Program Listing for File processorgraph.cpp¶
↰ Return to documentation for file (falcon/processorgraph.cpp
)
// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include <exception>
#include <iostream>
#include <regex>
#include <algorithm>
#include <string>
#include <vector>
#include "processorgraph.hpp"
#include "sharedstate.hpp"
#include "buildconstant.hpp"
using namespace graph;
std::string graph_state_string(GraphState state) {
std::string s;
#define PROCESS_STATE(p) \
case (GraphState::p): \
s = #p; \
break;
switch (state) {
PROCESS_STATE(NOGRAPH);
PROCESS_STATE(CONSTRUCTING);
PROCESS_STATE(PREPARING);
PROCESS_STATE(READY);
PROCESS_STATE(STARTING);
PROCESS_STATE(PROCESSING);
PROCESS_STATE(STOPPING);
PROCESS_STATE(ERROR);
}
#undef PROCESS_STATE
return s;
}
std::vector<std::string> expandProcessorName(std::string s) {
static const int name_group = 1;
static const int range_group = 2;
static const int first_range_id = 1;
static const int end_range_id = 2;
std::vector<std::string> result;
int startid, endid;
// name# or name[#, #-#]
std::regex re(
"^([a-zA-Z]+(?:[ -_][a-zA-Z]+)*)[ ]*((?:\\d+)|(?:\\([\\d,\\-]+\\)))?$");
std::smatch match;
// match regular expression
if (!std::regex_match(s, match, re)) {
throw std::runtime_error("Invalid processor name: \"" + s + "\"");
}
// get base name
if (!match[name_group].matched) {
throw std::runtime_error("Invalid processor name (no base name): \"" + s +
"\"");
}
std::string name = match[name_group].str();
// remove trimming spaces
name = std::regex_replace(name, std::regex("^ +| +$"), std::string(""));
//name = std::regex_replace(name, std::regex("[ _]"), "-");
// parse part identifiers
std::vector<int> identifiers;
if (!match[range_group].matched) {
result.push_back(name);
} else {
std::string range = match[range_group].str(); //Example: (1-2)
// remove trimming spaces
range = std::regex_replace(range, std::regex("^ +| +$"), std::string(""));
if (range[0] == '(') {
// match ID range vector
// remove brackets and spaces
range.erase(std::remove_if(range.begin(), range.end(),
[](char x) {
return (x == '(' || x == ')' ||
std::isspace(x));
}),
range.end());
// split on comma
auto id_range = split(range, ',');
std::regex re_range("(\\d+)(?:\\-(\\d+))?");
std::smatch match_range;
// match start and end id of ranges
for (const auto &q : id_range) {
if (std::regex_match(q, match_range, re_range)) {
startid = stoi(match_range[first_range_id].str());
if (match_range[end_range_id].matched) {
endid = stoi(match_range[end_range_id].str());
} else {
endid = startid;
}
for (auto kk = startid; kk <= endid; kk++) {
result.push_back(name + std::to_string(kk));
}
} else {
throw std::runtime_error(
"Invalid processor name (invalid identifiers): \"" + s + "\"");
}
}
} else {
// try to convert to int
try {
result.push_back(name + std::to_string(stoi(range)));
} catch (std::invalid_argument &e) {
throw std::runtime_error(
"Invalid processor name (invalid identifiers): \"" + s + "\"");
}
}
}
return result;
}
void ProcessorGraph::ConstructProcessorEngines(const YAML::Node &node) {
std::vector<std::string> processor_name_list;
std::string processor_name;
std::string processor_class;
std::unique_ptr<IProcessor> processor;
// loop through all processors defined in YAML document
for (YAML::const_iterator it = node.begin(); it != node.end(); ++it) {
// expand processor name
// e.g. name -> name, name1 -> name1, name(1-2, 4) -> name1, name2, name4
processor_name_list = expandProcessorName(it->first.as<std::string>());
// get processor definition
YAML::Node processor_node = it->second;
if (processor_node["class"]) {
processor_class = processor_node["class"].as<std::string>();
// loop through expanded name list
for (auto &name_it : processor_name_list) {
processor_name = name_it;
// does processor already exist?
auto it2 = processors_.find(processor_name);
if (it2 == processors_.end()) { // no processor with this name known
try {
processor.reset(
ProcessorFactory::instance().create(processor_class));
} catch (factory::UnknownClass &e) {
throw InvalidProcessorError("Cannot create processor " +
processor_name + " of unknown class " +
processor_class + ".");
}
processor->set_name_and_type(processor_name, processor_class);
processor->internal_Configure(processor_node, global_context_);
processors_[processor_name] =
std::make_pair(processor_class, std::move(processor));
LOG(DEBUG) << "Constructed and configured " << processor_name << " ("
<< processor_class << ").";
} else if (it2->second.first == processor_class) {
// processor with this name and class found
it2->second.second->internal_Configure(processor_node,
global_context_);
LOG(DEBUG) << "Configured processor " << processor_name << " ("
<< processor_class << ")";
} else { // processor with this name, but different class found
throw InvalidProcessorError("Processor " + processor_name +
" of different class (" +
it2->second.first + ") already exists.");
}
}
} else {
throw InvalidProcessorError("No class specified for processor " +
processor_name + ".");
}
}
}
void ParseConnectionRules(const YAML::Node &node,
StreamConnections &connections) {
for (YAML::const_iterator it = node.begin(); it != node.end(); ++it) {
expandConnectionRule(parseConnectionRule(it->as<std::string>()),
connections);
LOG(DEBUG) << "Parsed connection rule " << it->as<std::string>();
}
}
ProcessorGraph::ProcessorGraph(GlobalContext &context)
: global_context_(context), terminate_signal_(false) {
// log list of registered processors
std::vector<std::string> processors =
ProcessorFactory::instance().listEntries();
for (auto item : processors) {
documentation_[item] = LoadProcessorDoc(item);
if (documentation_[item].IsMap() && documentation_[item]["Description"]) {
LOG(INFO) << "Registered processor " << item << " - "
<< documentation_[item]["Description"];
} else {
LOG(INFO) << "Registered processor " << item;
}
}
}
YAML::Node LoadProcessorDoc(std::string processor) {
std::transform(processor.begin(), processor.end(), processor.begin(),
::tolower);
std::string filename = DOC_PATH + processor + "/doc.yaml";
YAML::Node node;
try {
return YAML::LoadFile(filename);
} catch (YAML::BadFile &e) {
return YAML::Load("No available documentation.\n");
} catch (YAML::ParserException &e) {
LOG(DEBUG) << processor << " - " << e.what();
return YAML::Load("Error when parsing this documentation.\n");
}
}
YAML::Node ProcessorGraph::GetProcessorDocumentation() {
return documentation_;
}
std::string ProcessorGraph::state_string() const {
return graph_state_string(state_);
}
IProcessor *ProcessorGraph::LookUpProcessor(std::string name) {
if (processors_.count(name) == 0) {
throw InvalidProcessorError("Processor \"" + name + "\" not found.");
}
return processors_[name].second.get();
}
std::vector<std::pair<std::string, std::shared_ptr<IState>>>
ProcessorGraph::LookUpStates(std::vector<std::string> state_addresses) {
std::vector<std::pair<std::string, std::shared_ptr<IState>>> states;
for (auto &state_address : state_addresses) {
// parse processor.state name
std::vector<std::string> address = split(state_address, '.');
if (address.size() != 2) {
throw InvalidGraphError("Error parsing state address \"" + state_address +
"\"");
}
// expand processor part of address
auto expanded_processor = expandProcessorName(address[0]);
for (auto &itv : expanded_processor) {
IProcessor *processor = LookUpProcessor(itv);
auto state = processor->shared_state(
address[1]); // fix error message when this fails
states.push_back(std::make_pair(itv + "." + address[1], state));
}
}
return states;
}
void ProcessorGraph::BuildSharedStates(const YAML::Node &node) {
// states:
// - [processor.state, processor.state, ...]
// - group-name:
// permission: xxx
// description: xxx
// states: [...]
// - group-name: [...]
std::vector<std::pair<std::string, std::shared_ptr<IState>>> states;
std::string alias;
Permission permission;
std::string description;
int group_index = 0;
// loop through items in sequence:
for (YAML::const_iterator link = node.begin(); link != node.end(); ++link) {
++group_index;
description = "";
permission = Permission::WRITE;
if (link->IsSequence()) {
alias = "alias_" + std::to_string(group_index);
states = LookUpStates(link->as<std::vector<std::string>>());
} else if (link->IsMap() && link->size() == 1 &&
link->begin()->second.IsSequence()) {
alias = link->begin()->first.as<std::string>();
states =
LookUpStates(link->begin()->second.as<std::vector<std::string>>());
} else if (link->IsMap() && link->size() == 1 &&
link->begin()->second.IsMap()) {
alias = link->begin()->first.as<std::string>();
description = link->begin()->second["description"].as<std::string>("");
permission = permission_from_string(
link->begin()->second["permission"].as<std::string>("unspecified"));
states = LookUpStates(
link->begin()->second["states"].as<std::vector<std::string>>());
} else {
throw InvalidGraphError("Error parsing linked state request.");
}
shared_state_map_.AddAlias(alias, permission, description);
for (auto const &state : states) {
shared_state_map_.ShareState(alias, state.first, state.second);
LOG(DEBUG) << "Successfully linked state " << state.first << " to alias "
<< alias;
}
}
}
void ProcessorGraph::CreateConnection(SlotAddress &out, SlotAddress &in) {
// get ProcessorEngine for output and input
IProcessor *processor_out, *processor_in;
try {
processor_out = this->processors_.at(out.processor()).second.get();
} catch (std::out_of_range &e) {
throw std::out_of_range("Unknown processor \"" + out.processor() + "\"");
}
out.set_processor_class(processor_out->type());
try {
processor_in = this->processors_.at(in.processor()).second.get();
} catch (std::out_of_range &e) {
throw std::out_of_range("Unknown processor \"" + in.processor() + "\"");
}
in.set_processor_class(processor_in->type());
// let engine prepare connections ( get default port, check port, reserve
// slot, update address )
processor_out->internal_PrepareConnectionOut(out);
processor_in->internal_PrepareConnectionIn(in);
// check compatibility
processor_in->internal_ConnectionCompatibilityCheck(in, processor_out, out);
// connect in to out, connect out to in
processor_in->internal_ConnectIn(in, processor_out, out);
try {
processor_out->internal_ConnectOut(out, processor_in, in);
} catch (...) {
// internal error
// in_connector_->Disconnect();
throw std::runtime_error("Internal error: cannot connect to output slot");
}
}
void ProcessorGraph::Build(const YAML::Node &node) {
if (state_ != GraphState::NOGRAPH) {
throw InvalidStateError(
"A graph has already been built. Destroy old graph first.");
}
if (!node["processors"] || !node["processors"].IsMap()) {
throw InvalidGraphError("No processors found in graph definition.");
}
set_state(GraphState::CONSTRUCTING);
try {
ConstructProcessorEngines(node["processors"]);
LOG(INFO) << "Constructed and configured all processors";
for (auto &it : this->processors_) {
it.second.second->internal_CreatePorts();
LOG(DEBUG) << "Created ports for processor " << it.first;
}
LOG(INFO) << "All ports have been created.";
if (node["connections"] && node["connections"].IsSequence()) {
ParseConnectionRules(node["connections"], connections_);
LOG(INFO) << "Parsed all connection rules.";
for (auto &it : connections_) {
CreateConnection(it.first, it.second);
LOG(DEBUG) << "Established connection " << it.first.string(true) << "->"
<< it.second.string(true);
}
LOG(INFO) << "All connections have been established.";
}
if (node["states"] && node["states"].IsSequence()) {
BuildSharedStates(node["states"]);
LOG(INFO) << "Linked all shared states.";
}
} catch (...) {
Destroy();
throw;
}
try {
// negiotiate connections
for (auto &it : this->processors_) {
it.second.second->internal_NegotiateConnections();
LOG(DEBUG) << "Negotiated data streams for processor " << it.first;
}
LOG(INFO) << "All data streams have been negotiated.";
// build ringbuffers
for (auto &it : this->processors_) {
it.second.second->internal_CreateRingBuffers();
LOG(DEBUG) << "Constructed ring buffer for processor " << it.first;
}
} catch (...) {
Destroy();
throw;
}
set_state(GraphState::PREPARING);
// prepare processors
try {
for (auto &it : this->processors_) {
it.second.second->Prepare(global_context_);
LOG(DEBUG) << "Successfully prepared processor " << it.first;
}
LOG(INFO) << "All processors have been prepared.";
} catch (...) {
Destroy();
throw;
}
yaml_ = node;
LOG(INFO) << "Graph was successfully constructed.";
set_state(GraphState::READY);
}
void ProcessorGraph::Destroy() {
// can only destroy graph if state is CONSTRUCTING, PREPARING or READY
if (state_ == GraphState::PROCESSING || state_ == GraphState::STARTING ||
state_ == GraphState::STOPPING) {
throw InvalidStateError("Cannot destroy graph while processing.");
} else if (state_ == GraphState::NOGRAPH) {
// nothing to destroy
return;
}
if (state_ != GraphState::CONSTRUCTING) {
// unprepare processors
for (auto &it : this->processors_) {
try {
it.second.second->Unprepare(global_context_);
LOG(DEBUG) << "Successfully unprepared processor " << it.first;
} catch (...) {
connections_.clear();
processors_.clear();
set_state(GraphState::NOGRAPH);
throw InvalidGraphError(
"Error while unpreparing processors. Forced destruction of graph. "
"Possible corruption of internal state.");
}
}
}
// destroy connections and processors
shared_state_map_.clear(); // will unlink all states and remove groups
connections_.clear();
processors_.clear(); // will destroy processors and all their ports/states
yaml_ = YAML::Null;
LOG(INFO) << "Graph has been destroyed.";
set_state(GraphState::NOGRAPH);
}
void ProcessorGraph::StartProcessing(std::string run_group_id,
std::string run_id,
std::string template_id, bool test_flag) {
// start processing only if state is READY
if (state_ == GraphState::READY) {
// construct RunInfo object
// runinfo_.reset( new RunInfo( terminate_signal_, context_, run_identifier,
// destination, source ) );
run_context_.reset(new RunContext(global_context_, terminate_signal_,
run_group_id, run_id, template_id,
test_flag));
set_state(GraphState::STARTING);
// prepare all processors for processing
// (i.e. flush buffers)
for (auto &it : this->processors_) {
it.second.second->internal_PrepareProcessing();
LOG(DEBUG) << "Prepared data stream ports of processor " << it.first;
}
LOG(INFO) << "Prepared all data stream ports for processing.";
try {
// loop through all processors
for (auto &it : this->processors_) {
it.second.second->internal_Start(*run_context_);
LOG(DEBUG) << "Started thread for processor " << it.first;
}
LOG(INFO) << "Started all processors.";
} catch (...) {
StopProcessing();
throw;
}
// wait until all processors are in running state
while (!all_processors_running()) {
if (run_context_->terminated()) {
// processor terminated during preparation or preprocessing
// other processors need to be unlocked still
break;
}
}
// all processors have either passed the preprocessing step
// or have terminated with error, which will be dealt with in
// graphmanager::run let's signal everyone to GO
{
std::unique_lock<std::mutex> lock(run_context_->mutex);
run_context_->go_signal = true;
run_context_->go_condition.notify_all();
}
set_state(GraphState::PROCESSING);
} else if (state_ == GraphState::NOGRAPH ||
state_ == GraphState::CONSTRUCTING ||
state_ == GraphState::PREPARING) {
throw InvalidStateError("Graph is not yet assembled.");
}
}
void ProcessorGraph::StopProcessing() {
if (state_ == GraphState::PROCESSING || state_ == GraphState::STARTING) {
set_state(GraphState::STOPPING);
if (run_context_->error()) {
LOG(ERROR) << "Processing terminated with error. "
<< run_context_->error_message();
}
// signal stop
run_context_->Terminate();
// alert waiting processors
for (auto &it : this->processors_) {
it.second.second->internal_Alert();
}
// join processor threads
for (auto &it : this->processors_) {
it.second.second->internal_Stop();
}
LOG(INFO) << "Stopped all processors.";
LOG(INFO) << "Graph was processing for "
<< std::to_string(run_context_->seconds()) << " seconds";
run_context_.reset();
terminate_signal_.store(false);
set_state(GraphState::READY);
} else if (state_ == GraphState::NOGRAPH ||
state_ == GraphState::CONSTRUCTING ||
state_ == GraphState::PREPARING) {
throw InvalidStateError("Graph is not yet assembled.");
} else { // READY, STOPPING
// pass
}
}
void ProcessorGraph::Update(YAML::Node &node) {
// YAML
// shared-state: value
// processor: {state: value}
// make sure node is a map
if (!node.IsMap()) {
throw InvalidProcessorError("No valid map with states found.");
}
// loop through all keys
for (YAML::iterator it = node.begin(); it != node.end(); ++it) {
std::string key = it->first.as<std::string>();
if (it->second.IsMap()) {
// find corresponding processor engine
if (processors_.count(key) == 0) {
LOG(ERROR) << "No processor named " << key;
continue;
}
IProcessor *processor = processors_[key].second.get();
// loop through states
for (YAML::iterator it2 = it->second.begin(); it2 != it->second.end();
++it2) {
try {
auto state_name = it2->first.as<std::string>();
auto state_value = it2->second.as<std::string>();
// it2->second = processor->internal_UpdateState( state_name,
// state_value );
auto pstate = processor->shared_state(state_name);
// check if externally settable??
if (pstate->external_permission() == Permission::WRITE) {
// set from string
it2->second = pstate->set_string(state_value);
} else {
throw std::runtime_error("Shared state " + state_name +
" on processor " + key +
" can not be controlled externally.");
}
LOG(UPDATE) << "State " << key << "." << state_name << " set to "
<< state_value;
} catch (std::exception &e) {
it2->second = false;
LOG(ERROR) << "Unable to update state value: " << e.what();
}
}
} else { // key points to shared state alias
try {
auto state_value = it->second.as<std::string>();
it->second = shared_state_map_.UpdateAlias(key, state_value);
LOG(UPDATE) << "Alias state " << key << " set to " << state_value;
} catch (std::exception &e) {
it->second = false;
LOG(ERROR) << "Unable to update state value: " << e.what();
}
}
}
}
void ProcessorGraph::Retrieve(YAML::Node &node) {
// YAML
// processor:
// state: <null>
// make sure node is a map
if (!node.IsMap()) {
throw InvalidProcessorError("No valid map with states found.");
}
// loop through all keys
for (YAML::iterator it = node.begin(); it != node.end(); ++it) {
std::string key = it->first.as<std::string>();
if (it->second.IsMap()) {
// find corresponding processor engine
if (processors_.count(key) == 0) {
LOG(ERROR) << "No processor named " << key;
continue;
}
IProcessor *processor = processors_[key].second.get();
// loop through states
for (YAML::iterator it2 = it->second.begin(); it2 != it->second.end();
++it2) {
try {
auto state_name = it2->first.as<std::string>();
// it2->second = processor->internal_RetrieveState( state_name );
auto pstate = processor->shared_state(state_name);
if (pstate->external_permission() != Permission::NONE) {
it2->second = pstate->get_string();
} else {
throw std::runtime_error("Shared state " + state_name +
" on processor " + key +
" can not be read externally.");
}
} catch (std::exception &e) {
it2->second = YAML::Null;
LOG(ERROR) << "Unable to retrieve state value: " << e.what();
}
}
} else { // key points to shared state alias
try {
it->second = shared_state_map_.RetrieveAlias(key);
} catch (std::exception &e) {
it->second = YAML::Null;
LOG(ERROR) << "Unable to retrieve state value: " << e.what();
}
}
}
}
void ProcessorGraph::Apply(YAML::Node &node) {
// YAML
// falcon:
// version: 1.0
// graph:
// processor:
// method:
// parameter: value
if (!node.IsMap()) {
throw InvalidProcessorError("No processors found in method definition.");
}
// loop through all processors, make sure value is another map
for (YAML::iterator it = node.begin(); it != node.end(); ++it) {
std::string processor_name = it->first.as<std::string>();
if (!it->second.IsMap()) {
LOG(ERROR) << "Invalid method definition for processor "
<< processor_name;
continue;
}
// find corresponding processor engine
if (processors_.count(processor_name) == 0) {
LOG(ERROR) << "In method definition: no processor named "
<< processor_name;
continue;
}
IProcessor *processor = processors_[processor_name].second.get();
// loop through all states
for (YAML::iterator it2 = it->second.begin(); it2 != it->second.end();
++it2) {
try {
it2->second = processor->internal_ApplyMethod(
it2->first.as<std::string>(), it2->second);
} catch (std::exception &e) {
it2->second = YAML::Null;
LOG(ERROR) << "Unable to apply method: " << e.what();
}
}
}
}
std::string ProcessorGraph::ExportYAML() {
std::string s = "";
YAML::Node node;
YAML::Emitter out;
node["falcon"]["version"] = 1.0;
if (state_ != GraphState::NOGRAPH) {
for (YAML::const_iterator it = yaml_["processors"].begin(); it != yaml_["processors"].end(); ++it) {
std::vector<std::string> processor_list = expandProcessorName(it->first.as<std::string>());
for(auto &name : processor_list ){
node["graph"]["processors"][name] = this->processors_[name].second->ExportYAML();
node["graph"]["processors"][name]["class"] = this->processors_[name].first;
if (yaml_["processors"][it->first]["options"]) {
node["graph"]["processors"][name]["options"] =
yaml_["processors"][it->first]["options"];
}
if (yaml_["processors"][it->first]["advanced"]) {
node["graph"]["processors"][name]["advanced"] =
yaml_["processors"][it->first]["advanced"];
}
}
}
for (auto &it : this->connections_) {
node["graph"]["connections"].push_back(it.first.string() + "=" +
it.second.string());
}
node["graph"]["states"] = shared_state_map_.ExportYAML();
out << node;
s = out.c_str();
}
return s;
}