Program Listing for File graphmanager.cpp

Return to documentation for file (falcon/graphmanager.cpp)

// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include <unistd.h>
#include <fstream>

#include "graphmanager.hpp"
#include "utilities/general.hpp"
#include "logging/log.hpp"
#include "utilities/zmqutil.hpp"


using namespace graph;

GraphManager::GraphManager(GlobalContext &context)
    : global_context_(&context), graph_(context) {}

void GraphManager::HandleCommand(std::string command,
                                 std::deque<std::string> &extra,
                                 std::deque<std::string> &reply) {
  if (command == "build") {
    if (extra.size() < 1) {
      throw std::runtime_error("Missing YAML graph definition.");
    }
    YAML::Node node = YAML::Load(extra[0]);

    if (!node.IsMap()) {
      std::string file = node.as<std::string>();
      file = global_context_->resolve_path(file, "graphs");
      try {
        node = YAML::LoadFile(file);
      } catch (YAML::BadFile &e) {
        throw std::runtime_error("Cannot open YAML graph definition file " +
                                 file + ". Check if file actually exists.");
      }
    }
    ParseGraph(node);
    // save YAML to global_context_.resolve_path( "graphs://_last_graph" )
    std::ofstream fout(
        global_context_->resolve_path("graphs://_last_graph.yaml"));
    fout << node;

  } else if (command == "destroy") {
    graph_.Destroy();
  } else if (command == "start" || command == "test") {
    std::string run_env = extra.size() > 0 ? extra[0] : "";
    std::string destination = extra.size() > 1 ? extra[1] : "";
    std::string source = extra.size() > 2 ? extra[2] : "";
    graph_.StartProcessing(run_env, destination, source,
                           command == "test" || global_context_->test());
  } else if (command == "stop") {
    graph_.StopProcessing();
  } else if (command == "state") {
    reply.push_back(graph_.state_string());
  } else if (command == "update") {
    if (extra.size() > 0) {
      YAML::Node node = YAML::Load(extra[0]);
      graph_.Update(node);
      YAML::Emitter out;
      out << node;
      reply.push_back(std::string(out.c_str()));
    }
  } else if (command == "retrieve") {
    if (extra.size() > 0) {
      YAML::Node node = YAML::Load(extra[0]);
      graph_.Retrieve(node);
      YAML::Emitter out;
      out << node;
      reply.push_back(std::string(out.c_str()));
    }
  } else if (command == "apply") {
    if (extra.size() > 0) {
      YAML::Node node = YAML::Load(extra[0]);
      graph_.Apply(node);
      YAML::Emitter out;
      out << node;
      reply.push_back(std::string(out.c_str()));
    }
  } else if (command == "documentation") {
    YAML::Node docs = graph_.GetProcessorDocumentation();
    YAML::Emitter out;
    out << docs;
    reply.push_back(std::string(out.c_str()));
  } else if (command == "yaml") {
    reply.push_back(graph_.ExportYAML());
  } else {
    throw std::runtime_error("Unknown graph command \"" + command + "\".");
  }
}

void GraphManager::ParseGraph(YAML::Node &node) {
  if (node["graph"]) {
    if (node["processors"]) {
      LOG(WARNING)
          << "Detected mixed use of old and new style graph definition."
             " Only the new style graph definition will be used and top-level "
             "processors, connections"
             " and states maps will be ignored.";
    }

    if (!node["graph"].IsMap()) {
      std::string graph_template_path =
          global_context_->resolve_path(node["graph"].as<std::string>());
      try {
        node["graph"] = YAML::LoadFile(graph_template_path);
      } catch (YAML::BadFile &e) {
        throw std::runtime_error(
            "Cannot open YAML graph template definition file " +
            graph_template_path + ". Check if file actually exists.");
      }
    }

    if (node["options"]) {
      YAML::Node options_node;
      if (!node["options"].IsMap()) {
        std::string graph_options_path =
            global_context_->resolve_path(node["options"].as<std::string>());

        try {
          options_node = YAML::LoadFile(graph_options_path);
        } catch (YAML::BadFile &e) {
          throw std::runtime_error(
              "Cannot open YAML graph options definition file " +
              graph_options_path + ". Check if file actually exists.");
        }

      } else {
        options_node = node["options"];
      }

      for (YAML::const_iterator it = options_node.begin();
           it != options_node.end(); ++it) {
        std::string processor_name = it->first.as<std::string>();
        if (!node["graph"]["processors"][processor_name]) {
          throw std::runtime_error(
              "Mismatch between the options graph and the template graph.");
        }

        for (YAML::const_iterator options_type_it = it->second.begin();
             options_type_it != it->second.end(); ++options_type_it) {
          for (YAML::const_iterator options_it =
                   options_type_it->second.begin();
               options_it != options_type_it->second.end(); ++options_it) {
            std::string processor_option_name =
                options_it->first.as<std::string>();
            node["graph"]["processors"][processor_name]
                [options_type_it->first.as<std::string>()]
                [processor_option_name] = options_it->second;
          }
        }
      }
    }
    graph_.Build(node["graph"]);
  } else if (node["processors"]) {
    LOG(WARNING) << "The graph definition seems to have the server-side format. "
                    "Consider to use a user-side format to override options.";
    graph_.Build(node);
  } else {
    throw std::runtime_error("Invalid graph description.");
  }
}

void GraphManager::Run() {
  // initialize
  zmq::socket_t socket(global_context_->zmq(), ZMQ_REP);
  socket.bind("inproc://graph");

  zmq_frames request;
  zmq_frames reply;

  while (!terminated()) {
    // sleep a bit, since we are continuously polling
    usleep(1000);   // 1 msec

    // process commands
    request.clear();
    if (s_nonblocking_recv_multi(socket, request)) {
      // handle command

      reply.clear();

      std::string command = request[0];
      request.pop_front();

      LOG(DEBUG) << "GraphManager received command \"" << command << "\"";

      try {
        HandleCommand(command, request, reply);

        if (reply.size() == 0) {
          reply.push_back("OK");
        }
      } catch (GraphException &e) {
        if (e.isFatal()) {
          reply.push_back("ERR");
        } else {
          reply.push_back("WARN");
        }
        reply.push_back(e.gettype());
        reply.push_back(e.what());

        LOG(ERROR) << "Error handling command: " << command
                   << " Error type: " << e.gettype() << "  Error: " << e.what();
      } catch (std::exception &e) {
        reply.push_back("ERR");
        reply.push_back("exception");
        reply.push_back(e.what());

        LOG(ERROR) << "Error handling command: " << command
                   << " Error: " << e.what();
      } catch (...) {
        reply.push_back("ERR");
        reply.push_back("Unknown");
        reply.push_back("Unknown error.");

        LOG(ERROR) << "Error handling command: " << command;
      }

      // reply
      s_send_multi(socket, reply);

      LOG(DEBUG) << "GraphManager replied to command \"" << command
                 << "\" with \""
                 << join(reply.begin(), reply.end(), std::string(" | "))
                 << "\"";
    }

    // check if graph processing was terminated by a processor
    // or if all processors are done and waiting to be killed
    if (graph_.done()) {
      LOG(DEBUG) << "Processing is done.";
      graph_.StopProcessing();
    }
  }

  // finish
}