Program Listing for File istreamports.hpp¶
↰ Return to documentation for file (falcon/istreamports.hpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#pragma once
#include <memory>
#include <set>
#include <string>
#include <vector>
#include "connections.hpp"
#include "idata.hpp"
#include "portpolicy.hpp"
#include "streaminfo.hpp"
#include "yaml-cpp/yaml.h"
class IPortOut;
class ISlotOut;
class IPortIn;
class ISlotIn;
class IProcessor;
class ISlotOut {
friend class ISlotIn;
template <typename DATATYPE> friend class SlotIn;
friend class IPortOut;
public:
ISlotOut(IPortOut *parent, const SlotAddress &address)
: ring_batch_(1), buffer_size_(-1), parent_(parent), address_(address) {}
virtual ~ISlotOut(){}
const SlotAddress &address() const { return address_; }
IPortOut *parent() { return parent_; }
bool connected() const { return downstream_slots_.size() > 0; }
int nconnected() const { return downstream_slots_.size(); }
virtual IStreamInfo &streaminfo() = 0;
int buffer_size() const { return buffer_size_; }
protected:
// called by IPortOut
void Connect(ISlotIn *downstream);
// called by SlotIn
int64_t WaitFor(int64_t sequence) const {
return barrier_->WaitFor(sequence);
}
int64_t WaitFor(int64_t sequence, int64_t time_out) const {
return barrier_->WaitFor(sequence, time_out);
}
virtual typename AnyType::Data *DataAt(int64_t sequence) const = 0;
std::vector<RingSequence *> gating_sequences();
protected:
RingBatch ring_batch_;
bool has_publishable_data_ = false;
// need to go through base class, since we don't know
// the exact datatype of downstream slots
std::set<ISlotIn *> downstream_slots_;
std::unique_ptr<RingBarrier> barrier_ = nullptr;
int buffer_size_;
IPortOut *parent_; // observing pointer
SlotAddress address_;
};
class IPortOut {
friend class ProcessorEngine;
friend class IProcessor;
public:
IPortOut(IProcessor *parent, const PortAddress &address, PortOutPolicy policy)
: parent_(parent), address_(address), policy_(policy) {}
virtual ~IPortOut(){}
const PortAddress &address() const { return address_; }
const PortOutPolicy &policy() const { return policy_; }
IProcessor *parent() { return parent_; }
virtual std::string datatype() const = 0;
virtual ISlotOut *slot(std::size_t index) = 0;
virtual SlotType number_of_slots() const = 0;
virtual const typename AnyType::Capabilities &capabilities() const = 0;
YAML::Node ExportYAML() const;
std::string name() const { return address_.port(); }
protected:
// called by StreamOutConnector
virtual void Connect(int slot, ISlotIn *downstream) = 0;
virtual int ReserveSlot(int slot) = 0;
virtual void CreateRingBuffers() = 0;
virtual void UnlockSlots() = 0;
virtual void PrepareProcessing() = 0;
virtual void NewSlot(int n = 1) = 0;
void set_buffer_size(int sz) { policy_.set_buffer_size(sz); }
IProcessor *parent_; // observing pointer
PortAddress address_;
private:
std::string name_;
PortOutPolicy policy_;
};
class ISlotIn {
friend class IPortIn;
template <typename DATATYPE> friend class PortIn;
friend class ISlotOut;
public:
ISlotIn(IPortIn *parent, const SlotAddress &address, int64_t time_out = -1,
bool cache = false)
: time_out_(time_out), cache_enabled_(cache), parent_(parent),
address_(address) {}
virtual ~ISlotIn(){}
const SlotAddress &address() const { return address_; }
IPortIn *parent() { return parent_; }
void NegotiateUpstream();
bool connected() const { return upstream_ != nullptr; }
void ReleaseData();
const SlotAddress &upstream_address() {
if (upstream_ == nullptr) {
throw std::runtime_error(
"Cannot get upstream address: slot is not connected.");
}
return upstream_->address();
}
const PortOutPolicy &upstream_policy() const {
if (upstream_ == nullptr) {
throw std::runtime_error(
"Cannot get upstream policy: slot is not connected.");
}
return upstream_->parent()->policy();
}
virtual void Validate() = 0;
protected:
// called by upstream ISlotOut
RingSequence *sequence() { return &sequence_; }
// called by IPortIn
void Connect(ISlotOut *upstream);
void PrepareProcessing();
protected:
int64_t time_out_;
bool cache_enabled_;
int64_t ncached_ = 0;
int64_t nretrieved_ = 0;
typename AnyType::Data *cache_ = nullptr;
// access to upstream slot needs to go through base pointer
// (since we don't know the exact datatype)
ISlotOut *upstream_ = nullptr;
RingSequence sequence_; // the input slot's read cursor into the buffer
IPortIn *parent_; // observing pointer
SlotAddress address_;
};
class IPortIn {
friend class ProcessorEngine;
friend class IProcessor;
public:
IPortIn(IProcessor *parent, const PortAddress &address, PortInPolicy policy)
: parent_(parent), address_(address), policy_(policy) {}
virtual ~IPortIn(){}
const PortAddress &address() const { return address_; }
const PortInPolicy &policy() const { return policy_; }
IProcessor *parent() { return parent_; }
virtual std::string datatype() const = 0;
virtual SlotType number_of_slots() const = 0;
virtual ISlotIn *slot(std::size_t index) = 0;
YAML::Node ExportYAML() const;
std::string name() const { return address_.port(); }
protected:
// called by StreamInConnector
virtual void Connect(int slot, ISlotOut *upstream) = 0;
virtual int ReserveSlot(int slot) = 0;
virtual void VerifyCompatibility(IPortOut *upstream) = 0;
// called by ...
virtual void PrepareProcessing() = 0;
virtual void UnlockSlots() = 0;
IProcessor *parent_; // observing pointer
PortAddress address_;
private:
std::string name_;
PortInPolicy policy_;
};