.. _program_listing_file_falcon_streamports.hpp: Program Listing for File streamports.hpp ======================================== |exhale_lsh| :ref:`Return to documentation for file ` (``falcon/streamports.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // --------------------------------------------------------------------- // This file is part of falcon-core. // // Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders // // Falcon-server is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Falcon-server is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with falcon-core. If not, see . // --------------------------------------------------------------------- #pragma once #include #include #include #include #include "istreamports.hpp" #include "utilities/math_numeric.hpp" struct RingBufferStatus { uint64_t read; uint64_t backlog; bool alive; }; // forward declarations template class SlotIn; template class PortOut; template class PortIn; int IdentifyNextSlot(int slot_request, int connected_slot_number, bool allow_multi_connect, const PortPolicy &policy); template class SlotOut : public ISlotOut { friend class PortOut; public: SlotOut(PortOut *parent, const SlotAddress &address, const typename DATATYPE::Parameters ¶meters) : ISlotOut(parent, address), streaminfo_(parameters), ringbuffer_serial_number_(0) {} // public interface typename DATATYPE::Data *ClaimData(bool clear); std::vector ClaimDataN(uint64_t n, bool clear); void PublishData(); virtual StreamInfo &streaminfo() { return streaminfo_; } const typename DATATYPE::Data &prototype() const { return streaminfo_.template getDataPrototype(); } uint64_t nitems_produced() const; protected: // called by SlotIn virtual typename DATATYPE::Data *DataAt(int64_t sequence) const { return ringbuffer_->Get(sequence); } void CreateRingBuffer(int buffer_size, WaitStrategy wait_strategy); void Unlock(); RingBatch *next_batch(uint64_t n = 1); virtual void PrepareProcessing() { ringbuffer_serial_number_ = 0; if (!connected()) { return; } ringbuffer_->ForcePublish(-1L); ringbuffer_->Claim(-1L); } public: StreamInfo streaminfo_; // owned by SlotOut, once finalized, the streaminfo (and // datatype) are fixed for the life time of the slot(?) std::unique_ptr> ringbuffer_ = nullptr; protected: uint64_t ringbuffer_serial_number_; }; template class PortOut : public IPortOut { public: PortOut(IProcessor *parent, const PortAddress &address, const typename DATATYPE::Parameters ¶meters, const PortOutPolicy &policy) : IPortOut(parent, address, policy), parameters_(parameters) { NewSlot(policy.min_slot_number()); } SlotType number_of_slots() const override { return slots_.size(); } std::string datatype() const override { return DATATYPE::datatype(); } StreamInfo &streaminfo(std::size_t index) { return slots_[index]->streaminfo(); } const typename DATATYPE::Data &prototype(std::size_t index) const { return slots_[index]->prototype(); } virtual SlotOut *slot(std::size_t index) { return slots_[index].get(); } SlotOut *dataslot(std::size_t index) { return slots_[index].get(); } protected: // called by StreamOutConnector void Connect(int slot, ISlotIn *downstream) override; int ReserveSlot(int slot) override; // called by IPortOut void CreateRingBuffers() override; void UnlockSlots() override; // called by PortOut::Connect virtual void NewSlot(int n = 1); void PrepareProcessing() override { for (auto &it : slots_) { it->PrepareProcessing(); } } private: typename DATATYPE::Parameters parameters_; // default parameters std::vector>> slots_; }; template class SlotIn : public ISlotIn { friend class PortIn; public: SlotIn(PortIn *parent, const SlotAddress &address, typename DATATYPE::Capabilities capabilities, bool cache = false) : ISlotIn(parent, address, cache), capabilities_(capabilities) { } //const typename DATATYPE::Data *GetDataPrototype() const; bool RetrieveData(typename DATATYPE::Data *&data, int64_t time_out=-1); bool RetrieveDataN(uint64_t n, std::vector &data, int64_t time_out=-1); bool RetrieveDataAll(std::vector &data, int64_t time_out=-1); bool FlushData(){ std::vector data; if(!RetrieveDataAll(data, 0)){ return false; } auto nread = status_read(); if (nread == 0) { ReleaseData(); } return true; } const IStreamInfo &streaminfo() { if (!connected()) { throw std::runtime_error("Input slot is not connected"); } NegotiateUpstream(); return upstream_->streaminfo(); } const typename DATATYPE::Data & prototype() { return streaminfo().template getDataPrototype(); } bool status_alive() const { return status_.alive; } uint64_t status_read() const { return status_.read; } uint64_t status_backlog() const { return status_.backlog; } void Validate() override { try { // the template keyword is necessary here for the compiler // to know how to interpret the code // see https://stackoverflow.com/a/613132 auto prototype = this->streaminfo().template getDataPrototype(); capabilities_.Validate(prototype); } catch (const std::bad_cast &e) { throw std::runtime_error("Incompatible data types (" + this->streaminfo().datatype() + " -> " + DATATYPE::datatype() + ")."); } catch (const std::exception &e) { throw; } } protected: void Unlock(); void check_high_water_level(); RingBufferStatus status_; const double HIGH_WATER_LEVEL = 0.85; unsigned int n_messages_; const unsigned int MAX_N_MESSAGES = 20; typename DATATYPE::Capabilities capabilities_; public: typename DATATYPE::Data *cache_; }; template class PortIn : public IPortIn { public: PortIn(IProcessor *parent, const PortAddress &address, const typename DATATYPE::Capabilities &capabilities, const PortInPolicy &policy) : IPortIn(parent, address, policy), capabilities_(capabilities) { NewSlot(policy.min_slot_number()); } SlotType number_of_slots() const override { return slots_.size(); } virtual SlotIn *slot(std::size_t index) { return slots_[index].get(); } SlotIn *dataslot(std::size_t index) { return slots_[index].get(); } std::string datatype() const override { return DATATYPE::datatype(); } const IStreamInfo &streaminfo(std::size_t index) { return slots_[index]->streaminfo(); } const typename DATATYPE::Data & prototype(std::size_t index) { return slots_[index]->prototype(); } void PrepareProcessing() override { for (auto &it : slots_) { it->PrepareProcessing(); } } protected: // called by StreamInConnector virtual void Connect(int slot, ISlotOut *upstream); virtual int ReserveSlot(int slot); void UnlockSlots() override; void NewSlot(int n = 1); private: typename DATATYPE::Capabilities capabilities_; std::vector>> slots_; }; #include "streamports.ipp"