.. _program_listing_file_falcon_istreamports.hpp: Program Listing for File istreamports.hpp ========================================= |exhale_lsh| :ref:`Return to documentation for file ` (``falcon/istreamports.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // --------------------------------------------------------------------- // This file is part of falcon-core. // // Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders // // Falcon-server is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Falcon-server is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with falcon-core. If not, see . // --------------------------------------------------------------------- #pragma once #include #include #include #include #include "connections.hpp" #include "idata.hpp" #include "portpolicy.hpp" #include "streaminfo.hpp" #include "yaml-cpp/yaml.h" class IPortOut; class ISlotOut; class IPortIn; class ISlotIn; class IProcessor; class ISlotOut { friend class ISlotIn; template friend class SlotIn; friend class IPortOut; public: ISlotOut(IPortOut *parent, const SlotAddress &address) : ring_batch_(1), buffer_size_(-1), parent_(parent), address_(address) {} virtual ~ISlotOut(){} const SlotAddress &address() const { return address_; } IPortOut *parent() { return parent_; } bool connected() const { return downstream_slots_.size() > 0; } int nconnected() const { return downstream_slots_.size(); } virtual IStreamInfo &streaminfo() = 0; int buffer_size() const { return buffer_size_; } protected: // called by IPortOut void Connect(ISlotIn *downstream); // called by SlotIn int64_t WaitFor(int64_t sequence) const { return barrier_->WaitFor(sequence); } int64_t WaitFor(int64_t sequence, int64_t time_out) const { return barrier_->WaitFor(sequence, time_out); } virtual typename AnyType::Data *DataAt(int64_t sequence) const = 0; std::vector gating_sequences(); protected: RingBatch ring_batch_; bool has_publishable_data_ = false; // need to go through base class, since we don't know // the exact datatype of downstream slots std::set downstream_slots_; std::unique_ptr barrier_ = nullptr; int buffer_size_; IPortOut *parent_; // observing pointer SlotAddress address_; }; class IPortOut { friend class ProcessorEngine; friend class IProcessor; public: IPortOut(IProcessor *parent, const PortAddress &address, PortOutPolicy policy) : parent_(parent), address_(address), policy_(policy) {} virtual ~IPortOut(){} const PortAddress &address() const { return address_; } const PortOutPolicy &policy() const { return policy_; } IProcessor *parent() { return parent_; } virtual std::string datatype() const = 0; virtual ISlotOut *slot(std::size_t index) = 0; virtual SlotType number_of_slots() const = 0; virtual const typename AnyType::Capabilities &capabilities() const = 0; YAML::Node ExportYAML() const; std::string name() const { return address_.port(); } protected: // called by StreamOutConnector virtual void Connect(int slot, ISlotIn *downstream) = 0; virtual int ReserveSlot(int slot) = 0; virtual void CreateRingBuffers() = 0; virtual void UnlockSlots() = 0; virtual void PrepareProcessing() = 0; virtual void NewSlot(int n = 1) = 0; void set_buffer_size(int sz) { policy_.set_buffer_size(sz); } IProcessor *parent_; // observing pointer PortAddress address_; private: std::string name_; PortOutPolicy policy_; }; class ISlotIn { friend class IPortIn; template friend class PortIn; friend class ISlotOut; public: ISlotIn(IPortIn *parent, const SlotAddress &address, int64_t time_out = -1, bool cache = false) : time_out_(time_out), cache_enabled_(cache), parent_(parent), address_(address) {} virtual ~ISlotIn(){} const SlotAddress &address() const { return address_; } IPortIn *parent() { return parent_; } void NegotiateUpstream(); bool connected() const { return upstream_ != nullptr; } void ReleaseData(); const SlotAddress &upstream_address() { if (upstream_ == nullptr) { throw std::runtime_error( "Cannot get upstream address: slot is not connected."); } return upstream_->address(); } const PortOutPolicy &upstream_policy() const { if (upstream_ == nullptr) { throw std::runtime_error( "Cannot get upstream policy: slot is not connected."); } return upstream_->parent()->policy(); } virtual void Validate() = 0; protected: // called by upstream ISlotOut RingSequence *sequence() { return &sequence_; } // called by IPortIn void Connect(ISlotOut *upstream); void PrepareProcessing(); protected: int64_t time_out_; bool cache_enabled_; int64_t ncached_ = 0; int64_t nretrieved_ = 0; typename AnyType::Data *cache_ = nullptr; // access to upstream slot needs to go through base pointer // (since we don't know the exact datatype) ISlotOut *upstream_ = nullptr; RingSequence sequence_; // the input slot's read cursor into the buffer IPortIn *parent_; // observing pointer SlotAddress address_; }; class IPortIn { friend class ProcessorEngine; friend class IProcessor; public: IPortIn(IProcessor *parent, const PortAddress &address, PortInPolicy policy) : parent_(parent), address_(address), policy_(policy) {} virtual ~IPortIn(){} const PortAddress &address() const { return address_; } const PortInPolicy &policy() const { return policy_; } IProcessor *parent() { return parent_; } virtual std::string datatype() const = 0; virtual SlotType number_of_slots() const = 0; virtual ISlotIn *slot(std::size_t index) = 0; YAML::Node ExportYAML() const; std::string name() const { return address_.port(); } protected: // called by StreamInConnector virtual void Connect(int slot, ISlotOut *upstream) = 0; virtual int ReserveSlot(int slot) = 0; virtual void VerifyCompatibility(IPortOut *upstream) = 0; // called by ... virtual void PrepareProcessing() = 0; virtual void UnlockSlots() = 0; IProcessor *parent_; // observing pointer PortAddress address_; private: std::string name_; PortInPolicy policy_; };