Program Listing for File streamports.ipp¶
↰ Return to documentation for file (falcon/streamports.ipp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
//#include "streamports.hpp"
//#include "g3log/src/g2log.hpp"
template <typename DATATYPE>
inline uint64_t SlotOut<DATATYPE>::nitems_produced() const {
return ringbuffer_serial_number_;
}
template <typename DATATYPE>
inline typename DATATYPE::Data *SlotOut<DATATYPE>::ClaimData(bool clear) {
next_batch(1);
has_publishable_data_ = true;
typename DATATYPE::Data *data = ringbuffer_->Get(ring_batch_.Start());
if (clear) {
data->ClearData();
}
data->set_serial_number(ringbuffer_serial_number_++);
return data;
}
template <typename DATATYPE>
inline std::vector<typename DATATYPE::Data *>
SlotOut<DATATYPE>::ClaimDataN(uint64_t n, bool clear) {
std::vector<typename DATATYPE::Data *> data;
next_batch(n);
int64_t start = ring_batch_.Start();
for (int64_t k = start; k <= ring_batch_.end(); k++) {
data.push_back((typename DATATYPE::Data *)ringbuffer_->Get(k));
}
if (clear) {
for (auto &it : data) {
it->ClearData();
}
}
for (auto &it : data) {
it->set_serial_number(ringbuffer_serial_number_++);
}
has_publishable_data_ = true;
return data;
}
template <typename DATATYPE> inline void SlotOut<DATATYPE>::PublishData() {
if (has_publishable_data_ && ringbuffer_->GetCursor() != INT64_MAX) {
ringbuffer_->Publish(ring_batch_);
has_publishable_data_ = false;
}
}
template <typename DATATYPE>
void SlotOut<DATATYPE>::CreateRingBuffer(int buffer_size,
WaitStrategy wait_strategy) {
// make sure buffer size is power of 2 and at least 2
buffer_size_ = buffer_size < 2 ? 2 : next_pow2(buffer_size);
datafactory_.reset(new DataFactory<DATATYPE>(streaminfo_.parameters()));
try {
ringbuffer_.reset(new RingBuffer<typename DATATYPE::Data>(
datafactory_.get(), buffer_size_,
ClaimStrategy::kSingleThreadedStrategy, wait_strategy));
} catch (std::runtime_error &e) {
throw;
}
barrier_.reset(ringbuffer_->NewBarrier(std::vector<RingSequence *>(0)));
ringbuffer_->set_gating_sequences(gating_sequences());
}
template <typename DATATYPE> void SlotOut<DATATYPE>::Unlock() {
if (connected()) {
ringbuffer_->ForcePublish(INT64_MAX);
}
}
template <typename DATATYPE>
inline RingBatch *SlotOut<DATATYPE>::next_batch(uint64_t n) {
ring_batch_.set_size((int)n);
ring_batch_.set_end(-1);
return ringbuffer_->Next(&ring_batch_);
}
template <typename DATATYPE>
void PortOut<DATATYPE>::Connect(int slot, ISlotIn *downstream) {
if (slot < 0 || slot >= number_of_slots()) {
throw std::out_of_range("Error connecting to slot " + std::to_string(slot) +
" (invalid slot number)");
}
this->slots_.at(slot)->Connect(downstream);
}
template <typename DATATYPE> int PortOut<DATATYPE>::ReserveSlot(int slot) {
int open_slot = -1;
int nconnections = 0;
for (int n = 0; n < (int)slots_.size(); n++) {
nconnections += slots_[n]->nconnected();
if (open_slot == -1 && !slots_[n]->connected()) {
open_slot = n;
}
}
if (open_slot == -1) {
open_slot = nconnections;
}
int reserved_slot = IdentifyNextSlot(slot, open_slot, true, policy());
if (reserved_slot < 0) {
throw std::runtime_error("Cannot reserve slot.");
}
if (reserved_slot == number_of_slots()) {
this->NewSlot();
}
return reserved_slot;
}
template <typename DATATYPE> void PortOut<DATATYPE>::CreateRingBuffers() {
for (auto &slot_it : slots_) {
slot_it->CreateRingBuffer(policy().buffer_size(), policy().wait_strategy());
}
}
template <typename DATATYPE> void PortOut<DATATYPE>::UnlockSlots() {
for (auto &slot_it : slots_) {
slot_it->Unlock();
}
}
template <typename DATATYPE> void PortOut<DATATYPE>::NewSlot(int n) {
SlotAddress address(this->address_, 0);
for (int k = 0; k < n; k++) {
address.set_slot(this->slots_.size());
auto slot = std::make_unique<SlotOut<DATATYPE>>(this, address, parameters_);
slots_.push_back(std::move(slot));
}
}
template <typename DATATYPE>
const typename DATATYPE::Data *SlotIn<DATATYPE>::GetDataPrototype() const {
const typename DATATYPE::Data *data;
data = (const typename DATATYPE::Data *)upstream_->DataAt(0);
return data;
}
template <typename DATATYPE> void SlotIn<DATATYPE>::check_high_water_level() {
if (status_.backlog > HIGH_WATER_LEVEL * upstream_->buffer_size() and
n_messages_ == 0) {
LOG(WARNING) << "high-water level reached for "
<< upstream_address().string();
++n_messages_;
if (n_messages_ == MAX_N_MESSAGES) {
n_messages_ = 0;
}
}
}
template <typename DATATYPE>
bool SlotIn<DATATYPE>::RetrieveData(typename DATATYPE::Data *&data) {
data = nullptr;
status_.read = status_.backlog = 0;
status_.alive = true;
if (!connected()) {
return status_.alive;
}
int64_t requested_sequence = sequence_.sequence();
if (requested_sequence == INT64_MAX) {
status_.alive = false;
return status_.alive;
}
requested_sequence += ncached_ + 1L;
try {
if (time_out_ < 0) {
int64_t available_sequence = upstream_->WaitFor(requested_sequence);
if (available_sequence == INT64_MAX) {
status_.alive = false;
} else {
data = (typename DATATYPE::Data *)upstream_->DataAt(requested_sequence);
++nretrieved_;
status_.read = 1;
status_.backlog = available_sequence - requested_sequence;
}
} else {
int64_t available_sequence =
upstream_->WaitFor(requested_sequence, time_out_);
if (available_sequence < requested_sequence) {
// timed out
if (cache_enabled_) {
data = cache_;
status_.read = 1;
}
} else if (available_sequence == INT64_MAX) {
status_.alive = false;
} else {
data = (typename DATATYPE::Data *)upstream_->DataAt(requested_sequence);
++nretrieved_;
status_.read = 1;
status_.backlog = available_sequence - requested_sequence;
if (cache_enabled_) {
if (ncached_ == 0) {
--nretrieved_;
}
cache_ = data;
ncached_ = 1;
}
}
}
} catch (const RingAlertException &e) {
// terminate processing
status_.alive = false;
}
check_high_water_level();
return status_.alive;
}
template <typename DATATYPE>
bool SlotIn<DATATYPE>::RetrieveDataN(
uint64_t n, std::vector<typename DATATYPE::Data *> &data) {
// will only cache last value, but does not return cached values when timed
// out if n>1
data.clear();
status_.read = status_.backlog = 0;
status_.alive = true;
if (!connected() || n == 0) {
return status_.alive;
}
int64_t current_sequence = sequence_.sequence();
if (current_sequence == INT64_MAX) {
status_.alive = false;
return status_.alive;
}
current_sequence += ncached_;
int64_t requested_sequence = current_sequence + n;
try {
if (time_out_ < 0) {
int64_t available_sequence = upstream_->WaitFor(requested_sequence);
if (available_sequence == INT64_MAX) {
status_.alive = false;
} else {
for (int64_t k = current_sequence + 1; k <= requested_sequence; k++) {
data.push_back((typename DATATYPE::Data *)upstream_->DataAt(k));
++nretrieved_;
++status_.read;
}
status_.backlog = available_sequence - requested_sequence;
}
} else {
int64_t available_sequence =
upstream_->WaitFor(requested_sequence, time_out_);
if (available_sequence < requested_sequence) {
// timed out
if (n == 1 && cache_enabled_) {
data.push_back(cache_);
status_.read = 1;
}
} else if (available_sequence == INT64_MAX) {
status_.alive = false;
} else {
for (int64_t k = current_sequence + 1; k <= requested_sequence; k++) {
data.push_back((typename DATATYPE::Data *)upstream_->DataAt(k));
++nretrieved_;
++status_.read;
}
status_.backlog = available_sequence - requested_sequence;
if (cache_enabled_) {
if (ncached_ == 0) {
--nretrieved_;
}
cache_ = data.back();
ncached_ = 1;
}
}
}
} catch (const RingAlertException &e) {
status_.alive = false;
}
check_high_water_level();
return status_.alive;
}
template <typename DATATYPE>
bool SlotIn<DATATYPE>::RetrieveDataAll(
std::vector<typename DATATYPE::Data *> &data) {
// supports single item caching
data.clear();
status_.read = status_.backlog = 0;
status_.alive = true;
if (!connected()) {
return status_.alive;
}
int64_t current_sequence = sequence_.sequence();
if (current_sequence == INT64_MAX) {
status_.alive = false;
return status_.alive;
}
current_sequence += ncached_;
int64_t requested_sequence = current_sequence + 1L;
try {
if (time_out_ < 0) {
int64_t available_sequence = upstream_->WaitFor(requested_sequence);
if (available_sequence == INT64_MAX) {
status_.alive = false;
} else {
for (int64_t k = current_sequence + 1; k <= available_sequence; k++) {
data.push_back((typename DATATYPE::Data *)upstream_->DataAt(k));
++nretrieved_;
++status_.read;
}
}
} else {
int64_t available_sequence =
upstream_->WaitFor(requested_sequence, time_out_);
if (available_sequence < requested_sequence) {
// timed out
if (cache_enabled_) {
data.push_back(cache_);
status_.read = 1;
}
} else if (available_sequence == INT64_MAX) {
status_.alive = false;
} else {
for (int64_t k = current_sequence + 1; k <= available_sequence; k++) {
data.push_back((typename DATATYPE::Data *)upstream_->DataAt(k));
++nretrieved_;
++status_.read;
}
if (cache_enabled_) {
if (ncached_ == 0) {
--nretrieved_;
}
cache_ = data.back();
ncached_ = 1;
}
}
}
} catch (const RingAlertException &e) {
status_.alive = false;
}
check_high_water_level();
return status_.alive;
}
template <typename DATATYPE> void SlotIn<DATATYPE>::Unlock() {
sequence_.set_sequence(INT64_MAX);
}
template <typename DATATYPE>
void PortIn<DATATYPE>::Connect(int slot, ISlotOut *upstream) {
if (slot >= policy().min_slot_number() && slot == number_of_slots() &&
slot < policy().max_slot_number()) {
// create new slot
NewSlot();
}
if (slot < 0 || slot >= number_of_slots()) {
throw std::out_of_range("Error connecting to slot " + std::to_string(slot) +
" (number of available slots = " +
std::to_string(number_of_slots()) + ")");
}
slots_.at(slot)->Connect(upstream);
}
template <typename DATATYPE> int PortIn<DATATYPE>::ReserveSlot(int slot) {
int nconnected;
for (nconnected = 0; nconnected < (int)slots_.size(); nconnected++) {
if (!slots_[nconnected]->connected()) {
break;
}
}
int reserved_slot = IdentifyNextSlot(slot, nconnected, false, policy());
if (reserved_slot < 0) {
throw std::runtime_error("Cannot reserve slot.");
}
if (reserved_slot == number_of_slots()) {
this->NewSlot();
}
return reserved_slot;
}
template <typename DATATYPE>
void PortIn<DATATYPE>::VerifyCompatibility(IPortOut *upstream) {
try {
// the data type of upstream ports should be the same as or a more
// derived type than the data type of the downstream port
// dynamic_cast is used to test for upcast ability
auto cast = dynamic_cast<const typename DATATYPE::Capabilities &>(
upstream->capabilities());
// at this point, upstream capabilities have been cast up to the
// same data type as the downstream capabilities
// here we test further compatibility of the capabilities
capabilities_.VerifyCompatibility(cast);
} catch (const std::bad_cast &e) {
// casting failed, upstream and downstream data types are not compatible
throw std::runtime_error(std::string("Incompatible data types"));
}
}
template <typename DATATYPE> void PortIn<DATATYPE>::NewSlot(int n) {
SlotAddress address(this->address_, 0);
for (int k = 0; k < n; k++) {
address.set_slot(slots_.size());
auto slot = std::make_unique<SlotIn<DATATYPE>>(this, address, capabilities_, policy().time_out(),
policy().cache_enabled());
slots_.push_back(std::move(slot));
}
}
template <typename DATATYPE> void PortIn<DATATYPE>::UnlockSlots() {
for (auto &slot_it : slots_) {
slot_it->Unlock();
}
}