Program Listing for File processorgraph.hpp

Return to documentation for file (falcon/processorgraph.hpp)

// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------

#pragma once

#include <cstring>
#include <exception>
#include <map>
#include <memory>
#include <utility>
#include <string>
#include <vector>

#include "yaml-cpp/yaml.h"
#include "logging/log.hpp"
#include "iprocessor.hpp"
#include "graphexceptions.hpp"
#include "connectionparser.hpp"
#include "iprocessor.hpp"
#include "runinfo.hpp"

YAML::Node LoadProcessorDoc(std::string processor);



namespace graph {

enum class GraphState {
  NOGRAPH,
  CONSTRUCTING,
  PREPARING,
  READY,
  STARTING,
  PROCESSING,
  STOPPING,
  ERROR
};

class ProcessorGraph {
 public:
  ProcessorGraph(GlobalContext &context);

  bool terminated() { return terminate_signal_.load(); }

  bool done() {
    // graph is done, if it has terminated or PROCESSING and no processor is
    // running
    if (state_ != GraphState::PROCESSING) {
      return false;
    }

    if (terminated()) {
      LOG(DEBUG) << "done: terminated=true.";
      return true;
    }

    return !any_processor_running();
  }

  bool all_processors_running() {
    for (auto &it : processors_) {
      if (!it.second.second->running()) {
        return false;
      }
    }
    return true;
  }
  bool any_processor_running() {
    for (auto &it : processors_) {
      if (it.second.second->running()) {
        return true;
      }
    }
    return false;
  }

  void ConstructProcessorEngines(const YAML::Node &node);

  YAML::Node GetProcessorDocumentation();

  void Build(const YAML::Node &node);
  void Destroy();
  void StartProcessing(std::string run_group_id, std::string run_id,
                       std::string template_id, bool test_flag);
  void StopProcessing();
  void Update(YAML::Node &node);
  void Retrieve(YAML::Node &node);
  void Apply(YAML::Node &node);

  std::string ExportYAML();

  const GraphState state() const { return state_; }
  std::string state_string() const;
  void set_state(GraphState state) {
    state_ = state;
    LOG(STATE) << state_string();
  }

  const ProcessorMap &processors() const { return processors_; }
  const StreamConnections &connections() const { return connections_; }

  IProcessor *LookUpProcessor(std::string name);
  std::vector<std::pair<std::string, std::shared_ptr<IState>>>
  LookUpStates(std::vector<std::string> state_addresses);

  void BuildSharedStates(const YAML::Node &node);

 protected:
  void CreateConnection(SlotAddress &out, SlotAddress &in);

 private:
  YAML::Node yaml_;
  YAML::Node documentation_;

  GlobalContext &global_context_;

  ProcessorMap processors_;
  StreamConnections connections_;

  SharedStateMap shared_state_map_;

  GraphState state_ = GraphState::NOGRAPH;

  std::unique_ptr<RunContext> run_context_;
  std::atomic<bool> terminate_signal_;
};

}  // namespace graph