diff --git a/build/compile_summa.sh b/build/compile_summa.sh index 176d05b41eebd1c7ca840ed74b1a3b0c130ff049..e8535b0e3cafc6ad45a09b34f390b76dfc61eaee 100644 --- a/build/compile_summa.sh +++ b/build/compile_summa.sh @@ -7,7 +7,7 @@ module load openblas module load caf #### Specifiy Master Directory, parent of build directory -export F_MASTER=/globalhome/kck540/HPC/SummaProjects/Summa-Actors +export ROOT_DIR=/globalhome/kck540/HPC/SummaProjects/Summa-Actors #### Specifiy Compilers #### export FC=gfortran @@ -28,11 +28,11 @@ export ACTORS_LIBRARIES="-L$EBROOTCAF/lib\ -L$EBROOTCAF/lib64\ -L$EBROOTNETCDFMINFORTRAN/lib64\ -L$EBROOTOPENBLAS/lib\ - -L$F_MASTER/bin\ + -L$ROOT_DIR/bin\ -lcaf_core -lcaf_io -lsumma -lopenblas -lnetcdff" #### Compile with the Makefile #### -make -f ${F_MASTER}/build/makefile +make -f ${ROOT_DIR}/build/makefile all -export LD_LIBRARY_PATH=${F_MASTER}/bin \ No newline at end of file +export LD_LIBRARY_PATH=${ROOT_DIR}/bin \ No newline at end of file diff --git a/build/includes/file_access_actor/file_access_actor.hpp b/build/includes/file_access_actor/file_access_actor.hpp new file mode 100644 index 0000000000000000000000000000000000000000..197d167d4950fadb8bc25e0fbdaa978a57aff4d9 --- /dev/null +++ b/build/includes/file_access_actor/file_access_actor.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "caf/all.hpp" +#include "output_manager.hpp" +#include "forcing_file_info.hpp" + +namespace caf { +struct file_access_state { + // Variables set on Spwan + caf::actor parent; + int startGRU; + int numGRU; + + + void *handle_forcing_file_info; // Handle for the forcing file information + void *handle_ncid; // output file ids + OutputManager *output_manager; + int num_vectors_in_output_manager; + int num_steps; + int outputStrucSize; + int stepsInCurrentFile; + int numFiles; + int filesLoaded; + int err; + + std::vector<Forcing_File_Info> forcing_file_list; // list of steps in file + std::vector<bool> outputFileInitHRU; + + std::chrono::time_point<std::chrono::system_clock> readStart; + std::chrono::time_point<std::chrono::system_clock> readEnd; + double readDuration = 0.0; + + std::chrono::time_point<std::chrono::system_clock> writeStart; + std::chrono::time_point<std::chrono::system_clock> writeEnd; + double writeDuration = 0.0; +}; + +behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU, + int outputStrucSize, std::string configPath, actor parent); +void initalizeFileAccessActor(stateful_actor<file_access_state>* self); +int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, int numStepsToWrite, int returnMessage, caf::actor actorRef); +int readForcing(stateful_actor<file_access_state>* self, int currentFile); +int write(stateful_actor<file_access_state>* self, int listIndex); +int parseSettings(stateful_actor<file_access_state>* self, std::string configPath); + +} // end namespace \ No newline at end of file diff --git a/build/source/actors/file_access_actor/fileAccess_subroutine_wrappers.h b/build/includes/file_access_actor/file_access_actor_subroutine_wrappers.hpp similarity index 91% rename from build/source/actors/file_access_actor/fileAccess_subroutine_wrappers.h rename to build/includes/file_access_actor/file_access_actor_subroutine_wrappers.hpp index 795afc439ccd5fa47ff58cd5108a673e07af041e..31b04753a566b40a9bd1ef61c7df4a0996e5771f 100644 --- a/build/source/actors/file_access_actor/fileAccess_subroutine_wrappers.h +++ b/build/includes/file_access_actor/file_access_actor_subroutine_wrappers.hpp @@ -1,9 +1,7 @@ -#ifndef fileAccess_SUBROUTINE_WRAPPERS_H_ -#define fileAccess_SUBROUTINE_WRAPPERS_H_ +#pragma once extern "C" { - void read_pinit_C(int* err); void read_vegitationTables(int* err); @@ -34,6 +32,3 @@ extern "C" { void Write_HRU_Param(void* handle_ncid, int* indxGRU, int* indxHRU, int* err); } - - -#endif \ No newline at end of file diff --git a/build/includes/file_access_actor/forcing_file_info.hpp b/build/includes/file_access_actor/forcing_file_info.hpp new file mode 100644 index 0000000000000000000000000000000000000000..14945aabb5352fa0dad6ec0b9da49f00d0e2910a --- /dev/null +++ b/build/includes/file_access_actor/forcing_file_info.hpp @@ -0,0 +1,20 @@ +#pragma once + +class Forcing_File_Info { + private: + int file_ID; + int num_steps; + bool is_loaded; + + public: + Forcing_File_Info(int file_ID); + + int getNumSteps(); + + bool isFileLoaded(); + + void updateIsLoaded(); + + void updateNumSteps(int num_steps); + +}; \ No newline at end of file diff --git a/build/includes/file_access_actor/output_manager.hpp b/build/includes/file_access_actor/output_manager.hpp new file mode 100644 index 0000000000000000000000000000000000000000..20f18c740ffbb55cd76c04fcd42746c2d9a798f9 --- /dev/null +++ b/build/includes/file_access_actor/output_manager.hpp @@ -0,0 +1,56 @@ +#pragma once + +#include "caf/all.hpp" +#include <vector> +#include <algorithm> + + +class ActorRefList { + private: + int numStepsToWrite; // We can save this value here so that we know how many steps to write + int currentSize; + unsigned int maxSize; + int minIndex = -1; // minimum index of the actor being stored on this list + int maxIndex = 0; // maximum index of the actor being stored on this list + std::vector<std::tuple<caf::actor, int>> list; + + public: + ActorRefList(int maxSize); + ~ActorRefList(); + int getMaxIndex(); + int getMinIndex(); + int getCurrentSize(); + int getMaxSize(); + int getNumStepsToWrite(); + bool isFull(); + void addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite); + std::tuple<caf::actor,int> popActor(); + bool isEmpty(); + void decrementMaxSize(); + void removeFailed(caf::actor actorRef); +}; + +class OutputManager { + private: + int numVectors; + int avgSizeOfActorList; + bool runningFailures; + std::vector<ActorRefList*> list; + std::vector<int> failedHRU; + std::vector<int> failureReRun; // index used so we can add failedHRUs if they fail a second time + public: + OutputManager(int numVectors, int totalNumActors); + ~OutputManager(); + int addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite); + std::tuple<caf::actor,int> popActor(int index); + int removeFailed(caf::actor actorRef, int index); + int decrementMaxSize(int indexHRU); + void restartFailures(); + int getNumStepsToWrite(int listIndex); + bool isFull(int listIndex); + bool isEmpty(int listIndex); + int getSize(int listIndex); + int getMinIndex(int listIndex); + int getMaxIndex(int listIndex); + void addFailed(int indxHRU); +}; \ No newline at end of file diff --git a/build/includes/global/fortran_data_types.hpp b/build/includes/global/fortran_data_types.hpp new file mode 100644 index 0000000000000000000000000000000000000000..7552b971c55fe790187cb7445494de767a40529b --- /dev/null +++ b/build/includes/global/fortran_data_types.hpp @@ -0,0 +1,99 @@ +#pragma once + +extern "C" { + // flagVec + void* new_handle_flagVec(); + void delete_handle_flagVec(void* handle); + void set_data_flagVec(void* handle, const int* array, int size); + void get_size_data_flagVec(void* handle, int* size); + void get_data_flagVec(void* handle, int* array); + + // var_i + void* new_handle_var_i(); + void delete_handle_var_i(void* handle); + void set_data_var_i(void* handle, const int* array, int size); + void get_size_data_var_i(void* handle, int* size); + void get_data_var_i(void* handle, int* array); + + // var_i8 + void* new_handle_var_i8(); + void delete_handle_var_i8(void* handle); + void set_data_var_i8(void* handle, const long int* array, int size); + void get_size_data_var_i8(void* handle, int* size); + void get_data_var_i8(void* handle, long int* array); + + // var_d + void* new_handle_var_d(); + void delete_handle_var_d(void* handle); + void set_data_var_d(void* handle, const double* array, int size); + void get_size_data_var_d(void* handle, int* size); + void get_data_var_d(void* handle, double* array); + + // ilength + void* new_handle_ilength(); + void delete_handle_ilength(void* handle); + void set_data_ilength(void* handle, const int* array, int size); + void get_size_data_ilength(void* handle, int* size); + void get_data_ilength(void* handle, int* array); + + // i8length + void* new_handle_i8length(); + void delete_handle_i8length(void* handle); + void set_data_i8length(void* handle, const long int* array, int size); + void get_size_data_i8length(void* handle, int* size); + void get_data_i8length(void* handle, long int* array); + + // dlength + void* new_handle_dlength(); + void delete_handle_dlength(void* handle); + void set_data_dlength(void* handle, const double* array, int size); + void get_size_data_dlength(void* handle, int* size); + void get_data_dlength(void* handle, double* array); + + // var_flagVec + void* new_handle_var_flagVec(); + void delete_handle_var_flagVec(void* handle); + void set_data_var_flagVec(void* handle, const int* array, int num_row, const int* num_col, int num_elements); + void get_size_var_flagVec(void* handle, int* num_var); + void get_size_data_var_flagVec(void* handle, int* num_var, int* num_dat); + void get_data_var_flagVec(void* handle, int* array); + + // var_ilength + void* new_handle_var_ilength(); + void delete_handle_var_ilength(void* handle); + void set_data_var_ilength(void* handle, const int* array, int num_row, const int* num_col, int num_elements); + void get_size_var_ilength(void* handle, int* num_var); + void get_size_data_var_ilength(void* handle, int* num_var, int* num_dat); + void get_data_var_ilength(void* handle, int* array); + + // var_i8length + void* new_handle_var_i8length(); + void delete_handle_var_i8length(void* handle); + void set_data_var_i8length(void* handle, const long int* array, int num_row, const int* num_col, int num_elements); + void get_size_var_i8length(void* handle, int* num_var); + void get_size_data_var_i8length(void* handle, int* num_var, int* num_dat); + void get_data_var_i8length(void* handle, long int* array); + + // var_dlength + void* new_handle_var_dlength(); + void delete_handle_var_dlength(void* handle); + void set_data_var_dlength(void* handle, const double* array, int num_row, const int* num_col, int num_elements); + void get_size_var_dlength(void* handle, int* num_var); + void get_size_data_var_dlength(void* handle, int* num_var, int* num_dat); + void get_data_var_dlength(void* handle, double* array); + + // var_dlength_array + void* new_handle_dlength_array(); + void delete_handle_dlength_array(void* handle); + + // var_info + void* new_handle_var_info(); + void delete_handle_var_info(void* handle); + void set_data_var_info(void* handle, char const *str1, char const *str2, char const *str3, + int type, const int* ncid, int ncid_size, const int* index, int index_size, int flag); + + // file_info + void* new_handle_file_info(); + void delete_handle_file_info(void* handle); + +} \ No newline at end of file diff --git a/build/includes/global/global.hpp b/build/includes/global/global.hpp new file mode 100644 index 0000000000000000000000000000000000000000..c4d4b8d5321042680a53cc3b752701c0a9cb57c4 --- /dev/null +++ b/build/includes/global/global.hpp @@ -0,0 +1,11 @@ +#pragma once + +#include <chrono> + +extern bool debug; + +/** + * Return the time between to time points + */ +double calculateTime(std::chrono::time_point<std::chrono::system_clock> start, + std::chrono::time_point<std::chrono::system_clock> end); \ No newline at end of file diff --git a/build/source/actors/global/json.hpp b/build/includes/global/json.hpp similarity index 100% rename from build/source/actors/global/json.hpp rename to build/includes/global/json.hpp diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp new file mode 100644 index 0000000000000000000000000000000000000000..1b680f98d743f35a13bff70174d31e1507f74240 --- /dev/null +++ b/build/includes/global/message_atoms.hpp @@ -0,0 +1,41 @@ +#pragma once + +CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) + // Summa Actor + CAF_ADD_ATOM(summa, start_summa) + CAF_ADD_ATOM(summa, done_job) + CAF_ADD_ATOM(summa, err) + // Job Actor + CAF_ADD_ATOM(summa, done_reading_forcingFile) + CAF_ADD_ATOM(summa, done_reading_first_forcing_file) + CAF_ADD_ATOM(summa, init_hru) + CAF_ADD_ATOM(summa, done_init_hru) + CAF_ADD_ATOM(summa, done_write) + CAF_ADD_ATOM(summa, doneFile) + CAF_ADD_ATOM(summa, done_hru) + CAF_ADD_ATOM(summa, done_final_write) + CAF_ADD_ATOM(summa, run_failure) + CAF_ADD_ATOM(summa, done_file_access_actor_init) + CAF_ADD_ATOM(summa, file_access_actor_done) + CAF_ADD_ATOM(summa, file_access_actor_err) + // FileAccess Actor + CAF_ADD_ATOM(summa, initalize_outputStructure) + CAF_ADD_ATOM(summa, access_forcing) + CAF_ADD_ATOM(summa, access_first_forcing_file) + CAF_ADD_ATOM(summa, access_forcing_internal) + CAF_ADD_ATOM(summa, write_output) + CAF_ADD_ATOM(summa, write_output_final) + CAF_ADD_ATOM(summa, deallocate_structures) + CAF_ADD_ATOM(summa, update_completed) + CAF_ADD_ATOM(summa, update_failed) + CAF_ADD_ATOM(summa, reset_outputCounter) + CAF_ADD_ATOM(summa, read_and_write) + CAF_ADD_ATOM(summa, write_param) + CAF_ADD_ATOM(summa, restart_failures) + // HRU Actor + CAF_ADD_ATOM(summa, run_hru) + CAF_ADD_ATOM(summa, start_hru) + CAF_ADD_ATOM(summa, file_information) + CAF_ADD_ATOM(summa, dt_init_factor) + +CAF_END_TYPE_ID_BLOCK(summa) \ No newline at end of file diff --git a/build/includes/hru_actor/hru_actor.hpp b/build/includes/hru_actor/hru_actor.hpp new file mode 100644 index 0000000000000000000000000000000000000000..9449f185bf7fd452f247dd723327b358f54090f7 --- /dev/null +++ b/build/includes/hru_actor/hru_actor.hpp @@ -0,0 +1,135 @@ +#pragma once + +#include "caf/all.hpp" +#include "fortran_data_types.hpp" + +#include <chrono> +#include <string> + + + +namespace caf { +struct hru_state { + // Actor References + caf::actor file_access_actor; + caf::actor parent; + + // Info about which HRU we are and our indexes + // into global structures in Fortran + int indxHRU; // index for hru part of derived types in FORTRAN + int indxGRU; // index for gru part of derived types in FORTRAN + int refGRU; // The actual ID of the GRU we are + + // Variables for output/forcing structures + int outputStrucSize; + int outputStep; + int stepsInCurrentFFile; + int forcingFileStep; + int currentForcingFile = 1; + + + // statistics structures + void *handle_forcStat = new_handle_var_dlength(); // model forcing data + void *handle_progStat = new_handle_var_dlength(); // model prognostic (state) variables + void *handle_diagStat = new_handle_var_dlength(); // model diagnostic variables + void *handle_fluxStat = new_handle_var_dlength(); // model fluxes + void *handle_indxStat = new_handle_var_dlength(); // model indices + void *handle_bvarStat = new_handle_var_dlength(); // basin-average variables + // primary data structures (scalars) + void *handle_timeStruct = new_handle_var_i(); // model time data + void *handle_forcStruct = new_handle_var_d(); // model forcing data + void *handle_attrStruct = new_handle_var_d(); // local attributes for each HRU + void *handle_typeStruct = new_handle_var_i(); // local classification of soil veg etc. for each HRU + void *handle_idStruct = new_handle_var_i8(); + // primary data structures (variable length vectors) + void *handle_indxStruct = new_handle_var_ilength(); // model indices + void *handle_mparStruct = new_handle_var_dlength(); // model parameters + void *handle_progStruct = new_handle_var_dlength(); // model prognostic (state) variables + void *handle_diagStruct = new_handle_var_dlength(); // model diagnostic variables + void *handle_fluxStruct = new_handle_var_dlength(); // model fluxes + // basin-average structures + void *handle_bparStruct = new_handle_var_d(); // basin-average parameters + void *handle_bvarStruct = new_handle_var_dlength(); // basin-average variables + // ancillary data structures + void *handle_dparStruct = new_handle_var_d(); // default model parameters + // Local hru data + void *handle_ncid = new_handle_var_i(); // output file ids + void *handle_statCounter = new_handle_var_i(); + void *handle_outputTimeStep = new_handle_var_i(); + void *handle_resetStats = new_handle_flagVec(); + void *handle_finalizeStats = new_handle_flagVec(); + void *handle_oldTime = new_handle_var_i(); + void *handle_refTime = new_handle_var_i(); + void *handle_finshTime = new_handle_var_i(); + void *handle_startTime = new_handle_var_i(); + // Misc Variables + int timestep = 1; // Current Timestep of HRU simulation + int computeVegFlux; // flag to indicate if we are computing fluxes over vegetation + double dt_init; // used to initialize the length of the sub-step for each HRU + double upArea; // area upslope of each HRU + int num_steps = 0; // number of time steps + int forcingStep; // index of current time step in current forcing file + int iFile; // index of current forcing file from forcing file list + int dt_init_factor = 1; // factor of dt_init (coupled_em) + bool printOutput; + int outputFrequency; + + + // Julian Day variables + double fracJulDay; + double tmZoneOffsetFracDay; + int yearLength; + int err = 0; // error conotrol + + std::chrono::time_point<std::chrono::system_clock> start; + std::chrono::time_point<std::chrono::system_clock> end; + double duration = 0.0; + std::chrono::time_point<std::chrono::system_clock> initStart; + std::chrono::time_point<std::chrono::system_clock> initEnd; + double initDuration = 0.0; + std::chrono::time_point<std::chrono::system_clock> forcingStart; + std::chrono::time_point<std::chrono::system_clock> forcingEnd; + double forcingDuration = 0.0; + std::chrono::time_point<std::chrono::system_clock> runPhysicsStart; + std::chrono::time_point<std::chrono::system_clock> runPhysicsEnd; + double runPhysicsDuration = 0.0; + std::chrono::time_point<std::chrono::system_clock> writeOutputStart; + std::chrono::time_point<std::chrono::system_clock> writeOutputEnd; + double writeOutputDuration = 0.0; + +}; + +behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, + std::string configPath, + caf::actor file_access_actor, int outputStrucSize, caf::actor parent); + +/** + * @brief Get the settings from the settings JSON file + * + * @param self Actor State + * @param configPath Path to the directory that contains the settings file + */ +void parseSettings(stateful_actor<hru_state>* self, std::string configPath); + +/** + Function to initalize the HRU for running + */ +void Initialize_HRU(stateful_actor<hru_state>* self); + +/** + Function runs all of the hru time_steps + */ +int Run_HRU(stateful_actor<hru_state>* self); + +bool check_HRU(stateful_actor<hru_state>* self, int err); + +void initalizeTimeVars(stateful_actor<hru_state>* self); + +void finalizeTimeVars(stateful_actor<hru_state>* self); + +void deallocateHRUStructures(stateful_actor<hru_state>* self); + +void printOutput(stateful_actor<hru_state>* self); + + +} \ No newline at end of file diff --git a/build/includes/hru_actor/hru_actor_subroutine_wrappers.hpp b/build/includes/hru_actor/hru_actor_subroutine_wrappers.hpp new file mode 100644 index 0000000000000000000000000000000000000000..6c14753313e753e0fed423dfaacec47a419b1baa --- /dev/null +++ b/build/includes/hru_actor/hru_actor_subroutine_wrappers.hpp @@ -0,0 +1,98 @@ +#pragma once + +extern "C" { + // Initialize HRU data_structures + void Initialize( + int* indxGRU, int* num_steps, + // Statistics Structures + void* forcStat, void* progStat, void* diagStat, void* fluxStat, void* indxStat, void* bvarStat, + // Primary Data Structures (scalars) + void* timeStruct, void* forcStruct, void* attrStruct, void* typeStruct, void* idStruct, + // primary data structures (variable length vectors) + void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct, + // basin-average structures + void* bparStruct, void* bvarStruct, + // ancillary data structures + void* dparStruct, + // local HRU data + void* startTime, void* finshTime, void* refTime, void* oldTime, int* err); + + // SetupParam for HRU + void SetupParam( + int* indxGRU, int* indxHRU, + // primary data structures (scalars) + void* attrStruct, void* typeStruct, void* idStruct, + // primary data structures (variable length vectors) + void* mparStruct, void* bparStruct, void* bvarStruct, void* dparStruct, + // local HRU data + void* startTime, void* oldTime, + // miscellaneous + double* upArea, int* err); + + + // Setup Restart File if this option has been chosen + void Restart( + int* indxGRU, int* indxHRU, + // primary data structures (variable length vectors) + void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct, + // basin-average structures + void* bvarStruct, + // misc + double* dtInit, int* err); + + // Read Forcing for HRU + void Forcing( + int* indxGRU, int* stepIndex, + void* timeStruct, void* forcStruct, + int* iFile, int* forcingStep, + double* fracJulDay, double* tmZoneOffsetFracDay, int* yearLength, + int* err); + + // Run the model for one timestep + void RunPhysics( + int* id, int* stepIndex, + // primary data structures (scalars) + void* timeStruct, void* forcStruct, void* attrStruct, void* typeStruct, + // primary data structures (variable length vectors) + void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct, + // basin-average structures + void* bvarStruct, + double* fracJulDay, double* tmZoneOffsetFracDay, int* yearLength, + // misc + int* flag, double* dt, int* dt_int_factor, int* err); + + // Write output to the output structure + void WriteOutput( + int* indHRU, int* indxGRU, int* indexStep, + // statistics structures + void* forcStat, void* progStat, void* diagStat, void* fluxStat, void* indxStat, void* bvarStat, + // primary data structures (scalars) + void* timeStruct, void* forcStruct, void* attrStruct, void* typeStruct, + // primary data structures (variable length vectors) + void* indxStruct, void* mparStruct, void* progStruct, void* diagStruct, void* fluxStruct, + // basin-average structures + void* bparStruct, void* bvarStruct, + // local vars + void* statCounter, void* outputTimeStep, void* resetStats, void* finalizeStats, + void* finshTime, void* oldTime, int* outputStep, int* err); + + void DeallocateStructures( + void* handle_forcStat, void* handle_progStat, void* handle_diagStat, void* handle_fluxStat, + void* handle_indxStat, void* handle_bvarStat, void* handle_timeStruct, void* handle_forcStruct, + void* handle_attrStruct, void* handle_typeStruct, void* handle_idStruct, void* handle_indxStruct, + void* handle_mparStruct, void* handle_progStruct, void* handle_diagStruct, void* handle_fluxStruct, + void* handle_bparStruct, void* handle_bvarStruct, void* handle_dparStruct, + void* handle_startTime, void* handle_finishTime, + void* handle_refTime, void* handle_oldTime, + void* handle_ncid, + void* handle_statCounter, + void* handle_outputTimeStep, + void* handle_resetStats, + void* handle_finalizeStats, + int* err); + + void Write_Param_C( + int* indxGRU, int* indxHRU, + void* handle_attrStruct, void* handle_typeStruct, void* handle_mparStruct, void* handle_bparStruct, + int* err); +} \ No newline at end of file diff --git a/build/includes/job_actor/GRUinfo.hpp b/build/includes/job_actor/GRUinfo.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b46b84e5d006ad5a9f453178f7cc1f26b90573ff --- /dev/null +++ b/build/includes/job_actor/GRUinfo.hpp @@ -0,0 +1,70 @@ +#pragma once + +#include "caf/all.hpp" +#include <iostream> +#include <fstream> + +class GRUinfo { + private: + int refGRU; // This will be the same as the refGRU + int indxGRU; + caf::actor GRU; + + // Variable to update + int dt_init; + + // Completed Information + int currentAttempt; + int maxAttempts; + bool completed; + bool failed; + + // Timing information for the GRU + double runTime; + double initDuration; + double forcingDuration; + double runPhysicsDuration; + double writeOutputDuration; + + public: + + // Constructor + GRUinfo(int refGRU, int indxGRU, caf::actor gru, int dt_init, int maxAttempts); + + // Deconstructor + ~GRUinfo(); + + + int getRefGRU(); + + int getIndxGRU(); + + int getDt_init(); + + caf::actor getActor(); + + void updateGRU(caf::actor gru); + + void updateFailed(); + + void updateCompletedToTrue(); + + void updateDt_init(); + + void updateCurrentAttempt(); + + bool isMaxAttemptsReached(); + + bool isFailed(); + + bool isCompleted(); + + void doneRun(double runTime, double initDuration, double forcingDuration, + double runPhysicsDuration, double writeOutputDuration); + + void writeSuccess(std::string fileName); + + void writeFail(std::string fileName); + + void printOutput(); +}; \ No newline at end of file diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp new file mode 100644 index 0000000000000000000000000000000000000000..c4095cb592b15bd69399c88f90e95585be4ce0c7 --- /dev/null +++ b/build/includes/job_actor/job_actor.hpp @@ -0,0 +1,57 @@ +#pragma once +#include "caf/all.hpp" +#include "caf/io/all.hpp" +#include "GRUinfo.hpp" + +namespace caf { +struct job_state { + // Actor References + caf::actor file_access_actor; // actor reference for the file_access_actor + caf::actor parent; // actor reference to the top-level SummaActor + + // Job Parameters + int startGRU; // Starting GRU for this job + int numGRU; // Number of GRUs for this job + std::string configPath; + + std::string fileManager; // Path of the fileManager.txt file + + // Variables for GRU monitoring + int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em) + int maxRunAttempts = 3; // Max number of attemtps to solve a GRU + std::vector<GRUinfo*> GRUList; // List of all GRUs under this job actor + int numGRUDone = 0; // The number of GRUs that have completed + int GRUInit = 0; // Number of GRUs initalized + int err = 0; // Error Code + int numGRUFailed = 0; // Number of GRUs that have failed + int outputStrucSize; + + // Timing Variables + std::chrono::time_point<std::chrono::system_clock> start; + std::chrono::time_point<std::chrono::system_clock> end; + double duration; + + // Output File Names for Timings + bool outputCSV; + std::string csvOut; + std::string csvPath; + std::string successOutputFile; + std::string failedOutputFile = "failedHRU"; + std::string fileAccessActorStats = "fileAccessActor.csv"; + +}; + +behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU, + std::string configPath, int outputStrucSize, actor parent); + +int parseSettings(stateful_actor<job_state>* self, std::string configPath); + +void initJob(stateful_actor<job_state>* self); + +void initalizeGRU(stateful_actor<job_state>* self); + +void runGRUs(stateful_actor<job_state>* self); + +void restartFailures(stateful_actor<job_state>* self); + +} // end namespace \ No newline at end of file diff --git a/build/includes/job_actor/job_actor_subroutine_wrappers.hpp b/build/includes/job_actor/job_actor_subroutine_wrappers.hpp new file mode 100644 index 0000000000000000000000000000000000000000..df66a121eccd367fdb5b4981aa1a1d14b3f0e042 --- /dev/null +++ b/build/includes/job_actor/job_actor_subroutine_wrappers.hpp @@ -0,0 +1,9 @@ +#pragma once + +extern "C" { + void initGlobals(char const*str1, int* totalGRUs, int* totalHRUs, + int* numGRUs, int* numHRUs, int* startGRUIndex, int* err); + + void cleanUpJobActor(int* err); + +} \ No newline at end of file diff --git a/build/includes/main.h b/build/includes/main.h new file mode 100644 index 0000000000000000000000000000000000000000..8c4b0b91c53f8dc6ba4a7a7039d9bdaf07517f4f --- /dev/null +++ b/build/includes/main.h @@ -0,0 +1,7 @@ +#ifndef MAIN_H_ +#define MAIN_H_ + + + + +#endif \ No newline at end of file diff --git a/build/includes/summa_actor/summa_actor.hpp b/build/includes/summa_actor/summa_actor.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b86256f5200ffe1563e03c28986fccf4fb9af816 --- /dev/null +++ b/build/includes/summa_actor/summa_actor.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include "caf/all.hpp" +#include "caf/io/all.hpp" + +#include <chrono> +#include <string> + +namespace caf { + +struct summa_actor_state { + // Timing Information + std::chrono::time_point<std::chrono::system_clock> start; + std::chrono::time_point<std::chrono::system_clock> end; + double duration; + // Program Parameters + int startGRU; // starting GRU for the simulation + int numGRU; // number of GRUs to compute + std::string configPath;// path to the fileManager.txt file + // Information about the jobs + int numFailed = 0; // Number of jobs that have failed + + // Values Set By Summa_Actors_Settings.json + int maxGRUPerJob; // maximum number of GRUs a job can compute at once + int outputStrucSize; + + caf::actor currentJob; // Reference to the current job actor + +}; + +behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, std::string configPath); + +void spawnJob(stateful_actor<summa_actor_state>* self); + +void parseSettings(stateful_actor<summa_actor_state>* self, std::string configPath); + + + +} // namespace caf \ No newline at end of file diff --git a/build/makefile b/build/makefile index 9368cc4df61b49792bade25d648f9a09a39feb13..ef56fd25d6282afff99558420bb7c6cc95f254e7 100644 --- a/build/makefile +++ b/build/makefile @@ -1,25 +1,25 @@ #### parent directory of the 'build' directory #### -# F_MASTER = +ROOT_DIR = /globalhome/kck540/HPC/SummaProjects/Summa-Actors -#### fortran compiler #### -# FC = - -#### C++ compiler #### -# CC = +#### Compilers #### +FC = gfortran # Fortran +CC = g++ # C++ #### Includes AND Libraries #### -# INCLUDES = -# LIBRARIES = +INCLUDES = -I$(EBROOTNETCDFMINFORTRAN)/include +LIBRARIES = -L$(EBROOTNETCDFMINFORTRAN)/lib64 \ + -L$(EBROOTOPENBLAS)/lib -lnetcdff -lopenblas -# ACTORS_INCLUDES = -# ACTORS_LIBRARIES = +ACTORS_INCLUDES = -I$(EBROOTCAF)/include -I$(EBROOTNETCDFMINFORTRAN)/include +ACTORS_LIBRARIES = -L$(EBROOTCAF)/lib -L$(EBROOTCAF)/lib64 -L$(EBROOTNETCDFMINFORTRAN)/lib64 \ + -L$(EBROOTOPENBLAS)/lib -L$(ROOT_DIR)/bin -lcaf_core -lcaf_io -lsumma -lopenblas -lnetcdff # Production runs -FLAGS_NOAH = -O3 -ffree-form -ffree-line-length-none -fmax-errors=0 -fPIC -FLAGS_COMM = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC -FLAGS_SUMMA = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC -FLAGS_ACTORS = -O3 +FLAGS_NOAH = -O3 -ffree-form -ffree-line-length-none -fmax-errors=0 -fPIC -Wfatal-errors +FLAGS_COMM = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC -Wfatal-errors +FLAGS_SUMMA = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC -Wfatal-errors +FLAGS_ACTORS = -O3 -Wfatal-errors -std=c++17 # # Debug runs # FLAGS_NOAH = -pg -g -O0 -ffree-form -ffree-line-length-none -fmax-errors=0 -fbacktrace -Wno-unused -Wno-unused-dummy-argument -fPIC @@ -33,18 +33,17 @@ FLAGS_ACTORS = -O3 #======================================================================== # Core directory that contains source code -F_KORE_DIR = $(F_MASTER)/build/source +F_KORE_DIR = $(ROOT_DIR)/build/source # Location of the compiled modules -MOD_PATH = $(F_MASTER)/build +MOD_PATH = $(ROOT_DIR)/build # Define the directory for the executables -EXE_PATH = $(F_MASTER)/bin - -#======================================================================== -# PART 2: Assemble all of the SUMMA sub-routines -#======================================================================== +EXE_PATH = $(ROOT_DIR)/bin +#################################################################################################### +###################################### Assemble Fortran Files ###################################### +#################################################################################################### # Define directories DRIVER_DIR = $(F_KORE_DIR)/driver HOOKUP_DIR = $(F_KORE_DIR)/hookup @@ -235,25 +234,59 @@ SUMMA_DRIVER= \ DRIVER = $(patsubst %, $(DRIVER_DIR)/%, $(SUMMA_DRIVER)) -ACTORC = $(F_KORE_DIR)/actors/main.cc +#################################################################################################### +###################################### Assemble Fortran Files ###################################### +#################################################################################################### + +#################################################################################################### +######################################## Assemble C++ Files ######################################## +#################################################################################################### + +INCLUDE_DIR = /globalhome/kck540/HPC/SummaProjects/Summa-Actors/build/includes +SOURCE_DIR = /globalhome/kck540/HPC/SummaProjects/Summa-Actors/build/source/actors + + +GLOBAL_INCLUDES = -I$(INCLUDE_DIR)/global +GLOBAL = $(SOURCE_DIR)/global/global.cpp + +SUMMA_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/summa_actor +SUMMA_ACTOR = $(SOURCE_DIR)/summa_actor/summa_actor.cpp + +JOB_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/job_actor +JOB_ACTOR = $(SOURCE_DIR)/job_actor/job_actor.cpp +GRUinfo = $(SOURCE_DIR)/job_actor/GRUinfo.cpp + +FILE_ACCESS_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/file_access_actor +FILE_ACCESS_ACTOR = $(SOURCE_DIR)/file_access_actor/file_access_actor.cpp +FORCING_FILE_INFO = $(SOURCE_DIR)/file_access_actor/forcing_file_info.cpp +OUTPUT_MANAGER = $(SOURCE_DIR)/file_access_actor/output_manager.cpp + +HRU_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/hru_actor +HRU_ACTOR = $(SOURCE_DIR)/hru_actor/hru_actor.cpp + +MAIN = $(F_KORE_DIR)/actors/main.cpp ACTOR_TEST = $(F_KORE_DIR)/testing/testing_main.cc +#################################################################################################### +######################################## Assemble C++ Files ######################################## +#################################################################################################### #======================================================================== # PART 3: compilation #====================================================================== +all: fortran cpp -# Compile -all: lib main +fortran: compile_noah compile_comm compile_summa link clean_fortran -lib: compile_noah compile_comm compile_summa link clean - -main: actors actorsLink actorsClean +cpp: compile_globals compile_hru_actor compile_file_access_actor compile_job_actor compile_summa_actor \ + compile_main link_cpp test: actors_test actors_testLink actorsClean - -# compile Noah-MP routines + +################################################################################################################### +############################################## COMPILE SUMMA-Fortran ############################################## +################################################################################################################### compile_noah: $(FC) $(FLAGS_NOAH) -c $(NRUTIL) $(NOAHMP) @@ -268,30 +301,60 @@ compile_summa: # generate library link: $(FC) -shared *.o -o libsumma.so - mv libsumma.so $(F_MASTER)/bin + mv libsumma.so $(ROOT_DIR)/bin -# compile the c++ portion of SummaActors -actors: - $(CC) $(FLAGS_ACTORS) -c $(ACTORC) -std=c++17 $(ACTORS_INCLUDES) +# Remove object files +clean_fortran: + rm -f *.o *.mod soil_veg_gen_parm__genmod.f90 +################################################################################################################### +############################################## COMPILE SUMMA-Fortran ############################################## +################################################################################################################### + + +################################################################################################################### +################################################ COMPILE SUMMA-C++ ################################################ +################################################################################################################### +compile_globals: + $(CC) $(FLAGS_ACTORS) -c $(GLOBAL) $(GLOBAL_INCLUDES) + +compile_hru_actor: + $(CC) $(FLAGS_ACTORS) -c $(HRU_ACTOR) $(HRU_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) + +compile_file_access_actor: + $(CC) $(FLAGS_ACTORS) -c $(FILE_ACCESS_ACTOR) $(FORCING_FILE_INFO) $(OUTPUT_MANAGER) \ + $(FILE_ACCESS_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) + +compile_job_actor: + $(CC) $(FLAGS_ACTORS) -c $(JOB_ACTOR) $(GRUinfo) $(JOB_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) \ + $(FILE_ACCESS_ACTOR_INCLUDES) $(HRU_ACTOR_INCLUDES) + +compile_summa_actor: + $(CC) $(FLAGS_ACTORS) -c $(SUMMA_ACTOR) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) \ + $(JOB_ACTOR_INCLUDES) + +compile_main: + $(CC) $(FLAGS_ACTORS) -c $(MAIN) $(GLOBAL_INCLUDES) $(SUMMA_ACTOR_INCLUDES) $(JOB_ACTOR_INCLUDES) -actorsLink: - $(CC) -o summaMain *.o $(ACTORS_LIBRARIES) - mv summaMain $(F_MASTER)/bin +link_cpp: + $(CC) $(FLAGS_ACTORS) -o summaMain *.o $(ACTORS_LIBRARIES) + mv summaMain $(ROOT_DIR)/bin -#### COMPILE TESTING FILES #### +clean_cpp: + rm *.o +################################################################################################################### +################################################ COMPILE SUMMA-C++ ################################################ +################################################################################################################### + + +################################################################################################################### +################################################## COMPILE TESTS ################################################## +################################################################################################################### actors_test: $(CC) $(FLAGS_ACTORS) -c $(ACTOR_TEST) -std=c++17 $(ACTORS_INCLUDES) actors_testLink: $(CC) -o summaTest *.o $(ACTORS_LIBRARIES) -actorsClean: - rm *.o -# Remove object files -clean: - rm -f *.o *.mod soil_veg_gen_parm__genmod.f90 - - clean_lib: rm *.so diff --git a/build/source/actors/file_access_actor/file_access_actor.cpp b/build/source/actors/file_access_actor/file_access_actor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..75ee6b97f2ed48cc35a1efa8630cf779cc6dcde0 --- /dev/null +++ b/build/source/actors/file_access_actor/file_access_actor.cpp @@ -0,0 +1,373 @@ +#include "caf/all.hpp" +#include "file_access_actor.hpp" +#include "output_manager.hpp" +#include "forcing_file_info.hpp" +#include "file_access_actor_subroutine_wrappers.hpp" +#include "fortran_data_types.hpp" +#include "message_atoms.hpp" +#include "global.hpp" +#include "json.hpp" + +using json = nlohmann::json; + +bool debug; + + +namespace caf { + +behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU, + int outputStrucSize, std::string configPath, actor parent) { + // Set File_Access_Actor variables + self->state.parent = parent; + self->state.numGRU = numGRU; + self->state.startGRU = startGRU; + self->state.outputStrucSize = outputStrucSize; + self->state.handle_forcing_file_info = new_handle_file_info(); + self->state.handle_ncid = new_handle_var_i(); + self->state.err = 0; + + // Get Settings from configuration file + if (parseSettings(self, configPath) == -1) { + aout(self) << "Error with JSON Settings File!!!\n"; + self->quit(); + } else { + aout(self) << "\nSETTINGS FOR FILE_ACCESS_ACTOR\n" << + "Number of Vectors in Output Structure = " << self->state.num_vectors_in_output_manager << "\n"; + } + initalizeFileAccessActor(self); + + return { + [=](initalize_outputStructure) { + aout(self) << "Initalizing Output Structure" << std::endl; + Init_OutputStruct(self->state.handle_forcing_file_info, &self->state.outputStrucSize, + &self->state.numGRU, &self->state.err); + }, + + [=](write_param, int indxGRU, int indxHRU) { + int err; + err = 0; + Write_HRU_Param(self->state.handle_ncid, &indxGRU, &indxHRU, &err); + if (err != 0) { + aout(self) << "ERROR: Write_HRU_PARAM -- For HRU = " << indxHRU << "\n"; + } + }, + + [=](access_forcing, int currentFile, caf::actor refToRespondTo) { + // aout(self) << "Received Current FIle = " << currentFile << std::endl; + if (currentFile <= self->state.numFiles) { + if(self->state.forcing_file_list[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1 + // aout(self) << "ForcingFile Already Loaded \n"; + self->send(refToRespondTo, run_hru_v, + self->state.forcing_file_list[currentFile - 1].getNumSteps()); + + } else { + self->state.readStart = std::chrono::high_resolution_clock::now(); + + // Load the file + FileAccessActor_ReadForcing(self->state.handle_forcing_file_info, ¤tFile, + &self->state.stepsInCurrentFile, &self->state.startGRU, + &self->state.numGRU, &self->state.err); + if (self->state.err != 0) { + aout(self) << "ERROR: Reading Forcing" << std::endl; + } + self->state.filesLoaded += 1; + self->state.forcing_file_list[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile); + + self->state.readEnd = std::chrono::high_resolution_clock::now(); + self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd); + // Check if we have loaded all forcing files + if(self->state.filesLoaded <= self->state.numFiles) { + self->send(self, access_forcing_internal_v, currentFile + 1); + } + + self->send(refToRespondTo, run_hru_v, + self->state.forcing_file_list[currentFile - 1].getNumSteps()); + } + } else { + aout(self) << currentFile << " is larger than expected" << std::endl; + } + + }, + + [=](access_forcing_internal, int currentFile) { + if (self->state.filesLoaded <= self->state.numFiles && + currentFile <= self->state.numFiles) { + // aout(self) << "Loading in background, File:" << currentFile << "\n"; + if (self->state.forcing_file_list[currentFile - 1].isFileLoaded()) { + aout(self) << "File Loaded when shouldn't be \n"; + } + self->state.readStart = std::chrono::high_resolution_clock::now(); + FileAccessActor_ReadForcing(self->state.handle_forcing_file_info, ¤tFile, + &self->state.stepsInCurrentFile, &self->state.startGRU, + &self->state.numGRU, &self->state.err); + if (self->state.err != 0) { + aout(self) << "ERROR: Reading Forcing" << std::endl; + } + self->state.filesLoaded += 1; + self->state.forcing_file_list[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile); + + self->state.readEnd = std::chrono::high_resolution_clock::now(); + self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd); + + self->send(self, access_forcing_internal_v, currentFile + 1); + } else { + aout(self) << "All Forcing Files Loaded \n"; + } + }, + + + [=](write_output, int indxGRU, int indxHRU, int numStepsToWrite, + caf::actor refToRespondTo) { + int err; + int returnMessage = 9999; + + err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite, returnMessage, refToRespondTo); + if (err != 0) { + aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n"; + } + + + }, + + [=](read_and_write, int indxGRU, int indxHRU, int numStepsToWrite, int currentFile, + caf::actor refToRespondTo) { + int err; + + err = readForcing(self, currentFile); + if (err != 0) + aout(self) << "\nERROR: FILE_ACCESS_ACTOR - READING_FORCING FAILED\n"; + + err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite, currentFile, refToRespondTo); + if (err != 0) + aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n"; + }, + + [=](run_failure, int indxGRU) { + int listIndex; + + // update the list in Fortran + updateFailed(&indxGRU); + + listIndex = self->state.output_manager->decrementMaxSize(indxGRU); + + // Check if this list is now full + if(self->state.output_manager->isFull(listIndex)) { + write(self, listIndex); + } + }, + + /** + * Message from JobActor + * OutputManager needs to be adjusted so the failed HRUs can run again + */ + [=](restart_failures) { + resetFailedArray(); + self->state.output_manager->restartFailures(); + }, + + [=](deallocate_structures) { + aout(self) << "Deallocating Structure" << std::endl; + FileAccessActor_DeallocateStructures(self->state.handle_forcing_file_info, self->state.handle_ncid); + + self->state.readDuration = self->state.readDuration / 1000; // Convert to milliseconds + self->state.readDuration = self->state.readDuration / 1000; // Convert to seconds + + self->state.writeDuration = self->state.writeDuration / 1000; // Convert to milliseconds + self->state.writeDuration = self->state.writeDuration / 1000; // Convert to milliseconds + + self->send(self->state.parent, file_access_actor_done_v, self->state.readDuration, + self->state.writeDuration); + self->quit(); + }, + + [=](reset_outputCounter, int indxGRU) { + resetOutputCounter(&indxGRU); + } + + }; +} + +void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { + int indx = 1; + int err = 0; + // aout(self) << "Set Up the forcing file" << std::endl; + ffile_info_C(&indx, self->state.handle_forcing_file_info, &self->state.numFiles, &err); + if (err != 0) { + aout(self) << "Error: ffile_info_C - File_Access_Actor \n"; + std::string function = "ffile_info_C"; + self->send(self->state.parent, file_access_actor_err_v, function); + self->quit(); + return; + } + + mDecisions_C(&self->state.num_steps, &err); + if (err != 0) { + aout(self) << "Error: mDecisions - FileAccess Actor \n"; + std::string function = "mDecisions_C"; + self->send(self->state.parent, file_access_actor_err_v, function); + self->quit(); + return; + } + + read_pinit_C(&err); + if (err != 0) { + aout(self) << "ERROR: read_pinit_C\n"; + std::string function = "read_pinit_C"; + self->send(self->state.parent, file_access_actor_err_v, function); + self->quit(); + return; + } + + read_vegitationTables(&err); + if (err != 0) { + aout(self) << "ERROR: read_vegitationTables\n"; + std::string function = "read_vegitationTables"; + self->send(self->state.parent, file_access_actor_err_v, function); + self->quit(); + return; + } + + initFailedHRUTracker(&self->state.numGRU); + + Create_Output_File(self->state.handle_ncid, &self->state.numGRU, &self->state.startGRU, &err); + if (err != 0) { + aout(self) << "ERROR: Create_OutputFile\n"; + std::string function = "Create_Output_File"; + self->send(self->state.parent, file_access_actor_err_v, function); + self->quit(); + return; + } + + // Initalize the output Structure + aout(self) << "Initalizing Output Structure" << std::endl; + Init_OutputStruct(self->state.handle_forcing_file_info, &self->state.outputStrucSize, + &self->state.numGRU, &self->state.err); + + // Initalize the output manager + self->state.output_manager = new OutputManager(self->state.num_vectors_in_output_manager, self->state.numGRU); + + self->send(self->state.parent, done_file_access_actor_init_v); + // initalize the forcingFile array + self->state.filesLoaded = 0; + for (int i = 1; i <= self->state.numFiles; i++) { + self->state.forcing_file_list.push_back(Forcing_File_Info(i)); + } +} + +int write(stateful_actor<file_access_state>* self, int listIndex) { + int err = 0; + int minGRU = self->state.output_manager->getMinIndex(listIndex); + int maxGRU = self->state.output_manager->getMaxIndex(listIndex); + int numStepsToWrite = self->state.output_manager->getNumStepsToWrite(listIndex); + FileAccessActor_WriteOutput(self->state.handle_ncid, + &numStepsToWrite, &minGRU, + &maxGRU, &err); + + // Pop The actors and send them the correct continue message + while(!self->state.output_manager->isEmpty(listIndex)) { + std::tuple<caf::actor, int> actor = self->state.output_manager->popActor(listIndex); + if (get<1>(actor) == 9999) { + + self->send(get<0>(actor), done_write_v); + + } else { + self->send(get<0>(actor), run_hru_v, + self->state.forcing_file_list[get<1>(actor) - 1].getNumSteps()); + } + } + + return 0; +} + +int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, + int numStepsToWrite, int returnMessage, caf::actor actorRef) { + self->state.writeStart = std::chrono::high_resolution_clock::now(); + if (debug) { + aout(self) << "Recieved Write Request From GRU: " << indxGRU << "\n"; + } + int err = 0; + int listIndex = self->state.output_manager->addActor(actorRef, indxGRU, returnMessage, numStepsToWrite); + if (self->state.output_manager->isFull(listIndex)) { + if (debug) { + aout(self) << "List with Index " << listIndex << " is full and ready to write\n"; + aout(self) << "Minimum GRU Index = " << self->state.output_manager->getMinIndex(listIndex) << "\n"; + aout(self) << "Maximum GRU Index = " << self->state.output_manager->getMaxIndex(listIndex) << "\n"; + } + + err = write(self, listIndex); + + } else { + if (debug) { + aout(self) << "List with Index " << listIndex << " is not full yet waiting to write\n"; + aout(self) << "Size of list is " << self->state.output_manager->getSize(listIndex) << "\n"; + } + } + + + self->state.writeEnd = std::chrono::high_resolution_clock::now(); + self->state.writeDuration += calculateTime(self->state.writeStart, self->state.writeEnd); + + return err; + +} + +int readForcing(stateful_actor<file_access_state>* self, int currentFile) { + // Check if we have already loaded this file + if(self->state.forcing_file_list[currentFile -1].isFileLoaded()) { + if (debug) + aout(self) << "ForcingFile Already Loaded \n"; + return 0; + + } else { + + // File Needs to be loaded + self->state.readStart = std::chrono::high_resolution_clock::now(); + + // Load the file + FileAccessActor_ReadForcing(self->state.handle_forcing_file_info, ¤tFile, + &self->state.stepsInCurrentFile, &self->state.startGRU, + &self->state.numGRU, &self->state.err); + + if (self->state.err != 0) { + if (debug) + aout(self) << "ERROR: FileAccessActor_ReadForcing\n" << + "currentFile = " << currentFile << "\n" << "number of steps = " + << self->state.stepsInCurrentFile << "\n"; + return -1; + } else { + self->state.filesLoaded += 1; + self->state.forcing_file_list[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile); + + self->state.readEnd = std::chrono::high_resolution_clock::now(); + self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd); + return 0; + } + } + +} + +int parseSettings(stateful_actor<file_access_state>* self, std::string configPath) { + json settings; + std::string SummaActorsSettigs = "/Summa_Actors_Settings.json"; + std::ifstream settings_file(configPath + SummaActorsSettigs); + settings_file >> settings; + settings_file.close(); + + if (settings.find("FileAccessActor") != settings.end()) { + json FileAccessActorConfig = settings["FileAccessActor"]; + // Find the File Manager Path + if (FileAccessActorConfig.find("num_vectors_in_output_manager") != FileAccessActorConfig.end()) { + self->state.num_vectors_in_output_manager = FileAccessActorConfig["num_vectors_in_output_manager"]; + } else { + aout(self) << "Error Finding FileManagerPath - Exiting as this is needed\n"; + return -1; + } + + return 0; + } else { + aout(self) << "Error Finding JobActor in JSON file - Exiting as there is no path for the fileManger\n"; + return -1; + } +} + +} // end namespace \ No newline at end of file diff --git a/build/source/actors/file_access_actor/forcing_file_info.cpp b/build/source/actors/file_access_actor/forcing_file_info.cpp new file mode 100644 index 0000000000000000000000000000000000000000..13f1ec0934d301e6aa4fe6e9e5b391b88e6c29b6 --- /dev/null +++ b/build/source/actors/file_access_actor/forcing_file_info.cpp @@ -0,0 +1,27 @@ +#include "forcing_file_info.hpp" + +Forcing_File_Info::Forcing_File_Info(int file_ID) { + this->file_ID = file_ID; + this->num_steps = 0; + this->is_loaded = false; +} + +int Forcing_File_Info::getNumSteps() { + return this->num_steps; +} + +bool Forcing_File_Info::isFileLoaded() { + return this->is_loaded; +} + +void Forcing_File_Info::updateIsLoaded() { + this->is_loaded = true; +} + + +void Forcing_File_Info::updateNumSteps(int num_steps) { + this->num_steps = num_steps; + this->is_loaded = true; +} + + diff --git a/build/source/actors/file_access_actor/output_manager.cpp b/build/source/actors/file_access_actor/output_manager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f8637ef598b9fffd8811230151f39c70bb26ec7b --- /dev/null +++ b/build/source/actors/file_access_actor/output_manager.cpp @@ -0,0 +1,215 @@ +#include "output_manager.hpp" +#include "caf/all.hpp" + +// ActorRefList +ActorRefList::ActorRefList(int maxSize){ + this->currentSize = 0; + this->maxSize = maxSize; +} + +ActorRefList::~ActorRefList(){} + +int ActorRefList::getMaxIndex() { + return this->maxIndex; +} + +int ActorRefList::getMinIndex() { + return this->minIndex; +} + +int ActorRefList::getCurrentSize() { + return this->currentSize; +} + +int ActorRefList::getMaxSize() { + return this->maxSize; +} + +int ActorRefList::getNumStepsToWrite() { + return this->numStepsToWrite; +} + +bool ActorRefList::isFull() { + return list.size() == this->maxSize; +} + +void ActorRefList::addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) { + if (this->isFull()) { + throw "List is full, cannot add actor to this list"; + } + if (index > this->maxIndex) { + this->maxIndex = index; + } + if (index < this->minIndex || this->minIndex < 0) { + this->minIndex = index; + } + this->numStepsToWrite = numStepsToWrite; + this->currentSize++; + list.push_back(std::make_tuple(actor, returnMessage)); +} + +std::tuple<caf::actor,int> ActorRefList::popActor() { + if (list.empty()) { + throw "List is empty, nothing to pop"; + } + auto actor = list.back(); + list.pop_back(); + this->currentSize--; + return actor; +} + +bool ActorRefList::isEmpty() { + return list.empty(); +} + +void ActorRefList::decrementMaxSize() { + this->maxSize--; +} + +void ActorRefList::removeFailed(caf::actor actorRef) { + bool found = false; + for(std::vector<std::tuple<caf::actor, int>>::iterator it = this->list.begin(); it != this->list.end(); it++) { + if (std::get<0>(*it) == actorRef) { + found = true; + this->list.erase(it); + this->currentSize--; this->maxSize--; + break; + } + } + + if (!found) { + throw "Element To Remove Not Found"; + } +} + +// OutputManager +OutputManager::OutputManager(int numVectors, int totalNumActors){ + this->numVectors = numVectors; + int sizeOfOneVector = totalNumActors / numVectors; + this->avgSizeOfActorList = sizeOfOneVector; + this->runningFailures = false; + // Create the first n-1 vectors with the same size + for (int i = 0; i < numVectors - 1; i++) { + auto refList = new ActorRefList(sizeOfOneVector); + totalNumActors = totalNumActors - sizeOfOneVector; + list.push_back(refList); + } + // Create the last vector with size however many actors are left + auto refList = new ActorRefList(totalNumActors); + list.push_back(refList); +} + +OutputManager::~OutputManager(){}; + +int OutputManager::addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) { + int listIndex; + if (this->runningFailures) { + // find the index of the structure this HRU is in + auto it = find(this->failureReRun.begin(), this->failureReRun.end(), index); + + if (it != this->failureReRun.end()) { + listIndex = it - this->failureReRun.begin(); + } else { + throw "Element Not Found in failureReRun list"; + } + + this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite); + + } else { + // Index has to be subtracted by 1 because Fortran array starts at 1 + listIndex = (index - 1) / this->avgSizeOfActorList; + if (listIndex > this->numVectors - 1) { + listIndex = this->numVectors - 1; + } + + this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite); + } + + return listIndex; +} + +std::tuple<caf::actor,int> OutputManager::popActor(int index) { + if (index > this->numVectors - 1 || index < 0) { + throw "List Index Out Of Range"; + } else if (this->list[index]->isEmpty()) { + throw "List is Empty, Nothing to pop"; + } + + return this->list[index]->popActor(); +} + +int OutputManager::removeFailed(caf::actor actorRef, int index) { + // Find the list this actor is on + int listIndex = (index - 1) / this->avgSizeOfActorList; + if (listIndex > this->numVectors - 1) { + listIndex = this->numVectors - 1; + } + + this->list[listIndex]->removeFailed(actorRef); + + return listIndex; +} + +int OutputManager::decrementMaxSize(int indexHRU) { + + this->failedHRU.push_back(indexHRU); + + // Find the list this actor is on + int listIndex = (indexHRU - 1) / this->avgSizeOfActorList; + if (listIndex > this->numVectors - 1) { + listIndex = this->numVectors - 1; + } + + this->list[listIndex]->decrementMaxSize(); + return listIndex; +} + +void OutputManager::restartFailures() { + this->list.clear(); + this->numVectors = this->failedHRU.size(); + for (unsigned int i = 0; i < this->failedHRU.size(); i++) { + auto refList = new ActorRefList(1); + this->list.push_back(refList); + } + + this->failureReRun = this->failedHRU; + this->failedHRU.clear(); + + this->runningFailures = true; + +} + +int OutputManager::getNumStepsToWrite(int listIndex) { + + return this->list[listIndex]->getNumStepsToWrite(); +} + +bool OutputManager::isFull(int listIndex) { + if (listIndex > this->numVectors - 1) { + throw "List Index Out Of Range"; + } + return this->list[listIndex]->isFull(); +} + +bool OutputManager::isEmpty(int listIndex) { + return this->list[listIndex]->isEmpty(); +} + +int OutputManager::getSize(int listIndex) { + if (listIndex > this->numVectors - 1) { + throw "List Index Out Of Range"; + } + return this->list[listIndex]->getCurrentSize(); +} + +int OutputManager::getMinIndex(int listIndex) { + return this->list[listIndex]->getMinIndex(); +} + +int OutputManager::getMaxIndex(int listIndex) { + return this->list[listIndex]->getMaxIndex(); +} + +void OutputManager::addFailed(int indxHRU) { + this->failedHRU.push_back(indxHRU); +} \ No newline at end of file diff --git a/build/source/actors/global/global.cpp b/build/source/actors/global/global.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d1911da064b807c07a2ef875952f34562212606d --- /dev/null +++ b/build/source/actors/global/global.cpp @@ -0,0 +1,8 @@ +#include "global.hpp" +#include <chrono> + +double calculateTime(std::chrono::time_point<std::chrono::system_clock> start, + std::chrono::time_point<std::chrono::system_clock> end) { + + return std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(); +} \ No newline at end of file diff --git a/build/source/actors/hru_actor/hru_actor.cpp b/build/source/actors/hru_actor/hru_actor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..300c978775b94dd1b5c67cf0de46b8c36e3dff1a --- /dev/null +++ b/build/source/actors/hru_actor/hru_actor.cpp @@ -0,0 +1,500 @@ +#include "caf/all.hpp" +#include "hru_actor.hpp" +#include "global.hpp" +#include "message_atoms.hpp" +#include "hru_actor_subroutine_wrappers.hpp" +#include "json.hpp" + +using json = nlohmann::json; + +namespace caf { + +behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, + std::string configPath, + caf::actor file_access_actor, int outputStrucSize, caf::actor parent) { + // Timing Information + self->state.start = std::chrono::high_resolution_clock::now(); + self->state.duration = 0.0; + self->state.initDuration = 0.0; + + // Actor References + self->state.file_access_actor = file_access_actor; + self->state.parent = parent; + + // Indexes into global structures + self->state.indxHRU = 1; + self->state.indxGRU = indxGRU; + self->state.refGRU = refGRU; + + // OutputStructure Size (how many timesteps we can compute before we need to write) + self->state.outputStrucSize = outputStrucSize; + + // initialize counters + self->state.timestep = 1; // Timestep of total simulation + self->state.outputStep = 1; // Index of the output structure + self->state.forcingStep = 1; // Index into the forcing file + self->state.iFile = 1; + + // Get the settings for the HRU + parseSettings(self, configPath); + // We only want to print this once + if (indxGRU == 1) { + aout(self) << "\nSETTINGS FOR HRU_ACTOR\n"; + aout(self) << "Print Output = " << self->state.printOutput << "\n"; + aout(self) << "Print Output every " << self->state.outputFrequency << " timesteps\n\n"; + } + + + Initialize_HRU(self); + + self->state.end = std::chrono::high_resolution_clock::now(); + self->state.duration += calculateTime(self->state.start, self->state.end); + + self->send(self->state.parent, done_init_hru_v); + + return { + // Starts the HRU and tells it to ask for data from the file_access_actor + [=](start_hru) { + self->state.start = std::chrono::high_resolution_clock::now(); + + int err; + + err = 0; + // Write Paramaters to OutputStruc + Write_Param_C(&self->state.indxGRU, &self->state.indxHRU, + self->state.handle_attrStruct, self->state.handle_typeStruct, + self->state.handle_mparStruct, self->state.handle_bparStruct, + &err); + + // ask file_access_actor to write paramaters + self->send(self->state.file_access_actor, write_param_v, self->state.indxGRU, self->state.indxHRU); + + + self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile, self); + self->state.end = std::chrono::high_resolution_clock::now(); + self->state.duration += calculateTime(self->state.start, self->state.end); + }, + + [=](done_write) { + self->state.start = std::chrono::high_resolution_clock::now(); + + // We receive a done_write message so we ensure that + // stepsInCurrentFFile remains unchanged + if (self->state.timestep >= self->state.num_steps) { + + self->state.end = std::chrono::high_resolution_clock::now(); + self->state.duration += calculateTime(self->state.start, self->state.end); + // Tell our parent we are done, convert all timings to seconds + + self->state.duration = self->state.duration / 1000; // Convert to milliseconds + self->state.initDuration = self->state.initDuration / 1000; // Convert to milliseconds + self->state.forcingDuration = self->state.forcingDuration / 1000; // Convert to milliseconds + self->state.runPhysicsDuration = self->state.runPhysicsDuration / 1000; // Convert to milliseconds + self->state.writeOutputDuration = self->state.writeOutputDuration / 1000; // Convert to milliseconds + + self->send(self->state.parent, + done_hru_v, + self->state.indxGRU, + self->state.duration / 1000, + self->state.initDuration / 1000, + self->state.forcingDuration / 1000, + self->state.runPhysicsDuration / 1000, + self->state.writeOutputDuration / 1000); + + deallocateHRUStructures(self); + + self->quit(); + return; + } + + self->state.end = std::chrono::high_resolution_clock::now(); + self->state.duration += calculateTime(self->state.start, self->state.end); + + self->send(self, run_hru_v, self->state.stepsInCurrentFFile); + }, + + [=](run_hru, int stepsInCurrentFFile) { + self->state.start = std::chrono::high_resolution_clock::now(); + bool keepRunning = true; + int err = 0; + self->state.stepsInCurrentFFile = stepsInCurrentFFile; + + while( keepRunning ) { + + err = Run_HRU(self); // Simulate a Timestep + + // update Timings + self->state.timestep += 1; + self->state.outputStep += 1; + self->state.forcingStep += 1; + + // if (self->state.timestep == 450 && self->state.indxGRU == 5) { + // err = 20; + // } + + keepRunning = check_HRU(self, err); // check if we are done, need to write + + } + + self->state.end = std::chrono::high_resolution_clock::now(); + self->state.duration += calculateTime(self->state.start, self->state.end); + + }, + + [=](dt_init_factor, int dt_init_factor) { + aout(self) << "Recieved New dt_init_factor to attempt on next run \n"; + self->state.dt_init_factor = dt_init_factor; + }, + }; + /********************************************************************************************************* + *********************************** END ACTOR MESSAGE HANDLERS ****************************************** + *********************************************************************************************************/ +} + +void parseSettings(stateful_actor<hru_state>* self, std::string configPath) { + json settings; + std::string SummaActorsSettings = "/Summa_Actors_Settings.json"; + std::ifstream settings_file(configPath + SummaActorsSettings); + settings_file >> settings; + settings_file.close(); + + if (settings.find("HRUActor") != settings.end()) { + json HRUActorConfig = settings["HRUActor"]; + // find if we want to print output to stdout + if (HRUActorConfig.find("printOutput") != HRUActorConfig.end()) { + self->state.printOutput = HRUActorConfig["printOutput"]; + } else { + aout(self) << "Error finding printOutput in JSON File - Reverting to default value\n"; + self->state.printOutput = true; + } + + if (self->state.printOutput) { + // get the frequency in number of timesteps we want to print the output + if(HRUActorConfig.find("outputFrequency") != HRUActorConfig.end()) { + self->state.outputFrequency = HRUActorConfig["outputFrequency"]; + } else { + aout(self) << "Error finding outputFrequency in JSON File - Reverting to default value\n"; + self->state.outputFrequency = 10000; + } + } + } else { + aout(self) << "Error finding HRUActor in JSON File - Reverting to default values for HRUs\n"; + self->state.printOutput = true; + self->state.outputFrequency = 10000; + } + + +} + +void Initialize_HRU(stateful_actor<hru_state>* self) { + self->state.initStart = std::chrono::high_resolution_clock::now(); + // aout(self) << "Initalizing HRU" << std::endl; + // aout(self) << "Entering Initalize \n"; + Initialize(&self->state.indxGRU, + &self->state.num_steps, + self->state.handle_forcStat, + self->state.handle_progStat, + self->state.handle_diagStat, + self->state.handle_fluxStat, + self->state.handle_indxStat, + self->state.handle_bvarStat, + self->state.handle_timeStruct, + self->state.handle_forcStruct, + self->state.handle_attrStruct, + self->state.handle_typeStruct, + self->state.handle_idStruct, + self->state.handle_indxStruct, + self->state.handle_mparStruct, + self->state.handle_progStruct, + self->state.handle_diagStruct, + self->state.handle_fluxStruct, + self->state.handle_bparStruct, + self->state.handle_bvarStruct, + self->state.handle_dparStruct, + self->state.handle_startTime, + self->state.handle_finshTime, + self->state.handle_refTime, + self->state.handle_oldTime, &self->state.err); + + if (self->state.err != 0) { + aout(self) << "Error: Initialize - HRU = " << self->state.indxHRU << + " - indxGRU = " << self->state.indxGRU << " - refGRU = "<< self->state.refGRU << std::endl; + aout(self) << "Error = " << self->state.err << "\n"; + self->quit(); + return; + } + + SetupParam(&self->state.indxGRU, + &self->state.indxHRU, + self->state.handle_attrStruct, + self->state.handle_typeStruct, + self->state.handle_idStruct, + self->state.handle_mparStruct, + self->state.handle_bparStruct, + self->state.handle_bvarStruct, + self->state.handle_dparStruct, + self->state.handle_startTime, + self->state.handle_oldTime, + &self->state.upArea, &self->state.err); + if (self->state.err != 0) { + aout(self) << "Error: SetupParam - HRU = " << self->state.indxHRU << + " - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << std::endl; + self->quit(); + return; + } + // aout(self) << "Restart" << std::endl; + + Restart(&self->state.indxGRU, + &self->state.indxHRU, + self->state.handle_indxStruct, + self->state.handle_mparStruct, + self->state.handle_progStruct, + self->state.handle_diagStruct, + self->state.handle_fluxStruct, + self->state.handle_bvarStruct, + &self->state.dt_init, &self->state.err); + if (self->state.err != 0) { + aout(self) << "Error: Restart - HRU = " << self->state.indxHRU << + " - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << std::endl; + self->quit(); + return; + } + + // aout(self) << self->state.refGRU << " - Done Init" << std::endl; + self->state.initEnd = std::chrono::high_resolution_clock::now(); + self->state.initDuration = calculateTime(self->state.initStart, self->state.initEnd); +} + +int Run_HRU(stateful_actor<hru_state>* self) { + /********************************************************************** + ** READ FORCING + **********************************************************************/ + self->state.forcingStart = std::chrono::high_resolution_clock::now(); + Forcing(&self->state.indxGRU, + &self->state.timestep, + self->state.handle_timeStruct, + self->state.handle_forcStruct, + &self->state.iFile, + &self->state.forcingStep, + &self->state.fracJulDay, + &self->state.tmZoneOffsetFracDay, + &self->state.yearLength, + &self->state.err); + if (self->state.err != 0) { + aout(self) << "Error: Forcing - HRU = " << self->state.indxHRU << + " - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << + " - Timestep = " << self->state.timestep << std::endl; + return 10; + + } + self->state.forcingEnd = std::chrono::high_resolution_clock::now(); + self->state.forcingDuration += calculateTime(self->state.forcingStart, self->state.forcingEnd); + + + if (self->state.printOutput && + self->state.timestep % self->state.outputFrequency == 0) { + printOutput(self); + } + + + /********************************************************************** + ** RUN_PHYSICS + **********************************************************************/ + self->state.runPhysicsStart = std::chrono::high_resolution_clock::now(); + self->state.err = 0; + RunPhysics(&self->state.indxHRU, + &self->state.timestep, + self->state.handle_timeStruct, + self->state.handle_forcStruct, + self->state.handle_attrStruct, + self->state.handle_typeStruct, + self->state.handle_indxStruct, + self->state.handle_mparStruct, + self->state.handle_progStruct, + self->state.handle_diagStruct, + self->state.handle_fluxStruct, + self->state.handle_bvarStruct, + &self->state.fracJulDay, + &self->state.tmZoneOffsetFracDay, + &self->state.yearLength, + &self->state.computeVegFlux, + &self->state.dt_init, + &self->state.dt_init_factor, + &self->state.err); + if (self->state.err != 0) { + aout(self) << "Error: RunPhysics - HRU = " << self->state.indxHRU << + " - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << + " - Timestep = " << self->state.timestep << std::endl; + return 20; + } + self->state.runPhysicsEnd = std::chrono::high_resolution_clock::now(); + self->state.runPhysicsDuration += calculateTime(self->state.runPhysicsStart, self->state.runPhysicsEnd); + + /********************************************************************** + ** WRITE_OUTPUT + **********************************************************************/ + self->state.writeOutputStart = std::chrono::high_resolution_clock::now(); + WriteOutput(&self->state.indxHRU, + &self->state.indxGRU, + &self->state.timestep, + self->state.handle_forcStat, + self->state.handle_progStat, + self->state.handle_diagStat, + self->state.handle_fluxStat, + self->state.handle_indxStat, + self->state.handle_bvarStat, + self->state.handle_timeStruct, + self->state.handle_forcStruct, + self->state.handle_attrStruct, + self->state.handle_typeStruct, + self->state.handle_indxStruct, + self->state.handle_mparStruct, + self->state.handle_progStruct, + self->state.handle_diagStruct, + self->state.handle_fluxStruct, + self->state.handle_bparStruct, + self->state.handle_bvarStruct, + self->state.handle_statCounter, + self->state.handle_outputTimeStep, + self->state.handle_resetStats, + self->state.handle_finalizeStats, + self->state.handle_finshTime, + self->state.handle_oldTime, + &self->state.outputStep, + &self->state.err); + if (self->state.err != 0) { + aout(self) << "Error: WriteOutput - HRU = " << self->state.indxHRU << + " - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << + " - Timestep = " << self->state.timestep << std::endl; + return 30; + } + self->state.writeOutputEnd = std::chrono::high_resolution_clock::now(); + self->state.writeOutputDuration += calculateTime(self->state.writeOutputStart, self->state.writeOutputEnd); + + return 0; +} + +bool check_HRU(stateful_actor<hru_state>* self, int err) { + + if (err != 0) { + // check for error + + self->send(self->state.parent, run_failure_v, self, self->state.indxGRU, err); + self->quit(); + return false; + + } else if (self->state.timestep > self->state.num_steps) { + // check if simulation is finished + self->state.outputStep -= 1; // prevents segfault + + if (debug) + aout(self) << "Sending Final Write" << + "forcingStep = " << self->state.forcingStep << "\n" << + "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << + "timeStep = " << self->state.timestep << "\n" << + "outputStep = " << self->state.outputStep << "\n"; + + self->send(self->state.file_access_actor, write_output_v, + self->state.indxGRU, self->state.indxHRU, self->state.outputStep, self); + + self->state.end = std::chrono::high_resolution_clock::now(); + self->state.duration += calculateTime(self->state.start, self->state.end); + + return false; + + } else if (self->state.outputStep > self->state.outputStrucSize && + self->state.forcingStep > self->state.stepsInCurrentFFile) { + // Special case where we need both reading and writing + self->state.outputStep -= 1; // prevents segfault + + if (debug) + aout(self) << "Need to read forcing and write to outputstruc\n" << + "forcingStep = " << self->state.forcingStep << "\n" << + "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << + "timeStep = " << self->state.timestep << "\n" << + "outputStep = " << self->state.outputStep << "\n"; + + + self->send(self->state.file_access_actor, read_and_write_v, self->state.indxGRU, + self->state.indxHRU, self->state.outputStep, self->state.iFile + 1, self); + self->state.outputStep = 1; + + return false; + + } else if (self->state.outputStep > self->state.outputStrucSize) { + // check if we need to clear the output struc + self->state.outputStep -= 1; + + if (debug) + aout(self) << "Sending Write \n" << + "forcingStep = " << self->state.forcingStep << "\n" << + "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << + "timeStep = " << self->state.timestep << "\n" << + "outputStep = " << self->state.outputStep << "\n"; + + + self->send(self->state.file_access_actor, write_output_v, + self->state.indxGRU, self->state.indxHRU, self->state.outputStep, self); + self->state.outputStep = 1; + + return false; + + } else if (self->state.forcingStep > self->state.stepsInCurrentFFile) { + // we need more forcing data + + if (debug) + aout(self) << "Asking for more forcing data\n" << + "forcingStep = " << self->state.forcingStep << "\n" << + "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << + "timeStep = " << self->state.timestep << "\n" << + "outputStep = " << self->state.outputStep << "\n"; + + self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile + 1, self); + + return false; + + } else { + return true; + } +} + +void deallocateHRUStructures(stateful_actor<hru_state>* self) { + + DeallocateStructures(self->state.handle_forcStat, + self->state.handle_progStat, + self->state.handle_diagStat, + self->state.handle_fluxStat, + self->state.handle_indxStat, + self->state.handle_bvarStat, + self->state.handle_timeStruct, + self->state.handle_forcStruct, + self->state.handle_attrStruct, + self->state.handle_typeStruct, + self->state.handle_idStruct, + self->state.handle_indxStruct, + self->state.handle_mparStruct, + self->state.handle_progStruct, + self->state.handle_diagStruct, + self->state.handle_fluxStruct, + self->state.handle_bparStruct, + self->state.handle_bvarStruct, + self->state.handle_dparStruct, + self->state.handle_startTime, + self->state.handle_finshTime, + self->state.handle_refTime, + self->state.handle_oldTime, + self->state.handle_ncid, + self->state.handle_statCounter, + self->state.handle_outputTimeStep, + self->state.handle_resetStats, + self->state.handle_finalizeStats, + &self->state.err); +} + +void printOutput(stateful_actor<hru_state>* self) { + aout(self) << self->state.refGRU << " - Timestep = " << self->state.timestep << std::endl; + aout(self) << self->state.refGRU << ":Accumulated Run Physics Time = " << + self->state.runPhysicsDuration << std::endl; +} + +} \ No newline at end of file diff --git a/build/source/actors/job_actor/GRUinfo.cpp b/build/source/actors/job_actor/GRUinfo.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c23be65df8ae5bb62641060e08bcae7928195e29 --- /dev/null +++ b/build/source/actors/job_actor/GRUinfo.cpp @@ -0,0 +1,113 @@ +#include "caf/all.hpp" +#include "GRUinfo.hpp" +#include <iostream> +#include <fstream> + + +GRUinfo::GRUinfo(int refGRU, int indxGRU, caf::actor gru, int dt_init, int maxAttempts) { + this->refGRU = refGRU; + this->indxGRU = indxGRU; + this->GRU = gru; + this->dt_init = dt_init; + this->currentAttempt = 1; + this->maxAttempts = maxAttempts; + this->completed = false; + this->failed = false; +} +GRUinfo::~GRUinfo(){}; + +// Getters +int GRUinfo::getRefGRU() { + return this->refGRU; +} + +int GRUinfo::getIndxGRU() { + return this->indxGRU; +} + +int GRUinfo::getDt_init() { + return this->dt_init; +} + +caf::actor GRUinfo::getActor() { + return GRU; +} +// Setters +void GRUinfo::updateGRU(caf::actor gru) { + this->GRU = gru; +} + +void GRUinfo::updateFailed() { + if (this->failed) { + this->failed = false; + } else { + this->failed = true; + } +} + +void GRUinfo::updateCompletedToTrue(){ + this->completed = true; +} + +void GRUinfo::updateDt_init() { + this->dt_init = this->dt_init * 2; +} + +void GRUinfo::updateCurrentAttempt() { + this->currentAttempt++; +} + +// Methods that return Booleans +bool GRUinfo::isMaxAttemptsReached() { + return this->maxAttempts <= this->currentAttempt; +} + +bool GRUinfo::isFailed() { + return this->failed; +} + +bool GRUinfo::isCompleted() { + return this->completed; +} + +void GRUinfo::doneRun(double runTime, double initDuration, double forcingDuration, + double runPhysicsDuration, double writeOutputDuration) { + this->runTime = runTime; + this->initDuration = initDuration; + this->forcingDuration = forcingDuration; + this->runPhysicsDuration = runPhysicsDuration; + this->writeOutputDuration = writeOutputDuration; +} + +// Methods for writing statistics to a file +void GRUinfo::writeSuccess(std::string fileName) { + std::ofstream file; + file.open(fileName, std::ios_base::app); + file << this->refGRU << "," + << this->runTime << "," + << this->initDuration << "," + << this->forcingDuration << "," + << this->runPhysicsDuration << "," + << this->writeOutputDuration << "," + << this->dt_init << "," + << this->currentAttempt << "\n"; + file.close(); +} + +void GRUinfo::writeFail(std::string fileName) { + std::ofstream file; + file.open(fileName, std::ios_base::app); + file << this->refGRU << "," + << this->dt_init << "," + << this->currentAttempt << "\n"; + file.close(); +} + +void GRUinfo::printOutput() { + std::cout << "\nGRU = " << this->refGRU << "\n" << + "RunTime = " << this->runTime << "\n" << + "initDuration = " << this->initDuration << "\n" << + "forcingDuration = " << this->forcingDuration << "\n" << + "runPhysicsDuration = " << this->runPhysicsDuration << "\n" << + "writeOutputDuration = " << this->writeOutputDuration << "\n\n"; +} \ No newline at end of file diff --git a/build/source/actors/job_actor/GRUinfo.h b/build/source/actors/job_actor/GRUinfo.h deleted file mode 100644 index 51153f0d28afdf9a59dea6927744cccc5d03c88f..0000000000000000000000000000000000000000 --- a/build/source/actors/job_actor/GRUinfo.h +++ /dev/null @@ -1,146 +0,0 @@ -#ifndef GRUinfo_H_ -#define GRUinfo_H_ - -#include "caf/all.hpp" -#include <iostream> -#include <fstream> -#include "Job.h" - - -class GRUinfo { - private: - int refGRU; // This will be the same as the refGRU - int indxGRU; - caf::actor GRU; - - // Variable to update - int dt_init; - - // Completed Information - int currentAttempt; - int maxAttempts; - bool completed; - bool failed; - - // Timing information for the GRU - double runTime; - double initDuration; - double forcingDuration; - double runPhysicsDuration; - double writeOutputDuration; - - public: - // Constructor - GRUinfo(int refGRU, int indxGRU, caf::actor gru, int dt_init, int maxAttempts) { - this->refGRU = refGRU; - this->indxGRU = indxGRU; - this->GRU = gru; - this->dt_init = dt_init; - this->currentAttempt = 1; - this->maxAttempts = maxAttempts; - this->completed = false; - this->failed = false; - } - // Deconstructor - ~GRUinfo(){}; - - // Getters - int getRefGRU() { - return this->refGRU; - } - - int getIndxGRU() { - return this->indxGRU; - } - - int getDt_init() { - return this->dt_init; - } - - caf::actor getActor() { - return GRU; - } - // Setters - void updateGRU(caf::actor gru) { - this->GRU = gru; - } - - void updateFailed() { - if (this->failed) { - this->failed = false; - } else { - this->failed = true; - } - } - - void updateCompletedToTrue(){ - this->completed = true; - } - - void updateDt_init() { - this->dt_init = this->dt_init * 2; - } - - void updateCurrentAttempt() { - this->currentAttempt++; - } - - // Methods that return Booleans - bool isMaxAttemptsReached() { - return this->maxAttempts <= this->currentAttempt; - } - - bool isFailed() { - return this->failed; - } - - bool isCompleted() { - return this->completed; - } - - void doneRun(double runTime, double initDuration, double forcingDuration, - double runPhysicsDuration, double writeOutputDuration) { - this->runTime = runTime; - this->initDuration = initDuration; - this->forcingDuration = forcingDuration; - this->runPhysicsDuration = runPhysicsDuration; - this->writeOutputDuration = writeOutputDuration; - } - - // Methods for writing statistics to a file - void writeSuccess(std::string fileName) { - std::ofstream file; - file.open(fileName, std::ios_base::app); - file << this->refGRU << "," - << this->runTime << "," - << this->initDuration << "," - << this->forcingDuration << "," - << this->runPhysicsDuration << "," - << this->writeOutputDuration << "," - << this->dt_init << "," - << this->currentAttempt << "\n"; - file.close(); - } - - void writeFail(std::string fileName) { - std::ofstream file; - file.open(fileName, std::ios_base::app); - file << this->refGRU << "," - << this->dt_init << "," - << this->currentAttempt << "\n"; - file.close(); - } - - void printOutput() { - std::cout << "\nGRU = " << this->refGRU << "\n" << - "RunTime = " << this->runTime << "\n" << - "initDuration = " << this->initDuration << "\n" << - "forcingDuration = " << this->forcingDuration << "\n" << - "runPhysicsDuration = " << this->runPhysicsDuration << "\n" << - "writeOutputDuration = " << this->writeOutputDuration << "\n\n"; - } - - - -}; -#endif \ No newline at end of file diff --git a/build/source/actors/job_actor/Job.h b/build/source/actors/job_actor/Job.h index dd44e5d8d93da7ae479f3e32249bc28c80d14da5..7162a62dfed7b6370d4e5df0271c56fb8f25efff 100644 --- a/build/source/actors/job_actor/Job.h +++ b/build/source/actors/job_actor/Job.h @@ -6,7 +6,6 @@ #include "../file_access_actor/FileAccessActor.h" #include "../hru_actor/HRUActor.h" #include "../global/messageAtoms.h" -#include "../global/json.hpp" #include "../global/global.h" #include "GRUinfo.h" #include "job_subroutine_wrappers.h" @@ -19,6 +18,8 @@ #include <fstream> #include <sys/stat.h> + + struct job_state { // Actor References caf::actor file_access_actor; // actor reference for the file_access_actor diff --git a/build/source/actors/job_actor/JobActor.h b/build/source/actors/job_actor/job_actor.cpp similarity index 98% rename from build/source/actors/job_actor/JobActor.h rename to build/source/actors/job_actor/job_actor.cpp index 890bfde6b0621022a0118d5d7d750c6f2fdd731b..241c5dcff25aec0a82cdf536abc425de10be090d 100644 --- a/build/source/actors/job_actor/JobActor.h +++ b/build/source/actors/job_actor/job_actor.cpp @@ -1,11 +1,15 @@ -#ifndef SUMMACLIENTACTOR_H_ -#define SUMMACLIENTACTOR_H_ +#include "job_actor.hpp" +#include "file_access_actor.hpp" +#include "json.hpp" +#include "message_atoms.hpp" +#include "global.hpp" +#include "job_actor_subroutine_wrappers.hpp" +#include "hru_actor.hpp" -#include "Job.h" - -using namespace caf; using json = nlohmann::json; +namespace caf { + /** * @brief First Actor that is spawned that is not the Coordinator Actor. * @@ -305,6 +309,6 @@ void restartFailures(stateful_actor<job_state>* self) { } } +} // End Namespace caf -#endif diff --git a/build/source/actors/main.cc b/build/source/actors/main.cc deleted file mode 100644 index fbecb3b905a981828f00eb55fd45c7eb93d31c50..0000000000000000000000000000000000000000 --- a/build/source/actors/main.cc +++ /dev/null @@ -1,183 +0,0 @@ -#include "caf/all.hpp" -#include "caf/io/all.hpp" -#include "summa_actor/SummaActor.h" -#include "global/messageAtoms.h" -#include "global/global.h" - -#include <string> -#include <bits/stdc++.h> -#include <unistd.h> -#include <iostream> - -using namespace caf; - - -/* Configuration class that handles the config and -/ command line options for the actors program */ -class config : public actor_system_config { - public: - int startGRU = -1; - int countGRU = -1; - std::string configPath = ""; // master file - bool debugMode = false; - uint16_t port = 4444; - std::string host = "localhost"; - bool server_mode = false; - - config() { - opt_group{custom_options_, "global"} - .add(startGRU, "gru,g", "Starting GRU Index") - .add(countGRU, "numGRU,n", "Total Number of GRUs") - .add(configPath, "config,c", "Path name of the config directory") - .add(debugMode, "debug-mode,d", "enable debug mode") - .add(port, "port,p", "set port") - .add(host, "host,h", "set Host (ignored in server mode)") - .add(server_mode, "server-mode,s", "enable server mode"); - } -}; - -struct server_state { - -}; - -behavior say_hello(stateful_actor<server_state>* self) { - aout(self) << "Hello" << std::endl; - return { - [=](int i) { - aout(self) << "Recieved" << i << std::endl; - return "Got It"; - }, - }; -} - -void run_server(actor_system& system, const config& cfg) { - scoped_actor self{system}; - auto hello = system.spawn(say_hello); - aout(self) << "SEVER" << std::endl; - aout(self) << "Attempting to publish actor" << cfg.port << std::endl; - auto is_port = io::publish(hello, cfg.port); - if (!is_port) { - std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << std::endl; - return; - } - aout(self) << "Successfully Published" << *is_port << std::endl; - std::string dummy; - std::getline(std::cin, dummy); - std::cout << "...cya" << std::endl; - anon_send_exit(hello, exit_reason::user_shutdown); -} - -struct client_state { - strong_actor_ptr current_server; -}; - -behavior unconnected(stateful_actor<client_state>*); -void connecting(stateful_actor<client_state>*, const std::string& host, uint16_t port); -behavior running(stateful_actor<client_state>*, const actor& say_hello); - -behavior client(stateful_actor<client_state>* self) { - self->set_down_handler([=](const down_msg& dm){ - if(dm.source == self->state.current_server) { - aout(self) << "*** Lost Connection to Server" << std::endl; - self->state.current_server = nullptr; - self->become(unconnected(self)); - } - }); - return unconnected(self); -} - -behavior unconnected(stateful_actor<client_state>* self) { - return { - [=] (connect_atom, const std::string& host, uint16_t port) { - connecting(self, host, port); - }, - }; -} - -void connecting(stateful_actor<client_state>* self, const std::string& host, uint16_t port) { - self->state.current_server = nullptr; - - auto mm = self->system().middleman().actor_handle(); - self->request(mm, infinite, connect_atom_v, host, port) - .await( - [=](const node_id&, strong_actor_ptr serv, - const std::set<std::string>& ifs) { - if (!serv) { - aout(self) << R"(*** no server found at ")" << host << R"(":)" << port - << std::endl; - return; - } - if (!ifs.empty()) { - aout(self) << R"(*** typed actor found at ")" << host << R"(":)" - << port << ", but expected an untyped actor " << std::endl; - return; - } - aout(self) << "*** successfully connected to server" << std::endl; - self->state.current_server = serv; - auto hdl = actor_cast<actor>(serv); - self->monitor(hdl); - self->become(running(self, hdl)); - }, - [=](const error& err) { - aout(self) << R"(*** cannot connect to ")" << host << R"(":)" << port - << " => " << to_string(err) << std::endl; - self->become(unconnected(self)); - }); -} - -behavior running(stateful_actor<client_state>* self, const actor& say_hello) { - aout(self) << "HERE" << std::endl; - self->send(say_hello, infinite, 80); - return { - [=](std::string test) { - aout(self) << test << std::endl; - } - }; -} - - -void run_client(actor_system& system, const config& cfg) { - scoped_actor self{system}; - aout(self) << "CLIENT" << std::endl; - auto c = system.spawn(client); - if (!cfg.host.empty() && cfg.port > 0) { - anon_send(c, connect_atom_v, cfg.host, cfg.port); - } else { - aout(self) << "No Server Config" << std::endl; - } -} - - -void caf_main(actor_system& sys, const config& cfg) { - // scoped_actor self{sys}; - // if (cfg.startGRU == -1) { - // aout(self) << "Starting GRU was not defined!! " << - // "startGRU is set with the \"-g\" option\n"; - // aout(self) << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; - // return; - // } - // if (cfg.countGRU == -1) { - // aout(self) << "Number of GRUs was not defined!! " << - // "countGRU is set with the \"-n\" option\n"; - // aout(self) << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; - // return; - // } - // if (cfg.configPath == "") { - // aout(self) << "File Manager was not defined!! " << - // "fileManger is set with the \"-c\" option\n"; - // aout(self) << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; - // return; - // } - // if (cfg.debugMode) { - // aout(self) << "Starting SUMMA-Actors in DebugMode\n"; - // debug = true; - // } - // start SUMMA - auto system = cfg.server_mode ? run_server : run_client; - system(sys, cfg); - - - // auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath); -} - -CAF_MAIN(id_block::summa, io::middleman) \ No newline at end of file diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7de2ab645608a8b3b33be5b98f4403e229da789a --- /dev/null +++ b/build/source/actors/main.cpp @@ -0,0 +1,106 @@ +#include "caf/all.hpp" +#include "caf/io/all.hpp" +#include "summa_actor.hpp" +#include "global.hpp" +#include "message_atoms.hpp" +#include <string> +#include <bits/stdc++.h> +#include <unistd.h> +#include <iostream> + +using namespace caf; + + +/* Configuration class that handles the config and +/ command line options for the actors program */ +class config : public actor_system_config { + public: + int startGRU = -1; + int countGRU = -1; + std::string configPath = ""; + bool debugMode = false; + uint16_t port = 4444; + std::string host = "localhost"; + bool server_mode = false; + bool distributed = false; + + config() { + opt_group{custom_options_, "global"} + .add(startGRU, "gru,g", "Starting GRU Index") + .add(countGRU, "numGRU,n", "Total Number of GRUs") + .add(configPath, "config,c", "Path name of the config directory") + .add(debugMode, "debug-mode,debug", "enable debug mode") + .add(distributed, "distributed-mode,d", "enable distributed mode") + .add(port, "port,p", "set port") + .add(host, "host,h", "set Host (ignored in server mode)") + .add(server_mode, "server-mode,s", "enable server mode"); + } +}; + +// void run_client(actor_system& system, const config& cfg) { +// scoped_actor self{system}; +// if (cfg.distributed) { +// aout(self) << "Starting SUMMA-Client in Distributed Mode\n"; +// auto c = system.spawn(summa_client); +// if (!cfg.host.empty() && cfg.port > 0) { +// anon_send(c, connect_atom_v, cfg.host, cfg.port); +// } else { +// aout(self) << "No Server Config" << std::endl; +// } + +// } else { +// aout(self) << "Starting SUMMA in non-distributed mode \n"; +// auto summa = system.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath); +// } + +// } + + +// void run_server(actor_system& system, const config& cfg) { +// scoped_actor self{system}; +// auto server = system.spawn(summa_server); +// aout(self) << "SEVER" << std::endl; +// aout(self) << "Attempting to publish actor" << cfg.port << std::endl; +// auto is_port = io::publish(server, cfg.port); +// if (!is_port) { +// std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << std::endl; +// return; +// } +// aout(self) << "Successfully Published" << *is_port << std::endl; +// std::string dummy; +// std::getline(std::cin, dummy); +// std::cout << "...cya" << std::endl; +// anon_send_exit(server, exit_reason::user_shutdown); +// } + +void caf_main(actor_system& sys, const config& cfg) { + scoped_actor self{sys}; + if (cfg.startGRU == -1) { + aout(self) << "Starting GRU was not defined!! " << + "startGRU is set with the \"-g\" option\n"; + aout(self) << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; + return; + } + if (cfg.countGRU == -1) { + aout(self) << "Number of GRUs was not defined!! " << + "countGRU is set with the \"-n\" option\n"; + aout(self) << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; + return; + } + if (cfg.configPath == "") { + aout(self) << "File Manager was not defined!! " << + "fileManger is set with the \"-c\" option\n"; + aout(self) << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; + return; + } + if (cfg.debugMode) { + aout(self) << "Starting SUMMA-Actors in DebugMode\n"; + bool debug = true; + } + // start SUMMA + // auto system = cfg.server_mode ? run_server : run_client; + // system(sys, cfg); + auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath); +} + +CAF_MAIN(id_block::summa, io::middleman) \ No newline at end of file diff --git a/build/source/actors/summa_actor/SummaClient.h b/build/source/actors/summa_actor/SummaClient.h new file mode 100644 index 0000000000000000000000000000000000000000..464f388dc398f0665acd294d3a9e3509f0a4a224 --- /dev/null +++ b/build/source/actors/summa_actor/SummaClient.h @@ -0,0 +1,90 @@ +#ifndef SUMMACLIENT_H_ +#define SUMMACLIENT_H_ + +#include "caf/all.hpp" +#include "caf/io/all.hpp" + +using namespace caf; + +struct summa_client_state { + strong_actor_ptr current_server; + +}; + + +behavior unconnected(stateful_actor<summa_client_state>*); +void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port); +behavior running(stateful_actor<summa_client_state>*, const actor& say_hello); + + +/** + * @brief Set up the client and its down handler + */ +behavior summa_client(stateful_actor<summa_client_state>* self) { + self->set_down_handler([=](const down_msg& dm){ + if(dm.source == self->state.current_server) { + aout(self) << "*** Lost Connection to Server" << std::endl; + self->state.current_server = nullptr; + self->become(unconnected(self)); + } + }); + return unconnected(self); +} +/** + * Attempt to connect to the server + */ +behavior unconnected(stateful_actor<summa_client_state>* self) { + return { + [=] (connect_atom, const std::string& host, uint16_t port) { + connecting(self, host, port); + }, + }; +} + +void connecting(stateful_actor<summa_client_state>* self, const std::string& host, uint16_t port) { + self->state.current_server = nullptr; + + auto mm = self->system().middleman().actor_handle(); + self->request(mm, infinite, connect_atom_v, host, port) + .await( + [=](const node_id&, strong_actor_ptr serv, + const std::set<std::string>& ifs) { + if (!serv) { + aout(self) << R"(*** no server found at ")" << host << R"(":)" << port + << std::endl; + return; + } + if (!ifs.empty()) { + aout(self) << R"(*** typed actor found at ")" << host << R"(":)" + << port << ", but expected an untyped actor " << std::endl; + return; + } + aout(self) << "*** successfully connected to server" << std::endl; + self->state.current_server = serv; + auto hdl = actor_cast<actor>(serv); + self->monitor(hdl); + self->become(running(self, hdl)); + }, + [=](const error& err) { + aout(self) << R"(*** cannot connect to ")" << host << R"(":)" << port + << " => " << to_string(err) << std::endl; + self->become(unconnected(self)); + }); +} + +behavior running(stateful_actor<summa_client_state>* self, const actor& server_actor) { + aout(self) << "Client Has Started Successfully" << std::endl; + self->send(server_actor, 80); + return { + [=](std::string test) { + aout(self) << test << std::endl; + } + }; +} + + + + + + +#endif \ No newline at end of file diff --git a/build/source/actors/summa_actor/SummaManager.h b/build/source/actors/summa_actor/SummaManager.h index da875782b25c35ea1b589823d68a467a32321581..95ab4b0badf4374831f577633e71729785a2ffb4 100644 --- a/build/source/actors/summa_actor/SummaManager.h +++ b/build/source/actors/summa_actor/SummaManager.h @@ -15,30 +15,13 @@ -struct summa_manager { - // Timing Information - std::chrono::time_point<std::chrono::system_clock> start; - std::chrono::time_point<std::chrono::system_clock> end; - double duration; - // Program Parameters - int startGRU; // starting GRU for the simulation - int numGRU; // number of GRUs to compute - std::string configPath;// path to the fileManager.txt file - // Information about the jobs - int numFailed = 0; // Number of jobs that have failed - - // Values Set By Summa_Actors_Settings.json - int maxGRUPerJob; // maximum number of GRUs a job can compute at once - int outputStrucSize; - - caf::actor currentJob; // Reference to the current job actor - -}; - -/** - * @brief Function to spawn a job actor - */ + +behavior summa_actor(stateful_actor<summa_manager>* self, int startGRU, int numGRU, std::string configPath); + void spawnJob(stateful_actor<summa_manager>* self); void parseSettings(stateful_actor<summa_manager>* self, std::string configPath); + + + #endif \ No newline at end of file diff --git a/build/source/actors/summa_actor/SummaServer.h b/build/source/actors/summa_actor/SummaServer.h new file mode 100644 index 0000000000000000000000000000000000000000..736791a041c86735b1be4c3067a239bcdb87c10f --- /dev/null +++ b/build/source/actors/summa_actor/SummaServer.h @@ -0,0 +1,31 @@ +#ifndef SUMMASERVER_H_ +#define SUMMASERVER_H_ + +#include "caf/all.hpp" +#include "caf/io/all.hpp" + +using namespace caf; + +struct summa_server_state { + +}; + + +behavior summa_server(stateful_actor<summa_server_state>* self) { + aout(self) << "Summa Server has Started \n"; + + return { + [=](int i) { + aout(self) << "Received " << i << "\n"; + return "Received"; + } + + }; + +} + + + + + +#endif \ No newline at end of file diff --git a/build/source/actors/summa_actor/summa_actor.cpp b/build/source/actors/summa_actor/summa_actor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6984ef3a3cb86ddfe15538dc1d860e240bf143c0 --- /dev/null +++ b/build/source/actors/summa_actor/summa_actor.cpp @@ -0,0 +1,117 @@ +#include "caf/all.hpp" +#include "caf/io/all.hpp" +#include "message_atoms.hpp" +#include "summa_actor.hpp" +#include "global.hpp" +#include "job_actor.hpp" +#include "json.hpp" +#include <iostream> +#include <chrono> +#include <string> +#include <fstream> + +using json = nlohmann::json; + +namespace caf { + +behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, std::string configPath) { + self->state.start = std::chrono::high_resolution_clock::now(); + // Set Variables + self->state.startGRU = startGRU; + self->state.numGRU = numGRU; + self->state.configPath = configPath; + + parseSettings(self, configPath); + aout(self) << "SETTINGS FOR SUMMA_ACTOR\n"; + aout(self) << "Output Structure Size = " << self->state.outputStrucSize << "\n"; + aout(self) << "Max GRUs Per Job = " << self->state.maxGRUPerJob << "\n"; + + // Create the job_actor and start SUMMA + spawnJob(self); + + return { + [=](done_job, int numFailed) { + self->state.numFailed += numFailed; + aout(self) << "Job Done\n"; + if (self->state.numGRU <= 0) { + + self->state.end = std::chrono::high_resolution_clock::now(); + self->state.duration = calculateTime(self->state.start, self->state.end); + + self->state.duration = self->state.duration / 1000; // Convert to milliseconds + + + aout(self) << "Total Program Duration:\n"; + aout(self) << " " << self->state.duration / 1000 << " Seconds\n"; + aout(self) << " " << (self->state.duration / 1000) / 60 << " Minutes\n"; + aout(self) << " " << ((self->state.duration / 1000) / 60) / 60 << " Hours\n"; + + aout(self) << "Program Finished \n"; + + } else { + // spawn a new job + spawnJob(self); + } + }, + + [=](err) { + aout(self) << "Unrecoverable Error: Attempting To Fail Gracefully\n"; + self->quit(); + } + }; +} + + +void spawnJob(stateful_actor<summa_actor_state>* self) { + // Ensure we do not start a job with too many GRUs + if (self->state.numGRU > self->state.maxGRUPerJob) { + // spawn the job actor + aout(self) << "\n Starting Job with startGRU = " << self->state.startGRU << "\n"; + self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.maxGRUPerJob, + self->state.configPath, self->state.outputStrucSize, self); + + // Update GRU count + self->state.numGRU = self->state.numGRU - self->state.maxGRUPerJob; + self->state.startGRU = self->state.startGRU + self->state.maxGRUPerJob; + + } else { + + self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.numGRU, + self->state.configPath, self->state.outputStrucSize, self); + self->state.numGRU = 0; + } +} + +void parseSettings(stateful_actor<summa_actor_state>* self, std::string configPath) { + json settings; + std::string SummaActorsSettings = "/Summa_Actors_Settings.json"; + std::ifstream settings_file(configPath + SummaActorsSettings); + settings_file >> settings; + settings_file.close(); + + if (settings.find("SummaActor") != settings.end()) { + json SummaActorConfig = settings["SummaActor"]; + + // Find the desired OutputStrucSize + if (SummaActorConfig.find("OuputStructureSize") != SummaActorConfig.end()) { + self->state.outputStrucSize = SummaActorConfig["OuputStructureSize"]; + } else { + aout(self) << "Error Finding OutputStructureSize in JOSN - Reverting to default value\n"; + self->state.outputStrucSize = 250; + } + + // Find the desired maxGRUPerJob size + if (SummaActorConfig.find("maxGRUPerJob") != SummaActorConfig.end()) { + self->state.maxGRUPerJob = SummaActorConfig["maxGRUPerJob"]; + } else { + aout(self) << "Error Finding maxGRUPerJob in JOSN - Reverting to default value\n"; + self->state.maxGRUPerJob = 500; + } + + } else { + aout(self) << "Error Finding SummaActor in JSON - Reverting to default values\n"; + self->state.outputStrucSize = 250; + self->state.maxGRUPerJob = 500; + } +} +} // end namespace