Program Listing for File iprocessor.cpp

Return to documentation for file (falcon/iprocessor.cpp)

// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------
#include <fstream>
#include <iostream>
#include <regex>

#include "iprocessor.hpp"
#include "logging/log.hpp"
#include "utilities/general.hpp"

void convert_name(std::string &s) {
  if (std::regex_match(s, std::regex("^\\w(?:(?:[ -][\\w])|\\w)*$"))) {
    s = std::regex_replace(s, std::regex("[ _]"), "-");
  } else {
    throw ProcessorInternalError(s + " is not a valid name.");
  }
}

const std::set<std::string> IProcessor::input_port_names() const {
  std::set<std::string> names;
  for (auto &it : input_ports_) {
    names.insert(it.first);
  }
  return names;
}

const std::set<std::string> IProcessor::output_port_names() const {
  std::set<std::string> names;
  for (auto &it : output_ports_) {
    names.insert(it.first);
  }
  return names;
}

YAML::Node IProcessor::ExportYAML() {
  YAML::Node node;

  for (auto &it : input_ports_) {
    node["inports"][it.first] = it.second->ExportYAML();
  }

  for (auto &it : output_ports_) {
    node["outports"][it.first] = it.second->ExportYAML();
  }

  for (auto &it : shared_states_) {
    node["states"][it.first]["permission"] =
        permission_to_string(it.second->external_permission());
    if (it.second->external_permission() != Permission::NONE) {
      node["states"][it.first]["value"] = it.second->get_string();
    }
    node["states"][it.first]["description"] = it.second->description();
  }

  for (auto &it : exposed_methods_) {
    node["methods"].push_back(it.first);
  }

  return node;
}

void IProcessor::remove_option(std::string name) { options_.remove(name); }

IPortIn *IProcessor::input_port(const PortAddress &address) {
  return input_port(address.port());
}

IPortOut *IProcessor::output_port(const PortAddress &address) {
  return output_port(address.port());
}

ISlotIn *IProcessor::input_slot(const SlotAddress &address) {
  return input_port(address.port())->slot(address.slot());
}

ISlotOut *IProcessor::output_slot(const SlotAddress &address) {
  return output_port(address.port())->slot(address.slot());
}

std::string IProcessor::default_input_port() const {
  if (input_ports_.size() != 1) {
    throw std::runtime_error(
        "Cannot determine default input port for processor \"" + name() +
        "\".");
  }
  return input_ports_.begin()->first;
}

std::string IProcessor::default_output_port() const {
  if (output_ports_.size() != 1) {
    throw ProcessorInternalError("Cannot determine default output port.",
                                 name());
  }
  return output_ports_.begin()->first;
}

void IProcessor::CompleteStreamInfo() {
  for (auto &it : output_ports_) {
    for (int k = 0; k < it.second->number_of_slots(); ++k) {
      it.second->slot(k)->streaminfo().Finalize();
    }
  }
}

void IProcessor::internal_Configure(const YAML::Node &node,
                                    const GlobalContext &context) {
  YAML::Node empty_node(YAML::NodeType::Map);
  try {
      if (!node["options"]) {
        // to trigger check of required options
        options_.from_yaml(empty_node);
      } else {
        options_.from_yaml(node["options"]);
      }

      if (!node["advanced"]) {
        // to trigger check of required options
        advanced_options_.from_yaml(empty_node);
      } else {
        advanced_options_.from_yaml(node["advanced"]);
      }
  } catch (const std::runtime_error& error) {
      throw std::runtime_error(name() +": " + error.what());

  }

  Configure(context);
}

void IProcessor::internal_CreatePorts() {
  CreatePorts();
  // set requested buffer sizes

  if (requested_buffer_sizes_.is_null()) {
    return;
  }

  for (auto &it : requested_buffer_sizes_()) {
    if (!has_output_port(it.first) || it.second < 2) {
      LOG(WARNING) << "Could not set ringbuffer size to " << it.second
                   << " for port " << name() << "." << it.first;
    } else {
      output_port(it.first)->set_buffer_size(it.second);
      LOG(INFO) << "Set ringbuffer size to " << it.second << " for port "
                << name() << "." << it.first;
    }
  }
}

void IProcessor::internal_PrepareConnectionIn(SlotAddress &address) {
  if (address.processor() != name()) {
    throw std::runtime_error(
        "Internal error: processor name does not match address.");
  }

  // get default port if needed
  if (address.port() == "") {
    address.set_port(default_input_port());
  }

  // test if port exists
  if (!has_input_port(address.port())) {
    throw std::out_of_range("Unknown input port \"" + address.processor() +
                            "." + address.port() + "\".");
  }

  // test if slot is valid and create new slot if needed
  int slot = input_port(address)->ReserveSlot(address.slot());

  // and update slot in address
  if (slot < 0) {
    throw std::out_of_range("Unable to reserve slot \"" +
                            std::to_string(address.slot()) +
                            "\" for input port \"" + address.processor() + "." +
                            address.port() + "\".");
  }

  address.set_slot(slot);
  address.set_port_datatype(input_port(address)->datatype());
}

void IProcessor::internal_PrepareConnectionOut(SlotAddress &address) {
  if (address.processor() != name()) {
    throw std::runtime_error(
        "Internal error: processor name does not match address.");
  }

  // get default port if needed
  if (address.port() == "") {
    address.set_port(default_output_port());
  }

  // test if port exists
  if (!has_output_port(address.port())) {
    throw std::out_of_range("Unknown output port \"" + address.port() +
                            "\" on processor \"" + address.processor() + "\".");
  }

  // test if slot is valid and create new one if necessary
  int slot = output_port(address)->ReserveSlot(address.slot());

  // and update slot in address
  if (slot < 0) {
    throw std::out_of_range("Unable to reserve slot \"" +
                            std::to_string(address.slot()) +
                            "\" for output port \"" + address.processor() +
                            "." + address.port() + "\".");
  }

  address.set_slot(slot);
  address.set_port_datatype(output_port(address)->datatype());
}

void IProcessor::internal_ConnectionCompatibilityCheck(
    const SlotAddress &address, IProcessor *upstream,
    const SlotAddress &upstream_address) {
  try {
    input_port(address)->VerifyCompatibility(
        upstream->output_port(upstream_address));
  } catch (std::exception &e) {
    throw ProcessorInternalError(
        std::string("Incompatible ports ") + upstream_address.string(true) +
            " -> " + address.string(true) + " (" + e.what() + ")",
        name());
  }
}

void IProcessor::internal_ConnectIn(const SlotAddress &address,
                                    IProcessor *upstream,
                                    const SlotAddress &upstream_address) {
  input_port(address)->Connect(address.slot(),
                               upstream->output_slot(upstream_address));
}

void IProcessor::internal_ConnectOut(const SlotAddress &address,
                                     IProcessor *downstream,
                                     const SlotAddress &downstream_address) {
  output_port(address)->Connect(address.slot(),
                                downstream->input_slot(downstream_address));
}

void IProcessor::internal_NegotiateConnections() {
  if (!negotiated_) {
    // check if all input slots are connected
    for (auto &it : input_ports_) {
      for (int k = 0; k < it.second->number_of_slots(); ++k) {
        if (!it.second->slot(k)->connected()) {
          LOG(ERROR) << name() << ": input slot \"" << it.first + "."
                     << std::to_string(k) << "\" is not connected.";
          throw ProcessorInternalError("input slot \"" + it.first + "." +
                                           std::to_string(k) +
                                           "\" is not connected.",
                                       name());
        }

        try {
          it.second->slot(k)->Validate();
        } catch (std::exception &e) {
          LOG(ERROR) << name() << ": Incompatible stream on slot \"" << it.first
                     << "." << std::to_string(k) << "\" (" << e.what() << ")";
          throw ProcessorInternalError("Incompatible stream on slot \"" +
                                           it.first + "." + std::to_string(k) +
                                           "\" (" + e.what() + ")",
                                       name());
        }
      }
    }

    for (auto &it : output_ports_) {
      for (int k = 0; k < it.second->number_of_slots(); ++k) {
        if (!it.second->slot(k)->connected()) {
          LOG(WARNING) << name() << ": output slot \"" << it.first + "."
                       << std::to_string(k) << "\" is not connected.";
        }
      }
    }

    CompleteStreamInfo();

    // OK, so let's finalize right here, locking streaminfo forever after
    // this also requires that set_stream_rate and set_parameters check &
    // respect the lock
    for (auto &it : output_ports_) {
      for (int k = 0; k < it.second->number_of_slots(); ++k) {
        it.second->slot(k)->streaminfo().Finalize();
      }
    }

    negotiated_ = true;
  }
}

void IProcessor::internal_CreateRingBuffers() {
  for (auto &it : output_ports_) {
    it.second->CreateRingBuffers();
  }
}

void IProcessor::internal_PrepareProcessing() {
  for (auto &it : input_ports_) {
    it.second->PrepareProcessing();
  }

  // reset all output slot cursors to 0
  for (auto &it : output_ports_) {
    it.second->PrepareProcessing();
  }
}

void IProcessor::internal_ThreadEntry(RunContext &runcontext) {
  LOG(DEBUG) << "Entering thread for processor " << name_;

  // ProcessingContext context( runcontext, name_, has_test_flag_.load() ?
  // test_flag_.load() : runcontext.test() );
  ProcessingContext context(runcontext, name_,
                            new_test_flag_.is_null() ? runcontext.test()
                                                     : new_test_flag_());

  LOG(DEBUG) << name_ << ": processor test flag set to " << context.test();

  internal_PrepareProcessing();

  try {
    TestPrepare(context);
  } catch (std::exception &e) {
    context.TerminateWithError("TestPrepare", e.what());
  }

  try {
    Preprocess(context);
  } catch (std::exception &e) {
    context.TerminateWithError("PreProcess", e.what());
  }

  running_.store(true);

  // wait for the go signal
  {
    std::unique_lock<std::mutex> lock(runcontext.mutex);
    while (!runcontext.go_signal) {
      runcontext.go_condition.wait(lock);
    }
  }

  try {
    Process(context);
  } catch (std::exception &e) {
    context.TerminateWithError("Process", e.what());
  }

  try {
    Postprocess(context);
  } catch (std::exception &e) {
    context.TerminateWithError("PostProcess", e.what());
  }

  try {
    TestFinalize(context);
  } catch (std::exception &e) {
    context.TerminateWithError("TestFinalize", e.what());
  }

  running_.store(false);

  LOG(DEBUG) << "Exiting thread for processor " << name_;
}

void IProcessor::internal_Start(RunContext &runcontext) {
  if (!running_) {
    internal_Stop();

    thread_ = std::thread(&IProcessor::internal_ThreadEntry, this,
                          std::ref(runcontext));

    if (!set_realtime_priority(thread_.native_handle(), thread_priority())) {
      LOG(WARNING) << "Unable to set thread priority for " << name_;
    } else if (thread_priority() >= PRIORITY_LOW) {
      LOG(INFO) << "Successfully set thread priority for " << name_ << " to "
                << thread_priority() << "%.";
    }

    if (!set_thread_core(thread_.native_handle(), thread_core())) {
      LOG(WARNING) << "Unable to pin thread for " << name_ << " to core "
                   << thread_core();
    } else if (thread_core() >= 0) {
      LOG(INFO) << "Successfully pinned thread for " << name_ << " to core "
                << thread_core() << ".";
    }
  }
}

void IProcessor::internal_Stop() {
  if (thread_.joinable())
    thread_.join();
  LOG(DEBUG) << name() << ": thread joined";
}

void IProcessor::internal_Alert() {
  for (auto &it : output_ports_) {
    it.second->UnlockSlots();
  }
  for (auto &it : input_ports_) {
    it.second->UnlockSlots();
  }
}

YAML::Node IProcessor::internal_ApplyMethod(std::string name,
                                            const YAML::Node &node) {
  return exposed_method(name)(node);
}

void IProcessor::create_file(std::string prefix, std::string variable_name,
                             std::string extension) {
  std::string full_path = prefix + "." + variable_name + "." + extension;
  if (path_exists(full_path)) {
    throw ProcessorInternalError(
        "Output file " + full_path + " already exists.", name());
  }

  // unique_ptr gives compilation error for unknown reasons
  auto stream = std::shared_ptr<std::ostream>(
      new std::ofstream(full_path, std::ofstream::out | std::ofstream::binary));
  if (!stream->good()) {
    throw ProcessorInternalError("Error opening output file " + full_path + ".",
                                 name());
  } else {
    LOG(INFO) << name() << ". " + full_path + " opened correctly for writing";
  }
  streams_[variable_name] = std::move(stream);
}

void IProcessor::prepare_latency_test(ProcessingContext &context) {
  auto path = context.resolve_path("test://", "test");
  create_file(path + name(), "SourceTimestamps");
  LOG(UPDATE) << name()
              << ". Resizing the source timestamp vector for testing ...";
  // reserve enough memory for the test
  test_source_timestamps_.resize(MAX_TEST_SAMPLING_FREQUENCY *
                                 (3600 * MAX_N_HOURS_TEST));
  LOG(INFO) << name() << ". Source timestamp vector resized with "
            << test_source_timestamps_.size() << " elements";
}

void IProcessor::save_source_timestamps_to_disk(std::uint64_t n_timestamps) {
  test_source_timestamps_.resize(n_timestamps);
  for (auto source_ts : test_source_timestamps_) {
    streams_["SourceTimestamps"]->write(
        reinterpret_cast<const char *>(&source_ts), sizeof(TimePoint));
  }
  LOG(INFO) << name() << ". " << test_source_timestamps_.size()
            << " source timestamps were written to disk.";
}