Skip to content
Snippets Groups Projects
Commit 2836f43c authored by KyleKlenk's avatar KyleKlenk
Browse files

Compiles with new actors

deleted unnecessary files
parent bd7ddf97
No related branches found
No related tags found
No related merge requests found
Showing
with 87 additions and 1985 deletions
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
namespace caf {
struct summa_client_state {
strong_actor_ptr current_server;
};
behavior summa_client(stateful_actor<summa_client_state>* self);
behavior unconnected(stateful_actor<summa_client_state>*);
void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port);
behavior running(stateful_actor<summa_client_state>*, const actor& summa_server);
}
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
namespace caf {
struct summa_server_state {
};
behavior summa_server(stateful_actor<summa_server_state>* self);
}
\ No newline at end of file
......@@ -251,6 +251,8 @@ GLOBAL = $(SOURCE_DIR)/global/global.cpp
SUMMA_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/summa_actor
SUMMA_ACTOR = $(SOURCE_DIR)/summa_actor/summa_actor.cpp
SUMMA_CLIENT = $(SOURCE_DIR)/summa_actor/summa_client.cpp
SUMMA_SERVER = $(SOURCE_DIR)/summa_actor/summa_server.cpp
JOB_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/job_actor
JOB_ACTOR = $(SOURCE_DIR)/job_actor/job_actor.cpp
......@@ -280,7 +282,7 @@ all: fortran cpp
fortran: compile_noah compile_comm compile_summa link clean_fortran
cpp: compile_globals compile_hru_actor compile_file_access_actor compile_job_actor compile_summa_actor \
compile_main link_cpp
compile_summa_client compile_summa_server compile_main link_cpp clean_cpp
test: actors_test actors_testLink actorsClean
......@@ -332,6 +334,12 @@ compile_summa_actor:
$(CC) $(FLAGS_ACTORS) -c $(SUMMA_ACTOR) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) \
$(JOB_ACTOR_INCLUDES)
compile_summa_client:
$(CC) $(FLAGS_ACTORS) -c $(SUMMA_CLIENT) $(SUMMA_ACTOR_INCLUDES)
compile_summa_server:
$(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(SUMMA_ACTOR_INCLUDES)
compile_main:
$(CC) $(FLAGS_ACTORS) -c $(MAIN) $(GLOBAL_INCLUDES) $(SUMMA_ACTOR_INCLUDES) $(JOB_ACTOR_INCLUDES)
......
#ifndef FILEACCESS_H_
#define FILEACCESS_H_
#include "caf/all.hpp"
#include "../global/fortran_dataTypes.h"
#include "../global/messageAtoms.h"
#include "../global/global.h"
#include "../global/json.hpp"
#include "fileAccess_subroutine_wrappers.h"
#include "OutputManager.h"
#include <vector>
#include <chrono>
class forcingFile {
private:
int fileID; // which file are we relative the forcing file list saved in fortran
int numSteps; // the number of steps in this forcing file
bool isLoaded; // is this file actually loaded in to RAM yet.
public:
forcingFile(int fileID) {
this->fileID = fileID;
this->numSteps = 0;
this->isLoaded = false;
}
int getNumSteps() {
return this->numSteps;
}
bool isFileLoaded() {
return this->isLoaded;
}
void updateIsLoaded() {
this->isLoaded = true;
}
void updateNumSteps(int numSteps) {
this->numSteps = numSteps;
this->isLoaded = true;
}
};
struct file_access_state {
// Variables set on Spwan
caf::actor parent;
int startGRU;
int numGRU;
void *handle_forcFileInfo = new_handle_file_info(); // Handle for the forcing file information
void *handle_ncid = new_handle_var_i(); // output file ids
OutputManager *output_manager;
int num_vectors_in_output_manager;
int num_steps;
int outputStrucSize;
int stepsInCurrentFile;
int numFiles;
int filesLoaded;
int err = 0;
std::vector<forcingFile> forcFileList; // list of steps in file
std::vector<bool> outputFileInitHRU;
std::chrono::time_point<std::chrono::system_clock> readStart;
std::chrono::time_point<std::chrono::system_clock> readEnd;
double readDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> writeStart;
std::chrono::time_point<std::chrono::system_clock> writeEnd;
double writeDuration = 0.0;
};
#endif
\ No newline at end of file
#ifndef FILEACCESSACTOR_H_
#define FILEACCESSACTOR_H_
#include "FileAccess.h"
using namespace caf;
using json = nlohmann::json;
void initalizeFileAccessActor(stateful_actor<file_access_state>* self);
int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, int numStepsToWrite, int returnMessage, caf::actor actorRef);
int readForcing(stateful_actor<file_access_state>* self, int currentFile);
int write(stateful_actor<file_access_state>* self, int listIndex);
int parseSettings(stateful_actor<file_access_state>* self, std::string configPath);
behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU,
int outputStrucSize, std::string configPath, actor parent) {
// Set File_Access_Actor variables
self->state.parent = parent;
self->state.numGRU = numGRU;
self->state.startGRU = startGRU;
self->state.outputStrucSize = outputStrucSize;
// Get Settings from configuration file
if (parseSettings(self, configPath) == -1) {
aout(self) << "Error with JSON Settings File!!!\n";
self->quit();
} else {
aout(self) << "\nSETTINGS FOR FILE_ACCESS_ACTOR\n" <<
"Number of Vectors in Output Structure = " << self->state.num_vectors_in_output_manager << "\n";
}
initalizeFileAccessActor(self);
return {
[=](initalize_outputStructure) {
aout(self) << "Initalizing Output Structure" << std::endl;
Init_OutputStruct(self->state.handle_forcFileInfo, &self->state.outputStrucSize,
&self->state.numGRU, &self->state.err);
},
[=](write_param, int indxGRU, int indxHRU) {
int err;
err = 0;
Write_HRU_Param(self->state.handle_ncid, &indxGRU, &indxHRU, &err);
if (err != 0) {
aout(self) << "ERROR: Write_HRU_PARAM -- For HRU = " << indxHRU << "\n";
}
},
[=](access_forcing, int currentFile, caf::actor refToRespondTo) {
// aout(self) << "Received Current FIle = " << currentFile << std::endl;
if (currentFile <= self->state.numFiles) {
if(self->state.forcFileList[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1
// aout(self) << "ForcingFile Already Loaded \n";
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
} else {
self->state.readStart = std::chrono::high_resolution_clock::now();
// Load the file
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
}
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
// Check if we have loaded all forcing files
if(self->state.filesLoaded <= self->state.numFiles) {
self->send(self, access_forcing_internal_v, currentFile + 1);
}
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
}
} else {
aout(self) << currentFile << " is larger than expected" << std::endl;
}
},
[=](access_forcing_internal, int currentFile) {
if (self->state.filesLoaded <= self->state.numFiles &&
currentFile <= self->state.numFiles) {
// aout(self) << "Loading in background, File:" << currentFile << "\n";
if (self->state.forcFileList[currentFile - 1].isFileLoaded()) {
aout(self) << "File Loaded when shouldn't be \n";
}
self->state.readStart = std::chrono::high_resolution_clock::now();
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
}
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
self->send(self, access_forcing_internal_v, currentFile + 1);
} else {
aout(self) << "All Forcing Files Loaded \n";
}
},
[=](write_output, int indxGRU, int indxHRU, int numStepsToWrite,
caf::actor refToRespondTo) {
int err;
int returnMessage = 9999;
err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite, returnMessage, refToRespondTo);
if (err != 0) {
aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n";
}
},
[=](read_and_write, int indxGRU, int indxHRU, int numStepsToWrite, int currentFile,
caf::actor refToRespondTo) {
int err;
err = readForcing(self, currentFile);
if (err != 0)
aout(self) << "\nERROR: FILE_ACCESS_ACTOR - READING_FORCING FAILED\n";
err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite, currentFile, refToRespondTo);
if (err != 0)
aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n";
},
[=](run_failure, int indxGRU) {
int listIndex;
// update the list in Fortran
updateFailed(&indxGRU);
listIndex = self->state.output_manager->decrementMaxSize(indxGRU);
// Check if this list is now full
if(self->state.output_manager->isFull(listIndex)) {
write(self, listIndex);
}
},
/**
* Message from JobActor
* OutputManager needs to be adjusted so the failed HRUs can run again
*/
[=](restart_failures) {
resetFailedArray();
self->state.output_manager->restartFailures();
},
[=](deallocate_structures) {
aout(self) << "Deallocating Structure" << std::endl;
FileAccessActor_DeallocateStructures(self->state.handle_forcFileInfo, self->state.handle_ncid);
self->state.readDuration = self->state.readDuration / 1000; // Convert to milliseconds
self->state.readDuration = self->state.readDuration / 1000; // Convert to seconds
self->state.writeDuration = self->state.writeDuration / 1000; // Convert to milliseconds
self->state.writeDuration = self->state.writeDuration / 1000; // Convert to milliseconds
self->send(self->state.parent, file_access_actor_done_v, self->state.readDuration,
self->state.writeDuration);
self->quit();
},
[=](reset_outputCounter, int indxGRU) {
resetOutputCounter(&indxGRU);
}
};
}
void initalizeFileAccessActor(stateful_actor<file_access_state>* self) {
int indx = 1;
int err = 0;
// aout(self) << "Set Up the forcing file" << std::endl;
ffile_info_C(&indx, self->state.handle_forcFileInfo, &self->state.numFiles, &err);
if (err != 0) {
aout(self) << "Error: ffile_info_C - File_Access_Actor \n";
std::string function = "ffile_info_C";
self->send(self->state.parent, file_access_actor_err_v, function);
self->quit();
return;
}
mDecisions_C(&self->state.num_steps, &err);
if (err != 0) {
aout(self) << "Error: mDecisions - FileAccess Actor \n";
std::string function = "mDecisions_C";
self->send(self->state.parent, file_access_actor_err_v, function);
self->quit();
return;
}
read_pinit_C(&err);
if (err != 0) {
aout(self) << "ERROR: read_pinit_C\n";
std::string function = "read_pinit_C";
self->send(self->state.parent, file_access_actor_err_v, function);
self->quit();
return;
}
read_vegitationTables(&err);
if (err != 0) {
aout(self) << "ERROR: read_vegitationTables\n";
std::string function = "read_vegitationTables";
self->send(self->state.parent, file_access_actor_err_v, function);
self->quit();
return;
}
initFailedHRUTracker(&self->state.numGRU);
Create_Output_File(self->state.handle_ncid, &self->state.numGRU, &self->state.startGRU, &err);
if (err != 0) {
aout(self) << "ERROR: Create_OutputFile\n";
std::string function = "Create_Output_File";
self->send(self->state.parent, file_access_actor_err_v, function);
self->quit();
return;
}
// Initalize the output Structure
aout(self) << "Initalizing Output Structure" << std::endl;
Init_OutputStruct(self->state.handle_forcFileInfo, &self->state.outputStrucSize,
&self->state.numGRU, &self->state.err);
// Initalize the output manager
self->state.output_manager = new OutputManager(self->state.num_vectors_in_output_manager, self->state.numGRU);
self->send(self->state.parent, done_file_access_actor_init_v);
// initalize the forcingFile array
self->state.filesLoaded = 0;
for (int i = 1; i <= self->state.numFiles; i++) {
self->state.forcFileList.push_back(forcingFile(i));
}
}
int write(stateful_actor<file_access_state>* self, int listIndex) {
int err = 0;
int minGRU = self->state.output_manager->getMinIndex(listIndex);
int maxGRU = self->state.output_manager->getMaxIndex(listIndex);
int numStepsToWrite = self->state.output_manager->getNumStepsToWrite(listIndex);
FileAccessActor_WriteOutput(self->state.handle_ncid,
&numStepsToWrite, &minGRU,
&maxGRU, &err);
// Pop The actors and send them the correct continue message
while(!self->state.output_manager->isEmpty(listIndex)) {
std::tuple<caf::actor, int> actor = self->state.output_manager->popActor(listIndex);
if (get<1>(actor) == 9999) {
self->send(get<0>(actor), done_write_v);
} else {
self->send(get<0>(actor), run_hru_v,
self->state.forcFileList[get<1>(actor) - 1].getNumSteps());
}
}
return 0;
}
int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU,
int numStepsToWrite, int returnMessage, caf::actor actorRef) {
self->state.writeStart = std::chrono::high_resolution_clock::now();
if (debug) {
aout(self) << "Recieved Write Request From GRU: " << indxGRU << "\n";
}
int err = 0;
int listIndex = self->state.output_manager->addActor(actorRef, indxGRU, returnMessage, numStepsToWrite);
if (self->state.output_manager->isFull(listIndex)) {
if (debug) {
aout(self) << "List with Index " << listIndex << " is full and ready to write\n";
aout(self) << "Minimum GRU Index = " << self->state.output_manager->getMinIndex(listIndex) << "\n";
aout(self) << "Maximum GRU Index = " << self->state.output_manager->getMaxIndex(listIndex) << "\n";
}
err = write(self, listIndex);
} else {
if (debug) {
aout(self) << "List with Index " << listIndex << " is not full yet waiting to write\n";
aout(self) << "Size of list is " << self->state.output_manager->getSize(listIndex) << "\n";
}
}
self->state.writeEnd = std::chrono::high_resolution_clock::now();
self->state.writeDuration += calculateTime(self->state.writeStart, self->state.writeEnd);
return err;
}
int readForcing(stateful_actor<file_access_state>* self, int currentFile) {
// Check if we have already loaded this file
if(self->state.forcFileList[currentFile -1].isFileLoaded()) {
if (debug)
aout(self) << "ForcingFile Already Loaded \n";
return 0;
} else {
// File Needs to be loaded
self->state.readStart = std::chrono::high_resolution_clock::now();
// Load the file
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
if (debug)
aout(self) << "ERROR: FileAccessActor_ReadForcing\n" <<
"currentFile = " << currentFile << "\n" << "number of steps = "
<< self->state.stepsInCurrentFile << "\n";
return -1;
} else {
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
return 0;
}
}
}
int parseSettings(stateful_actor<file_access_state>* self, std::string configPath) {
json settings;
std::string SummaActorsSettigs = "/Summa_Actors_Settings.json";
std::ifstream settings_file(configPath + SummaActorsSettigs);
settings_file >> settings;
settings_file.close();
if (settings.find("FileAccessActor") != settings.end()) {
json FileAccessActorConfig = settings["FileAccessActor"];
// Find the File Manager Path
if (FileAccessActorConfig.find("num_vectors_in_output_manager") != FileAccessActorConfig.end()) {
self->state.num_vectors_in_output_manager = FileAccessActorConfig["num_vectors_in_output_manager"];
} else {
aout(self) << "Error Finding FileManagerPath - Exiting as this is needed\n";
return -1;
}
return 0;
} else {
aout(self) << "Error Finding JobActor in JSON file - Exiting as there is no path for the fileManger\n";
return -1;
}
}
#endif
\ No newline at end of file
#ifndef OutputManager_H_
#define OutputManager_H_
#include "caf/all.hpp"
#include <vector>
#include <algorithm>
/**
* @brief Basic Container class to hold actor references. This has a size component for checking when it is full.
*
*/
class ActorRefList {
private:
int numStepsToWrite; // We can save this value here so that we know how many steps to write
int currentSize;
unsigned int maxSize;
int minIndex = -1; // minimum index of the actor being stored on this list
int maxIndex = 0; // maximum index of the actor being stored on this list
std::vector<std::tuple<caf::actor, int>> list;
public:
// Constructor
ActorRefList(int maxSize){
this->currentSize = 0;
this->maxSize = maxSize;
}
// Deconstructor
~ActorRefList(){};
int getMaxIndex() {
return this->maxIndex;
}
int getMinIndex() {
return this->minIndex;
}
int getCurrentSize() {
return this->currentSize;
}
int getMaxSize() {
return this->maxSize;
}
int getNumStepsToWrite() {
return this->numStepsToWrite;
}
bool isFull() {
return list.size() == this->maxSize;
}
/**
* Adds An Actor and its return message as a tuple to this->list
* actor - the actor ref of the actor being added to this->list
* returnMessage - Either 9999 (place holder and specifies to send a done_write_v message) or
* this is the current forcingFileList index that allows the file_access actor to know the number
* of steps the HRU actor that needs to compute
*/
void addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) {
if (this->isFull()) {
throw "List is full, cannot add actor to this list";
}
if (index > this->maxIndex) {
this->maxIndex = index;
}
if (index < this->minIndex || this->minIndex < 0) {
this->minIndex = index;
}
this->numStepsToWrite = numStepsToWrite;
this->currentSize++;
list.push_back(std::make_tuple(actor, returnMessage));
}
/**
* Return a tuple of an actor and its returnMessage.
* The return message is 9999 or the index of the forcingFile it needs to acces
*/
std::tuple<caf::actor,int> popActor() {
if (list.empty()) {
throw "List is empty, nothing to pop";
}
auto actor = list.back();
list.pop_back();
this->currentSize--;
return actor;
}
bool isEmpty() {
return list.empty();
}
/**
* When an actor fails we need to decrement the count
* so that this list becomes full when there is a failure
*
* indexHRU - index of the HRU causing the error
*/
void decrementMaxSize() {
this->maxSize--;
}
/**
* Remove the failed HRU from the list
*
*/
void removeFailed(caf::actor actorRef) {
bool found = false;
for(std::vector<std::tuple<caf::actor, int>>::iterator it = this->list.begin(); it != this->list.end(); it++) {
if (std::get<0>(*it) == actorRef) {
found = true;
this->list.erase(it);
this->currentSize--; this->maxSize--;
break;
}
}
if (!found) {
throw "Element To Remove Not Found";
}
}
};
/**
* @brief Class that manages which structure actors are held on
*
*/
class OutputManager {
private:
int numVectors;
int avgSizeOfActorList;
bool runningFailures;
std::vector<ActorRefList*> list;
std::vector<int> failedHRU;
std::vector<int> failureReRun; // index used so we can add failedHRUs if they fail a second time
public:
// Constructor
OutputManager(int numVectors, int totalNumActors){
this->numVectors = numVectors;
int sizeOfOneVector = totalNumActors / numVectors;
this->avgSizeOfActorList = sizeOfOneVector;
this->runningFailures = false;
// Create the first n-1 vectors with the same size
for (int i = 0; i < numVectors - 1; i++) {
auto refList = new ActorRefList(sizeOfOneVector);
totalNumActors = totalNumActors - sizeOfOneVector;
list.push_back(refList);
}
// Create the last vector with size however many actors are left
auto refList = new ActorRefList(totalNumActors);
list.push_back(refList);
}
// Deconstructor
~OutputManager(){};
/**
* @brief Adds an actor to its respective list
*
* @param actor Actor reference
* @param index Actor Index
* @param returnMessage Forcing File index or 9999
* @return int The list index that actor is added to.
*/
int addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) {
int listIndex;
if (this->runningFailures) {
// find the index of the structure this HRU is in
auto it = find(this->failureReRun.begin(), this->failureReRun.end(), index);
if (it != this->failureReRun.end()) {
listIndex = it - this->failureReRun.begin();
} else {
throw "Element Not Found in failureReRun list";
}
this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite);
} else {
// Index has to be subtracted by 1 because Fortran array starts at 1
listIndex = (index - 1) / this->avgSizeOfActorList;
if (listIndex > this->numVectors - 1) {
listIndex = this->numVectors - 1;
}
this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite);
}
return listIndex;
}
/**
* Remove tuple from list[index]
*
*/
std::tuple<caf::actor,int> popActor(int index) {
if (index > this->numVectors - 1 || index < 0) {
throw "List Index Out Of Range";
} else if (this->list[index]->isEmpty()) {
throw "List is Empty, Nothing to pop";
}
return this->list[index]->popActor();
}
/** When a failure occurs an actor most likley will not already be on this list
* This method may and probably should not be used. Although needing to remove a
* specific element from a list may be needed.
* Remove the failed actor from the list
* Return the index of the list we removed the actor from
* This is so we can check if it is full
*/
int removeFailed(caf::actor actorRef, int index) {
// Find the list this actor is on
int listIndex = (index - 1) / this->avgSizeOfActorList;
if (listIndex > this->numVectors - 1) {
listIndex = this->numVectors - 1;
}
this->list[listIndex]->removeFailed(actorRef);
return listIndex;
}
/**
* Decrease the size of the list
* Add this GRU to the failed list
*/
int decrementMaxSize(int indexHRU) {
this->failedHRU.push_back(indexHRU);
// Find the list this actor is on
int listIndex = (indexHRU - 1) / this->avgSizeOfActorList;
if (listIndex > this->numVectors - 1) {
listIndex = this->numVectors - 1;
}
this->list[listIndex]->decrementMaxSize();
return listIndex;
}
void restartFailures() {
this->list.clear();
this->numVectors = this->failedHRU.size();
for (unsigned int i = 0; i < this->failedHRU.size(); i++) {
auto refList = new ActorRefList(1);
this->list.push_back(refList);
}
this->failureReRun = this->failedHRU;
this->failedHRU.clear();
this->runningFailures = true;
}
/**
* Get the number of steps to write from the correct listIndex
*/
int getNumStepsToWrite(int listIndex) {
return this->list[listIndex]->getNumStepsToWrite();
}
bool isFull(int listIndex) {
if (listIndex > this->numVectors - 1) {
throw "List Index Out Of Range";
}
return this->list[listIndex]->isFull();
}
bool isEmpty(int listIndex) {
return this->list[listIndex]->isEmpty();
}
int getSize(int listIndex) {
if (listIndex > this->numVectors - 1) {
throw "List Index Out Of Range";
}
return this->list[listIndex]->getCurrentSize();
}
int getMinIndex(int listIndex) {
return this->list[listIndex]->getMinIndex();
}
int getMaxIndex(int listIndex) {
return this->list[listIndex]->getMaxIndex();
}
void addFailed(int indxHRU) {
this->failedHRU.push_back(indxHRU);
}
};
#endif
\ No newline at end of file
#ifndef FORTRAN_DATATYPES_H_
#define FORTRAN_DATATYPES_H_
extern "C" {
// flagVec
void* new_handle_flagVec();
void delete_handle_flagVec(void* handle);
void set_data_flagVec(void* handle, const int* array, int size);
void get_size_data_flagVec(void* handle, int* size);
void get_data_flagVec(void* handle, int* array);
// var_i
void* new_handle_var_i();
void delete_handle_var_i(void* handle);
void set_data_var_i(void* handle, const int* array, int size);
void get_size_data_var_i(void* handle, int* size);
void get_data_var_i(void* handle, int* array);
// var_i8
void* new_handle_var_i8();
void delete_handle_var_i8(void* handle);
void set_data_var_i8(void* handle, const long int* array, int size);
void get_size_data_var_i8(void* handle, int* size);
void get_data_var_i8(void* handle, long int* array);
// var_d
void* new_handle_var_d();
void delete_handle_var_d(void* handle);
void set_data_var_d(void* handle, const double* array, int size);
void get_size_data_var_d(void* handle, int* size);
void get_data_var_d(void* handle, double* array);
// ilength
void* new_handle_ilength();
void delete_handle_ilength(void* handle);
void set_data_ilength(void* handle, const int* array, int size);
void get_size_data_ilength(void* handle, int* size);
void get_data_ilength(void* handle, int* array);
// i8length
void* new_handle_i8length();
void delete_handle_i8length(void* handle);
void set_data_i8length(void* handle, const long int* array, int size);
void get_size_data_i8length(void* handle, int* size);
void get_data_i8length(void* handle, long int* array);
// dlength
void* new_handle_dlength();
void delete_handle_dlength(void* handle);
void set_data_dlength(void* handle, const double* array, int size);
void get_size_data_dlength(void* handle, int* size);
void get_data_dlength(void* handle, double* array);
// var_flagVec
void* new_handle_var_flagVec();
void delete_handle_var_flagVec(void* handle);
void set_data_var_flagVec(void* handle, const int* array, int num_row, const int* num_col, int num_elements);
void get_size_var_flagVec(void* handle, int* num_var);
void get_size_data_var_flagVec(void* handle, int* num_var, int* num_dat);
void get_data_var_flagVec(void* handle, int* array);
// var_ilength
void* new_handle_var_ilength();
void delete_handle_var_ilength(void* handle);
void set_data_var_ilength(void* handle, const int* array, int num_row, const int* num_col, int num_elements);
void get_size_var_ilength(void* handle, int* num_var);
void get_size_data_var_ilength(void* handle, int* num_var, int* num_dat);
void get_data_var_ilength(void* handle, int* array);
// var_i8length
void* new_handle_var_i8length();
void delete_handle_var_i8length(void* handle);
void set_data_var_i8length(void* handle, const long int* array, int num_row, const int* num_col, int num_elements);
void get_size_var_i8length(void* handle, int* num_var);
void get_size_data_var_i8length(void* handle, int* num_var, int* num_dat);
void get_data_var_i8length(void* handle, long int* array);
// var_dlength
void* new_handle_var_dlength();
void delete_handle_var_dlength(void* handle);
void set_data_var_dlength(void* handle, const double* array, int num_row, const int* num_col, int num_elements);
void get_size_var_dlength(void* handle, int* num_var);
void get_size_data_var_dlength(void* handle, int* num_var, int* num_dat);
void get_data_var_dlength(void* handle, double* array);
// var_dlength_array
void* new_handle_dlength_array();
void delete_handle_dlength_array(void* handle);
// var_info
void* new_handle_var_info();
void delete_handle_var_info(void* handle);
void set_data_var_info(void* handle, char const *str1, char const *str2, char const *str3,
int type, const int* ncid, int ncid_size, const int* index, int index_size, int flag);
// file_info
void* new_handle_file_info();
void delete_handle_file_info(void* handle);
}
#endif
\ No newline at end of file
#ifndef COMMONFUNCTIONS_H_
#define COMMONFUNCTIONS_H_
#include <chrono>
// Gobal Flag for Debuging only main function will change this
bool debug;
/**
* Return the time between to time points
*/
double calculateTime(std::chrono::time_point<std::chrono::system_clock> start,
std::chrono::time_point<std::chrono::system_clock> end);
double calculateTime(std::chrono::time_point<std::chrono::system_clock> start,
std::chrono::time_point<std::chrono::system_clock> end) {
return std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
}
#endif
\ No newline at end of file
#ifndef MESSAGEATOMS_H_
#define MESSAGEATOMS_H_
CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id)
// Summa Actor
CAF_ADD_ATOM(summa, start_summa)
CAF_ADD_ATOM(summa, done_job)
CAF_ADD_ATOM(summa, err)
// Job Actor
CAF_ADD_ATOM(summa, done_reading_forcingFile)
CAF_ADD_ATOM(summa, done_reading_first_forcing_file)
CAF_ADD_ATOM(summa, init_hru)
CAF_ADD_ATOM(summa, done_init_hru)
CAF_ADD_ATOM(summa, done_write)
CAF_ADD_ATOM(summa, doneFile)
CAF_ADD_ATOM(summa, done_hru)
CAF_ADD_ATOM(summa, done_final_write)
CAF_ADD_ATOM(summa, run_failure)
CAF_ADD_ATOM(summa, done_file_access_actor_init)
CAF_ADD_ATOM(summa, file_access_actor_done)
CAF_ADD_ATOM(summa, file_access_actor_err)
// FileAccess Actor
CAF_ADD_ATOM(summa, initalize_outputStructure)
CAF_ADD_ATOM(summa, access_forcing)
CAF_ADD_ATOM(summa, access_first_forcing_file)
CAF_ADD_ATOM(summa, access_forcing_internal)
CAF_ADD_ATOM(summa, write_output)
CAF_ADD_ATOM(summa, write_output_final)
CAF_ADD_ATOM(summa, deallocate_structures)
CAF_ADD_ATOM(summa, update_completed)
CAF_ADD_ATOM(summa, update_failed)
CAF_ADD_ATOM(summa, reset_outputCounter)
CAF_ADD_ATOM(summa, read_and_write)
CAF_ADD_ATOM(summa, write_param)
CAF_ADD_ATOM(summa, restart_failures)
// HRU Actor
CAF_ADD_ATOM(summa, run_hru)
CAF_ADD_ATOM(summa, start_hru)
CAF_ADD_ATOM(summa, file_information)
CAF_ADD_ATOM(summa, dt_init_factor)
CAF_END_TYPE_ID_BLOCK(summa)
#endif
\ No newline at end of file
#ifndef HRU_H_
#define HRU_H_
#include "caf/all.hpp"
#include "hru_subroutine_wrappers.h"
#include "../global/fortran_dataTypes.h"
#include "../global/messageAtoms.h"
#include "../global/json.hpp"
#include "../global/global.h"
#include <fstream>
#include <string>
#include <typeinfo>
#include <stdio.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <chrono>
#include <iostream>
using namespace caf;
struct hru_state {
// Actor References
caf::actor file_access_actor;
caf::actor parent;
// Info about which HRU we are and our indexes
// into global structures in Fortran
int indxHRU; // index for hru part of derived types in FORTRAN
int indxGRU; // index for gru part of derived types in FORTRAN
int refGRU; // The actual ID of the GRU we are
// Variables for output/forcing structures
int outputStrucSize;
int outputStep;
int stepsInCurrentFFile;
int forcingFileStep;
int currentForcingFile = 1;
// statistics structures
void *handle_forcStat = new_handle_var_dlength(); // model forcing data
void *handle_progStat = new_handle_var_dlength(); // model prognostic (state) variables
void *handle_diagStat = new_handle_var_dlength(); // model diagnostic variables
void *handle_fluxStat = new_handle_var_dlength(); // model fluxes
void *handle_indxStat = new_handle_var_dlength(); // model indices
void *handle_bvarStat = new_handle_var_dlength(); // basin-average variables
// primary data structures (scalars)
void *handle_timeStruct = new_handle_var_i(); // model time data
void *handle_forcStruct = new_handle_var_d(); // model forcing data
void *handle_attrStruct = new_handle_var_d(); // local attributes for each HRU
void *handle_typeStruct = new_handle_var_i(); // local classification of soil veg etc. for each HRU
void *handle_idStruct = new_handle_var_i8();
// primary data structures (variable length vectors)
void *handle_indxStruct = new_handle_var_ilength(); // model indices
void *handle_mparStruct = new_handle_var_dlength(); // model parameters
void *handle_progStruct = new_handle_var_dlength(); // model prognostic (state) variables
void *handle_diagStruct = new_handle_var_dlength(); // model diagnostic variables
void *handle_fluxStruct = new_handle_var_dlength(); // model fluxes
// basin-average structures
void *handle_bparStruct = new_handle_var_d(); // basin-average parameters
void *handle_bvarStruct = new_handle_var_dlength(); // basin-average variables
// ancillary data structures
void *handle_dparStruct = new_handle_var_d(); // default model parameters
// Local hru data
void *handle_ncid = new_handle_var_i(); // output file ids
void *handle_statCounter = new_handle_var_i();
void *handle_outputTimeStep = new_handle_var_i();
void *handle_resetStats = new_handle_flagVec();
void *handle_finalizeStats = new_handle_flagVec();
void *handle_oldTime = new_handle_var_i();
void *handle_refTime = new_handle_var_i();
void *handle_finshTime = new_handle_var_i();
void *handle_startTime = new_handle_var_i();
// Misc Variables
int timestep = 1; // Current Timestep of HRU simulation
int computeVegFlux; // flag to indicate if we are computing fluxes over vegetation
double dt_init; // used to initialize the length of the sub-step for each HRU
double upArea; // area upslope of each HRU
int num_steps = 0; // number of time steps
int forcingStep; // index of current time step in current forcing file
int iFile; // index of current forcing file from forcing file list
int dt_init_factor = 1; // factor of dt_init (coupled_em)
bool printOutput;
int outputFrequency;
// Julian Day variables
double fracJulDay;
double tmZoneOffsetFracDay;
int yearLength;
int err = 0; // error conotrol
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration = 0.0;
std::chrono::time_point<std::chrono::system_clock> initStart;
std::chrono::time_point<std::chrono::system_clock> initEnd;
double initDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> forcingStart;
std::chrono::time_point<std::chrono::system_clock> forcingEnd;
double forcingDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> runPhysicsStart;
std::chrono::time_point<std::chrono::system_clock> runPhysicsEnd;
double runPhysicsDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> writeOutputStart;
std::chrono::time_point<std::chrono::system_clock> writeOutputEnd;
double writeOutputDuration = 0.0;
};
/**
* @brief Get the settings from the settings JSON file
*
* @param self Actor State
* @param configPath Path to the directory that contains the settings file
*/
void parseSettings(stateful_actor<hru_state>* self, std::string configPath);
/**
Function to initalize the HRU for running
*/
void Initialize_HRU(stateful_actor<hru_state>* self);
/**
Function runs all of the hru time_steps
*/
int Run_HRU(stateful_actor<hru_state>* self);
bool check_HRU(stateful_actor<hru_state>* self, int err);
void initalizeTimeVars(stateful_actor<hru_state>* self);
void finalizeTimeVars(stateful_actor<hru_state>* self);
void deallocateHRUStructures(stateful_actor<hru_state>* self);
void printOutput(stateful_actor<hru_state>* self);
#endif
\ No newline at end of file
This diff is collapsed.
#ifndef HRU_SUBROUTINE_WRAPPERS_H_
#define HRU_SUBROUTINE_WRAPPERS_H_
extern "C" {
// Initialize HRU data_structures
void Initialize(
int* indxGRU, int* num_steps,
// Statistics Structures
void* forcStat, void* progStat, void* diagStat, void* fluxStat, void* indxStat, void* bvarStat,
// Primary Data Structures (scalars)
void* timeStruct, void* forcStruct, void* attrStruct, void* typeStruct, void* idStruct,
// primary data structures (variable length vectors)
void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct,
// basin-average structures
void* bparStruct, void* bvarStruct,
// ancillary data structures
void* dparStruct,
// local HRU data
void* startTime, void* finshTime, void* refTime, void* oldTime, int* err);
// SetupParam for HRU
void SetupParam(
int* indxGRU, int* indxHRU,
// primary data structures (scalars)
void* attrStruct, void* typeStruct, void* idStruct,
// primary data structures (variable length vectors)
void* mparStruct, void* bparStruct, void* bvarStruct, void* dparStruct,
// local HRU data
void* startTime, void* oldTime,
// miscellaneous
double* upArea, int* err);
// Setup Restart File if this option has been chosen
void Restart(
int* indxGRU, int* indxHRU,
// primary data structures (variable length vectors)
void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct,
// basin-average structures
void* bvarStruct,
// misc
double* dtInit, int* err);
// Read Forcing for HRU
void Forcing(
int* indxGRU, int* stepIndex,
void* timeStruct, void* forcStruct,
int* iFile, int* forcingStep,
double* fracJulDay, double* tmZoneOffsetFracDay, int* yearLength,
int* err);
// Run the model for one timestep
void RunPhysics(
int* id, int* stepIndex,
// primary data structures (scalars)
void* timeStruct, void* forcStruct, void* attrStruct, void* typeStruct,
// primary data structures (variable length vectors)
void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct,
// basin-average structures
void* bvarStruct,
double* fracJulDay, double* tmZoneOffsetFracDay, int* yearLength,
// misc
int* flag, double* dt, int* dt_int_factor, int* err);
// Write output to the output structure
void WriteOutput(
int* indHRU, int* indxGRU, int* indexStep,
// statistics structures
void* forcStat, void* progStat, void* diagStat, void* fluxStat, void* indxStat, void* bvarStat,
// primary data structures (scalars)
void* timeStruct, void* forcStruct, void* attrStruct, void* typeStruct,
// primary data structures (variable length vectors)
void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct,
// basin-average structures
void* bparStruct, void* bvarStruct,
// local vars
void* statCounter, void* outputTimeStep, void* resetStats, void* finalizeStats,
void* finshTime, void* oldTime, int* outputStep, int* err);
void DeallocateStructures(
void* handle_forcStat, void* handle_progStat, void* handle_diagStat, void* handle_fluxStat,
void* handle_indxStat, void* handle_bvarStat, void* handle_timeStruct, void* handle_forcStruct,
void* handle_attrStruct, void* handle_typeStruct, void* handle_idStruct, void* handle_indxStruct,
void* handle_mparStruct, void* handle_progStruct, void* handle_diagStruct, void* handle_fluxStruct,
void* handle_bparStruct, void* handle_bvarStruct, void* handle_dparStruct,
void* handle_startTime, void* handle_finishTime,
void* handle_refTime, void* handle_oldTime,
void* handle_ncid,
void* handle_statCounter,
void* handle_outputTimeStep,
void* handle_resetStats,
void* handle_finalizeStats,
int* err);
void Write_Param_C(
int* indxGRU, int* indxHRU,
void* handle_attrStruct, void* handle_typeStruct, void* handle_mparStruct, void* handle_bparStruct,
int* err);
}
# endif
#ifndef SUMMACLIENT_H_
#define SUMMACLIENT_H_
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "../file_access_actor/FileAccessActor.h"
#include "../hru_actor/HRUActor.h"
#include "../global/messageAtoms.h"
#include "../global/global.h"
#include "GRUinfo.h"
#include "job_subroutine_wrappers.h"
#include "string.h"
#include <unistd.h>
#include <vector>
#include <chrono>
#include <iostream>
#include <fstream>
#include <sys/stat.h>
struct job_state {
// Actor References
caf::actor file_access_actor; // actor reference for the file_access_actor
caf::actor parent; // actor reference to the top-level SummaActor
// Job Parameters
int startGRU; // Starting GRU for this job
int numGRU; // Number of GRUs for this job
std::string configPath;
std::string fileManager; // Path of the fileManager.txt file
// Variables for GRU monitoring
int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
int maxRunAttempts = 3; // Max number of attemtps to solve a GRU
std::vector<GRUinfo*> GRUList; // List of all GRUs under this job actor
int numGRUDone = 0; // The number of GRUs that have completed
int GRUInit = 0; // Number of GRUs initalized
int err = 0; // Error Code
int numGRUFailed = 0; // Number of GRUs that have failed
int outputStrucSize;
// Timing Variables
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration;
// Output File Names for Timings
bool outputCSV;
std::string csvOut;
std::string csvPath;
std::string successOutputFile;
std::string failedOutputFile = "failedHRU";
std::string fileAccessActorStats = "fileAccessActor.csv";
};
int parseSettings(stateful_actor<job_state>* self, std::string configPath);
void initJob(stateful_actor<job_state>* self);
void initalizeGRU(stateful_actor<job_state>* self);
void runGRUs(stateful_actor<job_state>* self);
void restartFailures(stateful_actor<job_state>* self);
#endif
\ No newline at end of file
#ifndef JOB_SUBROUTINE_WRAPPERS_H_
#define JOB_SUBROUTINE_WRAPPERS_H_
extern "C" {
void initGlobals(char const*str1, int* totalGRUs, int* totalHRUs,
int* numGRUs, int* numHRUs, int* startGRUIndex, int* err);
void cleanUpJobActor(int* err);
}
#endif
\ No newline at end of file
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "summa_actor.hpp"
#include "summa_client.hpp"
#include "summa_server.hpp"
#include "global.hpp"
#include "message_atoms.hpp"
#include <string>
......@@ -37,41 +39,41 @@ class config : public actor_system_config {
}
};
// void run_client(actor_system& system, const config& cfg) {
// scoped_actor self{system};
// if (cfg.distributed) {
// aout(self) << "Starting SUMMA-Client in Distributed Mode\n";
// auto c = system.spawn(summa_client);
// if (!cfg.host.empty() && cfg.port > 0) {
// anon_send(c, connect_atom_v, cfg.host, cfg.port);
// } else {
// aout(self) << "No Server Config" << std::endl;
// }
void run_client(actor_system& system, const config& cfg) {
scoped_actor self{system};
if (cfg.distributed) {
aout(self) << "Starting SUMMA-Client in Distributed Mode\n";
auto c = system.spawn(summa_client);
if (!cfg.host.empty() && cfg.port > 0) {
anon_send(c, connect_atom_v, cfg.host, cfg.port);
} else {
aout(self) << "No Server Config" << std::endl;
}
// } else {
// aout(self) << "Starting SUMMA in non-distributed mode \n";
// auto summa = system.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath);
// }
} else {
aout(self) << "Starting SUMMA in non-distributed mode \n";
auto summa = system.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath);
}
// }
}
// void run_server(actor_system& system, const config& cfg) {
// scoped_actor self{system};
// auto server = system.spawn(summa_server);
// aout(self) << "SEVER" << std::endl;
// aout(self) << "Attempting to publish actor" << cfg.port << std::endl;
// auto is_port = io::publish(server, cfg.port);
// if (!is_port) {
// std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << std::endl;
// return;
// }
// aout(self) << "Successfully Published" << *is_port << std::endl;
// std::string dummy;
// std::getline(std::cin, dummy);
// std::cout << "...cya" << std::endl;
// anon_send_exit(server, exit_reason::user_shutdown);
// }
void run_server(actor_system& system, const config& cfg) {
scoped_actor self{system};
auto server = system.spawn(summa_server);
aout(self) << "SEVER" << std::endl;
aout(self) << "Attempting to publish actor" << cfg.port << std::endl;
auto is_port = io::publish(server, cfg.port);
if (!is_port) {
std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << std::endl;
return;
}
aout(self) << "Successfully Published" << *is_port << std::endl;
std::string dummy;
std::getline(std::cin, dummy);
std::cout << "...cya" << std::endl;
anon_send_exit(server, exit_reason::user_shutdown);
}
void caf_main(actor_system& sys, const config& cfg) {
scoped_actor self{sys};
......@@ -97,10 +99,18 @@ void caf_main(actor_system& sys, const config& cfg) {
aout(self) << "Starting SUMMA-Actors in DebugMode\n";
bool debug = true;
}
// Start the Actors
if (cfg.distributed) {
aout(self) << "Starting SUMMA-Actors in Distributed Mode \n";
auto system = cfg.server_mode ? run_server : run_client;
system(sys, cfg);
} else {
auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath);
}
// start SUMMA
// auto system = cfg.server_mode ? run_server : run_client;
// system(sys, cfg);
auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath);
}
CAF_MAIN(id_block::summa, io::middleman)
\ No newline at end of file
#ifndef SUMMAACTOR_H_
#define SUMMAACTOR_H_
#include "SummaManager.h"
using namespace caf;
using json = nlohmann::json;
/**
* Top Level Actor for Summa. This actor recieves the number of GRUs to compute from main and
* divides them into jobs that compute one at a time.
*
* @param startGRU - starting GRU for the simulation
* @param numGRU - total number of GRUs to compute
* @param configPath - location of file information for SUMMA
* @return behavior
*/
behavior summa_actor(stateful_actor<summa_manager>* self, int startGRU, int numGRU, std::string configPath) {
self->state.start = std::chrono::high_resolution_clock::now();
// Set Variables
self->state.startGRU = startGRU;
self->state.numGRU = numGRU;
self->state.configPath = configPath;
parseSettings(self, configPath);
aout(self) << "SETTINGS FOR SUMMA_ACTOR\n";
aout(self) << "Output Structure Size = " << self->state.outputStrucSize << "\n";
aout(self) << "Max GRUs Per Job = " << self->state.maxGRUPerJob << "\n";
// Create the job_actor and start SUMMA
spawnJob(self);
return {
[=](done_job, int numFailed) {
self->state.numFailed += numFailed;
aout(self) << "Job Done\n";
if (self->state.numGRU <= 0) {
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration = calculateTime(self->state.start, self->state.end);
self->state.duration = self->state.duration / 1000; // Convert to milliseconds
aout(self) << "Total Program Duration:\n";
aout(self) << " " << self->state.duration / 1000 << " Seconds\n";
aout(self) << " " << (self->state.duration / 1000) / 60 << " Minutes\n";
aout(self) << " " << ((self->state.duration / 1000) / 60) / 60 << " Hours\n";
aout(self) << "Program Finished \n";
} else {
// spawn a new job
spawnJob(self);
}
},
[=](err) {
aout(self) << "Unrecoverable Error: Attempting To Fail Gracefully\n";
self->quit();
}
};
}
void spawnJob(stateful_actor<summa_manager>* self) {
// Ensure we do not start a job with too many GRUs
if (self->state.numGRU > self->state.maxGRUPerJob) {
// spawn the job actor
aout(self) << "\n Starting Job with startGRU = " << self->state.startGRU << "\n";
self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.maxGRUPerJob,
self->state.configPath, self->state.outputStrucSize, self);
// Update GRU count
self->state.numGRU = self->state.numGRU - self->state.maxGRUPerJob;
self->state.startGRU = self->state.startGRU + self->state.maxGRUPerJob;
} else {
self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.numGRU,
self->state.configPath, self->state.outputStrucSize, self);
self->state.numGRU = 0;
}
}
void parseSettings(stateful_actor<summa_manager>* self, std::string configPath) {
json settings;
std::string SummaActorsSettings = "/Summa_Actors_Settings.json";
std::ifstream settings_file(configPath + SummaActorsSettings);
settings_file >> settings;
settings_file.close();
if (settings.find("SummaActor") != settings.end()) {
json SummaActorConfig = settings["SummaActor"];
// Find the desired OutputStrucSize
if (SummaActorConfig.find("OuputStructureSize") != SummaActorConfig.end()) {
self->state.outputStrucSize = SummaActorConfig["OuputStructureSize"];
} else {
aout(self) << "Error Finding OutputStructureSize in JOSN - Reverting to default value\n";
self->state.outputStrucSize = 250;
}
// Find the desired maxGRUPerJob size
if (SummaActorConfig.find("maxGRUPerJob") != SummaActorConfig.end()) {
self->state.maxGRUPerJob = SummaActorConfig["maxGRUPerJob"];
} else {
aout(self) << "Error Finding maxGRUPerJob in JOSN - Reverting to default value\n";
self->state.maxGRUPerJob = 500;
}
} else {
aout(self) << "Error Finding SummaActor in JSON - Reverting to default values\n";
self->state.outputStrucSize = 250;
self->state.maxGRUPerJob = 500;
}
}
#endif
\ No newline at end of file
#ifndef SUMMAMANGER_H_
#define SUMMAMANGER_H_
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "../job_actor/JobActor.h"
#include "../global/json.hpp"
#include "../global/global.h"
#include <iostream>
#include <chrono>
#include <string>
#include <fstream>
behavior summa_actor(stateful_actor<summa_manager>* self, int startGRU, int numGRU, std::string configPath);
void spawnJob(stateful_actor<summa_manager>* self);
void parseSettings(stateful_actor<summa_manager>* self, std::string configPath);
#endif
\ No newline at end of file
#ifndef SUMMACLIENT_H_
#define SUMMACLIENT_H_
#include "caf/all.hpp"
#include "caf/io/all.hpp"
using namespace caf;
struct summa_client_state {
strong_actor_ptr current_server;
};
#include "summa_client.hpp"
behavior unconnected(stateful_actor<summa_client_state>*);
void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port);
behavior running(stateful_actor<summa_client_state>*, const actor& say_hello);
namespace caf {
/**
* @brief Set up the client and its down handler
*/
behavior summa_client(stateful_actor<summa_client_state>* self) {
self->set_down_handler([=](const down_msg& dm){
if(dm.source == self->state.current_server) {
......@@ -81,10 +67,4 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a
}
};
}
#endif
\ No newline at end of file
}
\ No newline at end of file
#ifndef SUMMASERVER_H_
#define SUMMASERVER_H_
#include "caf/all.hpp"
#include "caf/io/all.hpp"
using namespace caf;
struct summa_server_state {
};
#include "summa_server.hpp"
namespace caf {
behavior summa_server(stateful_actor<summa_server_state>* self) {
aout(self) << "Summa Server has Started \n";
......@@ -21,11 +15,5 @@ behavior summa_server(stateful_actor<summa_server_state>* self) {
}
};
}
#endif
\ No newline at end of file
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment