From 2836f43c78b42f5c6ba126b67d3b816c79f39e37 Mon Sep 17 00:00:00 2001 From: KyleKlenk <kyle.c.klenk@gmail.com> Date: Tue, 21 Jun 2022 15:41:08 -0600 Subject: [PATCH] Compiles with new actors deleted unnecessary files --- build/includes/summa_actor/summa_client.hpp | 16 + build/includes/summa_actor/summa_server.hpp | 14 + build/makefile | 10 +- .../actors/file_access_actor/FileAccess.h | 85 --- .../file_access_actor/FileAccessActor.h | 372 ------------- .../actors/file_access_actor/OutputManager.h | 306 ----------- .../source/actors/global/fortran_dataTypes.h | 102 ---- build/source/actors/global/global.h | 27 - build/source/actors/global/messageAtoms.h | 44 -- build/source/actors/hru_actor/HRU.h | 139 ----- build/source/actors/hru_actor/HRUActor.h | 504 ------------------ .../hru_actor/hru_subroutine_wrappers.h | 104 ---- build/source/actors/job_actor/Job.h | 72 --- .../job_actor/job_subroutine_wrappers.h | 12 - build/source/actors/main.cpp | 74 +-- build/source/actors/summa_actor/SummaActor.h | 120 ----- .../source/actors/summa_actor/SummaManager.h | 27 - .../{SummaClient.h => summa_client.cpp} | 26 +- .../{SummaServer.h => summa_server.cpp} | 18 +- 19 files changed, 87 insertions(+), 1985 deletions(-) create mode 100644 build/includes/summa_actor/summa_client.hpp create mode 100644 build/includes/summa_actor/summa_server.hpp delete mode 100644 build/source/actors/file_access_actor/FileAccess.h delete mode 100644 build/source/actors/file_access_actor/FileAccessActor.h delete mode 100644 build/source/actors/file_access_actor/OutputManager.h delete mode 100644 build/source/actors/global/fortran_dataTypes.h delete mode 100644 build/source/actors/global/global.h delete mode 100644 build/source/actors/global/messageAtoms.h delete mode 100644 build/source/actors/hru_actor/HRU.h delete mode 100644 build/source/actors/hru_actor/HRUActor.h delete mode 100644 build/source/actors/hru_actor/hru_subroutine_wrappers.h delete mode 100644 build/source/actors/job_actor/Job.h delete mode 100644 build/source/actors/job_actor/job_subroutine_wrappers.h delete mode 100644 build/source/actors/summa_actor/SummaActor.h delete mode 100644 build/source/actors/summa_actor/SummaManager.h rename build/source/actors/summa_actor/{SummaClient.h => summa_client.cpp} (84%) rename build/source/actors/summa_actor/{SummaServer.h => summa_server.cpp} (73%) diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp new file mode 100644 index 0000000..28d5b8f --- /dev/null +++ b/build/includes/summa_actor/summa_client.hpp @@ -0,0 +1,16 @@ +#pragma once + +#include "caf/all.hpp" +#include "caf/io/all.hpp" + +namespace caf { + +struct summa_client_state { + strong_actor_ptr current_server; +}; +behavior summa_client(stateful_actor<summa_client_state>* self); +behavior unconnected(stateful_actor<summa_client_state>*); +void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port); +behavior running(stateful_actor<summa_client_state>*, const actor& summa_server); + +} \ No newline at end of file diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp new file mode 100644 index 0000000..bed891e --- /dev/null +++ b/build/includes/summa_actor/summa_server.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "caf/all.hpp" +#include "caf/io/all.hpp" + +namespace caf { + +struct summa_server_state { + +}; + +behavior summa_server(stateful_actor<summa_server_state>* self); + +} \ No newline at end of file diff --git a/build/makefile b/build/makefile index ef56fd2..9c0bace 100644 --- a/build/makefile +++ b/build/makefile @@ -251,6 +251,8 @@ GLOBAL = $(SOURCE_DIR)/global/global.cpp SUMMA_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/summa_actor SUMMA_ACTOR = $(SOURCE_DIR)/summa_actor/summa_actor.cpp +SUMMA_CLIENT = $(SOURCE_DIR)/summa_actor/summa_client.cpp +SUMMA_SERVER = $(SOURCE_DIR)/summa_actor/summa_server.cpp JOB_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/job_actor JOB_ACTOR = $(SOURCE_DIR)/job_actor/job_actor.cpp @@ -280,7 +282,7 @@ all: fortran cpp fortran: compile_noah compile_comm compile_summa link clean_fortran cpp: compile_globals compile_hru_actor compile_file_access_actor compile_job_actor compile_summa_actor \ - compile_main link_cpp + compile_summa_client compile_summa_server compile_main link_cpp clean_cpp test: actors_test actors_testLink actorsClean @@ -332,6 +334,12 @@ compile_summa_actor: $(CC) $(FLAGS_ACTORS) -c $(SUMMA_ACTOR) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) \ $(JOB_ACTOR_INCLUDES) +compile_summa_client: + $(CC) $(FLAGS_ACTORS) -c $(SUMMA_CLIENT) $(SUMMA_ACTOR_INCLUDES) + +compile_summa_server: + $(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(SUMMA_ACTOR_INCLUDES) + compile_main: $(CC) $(FLAGS_ACTORS) -c $(MAIN) $(GLOBAL_INCLUDES) $(SUMMA_ACTOR_INCLUDES) $(JOB_ACTOR_INCLUDES) diff --git a/build/source/actors/file_access_actor/FileAccess.h b/build/source/actors/file_access_actor/FileAccess.h deleted file mode 100644 index 9dee306..0000000 --- a/build/source/actors/file_access_actor/FileAccess.h +++ /dev/null @@ -1,85 +0,0 @@ -#ifndef FILEACCESS_H_ -#define FILEACCESS_H_ -#include "caf/all.hpp" - -#include "../global/fortran_dataTypes.h" -#include "../global/messageAtoms.h" -#include "../global/global.h" -#include "../global/json.hpp" -#include "fileAccess_subroutine_wrappers.h" -#include "OutputManager.h" -#include <vector> -#include <chrono> - - - -class forcingFile { - private: - int fileID; // which file are we relative the forcing file list saved in fortran - int numSteps; // the number of steps in this forcing file - bool isLoaded; // is this file actually loaded in to RAM yet. - public: - forcingFile(int fileID) { - this->fileID = fileID; - this->numSteps = 0; - this->isLoaded = false; - } - - int getNumSteps() { - return this->numSteps; - } - - bool isFileLoaded() { - return this->isLoaded; - } - - void updateIsLoaded() { - this->isLoaded = true; - } - - void updateNumSteps(int numSteps) { - this->numSteps = numSteps; - this->isLoaded = true; - } -}; - -struct file_access_state { - // Variables set on Spwan - caf::actor parent; - int startGRU; - int numGRU; - - - void *handle_forcFileInfo = new_handle_file_info(); // Handle for the forcing file information - void *handle_ncid = new_handle_var_i(); // output file ids - OutputManager *output_manager; - int num_vectors_in_output_manager; - int num_steps; - int outputStrucSize; - int stepsInCurrentFile; - int numFiles; - int filesLoaded; - int err = 0; - - std::vector<forcingFile> forcFileList; // list of steps in file - std::vector<bool> outputFileInitHRU; - - std::chrono::time_point<std::chrono::system_clock> readStart; - std::chrono::time_point<std::chrono::system_clock> readEnd; - double readDuration = 0.0; - - std::chrono::time_point<std::chrono::system_clock> writeStart; - std::chrono::time_point<std::chrono::system_clock> writeEnd; - double writeDuration = 0.0; - - -}; - - - - - - - - -#endif \ No newline at end of file diff --git a/build/source/actors/file_access_actor/FileAccessActor.h b/build/source/actors/file_access_actor/FileAccessActor.h deleted file mode 100644 index 61c4da3..0000000 --- a/build/source/actors/file_access_actor/FileAccessActor.h +++ /dev/null @@ -1,372 +0,0 @@ -#ifndef FILEACCESSACTOR_H_ -#define FILEACCESSACTOR_H_ - -#include "FileAccess.h" - -using namespace caf; -using json = nlohmann::json; - - -void initalizeFileAccessActor(stateful_actor<file_access_state>* self); -int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, int numStepsToWrite, int returnMessage, caf::actor actorRef); -int readForcing(stateful_actor<file_access_state>* self, int currentFile); -int write(stateful_actor<file_access_state>* self, int listIndex); -int parseSettings(stateful_actor<file_access_state>* self, std::string configPath); - - - -behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU, - int outputStrucSize, std::string configPath, actor parent) { - // Set File_Access_Actor variables - self->state.parent = parent; - self->state.numGRU = numGRU; - self->state.startGRU = startGRU; - self->state.outputStrucSize = outputStrucSize; - - // Get Settings from configuration file - if (parseSettings(self, configPath) == -1) { - aout(self) << "Error with JSON Settings File!!!\n"; - self->quit(); - } else { - aout(self) << "\nSETTINGS FOR FILE_ACCESS_ACTOR\n" << - "Number of Vectors in Output Structure = " << self->state.num_vectors_in_output_manager << "\n"; - } - initalizeFileAccessActor(self); - - return { - [=](initalize_outputStructure) { - aout(self) << "Initalizing Output Structure" << std::endl; - Init_OutputStruct(self->state.handle_forcFileInfo, &self->state.outputStrucSize, - &self->state.numGRU, &self->state.err); - }, - - [=](write_param, int indxGRU, int indxHRU) { - int err; - err = 0; - Write_HRU_Param(self->state.handle_ncid, &indxGRU, &indxHRU, &err); - if (err != 0) { - aout(self) << "ERROR: Write_HRU_PARAM -- For HRU = " << indxHRU << "\n"; - } - }, - - [=](access_forcing, int currentFile, caf::actor refToRespondTo) { - // aout(self) << "Received Current FIle = " << currentFile << std::endl; - if (currentFile <= self->state.numFiles) { - if(self->state.forcFileList[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1 - // aout(self) << "ForcingFile Already Loaded \n"; - self->send(refToRespondTo, run_hru_v, - self->state.forcFileList[currentFile - 1].getNumSteps()); - - } else { - self->state.readStart = std::chrono::high_resolution_clock::now(); - - // Load the file - FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, ¤tFile, - &self->state.stepsInCurrentFile, &self->state.startGRU, - &self->state.numGRU, &self->state.err); - if (self->state.err != 0) { - aout(self) << "ERROR: Reading Forcing" << std::endl; - } - self->state.filesLoaded += 1; - self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile); - - self->state.readEnd = std::chrono::high_resolution_clock::now(); - self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd); - // Check if we have loaded all forcing files - if(self->state.filesLoaded <= self->state.numFiles) { - self->send(self, access_forcing_internal_v, currentFile + 1); - } - - self->send(refToRespondTo, run_hru_v, - self->state.forcFileList[currentFile - 1].getNumSteps()); - } - } else { - aout(self) << currentFile << " is larger than expected" << std::endl; - } - - }, - - [=](access_forcing_internal, int currentFile) { - if (self->state.filesLoaded <= self->state.numFiles && - currentFile <= self->state.numFiles) { - // aout(self) << "Loading in background, File:" << currentFile << "\n"; - if (self->state.forcFileList[currentFile - 1].isFileLoaded()) { - aout(self) << "File Loaded when shouldn't be \n"; - } - self->state.readStart = std::chrono::high_resolution_clock::now(); - FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, ¤tFile, - &self->state.stepsInCurrentFile, &self->state.startGRU, - &self->state.numGRU, &self->state.err); - if (self->state.err != 0) { - aout(self) << "ERROR: Reading Forcing" << std::endl; - } - self->state.filesLoaded += 1; - self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile); - - self->state.readEnd = std::chrono::high_resolution_clock::now(); - self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd); - - self->send(self, access_forcing_internal_v, currentFile + 1); - } else { - aout(self) << "All Forcing Files Loaded \n"; - } - }, - - - [=](write_output, int indxGRU, int indxHRU, int numStepsToWrite, - caf::actor refToRespondTo) { - int err; - int returnMessage = 9999; - - err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite, returnMessage, refToRespondTo); - if (err != 0) { - aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n"; - } - - - }, - - [=](read_and_write, int indxGRU, int indxHRU, int numStepsToWrite, int currentFile, - caf::actor refToRespondTo) { - int err; - - err = readForcing(self, currentFile); - if (err != 0) - aout(self) << "\nERROR: FILE_ACCESS_ACTOR - READING_FORCING FAILED\n"; - - err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite, currentFile, refToRespondTo); - if (err != 0) - aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n"; - }, - - [=](run_failure, int indxGRU) { - int listIndex; - - // update the list in Fortran - updateFailed(&indxGRU); - - listIndex = self->state.output_manager->decrementMaxSize(indxGRU); - - // Check if this list is now full - if(self->state.output_manager->isFull(listIndex)) { - write(self, listIndex); - } - }, - - /** - * Message from JobActor - * OutputManager needs to be adjusted so the failed HRUs can run again - */ - [=](restart_failures) { - resetFailedArray(); - self->state.output_manager->restartFailures(); - }, - - [=](deallocate_structures) { - aout(self) << "Deallocating Structure" << std::endl; - FileAccessActor_DeallocateStructures(self->state.handle_forcFileInfo, self->state.handle_ncid); - - self->state.readDuration = self->state.readDuration / 1000; // Convert to milliseconds - self->state.readDuration = self->state.readDuration / 1000; // Convert to seconds - - self->state.writeDuration = self->state.writeDuration / 1000; // Convert to milliseconds - self->state.writeDuration = self->state.writeDuration / 1000; // Convert to milliseconds - - self->send(self->state.parent, file_access_actor_done_v, self->state.readDuration, - self->state.writeDuration); - self->quit(); - }, - - [=](reset_outputCounter, int indxGRU) { - resetOutputCounter(&indxGRU); - } - - }; -} - -void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { - int indx = 1; - int err = 0; - // aout(self) << "Set Up the forcing file" << std::endl; - ffile_info_C(&indx, self->state.handle_forcFileInfo, &self->state.numFiles, &err); - if (err != 0) { - aout(self) << "Error: ffile_info_C - File_Access_Actor \n"; - std::string function = "ffile_info_C"; - self->send(self->state.parent, file_access_actor_err_v, function); - self->quit(); - return; - } - - mDecisions_C(&self->state.num_steps, &err); - if (err != 0) { - aout(self) << "Error: mDecisions - FileAccess Actor \n"; - std::string function = "mDecisions_C"; - self->send(self->state.parent, file_access_actor_err_v, function); - self->quit(); - return; - } - - read_pinit_C(&err); - if (err != 0) { - aout(self) << "ERROR: read_pinit_C\n"; - std::string function = "read_pinit_C"; - self->send(self->state.parent, file_access_actor_err_v, function); - self->quit(); - return; - } - - read_vegitationTables(&err); - if (err != 0) { - aout(self) << "ERROR: read_vegitationTables\n"; - std::string function = "read_vegitationTables"; - self->send(self->state.parent, file_access_actor_err_v, function); - self->quit(); - return; - } - - initFailedHRUTracker(&self->state.numGRU); - - Create_Output_File(self->state.handle_ncid, &self->state.numGRU, &self->state.startGRU, &err); - if (err != 0) { - aout(self) << "ERROR: Create_OutputFile\n"; - std::string function = "Create_Output_File"; - self->send(self->state.parent, file_access_actor_err_v, function); - self->quit(); - return; - } - - // Initalize the output Structure - aout(self) << "Initalizing Output Structure" << std::endl; - Init_OutputStruct(self->state.handle_forcFileInfo, &self->state.outputStrucSize, - &self->state.numGRU, &self->state.err); - - // Initalize the output manager - self->state.output_manager = new OutputManager(self->state.num_vectors_in_output_manager, self->state.numGRU); - - self->send(self->state.parent, done_file_access_actor_init_v); - // initalize the forcingFile array - self->state.filesLoaded = 0; - for (int i = 1; i <= self->state.numFiles; i++) { - self->state.forcFileList.push_back(forcingFile(i)); - } -} - -int write(stateful_actor<file_access_state>* self, int listIndex) { - int err = 0; - int minGRU = self->state.output_manager->getMinIndex(listIndex); - int maxGRU = self->state.output_manager->getMaxIndex(listIndex); - int numStepsToWrite = self->state.output_manager->getNumStepsToWrite(listIndex); - FileAccessActor_WriteOutput(self->state.handle_ncid, - &numStepsToWrite, &minGRU, - &maxGRU, &err); - - // Pop The actors and send them the correct continue message - while(!self->state.output_manager->isEmpty(listIndex)) { - std::tuple<caf::actor, int> actor = self->state.output_manager->popActor(listIndex); - if (get<1>(actor) == 9999) { - - self->send(get<0>(actor), done_write_v); - - } else { - self->send(get<0>(actor), run_hru_v, - self->state.forcFileList[get<1>(actor) - 1].getNumSteps()); - } - } - - return 0; -} - -int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, - int numStepsToWrite, int returnMessage, caf::actor actorRef) { - self->state.writeStart = std::chrono::high_resolution_clock::now(); - if (debug) { - aout(self) << "Recieved Write Request From GRU: " << indxGRU << "\n"; - } - int err = 0; - int listIndex = self->state.output_manager->addActor(actorRef, indxGRU, returnMessage, numStepsToWrite); - if (self->state.output_manager->isFull(listIndex)) { - if (debug) { - aout(self) << "List with Index " << listIndex << " is full and ready to write\n"; - aout(self) << "Minimum GRU Index = " << self->state.output_manager->getMinIndex(listIndex) << "\n"; - aout(self) << "Maximum GRU Index = " << self->state.output_manager->getMaxIndex(listIndex) << "\n"; - } - - err = write(self, listIndex); - - } else { - if (debug) { - aout(self) << "List with Index " << listIndex << " is not full yet waiting to write\n"; - aout(self) << "Size of list is " << self->state.output_manager->getSize(listIndex) << "\n"; - } - } - - - self->state.writeEnd = std::chrono::high_resolution_clock::now(); - self->state.writeDuration += calculateTime(self->state.writeStart, self->state.writeEnd); - - return err; - -} - -int readForcing(stateful_actor<file_access_state>* self, int currentFile) { - // Check if we have already loaded this file - if(self->state.forcFileList[currentFile -1].isFileLoaded()) { - if (debug) - aout(self) << "ForcingFile Already Loaded \n"; - return 0; - - } else { - - // File Needs to be loaded - self->state.readStart = std::chrono::high_resolution_clock::now(); - - // Load the file - FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, ¤tFile, - &self->state.stepsInCurrentFile, &self->state.startGRU, - &self->state.numGRU, &self->state.err); - - if (self->state.err != 0) { - if (debug) - aout(self) << "ERROR: FileAccessActor_ReadForcing\n" << - "currentFile = " << currentFile << "\n" << "number of steps = " - << self->state.stepsInCurrentFile << "\n"; - return -1; - } else { - self->state.filesLoaded += 1; - self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile); - - self->state.readEnd = std::chrono::high_resolution_clock::now(); - self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd); - return 0; - } - } - -} - -int parseSettings(stateful_actor<file_access_state>* self, std::string configPath) { - json settings; - std::string SummaActorsSettigs = "/Summa_Actors_Settings.json"; - std::ifstream settings_file(configPath + SummaActorsSettigs); - settings_file >> settings; - settings_file.close(); - - if (settings.find("FileAccessActor") != settings.end()) { - json FileAccessActorConfig = settings["FileAccessActor"]; - // Find the File Manager Path - if (FileAccessActorConfig.find("num_vectors_in_output_manager") != FileAccessActorConfig.end()) { - self->state.num_vectors_in_output_manager = FileAccessActorConfig["num_vectors_in_output_manager"]; - } else { - aout(self) << "Error Finding FileManagerPath - Exiting as this is needed\n"; - return -1; - } - - return 0; - } else { - aout(self) << "Error Finding JobActor in JSON file - Exiting as there is no path for the fileManger\n"; - return -1; - } -} - - - -#endif \ No newline at end of file diff --git a/build/source/actors/file_access_actor/OutputManager.h b/build/source/actors/file_access_actor/OutputManager.h deleted file mode 100644 index 5faa5f1..0000000 --- a/build/source/actors/file_access_actor/OutputManager.h +++ /dev/null @@ -1,306 +0,0 @@ -#ifndef OutputManager_H_ -#define OutputManager_H_ - -#include "caf/all.hpp" -#include <vector> -#include <algorithm> -/** - * @brief Basic Container class to hold actor references. This has a size component for checking when it is full. - * - */ -class ActorRefList { - private: - int numStepsToWrite; // We can save this value here so that we know how many steps to write - int currentSize; - unsigned int maxSize; - int minIndex = -1; // minimum index of the actor being stored on this list - int maxIndex = 0; // maximum index of the actor being stored on this list - std::vector<std::tuple<caf::actor, int>> list; - - public: - // Constructor - ActorRefList(int maxSize){ - this->currentSize = 0; - this->maxSize = maxSize; - } - - // Deconstructor - ~ActorRefList(){}; - - int getMaxIndex() { - return this->maxIndex; - } - - int getMinIndex() { - return this->minIndex; - } - - int getCurrentSize() { - return this->currentSize; - } - - int getMaxSize() { - return this->maxSize; - } - - int getNumStepsToWrite() { - return this->numStepsToWrite; - } - - bool isFull() { - return list.size() == this->maxSize; - } - - /** - * Adds An Actor and its return message as a tuple to this->list - * actor - the actor ref of the actor being added to this->list - * returnMessage - Either 9999 (place holder and specifies to send a done_write_v message) or - * this is the current forcingFileList index that allows the file_access actor to know the number - * of steps the HRU actor that needs to compute - */ - void addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) { - if (this->isFull()) { - throw "List is full, cannot add actor to this list"; - } - if (index > this->maxIndex) { - this->maxIndex = index; - } - if (index < this->minIndex || this->minIndex < 0) { - this->minIndex = index; - } - this->numStepsToWrite = numStepsToWrite; - this->currentSize++; - list.push_back(std::make_tuple(actor, returnMessage)); - } - - /** - * Return a tuple of an actor and its returnMessage. - * The return message is 9999 or the index of the forcingFile it needs to acces - */ - std::tuple<caf::actor,int> popActor() { - if (list.empty()) { - throw "List is empty, nothing to pop"; - } - auto actor = list.back(); - list.pop_back(); - this->currentSize--; - return actor; - } - - - bool isEmpty() { - return list.empty(); - } - - - /** - * When an actor fails we need to decrement the count - * so that this list becomes full when there is a failure - * - * indexHRU - index of the HRU causing the error - */ - void decrementMaxSize() { - this->maxSize--; - } - - /** - * Remove the failed HRU from the list - * - */ - void removeFailed(caf::actor actorRef) { - bool found = false; - for(std::vector<std::tuple<caf::actor, int>>::iterator it = this->list.begin(); it != this->list.end(); it++) { - if (std::get<0>(*it) == actorRef) { - found = true; - this->list.erase(it); - this->currentSize--; this->maxSize--; - break; - } - } - - if (!found) { - throw "Element To Remove Not Found"; - } - } -}; - - -/** - * @brief Class that manages which structure actors are held on - * - */ -class OutputManager { - private: - - int numVectors; - int avgSizeOfActorList; - bool runningFailures; - std::vector<ActorRefList*> list; - std::vector<int> failedHRU; - std::vector<int> failureReRun; // index used so we can add failedHRUs if they fail a second time - - - - public: - // Constructor - OutputManager(int numVectors, int totalNumActors){ - this->numVectors = numVectors; - int sizeOfOneVector = totalNumActors / numVectors; - this->avgSizeOfActorList = sizeOfOneVector; - this->runningFailures = false; - // Create the first n-1 vectors with the same size - for (int i = 0; i < numVectors - 1; i++) { - auto refList = new ActorRefList(sizeOfOneVector); - totalNumActors = totalNumActors - sizeOfOneVector; - list.push_back(refList); - } - // Create the last vector with size however many actors are left - auto refList = new ActorRefList(totalNumActors); - list.push_back(refList); - } - // Deconstructor - ~OutputManager(){}; - - /** - * @brief Adds an actor to its respective list - * - * @param actor Actor reference - * @param index Actor Index - * @param returnMessage Forcing File index or 9999 - * @return int The list index that actor is added to. - */ - int addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) { - int listIndex; - if (this->runningFailures) { - // find the index of the structure this HRU is in - auto it = find(this->failureReRun.begin(), this->failureReRun.end(), index); - - if (it != this->failureReRun.end()) { - listIndex = it - this->failureReRun.begin(); - } else { - throw "Element Not Found in failureReRun list"; - } - - this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite); - - } else { - // Index has to be subtracted by 1 because Fortran array starts at 1 - listIndex = (index - 1) / this->avgSizeOfActorList; - if (listIndex > this->numVectors - 1) { - listIndex = this->numVectors - 1; - } - - this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite); - } - - return listIndex; - } - - /** - * Remove tuple from list[index] - * - */ - std::tuple<caf::actor,int> popActor(int index) { - if (index > this->numVectors - 1 || index < 0) { - throw "List Index Out Of Range"; - } else if (this->list[index]->isEmpty()) { - throw "List is Empty, Nothing to pop"; - } - - return this->list[index]->popActor(); - } - - - /** When a failure occurs an actor most likley will not already be on this list - * This method may and probably should not be used. Although needing to remove a - * specific element from a list may be needed. - * Remove the failed actor from the list - * Return the index of the list we removed the actor from - * This is so we can check if it is full - */ - int removeFailed(caf::actor actorRef, int index) { - // Find the list this actor is on - int listIndex = (index - 1) / this->avgSizeOfActorList; - if (listIndex > this->numVectors - 1) { - listIndex = this->numVectors - 1; - } - - this->list[listIndex]->removeFailed(actorRef); - - return listIndex; - } - - /** - * Decrease the size of the list - * Add this GRU to the failed list - */ - int decrementMaxSize(int indexHRU) { - - this->failedHRU.push_back(indexHRU); - - // Find the list this actor is on - int listIndex = (indexHRU - 1) / this->avgSizeOfActorList; - if (listIndex > this->numVectors - 1) { - listIndex = this->numVectors - 1; - } - - this->list[listIndex]->decrementMaxSize(); - return listIndex; - } - - void restartFailures() { - this->list.clear(); - this->numVectors = this->failedHRU.size(); - for (unsigned int i = 0; i < this->failedHRU.size(); i++) { - auto refList = new ActorRefList(1); - this->list.push_back(refList); - } - - this->failureReRun = this->failedHRU; - this->failedHRU.clear(); - - this->runningFailures = true; - - } - - /** - * Get the number of steps to write from the correct listIndex - */ - int getNumStepsToWrite(int listIndex) { - - return this->list[listIndex]->getNumStepsToWrite(); - } - - bool isFull(int listIndex) { - if (listIndex > this->numVectors - 1) { - throw "List Index Out Of Range"; - } - return this->list[listIndex]->isFull(); - } - - bool isEmpty(int listIndex) { - return this->list[listIndex]->isEmpty(); - } - - int getSize(int listIndex) { - if (listIndex > this->numVectors - 1) { - throw "List Index Out Of Range"; - } - return this->list[listIndex]->getCurrentSize(); - } - - int getMinIndex(int listIndex) { - return this->list[listIndex]->getMinIndex(); - } - - int getMaxIndex(int listIndex) { - return this->list[listIndex]->getMaxIndex(); - } - - void addFailed(int indxHRU) { - this->failedHRU.push_back(indxHRU); - } - -}; - -#endif \ No newline at end of file diff --git a/build/source/actors/global/fortran_dataTypes.h b/build/source/actors/global/fortran_dataTypes.h deleted file mode 100644 index 2fceb62..0000000 --- a/build/source/actors/global/fortran_dataTypes.h +++ /dev/null @@ -1,102 +0,0 @@ -#ifndef FORTRAN_DATATYPES_H_ -#define FORTRAN_DATATYPES_H_ - -extern "C" { - // flagVec - void* new_handle_flagVec(); - void delete_handle_flagVec(void* handle); - void set_data_flagVec(void* handle, const int* array, int size); - void get_size_data_flagVec(void* handle, int* size); - void get_data_flagVec(void* handle, int* array); - - // var_i - void* new_handle_var_i(); - void delete_handle_var_i(void* handle); - void set_data_var_i(void* handle, const int* array, int size); - void get_size_data_var_i(void* handle, int* size); - void get_data_var_i(void* handle, int* array); - - // var_i8 - void* new_handle_var_i8(); - void delete_handle_var_i8(void* handle); - void set_data_var_i8(void* handle, const long int* array, int size); - void get_size_data_var_i8(void* handle, int* size); - void get_data_var_i8(void* handle, long int* array); - - // var_d - void* new_handle_var_d(); - void delete_handle_var_d(void* handle); - void set_data_var_d(void* handle, const double* array, int size); - void get_size_data_var_d(void* handle, int* size); - void get_data_var_d(void* handle, double* array); - - // ilength - void* new_handle_ilength(); - void delete_handle_ilength(void* handle); - void set_data_ilength(void* handle, const int* array, int size); - void get_size_data_ilength(void* handle, int* size); - void get_data_ilength(void* handle, int* array); - - // i8length - void* new_handle_i8length(); - void delete_handle_i8length(void* handle); - void set_data_i8length(void* handle, const long int* array, int size); - void get_size_data_i8length(void* handle, int* size); - void get_data_i8length(void* handle, long int* array); - - // dlength - void* new_handle_dlength(); - void delete_handle_dlength(void* handle); - void set_data_dlength(void* handle, const double* array, int size); - void get_size_data_dlength(void* handle, int* size); - void get_data_dlength(void* handle, double* array); - - // var_flagVec - void* new_handle_var_flagVec(); - void delete_handle_var_flagVec(void* handle); - void set_data_var_flagVec(void* handle, const int* array, int num_row, const int* num_col, int num_elements); - void get_size_var_flagVec(void* handle, int* num_var); - void get_size_data_var_flagVec(void* handle, int* num_var, int* num_dat); - void get_data_var_flagVec(void* handle, int* array); - - // var_ilength - void* new_handle_var_ilength(); - void delete_handle_var_ilength(void* handle); - void set_data_var_ilength(void* handle, const int* array, int num_row, const int* num_col, int num_elements); - void get_size_var_ilength(void* handle, int* num_var); - void get_size_data_var_ilength(void* handle, int* num_var, int* num_dat); - void get_data_var_ilength(void* handle, int* array); - - // var_i8length - void* new_handle_var_i8length(); - void delete_handle_var_i8length(void* handle); - void set_data_var_i8length(void* handle, const long int* array, int num_row, const int* num_col, int num_elements); - void get_size_var_i8length(void* handle, int* num_var); - void get_size_data_var_i8length(void* handle, int* num_var, int* num_dat); - void get_data_var_i8length(void* handle, long int* array); - - // var_dlength - void* new_handle_var_dlength(); - void delete_handle_var_dlength(void* handle); - void set_data_var_dlength(void* handle, const double* array, int num_row, const int* num_col, int num_elements); - void get_size_var_dlength(void* handle, int* num_var); - void get_size_data_var_dlength(void* handle, int* num_var, int* num_dat); - void get_data_var_dlength(void* handle, double* array); - - // var_dlength_array - void* new_handle_dlength_array(); - void delete_handle_dlength_array(void* handle); - - // var_info - void* new_handle_var_info(); - void delete_handle_var_info(void* handle); - void set_data_var_info(void* handle, char const *str1, char const *str2, char const *str3, - int type, const int* ncid, int ncid_size, const int* index, int index_size, int flag); - - // file_info - void* new_handle_file_info(); - void delete_handle_file_info(void* handle); - -} - -#endif \ No newline at end of file diff --git a/build/source/actors/global/global.h b/build/source/actors/global/global.h deleted file mode 100644 index b3104cb..0000000 --- a/build/source/actors/global/global.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef COMMONFUNCTIONS_H_ -#define COMMONFUNCTIONS_H_ - -#include <chrono> - - -// Gobal Flag for Debuging only main function will change this -bool debug; - -/** - * Return the time between to time points - */ -double calculateTime(std::chrono::time_point<std::chrono::system_clock> start, - std::chrono::time_point<std::chrono::system_clock> end); - - - -double calculateTime(std::chrono::time_point<std::chrono::system_clock> start, - std::chrono::time_point<std::chrono::system_clock> end) { - - return std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(); -} - - - - -#endif \ No newline at end of file diff --git a/build/source/actors/global/messageAtoms.h b/build/source/actors/global/messageAtoms.h deleted file mode 100644 index d8a3c4f..0000000 --- a/build/source/actors/global/messageAtoms.h +++ /dev/null @@ -1,44 +0,0 @@ -#ifndef MESSAGEATOMS_H_ -#define MESSAGEATOMS_H_ - -CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) - // Summa Actor - CAF_ADD_ATOM(summa, start_summa) - CAF_ADD_ATOM(summa, done_job) - CAF_ADD_ATOM(summa, err) - // Job Actor - CAF_ADD_ATOM(summa, done_reading_forcingFile) - CAF_ADD_ATOM(summa, done_reading_first_forcing_file) - CAF_ADD_ATOM(summa, init_hru) - CAF_ADD_ATOM(summa, done_init_hru) - CAF_ADD_ATOM(summa, done_write) - CAF_ADD_ATOM(summa, doneFile) - CAF_ADD_ATOM(summa, done_hru) - CAF_ADD_ATOM(summa, done_final_write) - CAF_ADD_ATOM(summa, run_failure) - CAF_ADD_ATOM(summa, done_file_access_actor_init) - CAF_ADD_ATOM(summa, file_access_actor_done) - CAF_ADD_ATOM(summa, file_access_actor_err) - // FileAccess Actor - CAF_ADD_ATOM(summa, initalize_outputStructure) - CAF_ADD_ATOM(summa, access_forcing) - CAF_ADD_ATOM(summa, access_first_forcing_file) - CAF_ADD_ATOM(summa, access_forcing_internal) - CAF_ADD_ATOM(summa, write_output) - CAF_ADD_ATOM(summa, write_output_final) - CAF_ADD_ATOM(summa, deallocate_structures) - CAF_ADD_ATOM(summa, update_completed) - CAF_ADD_ATOM(summa, update_failed) - CAF_ADD_ATOM(summa, reset_outputCounter) - CAF_ADD_ATOM(summa, read_and_write) - CAF_ADD_ATOM(summa, write_param) - CAF_ADD_ATOM(summa, restart_failures) - // HRU Actor - CAF_ADD_ATOM(summa, run_hru) - CAF_ADD_ATOM(summa, start_hru) - CAF_ADD_ATOM(summa, file_information) - CAF_ADD_ATOM(summa, dt_init_factor) - -CAF_END_TYPE_ID_BLOCK(summa) - -#endif \ No newline at end of file diff --git a/build/source/actors/hru_actor/HRU.h b/build/source/actors/hru_actor/HRU.h deleted file mode 100644 index 8dd21f2..0000000 --- a/build/source/actors/hru_actor/HRU.h +++ /dev/null @@ -1,139 +0,0 @@ -#ifndef HRU_H_ -#define HRU_H_ -#include "caf/all.hpp" -#include "hru_subroutine_wrappers.h" -#include "../global/fortran_dataTypes.h" -#include "../global/messageAtoms.h" -#include "../global/json.hpp" -#include "../global/global.h" - -#include <fstream> -#include <string> -#include <typeinfo> -#include <stdio.h> -#include <sys/time.h> -#include <sys/resource.h> -#include <chrono> -#include <iostream> - - -using namespace caf; - -struct hru_state { - // Actor References - caf::actor file_access_actor; - caf::actor parent; - - // Info about which HRU we are and our indexes - // into global structures in Fortran - int indxHRU; // index for hru part of derived types in FORTRAN - int indxGRU; // index for gru part of derived types in FORTRAN - int refGRU; // The actual ID of the GRU we are - - // Variables for output/forcing structures - int outputStrucSize; - int outputStep; - int stepsInCurrentFFile; - int forcingFileStep; - int currentForcingFile = 1; - - - // statistics structures - void *handle_forcStat = new_handle_var_dlength(); // model forcing data - void *handle_progStat = new_handle_var_dlength(); // model prognostic (state) variables - void *handle_diagStat = new_handle_var_dlength(); // model diagnostic variables - void *handle_fluxStat = new_handle_var_dlength(); // model fluxes - void *handle_indxStat = new_handle_var_dlength(); // model indices - void *handle_bvarStat = new_handle_var_dlength(); // basin-average variables - // primary data structures (scalars) - void *handle_timeStruct = new_handle_var_i(); // model time data - void *handle_forcStruct = new_handle_var_d(); // model forcing data - void *handle_attrStruct = new_handle_var_d(); // local attributes for each HRU - void *handle_typeStruct = new_handle_var_i(); // local classification of soil veg etc. for each HRU - void *handle_idStruct = new_handle_var_i8(); - // primary data structures (variable length vectors) - void *handle_indxStruct = new_handle_var_ilength(); // model indices - void *handle_mparStruct = new_handle_var_dlength(); // model parameters - void *handle_progStruct = new_handle_var_dlength(); // model prognostic (state) variables - void *handle_diagStruct = new_handle_var_dlength(); // model diagnostic variables - void *handle_fluxStruct = new_handle_var_dlength(); // model fluxes - // basin-average structures - void *handle_bparStruct = new_handle_var_d(); // basin-average parameters - void *handle_bvarStruct = new_handle_var_dlength(); // basin-average variables - // ancillary data structures - void *handle_dparStruct = new_handle_var_d(); // default model parameters - // Local hru data - void *handle_ncid = new_handle_var_i(); // output file ids - void *handle_statCounter = new_handle_var_i(); - void *handle_outputTimeStep = new_handle_var_i(); - void *handle_resetStats = new_handle_flagVec(); - void *handle_finalizeStats = new_handle_flagVec(); - void *handle_oldTime = new_handle_var_i(); - void *handle_refTime = new_handle_var_i(); - void *handle_finshTime = new_handle_var_i(); - void *handle_startTime = new_handle_var_i(); - // Misc Variables - int timestep = 1; // Current Timestep of HRU simulation - int computeVegFlux; // flag to indicate if we are computing fluxes over vegetation - double dt_init; // used to initialize the length of the sub-step for each HRU - double upArea; // area upslope of each HRU - int num_steps = 0; // number of time steps - int forcingStep; // index of current time step in current forcing file - int iFile; // index of current forcing file from forcing file list - int dt_init_factor = 1; // factor of dt_init (coupled_em) - bool printOutput; - int outputFrequency; - - - // Julian Day variables - double fracJulDay; - double tmZoneOffsetFracDay; - int yearLength; - int err = 0; // error conotrol - - std::chrono::time_point<std::chrono::system_clock> start; - std::chrono::time_point<std::chrono::system_clock> end; - double duration = 0.0; - std::chrono::time_point<std::chrono::system_clock> initStart; - std::chrono::time_point<std::chrono::system_clock> initEnd; - double initDuration = 0.0; - std::chrono::time_point<std::chrono::system_clock> forcingStart; - std::chrono::time_point<std::chrono::system_clock> forcingEnd; - double forcingDuration = 0.0; - std::chrono::time_point<std::chrono::system_clock> runPhysicsStart; - std::chrono::time_point<std::chrono::system_clock> runPhysicsEnd; - double runPhysicsDuration = 0.0; - std::chrono::time_point<std::chrono::system_clock> writeOutputStart; - std::chrono::time_point<std::chrono::system_clock> writeOutputEnd; - double writeOutputDuration = 0.0; - -}; - -/** - * @brief Get the settings from the settings JSON file - * - * @param self Actor State - * @param configPath Path to the directory that contains the settings file - */ -void parseSettings(stateful_actor<hru_state>* self, std::string configPath); - -/** - Function to initalize the HRU for running - */ -void Initialize_HRU(stateful_actor<hru_state>* self); - -/** - Function runs all of the hru time_steps - */ -int Run_HRU(stateful_actor<hru_state>* self); - -bool check_HRU(stateful_actor<hru_state>* self, int err); - -void initalizeTimeVars(stateful_actor<hru_state>* self); - -void finalizeTimeVars(stateful_actor<hru_state>* self); - -void deallocateHRUStructures(stateful_actor<hru_state>* self); - -void printOutput(stateful_actor<hru_state>* self); -#endif \ No newline at end of file diff --git a/build/source/actors/hru_actor/HRUActor.h b/build/source/actors/hru_actor/HRUActor.h deleted file mode 100644 index 12ea480..0000000 --- a/build/source/actors/hru_actor/HRUActor.h +++ /dev/null @@ -1,504 +0,0 @@ -#ifndef HRUActor_H_ -#define HRUActor_H_ - -#include "HRU.h" -using json = nlohmann::json; - - -/** - * @brief HRU Actor is reponsible for carrying out the computation component of SUMMA - * - * @param self The Actor Ref - * @param refGRU The GRU we are computing in reference to the forcingFile - * @param indxGRU The GRU we are computing's index in gru_struc - * @param parent - * @return behavior - */ -behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, - std::string configPath, - caf::actor file_access_actor, int outputStrucSize, caf::actor parent) { - // Timing Information - self->state.start = std::chrono::high_resolution_clock::now(); - self->state.duration = 0.0; - self->state.initDuration = 0.0; - - // Actor References - self->state.file_access_actor = file_access_actor; - self->state.parent = parent; - - // Indexes into global structures - self->state.indxHRU = 1; - self->state.indxGRU = indxGRU; - self->state.refGRU = refGRU; - - // OutputStructure Size (how many timesteps we can compute before we need to write) - self->state.outputStrucSize = outputStrucSize; - - // initialize counters - self->state.timestep = 1; // Timestep of total simulation - self->state.outputStep = 1; // Index of the output structure - self->state.forcingStep = 1; // Index into the forcing file - self->state.iFile = 1; - - // Get the settings for the HRU - parseSettings(self, configPath); - // We only want to print this once - if (indxGRU == 1) { - aout(self) << "\nSETTINGS FOR HRU_ACTOR\n"; - aout(self) << "Print Output = " << self->state.printOutput << "\n"; - aout(self) << "Print Output every " << self->state.outputFrequency << " timesteps\n\n"; - } - - - Initialize_HRU(self); - - self->state.end = std::chrono::high_resolution_clock::now(); - self->state.duration += calculateTime(self->state.start, self->state.end); - - self->send(self->state.parent, done_init_hru_v); - - return { - // Starts the HRU and tells it to ask for data from the file_access_actor - [=](start_hru) { - self->state.start = std::chrono::high_resolution_clock::now(); - - int err; - - err = 0; - // Write Paramaters to OutputStruc - Write_Param_C(&self->state.indxGRU, &self->state.indxHRU, - self->state.handle_attrStruct, self->state.handle_typeStruct, - self->state.handle_mparStruct, self->state.handle_bparStruct, - &err); - - // ask file_access_actor to write paramaters - self->send(self->state.file_access_actor, write_param_v, self->state.indxGRU, self->state.indxHRU); - - - self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile, self); - self->state.end = std::chrono::high_resolution_clock::now(); - self->state.duration += calculateTime(self->state.start, self->state.end); - }, - - [=](done_write) { - self->state.start = std::chrono::high_resolution_clock::now(); - - // We receive a done_write message so we ensure that - // stepsInCurrentFFile remains unchanged - if (self->state.timestep >= self->state.num_steps) { - - self->state.end = std::chrono::high_resolution_clock::now(); - self->state.duration += calculateTime(self->state.start, self->state.end); - // Tell our parent we are done, convert all timings to seconds - - self->state.duration = self->state.duration / 1000; // Convert to milliseconds - self->state.initDuration = self->state.initDuration / 1000; // Convert to milliseconds - self->state.forcingDuration = self->state.forcingDuration / 1000; // Convert to milliseconds - self->state.runPhysicsDuration = self->state.runPhysicsDuration / 1000; // Convert to milliseconds - self->state.writeOutputDuration = self->state.writeOutputDuration / 1000; // Convert to milliseconds - - self->send(self->state.parent, - done_hru_v, - self->state.indxGRU, - self->state.duration / 1000, - self->state.initDuration / 1000, - self->state.forcingDuration / 1000, - self->state.runPhysicsDuration / 1000, - self->state.writeOutputDuration / 1000); - - deallocateHRUStructures(self); - - self->quit(); - return; - } - - self->state.end = std::chrono::high_resolution_clock::now(); - self->state.duration += calculateTime(self->state.start, self->state.end); - - self->send(self, run_hru_v, self->state.stepsInCurrentFFile); - }, - - [=](run_hru, int stepsInCurrentFFile) { - self->state.start = std::chrono::high_resolution_clock::now(); - bool keepRunning = true; - int err = 0; - self->state.stepsInCurrentFFile = stepsInCurrentFFile; - - while( keepRunning ) { - - err = Run_HRU(self); // Simulate a Timestep - - // update Timings - self->state.timestep += 1; - self->state.outputStep += 1; - self->state.forcingStep += 1; - - // if (self->state.timestep == 450 && self->state.indxGRU == 5) { - // err = 20; - // } - - keepRunning = check_HRU(self, err); // check if we are done, need to write - - } - - self->state.end = std::chrono::high_resolution_clock::now(); - self->state.duration += calculateTime(self->state.start, self->state.end); - - }, - - [=](dt_init_factor, int dt_init_factor) { - aout(self) << "Recieved New dt_init_factor to attempt on next run \n"; - self->state.dt_init_factor = dt_init_factor; - }, - }; - /********************************************************************************************************* - *********************************** END ACTOR MESSAGE HANDLERS ****************************************** - *********************************************************************************************************/ -} - -void parseSettings(stateful_actor<hru_state>* self, std::string configPath) { - json settings; - std::string SummaActorsSettings = "/Summa_Actors_Settings.json"; - std::ifstream settings_file(configPath + SummaActorsSettings); - settings_file >> settings; - settings_file.close(); - - if (settings.find("HRUActor") != settings.end()) { - json HRUActorConfig = settings["HRUActor"]; - // find if we want to print output to stdout - if (HRUActorConfig.find("printOutput") != HRUActorConfig.end()) { - self->state.printOutput = HRUActorConfig["printOutput"]; - } else { - aout(self) << "Error finding printOutput in JSON File - Reverting to default value\n"; - self->state.printOutput = true; - } - - if (self->state.printOutput) { - // get the frequency in number of timesteps we want to print the output - if(HRUActorConfig.find("outputFrequency") != HRUActorConfig.end()) { - self->state.outputFrequency = HRUActorConfig["outputFrequency"]; - } else { - aout(self) << "Error finding outputFrequency in JSON File - Reverting to default value\n"; - self->state.outputFrequency = 10000; - } - } - } else { - aout(self) << "Error finding HRUActor in JSON File - Reverting to default values for HRUs\n"; - self->state.printOutput = true; - self->state.outputFrequency = 10000; - } - - -} - -void Initialize_HRU(stateful_actor<hru_state>* self) { - self->state.initStart = std::chrono::high_resolution_clock::now(); - // aout(self) << "Initalizing HRU" << std::endl; - // aout(self) << "Entering Initalize \n"; - Initialize(&self->state.indxGRU, - &self->state.num_steps, - self->state.handle_forcStat, - self->state.handle_progStat, - self->state.handle_diagStat, - self->state.handle_fluxStat, - self->state.handle_indxStat, - self->state.handle_bvarStat, - self->state.handle_timeStruct, - self->state.handle_forcStruct, - self->state.handle_attrStruct, - self->state.handle_typeStruct, - self->state.handle_idStruct, - self->state.handle_indxStruct, - self->state.handle_mparStruct, - self->state.handle_progStruct, - self->state.handle_diagStruct, - self->state.handle_fluxStruct, - self->state.handle_bparStruct, - self->state.handle_bvarStruct, - self->state.handle_dparStruct, - self->state.handle_startTime, - self->state.handle_finshTime, - self->state.handle_refTime, - self->state.handle_oldTime, &self->state.err); - - if (self->state.err != 0) { - aout(self) << "Error: Initialize - HRU = " << self->state.indxHRU << - " - indxGRU = " << self->state.indxGRU << " - refGRU = "<< self->state.refGRU << std::endl; - aout(self) << "Error = " << self->state.err << "\n"; - self->quit(); - return; - } - - SetupParam(&self->state.indxGRU, - &self->state.indxHRU, - self->state.handle_attrStruct, - self->state.handle_typeStruct, - self->state.handle_idStruct, - self->state.handle_mparStruct, - self->state.handle_bparStruct, - self->state.handle_bvarStruct, - self->state.handle_dparStruct, - self->state.handle_startTime, - self->state.handle_oldTime, - &self->state.upArea, &self->state.err); - if (self->state.err != 0) { - aout(self) << "Error: SetupParam - HRU = " << self->state.indxHRU << - " - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << std::endl; - self->quit(); - return; - } - // aout(self) << "Restart" << std::endl; - - Restart(&self->state.indxGRU, - &self->state.indxHRU, - self->state.handle_indxStruct, - self->state.handle_mparStruct, - self->state.handle_progStruct, - self->state.handle_diagStruct, - self->state.handle_fluxStruct, - self->state.handle_bvarStruct, - &self->state.dt_init, &self->state.err); - if (self->state.err != 0) { - aout(self) << "Error: Restart - HRU = " << self->state.indxHRU << - " - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << std::endl; - self->quit(); - return; - } - - // aout(self) << self->state.refGRU << " - Done Init" << std::endl; - self->state.initEnd = std::chrono::high_resolution_clock::now(); - self->state.initDuration = calculateTime(self->state.initStart, self->state.initEnd); -} - -int Run_HRU(stateful_actor<hru_state>* self) { - /********************************************************************** - ** READ FORCING - **********************************************************************/ - self->state.forcingStart = std::chrono::high_resolution_clock::now(); - Forcing(&self->state.indxGRU, - &self->state.timestep, - self->state.handle_timeStruct, - self->state.handle_forcStruct, - &self->state.iFile, - &self->state.forcingStep, - &self->state.fracJulDay, - &self->state.tmZoneOffsetFracDay, - &self->state.yearLength, - &self->state.err); - if (self->state.err != 0) { - aout(self) << "Error: Forcing - HRU = " << self->state.indxHRU << - " - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << - " - Timestep = " << self->state.timestep << std::endl; - return 10; - - } - self->state.forcingEnd = std::chrono::high_resolution_clock::now(); - self->state.forcingDuration += calculateTime(self->state.forcingStart, self->state.forcingEnd); - - - if (self->state.printOutput && - self->state.timestep % self->state.outputFrequency == 0) { - printOutput(self); - } - - - /********************************************************************** - ** RUN_PHYSICS - **********************************************************************/ - self->state.runPhysicsStart = std::chrono::high_resolution_clock::now(); - self->state.err = 0; - RunPhysics(&self->state.indxHRU, - &self->state.timestep, - self->state.handle_timeStruct, - self->state.handle_forcStruct, - self->state.handle_attrStruct, - self->state.handle_typeStruct, - self->state.handle_indxStruct, - self->state.handle_mparStruct, - self->state.handle_progStruct, - self->state.handle_diagStruct, - self->state.handle_fluxStruct, - self->state.handle_bvarStruct, - &self->state.fracJulDay, - &self->state.tmZoneOffsetFracDay, - &self->state.yearLength, - &self->state.computeVegFlux, - &self->state.dt_init, - &self->state.dt_init_factor, - &self->state.err); - if (self->state.err != 0) { - aout(self) << "Error: RunPhysics - HRU = " << self->state.indxHRU << - " - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << - " - Timestep = " << self->state.timestep << std::endl; - return 20; - } - self->state.runPhysicsEnd = std::chrono::high_resolution_clock::now(); - self->state.runPhysicsDuration += calculateTime(self->state.runPhysicsStart, self->state.runPhysicsEnd); - - /********************************************************************** - ** WRITE_OUTPUT - **********************************************************************/ - self->state.writeOutputStart = std::chrono::high_resolution_clock::now(); - WriteOutput(&self->state.indxHRU, - &self->state.indxGRU, - &self->state.timestep, - self->state.handle_forcStat, - self->state.handle_progStat, - self->state.handle_diagStat, - self->state.handle_fluxStat, - self->state.handle_indxStat, - self->state.handle_bvarStat, - self->state.handle_timeStruct, - self->state.handle_forcStruct, - self->state.handle_attrStruct, - self->state.handle_typeStruct, - self->state.handle_indxStruct, - self->state.handle_mparStruct, - self->state.handle_progStruct, - self->state.handle_diagStruct, - self->state.handle_fluxStruct, - self->state.handle_bparStruct, - self->state.handle_bvarStruct, - self->state.handle_statCounter, - self->state.handle_outputTimeStep, - self->state.handle_resetStats, - self->state.handle_finalizeStats, - self->state.handle_finshTime, - self->state.handle_oldTime, - &self->state.outputStep, - &self->state.err); - if (self->state.err != 0) { - aout(self) << "Error: WriteOutput - HRU = " << self->state.indxHRU << - " - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << - " - Timestep = " << self->state.timestep << std::endl; - return 30; - } - self->state.writeOutputEnd = std::chrono::high_resolution_clock::now(); - self->state.writeOutputDuration += calculateTime(self->state.writeOutputStart, self->state.writeOutputEnd); - - return 0; -} - -bool check_HRU(stateful_actor<hru_state>* self, int err) { - - if (err != 0) { - // check for error - - self->send(self->state.parent, run_failure_v, self, self->state.indxGRU, err); - self->quit(); - return false; - - } else if (self->state.timestep > self->state.num_steps) { - // check if simulation is finished - self->state.outputStep -= 1; // prevents segfault - - if (debug) - aout(self) << "Sending Final Write" << - "forcingStep = " << self->state.forcingStep << "\n" << - "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << - "timeStep = " << self->state.timestep << "\n" << - "outputStep = " << self->state.outputStep << "\n"; - - self->send(self->state.file_access_actor, write_output_v, - self->state.indxGRU, self->state.indxHRU, self->state.outputStep, self); - - self->state.end = std::chrono::high_resolution_clock::now(); - self->state.duration += calculateTime(self->state.start, self->state.end); - - return false; - - } else if (self->state.outputStep > self->state.outputStrucSize && - self->state.forcingStep > self->state.stepsInCurrentFFile) { - // Special case where we need both reading and writing - self->state.outputStep -= 1; // prevents segfault - - if (debug) - aout(self) << "Need to read forcing and write to outputstruc\n" << - "forcingStep = " << self->state.forcingStep << "\n" << - "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << - "timeStep = " << self->state.timestep << "\n" << - "outputStep = " << self->state.outputStep << "\n"; - - - self->send(self->state.file_access_actor, read_and_write_v, self->state.indxGRU, - self->state.indxHRU, self->state.outputStep, self->state.iFile + 1, self); - self->state.outputStep = 1; - - return false; - - } else if (self->state.outputStep > self->state.outputStrucSize) { - // check if we need to clear the output struc - self->state.outputStep -= 1; - - if (debug) - aout(self) << "Sending Write \n" << - "forcingStep = " << self->state.forcingStep << "\n" << - "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << - "timeStep = " << self->state.timestep << "\n" << - "outputStep = " << self->state.outputStep << "\n"; - - - self->send(self->state.file_access_actor, write_output_v, - self->state.indxGRU, self->state.indxHRU, self->state.outputStep, self); - self->state.outputStep = 1; - - return false; - - } else if (self->state.forcingStep > self->state.stepsInCurrentFFile) { - // we need more forcing data - - if (debug) - aout(self) << "Asking for more forcing data\n" << - "forcingStep = " << self->state.forcingStep << "\n" << - "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << - "timeStep = " << self->state.timestep << "\n" << - "outputStep = " << self->state.outputStep << "\n"; - - self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile + 1, self); - - return false; - - } else { - return true; - } -} - -void deallocateHRUStructures(stateful_actor<hru_state>* self) { - - DeallocateStructures(self->state.handle_forcStat, - self->state.handle_progStat, - self->state.handle_diagStat, - self->state.handle_fluxStat, - self->state.handle_indxStat, - self->state.handle_bvarStat, - self->state.handle_timeStruct, - self->state.handle_forcStruct, - self->state.handle_attrStruct, - self->state.handle_typeStruct, - self->state.handle_idStruct, - self->state.handle_indxStruct, - self->state.handle_mparStruct, - self->state.handle_progStruct, - self->state.handle_diagStruct, - self->state.handle_fluxStruct, - self->state.handle_bparStruct, - self->state.handle_bvarStruct, - self->state.handle_dparStruct, - self->state.handle_startTime, - self->state.handle_finshTime, - self->state.handle_refTime, - self->state.handle_oldTime, - self->state.handle_ncid, - self->state.handle_statCounter, - self->state.handle_outputTimeStep, - self->state.handle_resetStats, - self->state.handle_finalizeStats, - &self->state.err); -} - -void printOutput(stateful_actor<hru_state>* self) { - aout(self) << self->state.refGRU << " - Timestep = " << self->state.timestep << std::endl; - aout(self) << self->state.refGRU << ":Accumulated Run Physics Time = " << - self->state.runPhysicsDuration << std::endl; -} -#endif \ No newline at end of file diff --git a/build/source/actors/hru_actor/hru_subroutine_wrappers.h b/build/source/actors/hru_actor/hru_subroutine_wrappers.h deleted file mode 100644 index 638580d..0000000 --- a/build/source/actors/hru_actor/hru_subroutine_wrappers.h +++ /dev/null @@ -1,104 +0,0 @@ -#ifndef HRU_SUBROUTINE_WRAPPERS_H_ -#define HRU_SUBROUTINE_WRAPPERS_H_ - - -extern "C" { - // Initialize HRU data_structures - void Initialize( - int* indxGRU, int* num_steps, - // Statistics Structures - void* forcStat, void* progStat, void* diagStat, void* fluxStat, void* indxStat, void* bvarStat, - // Primary Data Structures (scalars) - void* timeStruct, void* forcStruct, void* attrStruct, void* typeStruct, void* idStruct, - // primary data structures (variable length vectors) - void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct, - // basin-average structures - void* bparStruct, void* bvarStruct, - // ancillary data structures - void* dparStruct, - // local HRU data - void* startTime, void* finshTime, void* refTime, void* oldTime, int* err); - - // SetupParam for HRU - void SetupParam( - int* indxGRU, int* indxHRU, - // primary data structures (scalars) - void* attrStruct, void* typeStruct, void* idStruct, - // primary data structures (variable length vectors) - void* mparStruct, void* bparStruct, void* bvarStruct, void* dparStruct, - // local HRU data - void* startTime, void* oldTime, - // miscellaneous - double* upArea, int* err); - - - // Setup Restart File if this option has been chosen - void Restart( - int* indxGRU, int* indxHRU, - // primary data structures (variable length vectors) - void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct, - // basin-average structures - void* bvarStruct, - // misc - double* dtInit, int* err); - - // Read Forcing for HRU - void Forcing( - int* indxGRU, int* stepIndex, - void* timeStruct, void* forcStruct, - int* iFile, int* forcingStep, - double* fracJulDay, double* tmZoneOffsetFracDay, int* yearLength, - int* err); - - // Run the model for one timestep - void RunPhysics( - int* id, int* stepIndex, - // primary data structures (scalars) - void* timeStruct, void* forcStruct, void* attrStruct, void* typeStruct, - // primary data structures (variable length vectors) - void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct, - // basin-average structures - void* bvarStruct, - double* fracJulDay, double* tmZoneOffsetFracDay, int* yearLength, - // misc - int* flag, double* dt, int* dt_int_factor, int* err); - - // Write output to the output structure - void WriteOutput( - int* indHRU, int* indxGRU, int* indexStep, - // statistics structures - void* forcStat, void* progStat, void* diagStat, void* fluxStat, void* indxStat, void* bvarStat, - // primary data structures (scalars) - void* timeStruct, void* forcStruct, void* attrStruct, void* typeStruct, - // primary data structures (variable length vectors) - void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct, - // basin-average structures - void* bparStruct, void* bvarStruct, - // local vars - void* statCounter, void* outputTimeStep, void* resetStats, void* finalizeStats, - void* finshTime, void* oldTime, int* outputStep, int* err); - - void DeallocateStructures( - void* handle_forcStat, void* handle_progStat, void* handle_diagStat, void* handle_fluxStat, - void* handle_indxStat, void* handle_bvarStat, void* handle_timeStruct, void* handle_forcStruct, - void* handle_attrStruct, void* handle_typeStruct, void* handle_idStruct, void* handle_indxStruct, - void* handle_mparStruct, void* handle_progStruct, void* handle_diagStruct, void* handle_fluxStruct, - void* handle_bparStruct, void* handle_bvarStruct, void* handle_dparStruct, - void* handle_startTime, void* handle_finishTime, - void* handle_refTime, void* handle_oldTime, - void* handle_ncid, - void* handle_statCounter, - void* handle_outputTimeStep, - void* handle_resetStats, - void* handle_finalizeStats, - int* err); - - void Write_Param_C( - int* indxGRU, int* indxHRU, - void* handle_attrStruct, void* handle_typeStruct, void* handle_mparStruct, void* handle_bparStruct, - int* err); -} - - - -# endif diff --git a/build/source/actors/job_actor/Job.h b/build/source/actors/job_actor/Job.h deleted file mode 100644 index 7162a62..0000000 --- a/build/source/actors/job_actor/Job.h +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef SUMMACLIENT_H_ -#define SUMMACLIENT_H_ - -#include "caf/all.hpp" -#include "caf/io/all.hpp" -#include "../file_access_actor/FileAccessActor.h" -#include "../hru_actor/HRUActor.h" -#include "../global/messageAtoms.h" -#include "../global/global.h" -#include "GRUinfo.h" -#include "job_subroutine_wrappers.h" - -#include "string.h" -#include <unistd.h> -#include <vector> -#include <chrono> -#include <iostream> -#include <fstream> -#include <sys/stat.h> - - - -struct job_state { - // Actor References - caf::actor file_access_actor; // actor reference for the file_access_actor - caf::actor parent; // actor reference to the top-level SummaActor - - // Job Parameters - int startGRU; // Starting GRU for this job - int numGRU; // Number of GRUs for this job - std::string configPath; - - std::string fileManager; // Path of the fileManager.txt file - - // Variables for GRU monitoring - int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em) - int maxRunAttempts = 3; // Max number of attemtps to solve a GRU - std::vector<GRUinfo*> GRUList; // List of all GRUs under this job actor - int numGRUDone = 0; // The number of GRUs that have completed - int GRUInit = 0; // Number of GRUs initalized - int err = 0; // Error Code - int numGRUFailed = 0; // Number of GRUs that have failed - int outputStrucSize; - - // Timing Variables - std::chrono::time_point<std::chrono::system_clock> start; - std::chrono::time_point<std::chrono::system_clock> end; - double duration; - - // Output File Names for Timings - bool outputCSV; - std::string csvOut; - std::string csvPath; - std::string successOutputFile; - std::string failedOutputFile = "failedHRU"; - std::string fileAccessActorStats = "fileAccessActor.csv"; - -}; - - - -int parseSettings(stateful_actor<job_state>* self, std::string configPath); - -void initJob(stateful_actor<job_state>* self); - -void initalizeGRU(stateful_actor<job_state>* self); - -void runGRUs(stateful_actor<job_state>* self); - -void restartFailures(stateful_actor<job_state>* self); - -#endif \ No newline at end of file diff --git a/build/source/actors/job_actor/job_subroutine_wrappers.h b/build/source/actors/job_actor/job_subroutine_wrappers.h deleted file mode 100644 index 8326cb0..0000000 --- a/build/source/actors/job_actor/job_subroutine_wrappers.h +++ /dev/null @@ -1,12 +0,0 @@ -#ifndef JOB_SUBROUTINE_WRAPPERS_H_ -#define JOB_SUBROUTINE_WRAPPERS_H_ - -extern "C" { - void initGlobals(char const*str1, int* totalGRUs, int* totalHRUs, - int* numGRUs, int* numHRUs, int* startGRUIndex, int* err); - - void cleanUpJobActor(int* err); - -} - -#endif \ No newline at end of file diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index 7de2ab6..5a6662b 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -1,6 +1,8 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" #include "summa_actor.hpp" +#include "summa_client.hpp" +#include "summa_server.hpp" #include "global.hpp" #include "message_atoms.hpp" #include <string> @@ -37,41 +39,41 @@ class config : public actor_system_config { } }; -// void run_client(actor_system& system, const config& cfg) { -// scoped_actor self{system}; -// if (cfg.distributed) { -// aout(self) << "Starting SUMMA-Client in Distributed Mode\n"; -// auto c = system.spawn(summa_client); -// if (!cfg.host.empty() && cfg.port > 0) { -// anon_send(c, connect_atom_v, cfg.host, cfg.port); -// } else { -// aout(self) << "No Server Config" << std::endl; -// } +void run_client(actor_system& system, const config& cfg) { + scoped_actor self{system}; + if (cfg.distributed) { + aout(self) << "Starting SUMMA-Client in Distributed Mode\n"; + auto c = system.spawn(summa_client); + if (!cfg.host.empty() && cfg.port > 0) { + anon_send(c, connect_atom_v, cfg.host, cfg.port); + } else { + aout(self) << "No Server Config" << std::endl; + } -// } else { -// aout(self) << "Starting SUMMA in non-distributed mode \n"; -// auto summa = system.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath); -// } + } else { + aout(self) << "Starting SUMMA in non-distributed mode \n"; + auto summa = system.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath); + } -// } +} -// void run_server(actor_system& system, const config& cfg) { -// scoped_actor self{system}; -// auto server = system.spawn(summa_server); -// aout(self) << "SEVER" << std::endl; -// aout(self) << "Attempting to publish actor" << cfg.port << std::endl; -// auto is_port = io::publish(server, cfg.port); -// if (!is_port) { -// std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << std::endl; -// return; -// } -// aout(self) << "Successfully Published" << *is_port << std::endl; -// std::string dummy; -// std::getline(std::cin, dummy); -// std::cout << "...cya" << std::endl; -// anon_send_exit(server, exit_reason::user_shutdown); -// } +void run_server(actor_system& system, const config& cfg) { + scoped_actor self{system}; + auto server = system.spawn(summa_server); + aout(self) << "SEVER" << std::endl; + aout(self) << "Attempting to publish actor" << cfg.port << std::endl; + auto is_port = io::publish(server, cfg.port); + if (!is_port) { + std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << std::endl; + return; + } + aout(self) << "Successfully Published" << *is_port << std::endl; + std::string dummy; + std::getline(std::cin, dummy); + std::cout << "...cya" << std::endl; + anon_send_exit(server, exit_reason::user_shutdown); +} void caf_main(actor_system& sys, const config& cfg) { scoped_actor self{sys}; @@ -97,10 +99,18 @@ void caf_main(actor_system& sys, const config& cfg) { aout(self) << "Starting SUMMA-Actors in DebugMode\n"; bool debug = true; } + + // Start the Actors + if (cfg.distributed) { + aout(self) << "Starting SUMMA-Actors in Distributed Mode \n"; + auto system = cfg.server_mode ? run_server : run_client; + system(sys, cfg); + } else { + auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath); + } // start SUMMA // auto system = cfg.server_mode ? run_server : run_client; // system(sys, cfg); - auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath); } CAF_MAIN(id_block::summa, io::middleman) \ No newline at end of file diff --git a/build/source/actors/summa_actor/SummaActor.h b/build/source/actors/summa_actor/SummaActor.h deleted file mode 100644 index 444bdc7..0000000 --- a/build/source/actors/summa_actor/SummaActor.h +++ /dev/null @@ -1,120 +0,0 @@ -#ifndef SUMMAACTOR_H_ -#define SUMMAACTOR_H_ - -#include "SummaManager.h" - -using namespace caf; -using json = nlohmann::json; -/** - * Top Level Actor for Summa. This actor recieves the number of GRUs to compute from main and - * divides them into jobs that compute one at a time. - * - * @param startGRU - starting GRU for the simulation - * @param numGRU - total number of GRUs to compute - * @param configPath - location of file information for SUMMA - * @return behavior - */ -behavior summa_actor(stateful_actor<summa_manager>* self, int startGRU, int numGRU, std::string configPath) { - self->state.start = std::chrono::high_resolution_clock::now(); - // Set Variables - self->state.startGRU = startGRU; - self->state.numGRU = numGRU; - self->state.configPath = configPath; - - parseSettings(self, configPath); - aout(self) << "SETTINGS FOR SUMMA_ACTOR\n"; - aout(self) << "Output Structure Size = " << self->state.outputStrucSize << "\n"; - aout(self) << "Max GRUs Per Job = " << self->state.maxGRUPerJob << "\n"; - - // Create the job_actor and start SUMMA - spawnJob(self); - - return { - [=](done_job, int numFailed) { - self->state.numFailed += numFailed; - aout(self) << "Job Done\n"; - if (self->state.numGRU <= 0) { - - self->state.end = std::chrono::high_resolution_clock::now(); - self->state.duration = calculateTime(self->state.start, self->state.end); - - self->state.duration = self->state.duration / 1000; // Convert to milliseconds - - - aout(self) << "Total Program Duration:\n"; - aout(self) << " " << self->state.duration / 1000 << " Seconds\n"; - aout(self) << " " << (self->state.duration / 1000) / 60 << " Minutes\n"; - aout(self) << " " << ((self->state.duration / 1000) / 60) / 60 << " Hours\n"; - - aout(self) << "Program Finished \n"; - - } else { - // spawn a new job - spawnJob(self); - } - }, - - [=](err) { - aout(self) << "Unrecoverable Error: Attempting To Fail Gracefully\n"; - self->quit(); - } - }; -} - - -void spawnJob(stateful_actor<summa_manager>* self) { - // Ensure we do not start a job with too many GRUs - if (self->state.numGRU > self->state.maxGRUPerJob) { - // spawn the job actor - aout(self) << "\n Starting Job with startGRU = " << self->state.startGRU << "\n"; - self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.maxGRUPerJob, - self->state.configPath, self->state.outputStrucSize, self); - - // Update GRU count - self->state.numGRU = self->state.numGRU - self->state.maxGRUPerJob; - self->state.startGRU = self->state.startGRU + self->state.maxGRUPerJob; - - } else { - - self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.numGRU, - self->state.configPath, self->state.outputStrucSize, self); - self->state.numGRU = 0; - } -} - -void parseSettings(stateful_actor<summa_manager>* self, std::string configPath) { - json settings; - std::string SummaActorsSettings = "/Summa_Actors_Settings.json"; - std::ifstream settings_file(configPath + SummaActorsSettings); - settings_file >> settings; - settings_file.close(); - - if (settings.find("SummaActor") != settings.end()) { - json SummaActorConfig = settings["SummaActor"]; - - // Find the desired OutputStrucSize - if (SummaActorConfig.find("OuputStructureSize") != SummaActorConfig.end()) { - self->state.outputStrucSize = SummaActorConfig["OuputStructureSize"]; - } else { - aout(self) << "Error Finding OutputStructureSize in JOSN - Reverting to default value\n"; - self->state.outputStrucSize = 250; - } - - // Find the desired maxGRUPerJob size - if (SummaActorConfig.find("maxGRUPerJob") != SummaActorConfig.end()) { - self->state.maxGRUPerJob = SummaActorConfig["maxGRUPerJob"]; - } else { - aout(self) << "Error Finding maxGRUPerJob in JOSN - Reverting to default value\n"; - self->state.maxGRUPerJob = 500; - } - - } else { - aout(self) << "Error Finding SummaActor in JSON - Reverting to default values\n"; - self->state.outputStrucSize = 250; - self->state.maxGRUPerJob = 500; - } -} - - - -#endif \ No newline at end of file diff --git a/build/source/actors/summa_actor/SummaManager.h b/build/source/actors/summa_actor/SummaManager.h deleted file mode 100644 index 95ab4b0..0000000 --- a/build/source/actors/summa_actor/SummaManager.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef SUMMAMANGER_H_ -#define SUMMAMANGER_H_ - -#include "caf/all.hpp" -#include "caf/io/all.hpp" -#include "../job_actor/JobActor.h" -#include "../global/json.hpp" -#include "../global/global.h" - - -#include <iostream> -#include <chrono> -#include <string> -#include <fstream> - - - - -behavior summa_actor(stateful_actor<summa_manager>* self, int startGRU, int numGRU, std::string configPath); - -void spawnJob(stateful_actor<summa_manager>* self); - -void parseSettings(stateful_actor<summa_manager>* self, std::string configPath); - - - -#endif \ No newline at end of file diff --git a/build/source/actors/summa_actor/SummaClient.h b/build/source/actors/summa_actor/summa_client.cpp similarity index 84% rename from build/source/actors/summa_actor/SummaClient.h rename to build/source/actors/summa_actor/summa_client.cpp index 464f388..b859680 100644 --- a/build/source/actors/summa_actor/SummaClient.h +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -1,25 +1,11 @@ -#ifndef SUMMACLIENT_H_ -#define SUMMACLIENT_H_ - #include "caf/all.hpp" #include "caf/io/all.hpp" -using namespace caf; - -struct summa_client_state { - strong_actor_ptr current_server; - -}; +#include "summa_client.hpp" -behavior unconnected(stateful_actor<summa_client_state>*); -void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port); -behavior running(stateful_actor<summa_client_state>*, const actor& say_hello); +namespace caf { - -/** - * @brief Set up the client and its down handler - */ behavior summa_client(stateful_actor<summa_client_state>* self) { self->set_down_handler([=](const down_msg& dm){ if(dm.source == self->state.current_server) { @@ -81,10 +67,4 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a } }; } - - - - - - -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/build/source/actors/summa_actor/SummaServer.h b/build/source/actors/summa_actor/summa_server.cpp similarity index 73% rename from build/source/actors/summa_actor/SummaServer.h rename to build/source/actors/summa_actor/summa_server.cpp index 736791a..2c2d837 100644 --- a/build/source/actors/summa_actor/SummaServer.h +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -1,15 +1,9 @@ -#ifndef SUMMASERVER_H_ -#define SUMMASERVER_H_ - #include "caf/all.hpp" #include "caf/io/all.hpp" -using namespace caf; - -struct summa_server_state { - -}; +#include "summa_server.hpp" +namespace caf { behavior summa_server(stateful_actor<summa_server_state>* self) { aout(self) << "Summa Server has Started \n"; @@ -21,11 +15,5 @@ behavior summa_server(stateful_actor<summa_server_state>* self) { } }; - } - - - - - -#endif \ No newline at end of file +} -- GitLab