.. _program_listing_file_falcon_streamports.hpp: Program Listing for File streamports.hpp ======================================== |exhale_lsh| :ref:`Return to documentation for file ` (``falcon/streamports.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // --------------------------------------------------------------------- // This file is part of falcon-core. // // Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders // // Falcon-server is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Falcon-server is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with falcon-core. If not, see . // --------------------------------------------------------------------- #pragma once #include #include #include #include #include "istreamports.hpp" #include "utilities/math_numeric.hpp" struct RingBufferStatus { uint64_t read; uint64_t backlog; bool alive; }; // forward declarations template class SlotIn; template class PortOut; template class PortIn; int IdentifyNextSlot(int slot_request, int connected_slot_number, bool allow_multi_connect, const PortPolicy &policy); template class SlotOut : public ISlotOut { friend class PortOut; public: SlotOut(PortOut *parent, const SlotAddress &address, const typename DATATYPE::Parameters ¶meters) : ISlotOut(parent, address), streaminfo_(parameters), ringbuffer_serial_number_(0) {} // public interface typename DATATYPE::Data *ClaimData(bool clear); std::vector ClaimDataN(uint64_t n, bool clear); void PublishData(); virtual StreamInfo &streaminfo() { return streaminfo_; } uint64_t nitems_produced() const; protected: // called by SlotIn virtual typename DATATYPE::Data *DataAt(int64_t sequence) const { return ringbuffer_->Get(sequence); } void CreateRingBuffer(int buffer_size, WaitStrategy wait_strategy); void Unlock(); RingBatch *next_batch(uint64_t n = 1); virtual void PrepareProcessing() { ringbuffer_serial_number_ = 0; if (!connected()) { return; } ringbuffer_->ForcePublish(-1L); ringbuffer_->Claim(-1L); } public: StreamInfo streaminfo_; // owned by SlotOut, once finalized, the streaminfo (and // datatype) are fixed for the life time of the slot(?) std::unique_ptr> datafactory_ = nullptr; std::unique_ptr> ringbuffer_ = nullptr; protected: uint64_t ringbuffer_serial_number_; }; template class PortOut : public IPortOut { public: PortOut(IProcessor *parent, const PortAddress &address, const typename DATATYPE::Capabilities &capabilities, const typename DATATYPE::Parameters ¶meters, const PortOutPolicy &policy) : IPortOut(parent, address, policy), capabilities_(capabilities), parameters_(parameters) { NewSlot(policy.min_slot_number()); } SlotType number_of_slots() const override { return slots_.size(); } std::string datatype() const override { return DATATYPE::datatype(); } StreamInfo &streaminfo(std::size_t index) { return slots_[index]->streaminfo(); } virtual SlotOut *slot(std::size_t index) { return slots_[index].get(); } SlotOut *dataslot(std::size_t index) { return slots_[index].get(); } virtual const typename DATATYPE::Capabilities &capabilities() const { return capabilities_; } protected: // called by StreamOutConnector void Connect(int slot, ISlotIn *downstream) override; int ReserveSlot(int slot) override; // called by IPortOut void CreateRingBuffers() override; void UnlockSlots() override; // called by PortOut::Connect virtual void NewSlot(int n = 1); void PrepareProcessing() override { for (auto &it : slots_) { it->PrepareProcessing(); } } private: typename DATATYPE::Capabilities capabilities_; typename DATATYPE::Parameters parameters_; // default parameters std::vector>> slots_; }; template class SlotIn : public ISlotIn { friend class PortIn; public: SlotIn(PortIn *parent, const SlotAddress &address, typename DATATYPE::Capabilities capabilities, int64_t time_out = -1, bool cache = false) : ISlotIn(parent, address, time_out, cache), capabilities_(capabilities) { } // methods called by processor implementation const typename DATATYPE::Data *GetDataPrototype() const; bool RetrieveData(typename DATATYPE::Data *&data); bool RetrieveDataN(uint64_t n, std::vector &data); bool RetrieveDataAll(std::vector &data); const StreamInfo &streaminfo() { if (!connected()) { throw std::runtime_error("Input slot is not connected"); } NegotiateUpstream(); return (StreamInfo &)upstream_->streaminfo(); } bool status_alive() const { return status_.alive; } uint64_t status_read() const { return status_.read; } uint64_t status_backlog() const { return status_.backlog; } void Validate() override { capabilities_.Validate(this->streaminfo().parameters()); } protected: void Unlock(); void check_high_water_level(); RingBufferStatus status_; const double HIGH_WATER_LEVEL = 0.85; unsigned int n_messages_; const unsigned int MAX_N_MESSAGES = 20; typename DATATYPE::Capabilities capabilities_; public: typename DATATYPE::Data *cache_; }; template class PortIn : public IPortIn { public: PortIn(IProcessor *parent, const PortAddress &address, const typename DATATYPE::Capabilities &capabilities, const PortInPolicy &policy) : IPortIn(parent, address, policy), capabilities_(capabilities) { NewSlot(policy.min_slot_number()); } SlotType number_of_slots() const override { return slots_.size(); } virtual SlotIn *slot(std::size_t index) { return slots_[index].get(); } SlotIn *dataslot(std::size_t index) { return slots_[index].get(); } std::string datatype() const override { return DATATYPE::datatype(); } const StreamInfo &streaminfo(std::size_t index) { return slots_[index]->streaminfo(); } void PrepareProcessing() override { for (auto &it : slots_) { it->PrepareProcessing(); } } protected: // called by StreamInConnector virtual void Connect(int slot, ISlotOut *upstream); virtual int ReserveSlot(int slot); virtual void VerifyCompatibility(IPortOut *upstream); void UnlockSlots() override; void NewSlot(int n = 1); private: // DATATYPE datatype_; typename DATATYPE::Capabilities capabilities_; std::vector>> slots_; }; #include "streamports.ipp"