Program Listing for File iprocessor.hpp¶
↰ Return to documentation for file (falcon/iprocessor.hpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#pragma once
#include <memory>
#include <set>
#include <string>
#include <functional>
#include <map>
#include <vector>
#include <utility>
#include "portpolicy.hpp"
#include "runinfo.hpp"
#include "threadutilities.hpp"
#include "streamports.hpp"
#include "factory/factory.hpp"
#include "yaml-cpp/yaml.h"
#include "graphexceptions.hpp"
#include "sharedstate.hpp"
#include "options/options.hpp"
// exception class for all processor related errors
GRAPHERROR(ProcessorInternalError);
GRAPHERROR(ProcessingError);
GRAPHERROR(ProcessingConfigureError);
GRAPHERROR(ProcessingCreatePortsError);
GRAPHERROR(ProcessingStreamInfoError);
GRAPHERROR(ProcessingPrepareError);
GRAPHERROR(ProcessingPreprocessingError);
void convert_name(std::string &s);
namespace graph {
class ProcessorGraph;
}
class IProcessor {
friend class ISlotIn;
friend class graph::ProcessorGraph;
public: // called by anyone
IProcessor(ThreadPriority priority = PRIORITY_NONE)
: running_(false), thread_() {
// add test option
add_option("test", new_test_flag_);
// add advanced options
add_advanced_option("threadcore", thread_core_);
add_advanced_option("threadpriority", thread_priority_ = priority);
add_advanced_option("buffer_sizes", requested_buffer_sizes_);
}
virtual ~IProcessor() { internal_Stop(); }
const std::string name() const { return name_; }
const std::string type() const { return type_; }
unsigned int n_input_ports() const { return input_ports_.size(); }
unsigned int n_output_ports() const { return output_ports_.size(); }
const std::set<std::string> input_port_names() const;
const std::set<std::string> output_port_names() const;
bool has_input_port(std::string port) {
return input_ports_.count(port) == 1;
}
bool has_output_port(std::string port) {
return output_ports_.count(port) == 1;
}
virtual bool issource() const { return n_input_ports() == 0; }
virtual bool issink() const { return n_output_ports() == 0; }
virtual bool isfilter() const { return (!issource() && !issink()); }
virtual bool isautonomous() const { return (issource() && issink()); }
ThreadPriority thread_priority() const { return thread_priority_(); }
ThreadCore thread_core() const { return thread_core_(); }
bool running() const { return running_.load(); }
YAML::Node ExportYAML();
protected:
std::map<std::string, std::shared_ptr<std::ostream>> streams_;
std::vector<TimePoint> test_source_timestamps_;
/* this methods creates a file whose access key is filename and whose
fullpath is prefix.filename.extension*/
void create_file(std::string prefix, std::string variable_name,
std::string extension = "bin");
void prepare_latency_test(ProcessingContext &context);
void save_source_timestamps_to_disk(std::uint64_t n_timestamps);
protected: // callable by derived processors, but not others
template <typename TValue>
void add_option(std::string name, TValue &value, std::string description = "",
bool required = false) {
options_.add(name, value, description, required);
}
void remove_option(std::string name);
template <typename DATATYPE>
PortOut<DATATYPE> *
create_output_port(std::string name,
const typename DATATYPE::Capabilities &capabilities,
const typename DATATYPE::Parameters ¶meters,
const PortOutPolicy &policy) {
if (name.size() == 0) {
name = DATATYPE::dataname();
}
convert_name(name);
if (output_ports_.count(name) == 1) {
throw std::runtime_error("Output port name \"" + name +
"\" is invalid or already exists.");
}
output_ports_[name] = std::move(std::unique_ptr<IPortOut>(
(IPortOut *)new PortOut<DATATYPE>(this, PortAddress(this->name(), name),
capabilities, parameters, policy)));
return ((PortOut<DATATYPE> *)output_ports_[name].get());
}
template <typename DATATYPE>
PortOut<DATATYPE> *
create_output_port(const typename DATATYPE::Capabilities &capabilities,
const typename DATATYPE::Parameters ¶meters,
const PortOutPolicy &policy) {
return create_output_port<DATATYPE>(DATATYPE::dataname(), capabilities,
parameters, policy);
}
template <typename DATATYPE>
PortIn<DATATYPE> *
create_input_port(std::string name,
const typename DATATYPE::Capabilities &capabilities,
const PortInPolicy &policy) {
if (name.size() == 0) {
name = DATATYPE::dataname();
}
convert_name(name);
if (input_ports_.count(name) == 1) {
throw std::runtime_error("Input port name \"" + name +
"\" is invalid or already exists.");
}
input_ports_[name] =
std::move(std::unique_ptr<IPortIn>((IPortIn *)new PortIn<DATATYPE>(
this, PortAddress(this->name(), name), capabilities, policy)));
return ((PortIn<DATATYPE> *)input_ports_[name].get());
}
template <typename DATATYPE>
PortIn<DATATYPE> *
create_input_port(const typename DATATYPE::Capabilities &capabilities,
const PortInPolicy &policy) {
return create_input_port<DATATYPE>(DATATYPE::dataname(), capabilities,
policy);
}
IPortIn *input_port(std::string port) { return input_ports_.at(port).get(); }
IPortOut *output_port(std::string port) {
return output_ports_.at(port).get();
}
IPortIn *input_port(const PortAddress &address);
IPortOut *output_port(const PortAddress &address);
ISlotIn *input_slot(const SlotAddress &address);
ISlotOut *output_slot(const SlotAddress &address);
/*self – peers – external
static variable: value is not changed by self or others
R – N – N (static variable) -> not very useful
R – N – R (externally observed static variable)
R – R – N (static shared observable)
R – R – R (externally observed static shared observable)
R – N – W (externally controlled static variable)
R – R – W (externally controlled shared static variable)
isolated producer: value is changed by self only
W – N – N (isolated producer) -> not very useful
W – N – R (externally observed isolated producer)
W – N – W (bi-directional channel)
co-operative producer: self and others change the value
W – W – N (co-operative producer)
W – W – R (externally observed co-operative producer)
W – W – W (externally controlled co-operative producer)
follower: value is changed by others, self only reads
R – W – N (follower)
R – W – R (externally observed follower)
R – W – W (externally controlled follower)
broadcaster: self changes the value, others follow
W – R – N (broadcaster)
W – R – R (externally observed broadcaster)
W – R – W (externally controlled broadcaster)
*/
template <typename T>
StaticState<T> *create_static_state(std::string state, T default_value,
bool shared = true,
Permission external = Permission::WRITE,
std::string description = "") {
if (shared) {
return ((StaticState<T> *)create_readable_shared_state<T>(
state, default_value, Permission::READ, external, description));
} else {
return ((StaticState<T> *)create_readable_shared_state<T>(
state, default_value, Permission::NONE, external, description));
}
}
template <typename T>
ProducerState<T> *create_producer_state(
std::string state, T default_value, bool cooperative = false,
Permission external = Permission::READ, std::string description = "") {
if (cooperative) {
return ((ProducerState<T> *)create_writable_shared_state<T>(
state, default_value, Permission::WRITE, external, description));
} else {
return ((ProducerState<T> *)create_writable_shared_state<T>(
state, default_value, Permission::NONE, external, description));
}
}
template <typename T>
BroadcasterState<T> *
create_broadcaster_state(std::string state, T default_value,
Permission external = Permission::NONE,
std::string description = "") {
return ((BroadcasterState<T> *)create_writable_shared_state<T>(
state, default_value, Permission::READ, external, description));
}
template <typename T>
FollowerState<T> *
create_follower_state(std::string state, T default_value,
Permission external = Permission::NONE,
std::string description = "") {
return ((FollowerState<T> *)create_readable_shared_state<T>(
state, default_value, Permission::WRITE, external, description));
}
template <typename T>
ReadableState<T> *create_readable_shared_state(
std::string state, T default_value, Permission peers = Permission::WRITE,
Permission external = Permission::NONE, std::string description = "") {
if (shared_states_.count(state) == 1) {
throw ProcessorInternalError("Shared state \"" + state +
"\" is invalid or already exists.",
name());
}
shared_states_[state] =
std::move(std::unique_ptr<IState>((IState *)new ReadableState<T>(
default_value, description, peers, external)));
return ((ReadableState<T> *)shared_states_[state].get());
}
template <typename T>
WritableState<T> *create_writable_shared_state(
std::string state, T default_value, Permission peers = Permission::READ,
Permission external = Permission::NONE, std::string description = "") {
if (shared_states_.count(state) == 1) {
throw ProcessorInternalError("Shared state \"" + state +
"\" is invalid or already exists.",
name());
}
shared_states_[state] =
std::move(std::unique_ptr<IState>((IState *)new WritableState<T>(
default_value, description, peers, external)));
return ((WritableState<T> *)shared_states_[state].get());
}
std::shared_ptr<IState> shared_state(std::string state) {
if (this->shared_states_.count(state) == 0) {
throw ProcessorInternalError(
"Shared state \"" + state + "\" does not exist.", name());
}
return shared_states_[state];
}
template <class T>
void expose_method(std::string methodname,
YAML::Node (T::*method)(const YAML::Node &)) {
if (exposed_methods_.count(methodname) == 1) {
throw ProcessorInternalError("Exposed method \"" + methodname +
"\" is invalid or already exists.",
name());
}
exposed_methods_[methodname] =
std::bind(method, static_cast<T *>(this), std::placeholders::_1);
}
std::function<YAML::Node(const YAML::Node &)> &
exposed_method(std::string method) {
if (this->exposed_methods_.count(method) == 0) {
throw ProcessorInternalError(
"Exposed method \"" + method + "\" does not exist.", name());
}
return exposed_methods_[method];
}
protected: // to be overridden and callable by derived processors
virtual std::string default_input_port() const;
virtual std::string default_output_port() const;
private: // to be overridden by derived processors, callable internally
virtual void Configure(const GlobalContext &context) {}
virtual void CreatePorts() = 0;
virtual void Preprocess(ProcessingContext &context) {}
virtual void Process(ProcessingContext &context) = 0;
virtual void Postprocess(ProcessingContext &context) {}
virtual void CompleteStreamInfo();
virtual void Prepare(GlobalContext &context) {}
virtual void Unprepare(GlobalContext &context) {}
virtual void TestPrepare(ProcessingContext &context) {}
virtual void TestFinalize(ProcessingContext &context) {}
private: // callable internally only
void internal_Configure(const YAML::Node &node,
const GlobalContext &context); // from engine
void internal_CreatePorts();
void internal_PrepareConnectionIn(SlotAddress &in);
void internal_PrepareConnectionOut(SlotAddress &out);
void
internal_ConnectionCompatibilityCheck(const SlotAddress &address,
IProcessor *upstream,
const SlotAddress &upstream_address);
void internal_ConnectIn(const SlotAddress &address, IProcessor *upstream,
const SlotAddress &upstream_address);
void internal_ConnectOut(const SlotAddress &address, IProcessor *downstream,
const SlotAddress &downstream_address);
void internal_NegotiateConnections();
void internal_CreateRingBuffers();
void internal_PrepareProcessing();
void internal_ThreadEntry(RunContext &runcontext);
void internal_Start(RunContext &runcontext);
void internal_Stop();
void internal_Alert();
YAML::Node internal_ApplyMethod(std::string name, const YAML::Node &node);
void set_name_and_type(std::string name, std::string type) {
name_ = name;
type_ = type;
}
template <typename TValue>
void add_advanced_option(std::string name, TValue &value,
std::string description = "",
bool required = false) {
advanced_options_.add(name, value, description, required);
}
private:
std::string name_;
std::string type_;
std::map<std::string, std::unique_ptr<IPortIn>> input_ports_;
std::map<std::string, std::unique_ptr<IPortOut>> output_ports_;
std::map<std::string, std::function<YAML::Node(const YAML::Node &)>>
exposed_methods_;
std::map<std::string, std::shared_ptr<IState>> shared_states_;
bool negotiated_ = false;
bool prepared_ = false;
std::atomic<bool> running_;
std::thread thread_;
options::Value<ThreadPriority, false> thread_priority_{
PRIORITY_NONE,
options::inrange<ThreadPriority>(PRIORITY_NONE, PRIORITY_HIGH)};
options::Value<ThreadCore, false> thread_core_{
CORE_NOT_PINNED,
options::inrange<ThreadCore>(
CORE_NOT_PINNED, (ThreadCore)sysconf(_SC_NPROCESSORS_ONLN) - 1)};
options::NullableBool new_test_flag_;
options::Value<std::map<std::string, int>> requested_buffer_sizes_{};
protected:
options::OptionList options_;
options::OptionList advanced_options_;
};
typedef std::map<std::string,
std::pair<std::string, std::unique_ptr<IProcessor>>>
ProcessorMap;
typedef factory::ObjectFactory<IProcessor, std::string> ProcessorFactory;
template <class PROCESSOR> class ProcessorRegistrar {
public:
ProcessorRegistrar(std::string name);
};
template <class PROCESSOR>
ProcessorRegistrar<PROCESSOR>::ProcessorRegistrar(std::string name) {
ProcessorFactory::instance().registerClass(
name, factory::createInstance<IProcessor, PROCESSOR>);
}
#define REGISTERPROCESSOR(PROCESSOR) \
namespace { \
static ProcessorRegistrar<PROCESSOR> _registrar(#PROCESSOR); \
};