Program Listing for File runinfo.hpp

Return to documentation for file (falcon/runinfo.hpp)

// ---------------------------------------------------------------------
// This file is part of falcon-core.
//
// Copyright (C) 2015, 2016, 2017 Neuro-Electronics Research Flanders
//
// Falcon-server is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Falcon-server is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with falcon-core. If not, see <http://www.gnu.org/licenses/>.
// ---------------------------------------------------------------------

#pragma once
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>

#include <atomic>
#include <chrono>
#include <cstdio>
#include <ctime>
#include <string>
#include <map>
#include <condition_variable>
#include <mutex>

#include "context.hpp"
#include "logging/log.hpp"
#include "utilities/time.hpp"

// forward declaration
namespace graph {
class ProcessorGraph;
}

class RunContext : public StorageContext {
 public:
  friend class graph::ProcessorGraph;
  friend class IProcessor;

 public:
  RunContext(GlobalContext &context, std::atomic<bool> &terminate_signal,
             std::string run_group_id, std::string run_id,
             std::string template_id, bool test_flag)
      : StorageContext(context), global_context_(context),
        start_time_(Clock::now()), stop_time_(start_time_),
        terminate_signal_(terminate_signal), template_id_(template_id),
        default_test_flag_(test_flag) {
    struct stat info;

    // use default run group id if none given
    run_group_id_ = run_group_id.empty() ? "default" : run_group_id;

    // add run group storage site
    add_storage_context("rungroup",
                        storage_context("runroot") + "/" + run_group_id_);

    if (!template_id.empty()) {
      add_storage_context("templatebase",
                          storage_context("rungroup") + "/" + template_id);
      if ((stat(storage_context("templatebase").c_str(), &info) < 0) ||
          !(info.st_mode & S_IFDIR)) {
        throw std::runtime_error(
            "Run source folder does not exist or is not accessible. (" +
            storage_context("templatebase") + ")");
      }
    }

    // create run group folder
    if (mkdir(storage_context("rungroup").c_str(),
              S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) != 0) {
      if (errno != EEXIST) {
        throw std::runtime_error("Cannot create run environment " +
                                 storage_context("rungroup"));
      }
    }

    // generate default destination
    if (run_id.empty()) {
      char buffer[20];

      time_t rawtime;
      struct tm *timeinfo;

      time(&rawtime);
      timeinfo = localtime(&rawtime);

      strftime(buffer, 20, "%Y%m%d_%H%M%S", timeinfo);

      run_id = buffer;
    }

    add_storage_context("runbase", storage_context("rungroup") + "/" + run_id);
    run_id_ = run_id;

    // create run destination folder
    if (mkdir(storage_context("runbase").c_str(),
              S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) != 0) {
      if (errno == EEXIST) {
        throw std::runtime_error("Run base folder already exists. (" +
                                 storage_context("runbase") + ")");
      } else {
        throw std::runtime_error("Cannot create run base folder " +
                                 storage_context("runbase"));
      }
    }

    // create symbolic link rungroup/_last_run pointing to run base folder
    std::string symlinkname = storage_context("rungroup") + "/_last_run";
    // remove old symlink (if present)
    std::remove(symlinkname.c_str());
    // create new symlink
    if (symlink(storage_context("runbase").c_str(), symlinkname.c_str()) != 0) {
      LOG(WARNING) << "Could not create symbolic link for last run.";
    }

    // create symbolic link runroot/_last_run_group pointing to run group folder
    symlinkname = storage_context("runroot") + "/_last_run_group";
    // remove old symlink (if present)
    std::remove(symlinkname.c_str());
    // create new symlink
    if (symlink(storage_context("rungroup").c_str(), symlinkname.c_str()) !=
        0) {
      LOG(WARNING) << "Could not create symbolic link for last run group.";
    }

    add_storage_context("lastrunbase",
                        storage_context("rungroup") + "/_last_run");
    add_storage_context("lastrungroup",
                        storage_context("runroot") + "/_last_run_group");

    set_default_context("runbase");
  }

  GlobalContext &global() { return global_context_; }

  bool terminated() const { return terminate_signal_.load(); }

  void Terminate() {
    if (!terminate_signal_.exchange(true)) {
      stop_time_ = Clock::now();
    }
  }
  void TerminateWithError(std::string processor_name, std::string step,
                          std::string error_message) {
    if (!terminate_signal_.exchange(true)) {
      stop_time_ = Clock::now();
      error_message_ = "Processor `" + processor_name + "` failed in `" + step +
                       "`: " + error_message;
    }
  }

  TimePoint start_time() const { return start_time_; }
  TimePoint stop_time() const {
    if (terminated()) {
      return stop_time_;
    } else {
      return Clock::now();
    }
  }
  int minutes() const {
    return (std::chrono::duration_cast<std::chrono::minutes>(stop_time() -
                                                             start_time())
                .count());
  }
  int seconds() const {
    return (std::chrono::duration_cast<std::chrono::seconds>(stop_time() -
                                                             start_time())
                .count());
  }

  std::string error_message() {
    if (terminated()) {
      return error_message_;
    } else {
      return "";
    }
  }
  bool error() {
    if (terminated()) {
      return error_message_.size() > 0;
    } else {
      return false;
    }
  }

  std::string run_group_id() const { return run_group_id_; }
  std::string run_id() const { return run_id_; }
  std::string template_id() const { return template_id_; }

  bool test() const { return default_test_flag_.load(); }

 protected:
  std::mutex mutex;
  std::condition_variable go_condition;
  bool go_signal = false;

 private:
  GlobalContext &global_context_;

 protected:
  TimePoint start_time_;
  TimePoint stop_time_;
  std::atomic<bool> &terminate_signal_;
  std::string error_message_;

 private:
  std::string run_group_id_;
  std::string run_id_;
  std::string template_id_;
  std::atomic<bool> default_test_flag_;
};

class ProcessingContext : public StorageContext {
 public:
  ProcessingContext(RunContext &context, std::string processor_name, bool test)
      : StorageContext(context), run_context_(context),
        processor_name_(processor_name), test_flag_(test) {
    add_storage_context("run",
                        storage_context("runbase") + "/" + processor_name);
    if (mkdir(storage_context("run").c_str(),
              S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) != 0) {
      if (errno != EEXIST) {
        throw std::runtime_error("Cannot create processor run environment " +
                                 storage_context("run"));
      }
    }

    if (storage_map().count("templatebase") == 1) {
      add_storage_context("template", storage_context("templatebase") + "/" +
                                          processor_name);
    }

    // add processor specific test storage site
    if (test) {
      add_storage_context("test", storage_context("run") + "/test");
      if (mkdir(storage_context("test").c_str(),
                S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) != 0) {
        if (errno != EEXIST) {
          throw std::runtime_error("Cannot create processor test environment " +
                                   storage_context("test"));
        }
      }

      if (storage_map().count("template") == 1) {
        add_storage_context("templatetest",
                            storage_context("template") + "/test");
      }
    }

    add_storage_context("lastrun",
                        storage_context("lastrunbase") + "/" + processor_name);
    add_storage_context("lasttest", storage_context("lastrun") + "/test");
    set_default_context("run");
  }

  RunContext &run() { return run_context_; }

  bool test() const { return test_flag_.load(); }

  bool terminated() const { return run_context_.terminated(); }
  void Terminate() { run_context_.Terminate(); }
  void TerminateWithError(std::string step, std::string error_message) {
    run_context_.TerminateWithError(processor_name_, step, error_message);
  }

 private:
  RunContext &run_context_;
  std::string processor_name_;
  std::atomic<bool> test_flag_;
  std::map<std::string, std::string> storage_map_;
};