.. _program_listing_file_falcon_processorgraph.cpp: Program Listing for File processorgraph.cpp =========================================== |exhale_lsh| :ref:`Return to documentation for file ` (``falcon/processorgraph.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // --------------------------------------------------------------------- // This file is part of falcon-core. // // Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders // // Falcon-server is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Falcon-server is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with falcon-core. If not, see . // --------------------------------------------------------------------- #include #include #include #include #include #include #include "processorgraph.hpp" #include "sharedstate.hpp" #include "buildconstant.hpp" using namespace graph; std::string graph_state_string(GraphState state) { std::string s; #define PROCESS_STATE(p) \ case (GraphState::p): \ s = #p; \ break; switch (state) { PROCESS_STATE(NOGRAPH); PROCESS_STATE(CONSTRUCTING); PROCESS_STATE(PREPARING); PROCESS_STATE(READY); PROCESS_STATE(STARTING); PROCESS_STATE(PROCESSING); PROCESS_STATE(STOPPING); PROCESS_STATE(ERROR); } #undef PROCESS_STATE return s; } std::vector expandProcessorName(std::string s) { static const int name_group = 1; static const int range_group = 2; static const int first_range_id = 1; static const int end_range_id = 2; std::vector result; int startid, endid; // name# or name[#, #-#] std::regex re( "^([a-zA-Z]+(?:[ -_][a-zA-Z]+)*)[ ]*((?:\\d+)|(?:\\([\\d,\\-]+\\)))?$"); std::smatch match; // match regular expression if (!std::regex_match(s, match, re)) { throw std::runtime_error("Invalid processor name: \"" + s + "\""); } // get base name if (!match[name_group].matched) { throw std::runtime_error("Invalid processor name (no base name): \"" + s + "\""); } std::string name = match[name_group].str(); // remove trimming spaces name = std::regex_replace(name, std::regex("^ +| +$"), std::string("")); //name = std::regex_replace(name, std::regex("[ _]"), "-"); // parse part identifiers std::vector identifiers; if (!match[range_group].matched) { result.push_back(name); } else { std::string range = match[range_group].str(); //Example: (1-2) // remove trimming spaces range = std::regex_replace(range, std::regex("^ +| +$"), std::string("")); if (range[0] == '(') { // match ID range vector // remove brackets and spaces range.erase(std::remove_if(range.begin(), range.end(), [](char x) { return (x == '(' || x == ')' || std::isspace(x)); }), range.end()); // split on comma auto id_range = split(range, ','); std::regex re_range("(\\d+)(?:\\-(\\d+))?"); std::smatch match_range; // match start and end id of ranges for (const auto &q : id_range) { if (std::regex_match(q, match_range, re_range)) { startid = stoi(match_range[first_range_id].str()); if (match_range[end_range_id].matched) { endid = stoi(match_range[end_range_id].str()); } else { endid = startid; } for (auto kk = startid; kk <= endid; kk++) { result.push_back(name + std::to_string(kk)); } } else { throw std::runtime_error( "Invalid processor name (invalid identifiers): \"" + s + "\""); } } } else { // try to convert to int try { result.push_back(name + std::to_string(stoi(range))); } catch (std::invalid_argument &e) { throw std::runtime_error( "Invalid processor name (invalid identifiers): \"" + s + "\""); } } } return result; } void ProcessorGraph::ConstructProcessorEngines(const YAML::Node &node) { std::vector processor_name_list; std::string processor_name; std::string processor_class; std::unique_ptr processor; // loop through all processors defined in YAML document for (YAML::const_iterator it = node.begin(); it != node.end(); ++it) { // expand processor name // e.g. name -> name, name1 -> name1, name(1-2, 4) -> name1, name2, name4 processor_name_list = expandProcessorName(it->first.as()); // get processor definition YAML::Node processor_node = it->second; if (processor_node["class"]) { processor_class = processor_node["class"].as(); // loop through expanded name list for (auto &name_it : processor_name_list) { processor_name = name_it; // does processor already exist? auto it2 = processors_.find(processor_name); if (it2 == processors_.end()) { // no processor with this name known try { processor.reset( ProcessorFactory::instance().create(processor_class)); } catch (factory::UnknownClass &e) { throw InvalidProcessorError("Cannot create processor " + processor_name + " of unknown class " + processor_class + "."); } processor->set_name_and_type(processor_name, processor_class); processor->internal_Configure(processor_node, global_context_); processors_[processor_name] = std::make_pair(processor_class, std::move(processor)); LOG(DEBUG) << "Constructed and configured " << processor_name << " (" << processor_class << ")."; } else if (it2->second.first == processor_class) { // processor with this name and class found it2->second.second->internal_Configure(processor_node, global_context_); LOG(DEBUG) << "Configured processor " << processor_name << " (" << processor_class << ")"; } else { // processor with this name, but different class found throw InvalidProcessorError("Processor " + processor_name + " of different class (" + it2->second.first + ") already exists."); } } } else { throw InvalidProcessorError("No class specified for processor " + processor_name + "."); } } } void ParseConnectionRules(const YAML::Node &node, StreamConnections &connections) { for (YAML::const_iterator it = node.begin(); it != node.end(); ++it) { expandConnectionRule(parseConnectionRule(it->as()), connections); LOG(DEBUG) << "Parsed connection rule " << it->as(); } } ProcessorGraph::ProcessorGraph(GlobalContext &context) : global_context_(context), terminate_signal_(false) { // log list of registered processors std::vector processors = ProcessorFactory::instance().listEntries(); for (auto item : processors) { documentation_[item] = LoadProcessorDoc(item); if (documentation_[item].IsMap() && documentation_[item]["Description"]) { LOG(INFO) << "Registered processor " << item << " - " << documentation_[item]["Description"]; } else { LOG(INFO) << "Registered processor " << item; } } } YAML::Node LoadProcessorDoc(std::string processor) { std::transform(processor.begin(), processor.end(), processor.begin(), ::tolower); std::string filename = DOC_PATH + processor + "/doc.yaml"; YAML::Node node; try { return YAML::LoadFile(filename); } catch (YAML::BadFile &e) { return YAML::Load("No available documentation.\n"); } catch (YAML::ParserException &e) { LOG(DEBUG) << processor << " - " << e.what(); return YAML::Load("Error when parsing this documentation.\n"); } } YAML::Node ProcessorGraph::GetProcessorDocumentation() { return documentation_; } std::string ProcessorGraph::state_string() const { return graph_state_string(state_); } IProcessor *ProcessorGraph::LookUpProcessor(std::string name) { if (processors_.count(name) == 0) { throw InvalidProcessorError("Processor \"" + name + "\" not found."); } return processors_[name].second.get(); } std::vector>> ProcessorGraph::LookUpStates(std::vector state_addresses) { std::vector>> states; for (auto &state_address : state_addresses) { // parse processor.state name std::vector address = split(state_address, '.'); if (address.size() != 2) { throw InvalidGraphError("Error parsing state address \"" + state_address + "\""); } // expand processor part of address auto expanded_processor = expandProcessorName(address[0]); for (auto &itv : expanded_processor) { IProcessor *processor = LookUpProcessor(itv); auto state = processor->shared_state( address[1]); // fix error message when this fails states.push_back(std::make_pair(itv + "." + address[1], state)); } } return states; } void ProcessorGraph::BuildSharedStates(const YAML::Node &node) { // states: // - [processor.state, processor.state, ...] // - group-name: // permission: xxx // description: xxx // states: [...] // - group-name: [...] std::vector>> states; std::string alias; Permission permission; std::string description; int group_index = 0; // loop through items in sequence: for (YAML::const_iterator link = node.begin(); link != node.end(); ++link) { ++group_index; description = ""; permission = Permission::WRITE; if (link->IsSequence()) { alias = "alias_" + std::to_string(group_index); states = LookUpStates(link->as>()); } else if (link->IsMap() && link->size() == 1 && link->begin()->second.IsSequence()) { alias = link->begin()->first.as(); states = LookUpStates(link->begin()->second.as>()); } else if (link->IsMap() && link->size() == 1 && link->begin()->second.IsMap()) { alias = link->begin()->first.as(); description = link->begin()->second["description"].as(""); permission = permission_from_string( link->begin()->second["permission"].as("unspecified")); states = LookUpStates( link->begin()->second["states"].as>()); } else { throw InvalidGraphError("Error parsing linked state request."); } shared_state_map_.AddAlias(alias, permission, description); for (auto const &state : states) { shared_state_map_.ShareState(alias, state.first, state.second); LOG(DEBUG) << "Successfully linked state " << state.first << " to alias " << alias; } } } void ProcessorGraph::CreateConnection(SlotAddress &out, SlotAddress &in) { // get ProcessorEngine for output and input IProcessor *processor_out, *processor_in; try { processor_out = this->processors_.at(out.processor()).second.get(); } catch (std::out_of_range &e) { throw std::out_of_range("Unknown processor \"" + out.processor() + "\""); } out.set_processor_class(processor_out->type()); try { processor_in = this->processors_.at(in.processor()).second.get(); } catch (std::out_of_range &e) { throw std::out_of_range("Unknown processor \"" + in.processor() + "\""); } in.set_processor_class(processor_in->type()); // let engine prepare connections ( get default port, check port, reserve // slot, update address ) processor_out->internal_PrepareConnectionOut(out); processor_in->internal_PrepareConnectionIn(in); // check compatibility processor_in->internal_ConnectionCompatibilityCheck(in, processor_out, out); // connect in to out, connect out to in processor_in->internal_ConnectIn(in, processor_out, out); try { processor_out->internal_ConnectOut(out, processor_in, in); } catch (...) { // internal error // in_connector_->Disconnect(); throw std::runtime_error("Internal error: cannot connect to output slot"); } } void ProcessorGraph::Build(const YAML::Node &node) { if (state_ != GraphState::NOGRAPH) { throw InvalidStateError( "A graph has already been built. Destroy old graph first."); } if (!node["processors"] || !node["processors"].IsMap()) { throw InvalidGraphError("No processors found in graph definition."); } set_state(GraphState::CONSTRUCTING); try { ConstructProcessorEngines(node["processors"]); LOG(INFO) << "Constructed and configured all processors"; for (auto &it : this->processors_) { it.second.second->internal_CreatePorts(); LOG(DEBUG) << "Created ports for processor " << it.first; } LOG(INFO) << "All ports have been created."; if (node["connections"] && node["connections"].IsSequence()) { ParseConnectionRules(node["connections"], connections_); LOG(INFO) << "Parsed all connection rules."; for (auto &it : connections_) { CreateConnection(it.first, it.second); LOG(DEBUG) << "Established connection " << it.first.string(true) << "->" << it.second.string(true); } LOG(INFO) << "All connections have been established."; } if (node["states"] && node["states"].IsSequence()) { BuildSharedStates(node["states"]); LOG(INFO) << "Linked all shared states."; } } catch (...) { Destroy(); throw; } try { // negiotiate connections for (auto &it : this->processors_) { it.second.second->internal_NegotiateConnections(); LOG(DEBUG) << "Negotiated data streams for processor " << it.first; } LOG(INFO) << "All data streams have been negotiated."; // build ringbuffers for (auto &it : this->processors_) { it.second.second->internal_CreateRingBuffers(); LOG(DEBUG) << "Constructed ring buffer for processor " << it.first; } } catch (...) { Destroy(); throw; } set_state(GraphState::PREPARING); // prepare processors try { for (auto &it : this->processors_) { it.second.second->Prepare(global_context_); LOG(DEBUG) << "Successfully prepared processor " << it.first; } LOG(INFO) << "All processors have been prepared."; } catch (...) { Destroy(); throw; } yaml_ = node; LOG(INFO) << "Graph was successfully constructed."; set_state(GraphState::READY); } void ProcessorGraph::Destroy() { // can only destroy graph if state is CONSTRUCTING, PREPARING or READY if (state_ == GraphState::PROCESSING || state_ == GraphState::STARTING || state_ == GraphState::STOPPING) { throw InvalidStateError("Cannot destroy graph while processing."); } else if (state_ == GraphState::NOGRAPH) { // nothing to destroy return; } if (state_ != GraphState::CONSTRUCTING) { // unprepare processors for (auto &it : this->processors_) { try { it.second.second->Unprepare(global_context_); LOG(DEBUG) << "Successfully unprepared processor " << it.first; } catch (...) { connections_.clear(); processors_.clear(); set_state(GraphState::NOGRAPH); throw InvalidGraphError( "Error while unpreparing processors. Forced destruction of graph. " "Possible corruption of internal state."); } } } // destroy connections and processors shared_state_map_.clear(); // will unlink all states and remove groups connections_.clear(); processors_.clear(); // will destroy processors and all their ports/states yaml_ = YAML::Null; LOG(INFO) << "Graph has been destroyed."; set_state(GraphState::NOGRAPH); } void ProcessorGraph::StartProcessing(std::string run_group_id, std::string run_id, std::string template_id, bool test_flag) { // start processing only if state is READY if (state_ == GraphState::READY) { // construct RunInfo object // runinfo_.reset( new RunInfo( terminate_signal_, context_, run_identifier, // destination, source ) ); run_context_.reset(new RunContext(global_context_, terminate_signal_, run_group_id, run_id, template_id, test_flag)); set_state(GraphState::STARTING); // prepare all processors for processing // (i.e. flush buffers) for (auto &it : this->processors_) { it.second.second->internal_PrepareProcessing(); LOG(DEBUG) << "Prepared data stream ports of processor " << it.first; } LOG(INFO) << "Prepared all data stream ports for processing."; try { // loop through all processors for (auto &it : this->processors_) { it.second.second->internal_Start(*run_context_); LOG(DEBUG) << "Started thread for processor " << it.first; } LOG(INFO) << "Started all processors."; } catch (...) { StopProcessing(); throw; } // wait until all processors are in running state while (!all_processors_running()) { if (run_context_->terminated()) { // processor terminated during preparation or preprocessing // other processors need to be unlocked still break; } } // all processors have either passed the preprocessing step // or have terminated with error, which will be dealt with in // graphmanager::run let's signal everyone to GO { std::unique_lock lock(run_context_->mutex); run_context_->go_signal = true; run_context_->go_condition.notify_all(); } set_state(GraphState::PROCESSING); } else if (state_ == GraphState::NOGRAPH || state_ == GraphState::CONSTRUCTING || state_ == GraphState::PREPARING) { throw InvalidStateError("Graph is not yet assembled."); } } void ProcessorGraph::StopProcessing() { if (state_ == GraphState::PROCESSING || state_ == GraphState::STARTING) { set_state(GraphState::STOPPING); if (run_context_->error()) { LOG(ERROR) << "Processing terminated with error. " << run_context_->error_message(); } // signal stop run_context_->Terminate(); // alert waiting processors for (auto &it : this->processors_) { it.second.second->internal_Alert(); } // join processor threads for (auto &it : this->processors_) { it.second.second->internal_Stop(); } LOG(INFO) << "Stopped all processors."; LOG(INFO) << "Graph was processing for " << std::to_string(run_context_->seconds()) << " seconds"; run_context_.reset(); terminate_signal_.store(false); set_state(GraphState::READY); } else if (state_ == GraphState::NOGRAPH || state_ == GraphState::CONSTRUCTING || state_ == GraphState::PREPARING) { throw InvalidStateError("Graph is not yet assembled."); } else { // READY, STOPPING // pass } } void ProcessorGraph::Update(YAML::Node &node) { // YAML // shared-state: value // processor: {state: value} // make sure node is a map if (!node.IsMap()) { throw InvalidProcessorError("No valid map with states found."); } // loop through all keys for (YAML::iterator it = node.begin(); it != node.end(); ++it) { std::string key = it->first.as(); if (it->second.IsMap()) { // find corresponding processor engine if (processors_.count(key) == 0) { LOG(ERROR) << "No processor named " << key; continue; } IProcessor *processor = processors_[key].second.get(); // loop through states for (YAML::iterator it2 = it->second.begin(); it2 != it->second.end(); ++it2) { try { auto state_name = it2->first.as(); auto state_value = it2->second.as(); // it2->second = processor->internal_UpdateState( state_name, // state_value ); auto pstate = processor->shared_state(state_name); // check if externally settable?? if (pstate->external_permission() == Permission::WRITE) { // set from string it2->second = pstate->set_string(state_value); } else { throw std::runtime_error("Shared state " + state_name + " on processor " + key + " can not be controlled externally."); } LOG(UPDATE) << "State " << key << "." << state_name << " set to " << state_value; } catch (std::exception &e) { it2->second = false; LOG(ERROR) << "Unable to update state value: " << e.what(); } } } else { // key points to shared state alias try { auto state_value = it->second.as(); it->second = shared_state_map_.UpdateAlias(key, state_value); LOG(UPDATE) << "Alias state " << key << " set to " << state_value; } catch (std::exception &e) { it->second = false; LOG(ERROR) << "Unable to update state value: " << e.what(); } } } } void ProcessorGraph::Retrieve(YAML::Node &node) { // YAML // processor: // state: // make sure node is a map if (!node.IsMap()) { throw InvalidProcessorError("No valid map with states found."); } // loop through all keys for (YAML::iterator it = node.begin(); it != node.end(); ++it) { std::string key = it->first.as(); if (it->second.IsMap()) { // find corresponding processor engine if (processors_.count(key) == 0) { LOG(ERROR) << "No processor named " << key; continue; } IProcessor *processor = processors_[key].second.get(); // loop through states for (YAML::iterator it2 = it->second.begin(); it2 != it->second.end(); ++it2) { try { auto state_name = it2->first.as(); // it2->second = processor->internal_RetrieveState( state_name ); auto pstate = processor->shared_state(state_name); if (pstate->external_permission() != Permission::NONE) { it2->second = pstate->get_string(); } else { throw std::runtime_error("Shared state " + state_name + " on processor " + key + " can not be read externally."); } } catch (std::exception &e) { it2->second = YAML::Null; LOG(ERROR) << "Unable to retrieve state value: " << e.what(); } } } else { // key points to shared state alias try { it->second = shared_state_map_.RetrieveAlias(key); } catch (std::exception &e) { it->second = YAML::Null; LOG(ERROR) << "Unable to retrieve state value: " << e.what(); } } } } void ProcessorGraph::Apply(YAML::Node &node) { // YAML // falcon: // version: 1.0 // graph: // processor: // method: // parameter: value if (!node.IsMap()) { throw InvalidProcessorError("No processors found in method definition."); } // loop through all processors, make sure value is another map for (YAML::iterator it = node.begin(); it != node.end(); ++it) { std::string processor_name = it->first.as(); if (!it->second.IsMap()) { LOG(ERROR) << "Invalid method definition for processor " << processor_name; continue; } // find corresponding processor engine if (processors_.count(processor_name) == 0) { LOG(ERROR) << "In method definition: no processor named " << processor_name; continue; } IProcessor *processor = processors_[processor_name].second.get(); // loop through all states for (YAML::iterator it2 = it->second.begin(); it2 != it->second.end(); ++it2) { try { it2->second = processor->internal_ApplyMethod( it2->first.as(), it2->second); } catch (std::exception &e) { it2->second = YAML::Null; LOG(ERROR) << "Unable to apply method: " << e.what(); } } } } std::string ProcessorGraph::ExportYAML() { std::string s = ""; YAML::Node node; YAML::Emitter out; node["falcon"]["version"] = 1.0; if (state_ != GraphState::NOGRAPH) { for (YAML::const_iterator it = yaml_["processors"].begin(); it != yaml_["processors"].end(); ++it) { std::vector processor_list = expandProcessorName(it->first.as()); for(auto &name : processor_list ){ node["graph"]["processors"][name] = this->processors_[name].second->ExportYAML(); node["graph"]["processors"][name]["class"] = this->processors_[name].first; if (yaml_["processors"][it->first]["options"]) { node["graph"]["processors"][name]["options"] = yaml_["processors"][it->first]["options"]; } if (yaml_["processors"][it->first]["advanced"]) { node["graph"]["processors"][name]["advanced"] = yaml_["processors"][it->first]["advanced"]; } } } for (auto &it : this->connections_) { node["graph"]["connections"].push_back(it.first.string() + "=" + it.second.string()); } node["graph"]["states"] = shared_state_map_.ExportYAML(); out << node; s = out.c_str(); } return s; }