diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp index e92a7a1f15ab3601090057a106436b2654255177..5acb7c6ec340023c2c737fecacd21f4be72f21cf 100644 --- a/build/includes/job_actor/job_actor.hpp +++ b/build/includes/job_actor/job_actor.hpp @@ -12,7 +12,10 @@ namespace caf { using chrono_time = std::chrono::time_point<std::chrono::system_clock>; struct GRU_Container { + std::vector<GRU*> gru_list; + + chrono_time gru_start_time; // Vector of start times for each GRU int num_gru_done = 0; int num_gru_failed = 0; // number of grus that are waiting to be restarted diff --git a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp index 62ef5d24c3be21088dc3ac6e6b2471484183df0e..e6b6f1ca263d5d7e502edc54cb994907dd805978 100644 --- a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp +++ b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp @@ -269,7 +269,7 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { // save model decisions as named integers mDecisions_C(&self->state.num_steps, &err); if (err != 0) { - aout(self) << "\033[31mFile_Access_Actor: Error in mDecisions\033[0m\n"; + aout(self) << "ERROR: File_Access_Actor in mDecisions\n"; std::string function = "mDecisions"; self->send(self->state.parent, file_access_actor_err_v, function); self->quit(); diff --git a/build/source/actors/file_access_actor/fortran_code/cppwrap_fileAccess.f90 b/build/source/actors/file_access_actor/fortran_code/cppwrap_fileAccess.f90 index de44faf9c506f9b085b94fdebc8c98843e9bb1fd..e40da86eea35f544acce6a9eda12be3229052c05 100644 --- a/build/source/actors/file_access_actor/fortran_code/cppwrap_fileAccess.f90 +++ b/build/source/actors/file_access_actor/fortran_code/cppwrap_fileAccess.f90 @@ -30,7 +30,7 @@ subroutine mDecisions_C(num_steps, err) bind(C, name='mDecisions_C') character(len=256) :: message ! error message call mDecisions(err,message) - if(err/=0)then; print*, char(27),'[33m',message,char(27),'[0m'; return; endif + if(err/=0)then; print*,message; return; endif num_steps = numtim end subroutine mDecisions_C diff --git a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp index 9a66514301dd3e8f802ad7941d264162c01bd481..11de6a7a0cc456acdb80c1e8523090ea0090de02 100644 --- a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp +++ b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp @@ -35,6 +35,11 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, self->state.handle_progStruct, self->state.handle_diagStruct, self->state.handle_fluxStruct, self->state.handle_bvarStruct, self->state.handle_startTime, self->state.handle_finshTime, self->state.handle_refTime,self->state.handle_oldTime, &self->state.err); + + + + + if (self->state.err != 0) { aout(self) << "Error: HRU_Actor - Initialize - HRU = " << self->state.indxHRU << " - indxGRU = " << self->state.indxGRU << " - refGRU = "<< self->state.refGRU << std::endl; diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index af8df6d67ca95d6c7547b1087a9d68640c55067d..b1545735eba1794142a6a96573f5eab1754a86fc 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -15,9 +15,12 @@ using chrono_time = std::chrono::time_point<std::chrono::system_clock>; namespace caf { // First Actor that is spawned that is not the Coordinator Actor. -behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, - File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, - HRU_Actor_Settings hru_actor_settings, caf::actor parent) { +behavior job_actor(stateful_actor<job_state>* self, + int start_gru, int num_gru, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings, + caf::actor parent) { self->set_down_handler([=](const down_msg& dm) { aout(self) << "\n\n ********** DOWN HANDLER ********** \n"; @@ -55,37 +58,27 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, gethostname(host, HOST_NAME_MAX); self->state.hostname = host; - // Initalize global variables + // Initalize global variables calling Fortran Routines int err = 0; setTimesDirsAndFiles(self->state.job_actor_settings.file_manager_path.c_str(), &err); - if (err != 0) { - aout(self) << "ERROR: Job_Actor - setTimesDirsAndFiles\n"; - return {}; // Failure - } + if (err != 0) { aout(self) << "\nERROR: Job_Actor - setTimesDirsAndFiles\n"; return {}; } + defineGlobalData(&self->state.start_gru, &err); - if (err != 0) { - aout(self) << "ERROR: Job_Actor - defineGlobalData\n"; - return {}; // Failure - } + if (err != 0) { aout(self) << "\nERROR: Job_Actor - defineGlobalData\n"; return {}; } + readDimension(&self->state.num_gru, &self->state.num_hru, &self->state.start_gru, &err); - if (err != 0) { - aout(self) << "ERROR: Job_Actor - readDimension\n"; - return {}; // Failure - } + if (err != 0) { aout(self) << "\nERROR: Job_Actor - readDimension\n"; return {}; } + readIcondNLayers(&self->state.num_gru, &err); - if (err != 0) { - aout(self) << "ERROR: Job_Actor - readIcondNLayers\n"; - return {}; // Failure - } + if (err != 0) { aout(self) << "\nERROR: Job_Actor - readIcondNLayers\n"; return {};} + allocateTimeStructure(&err); - if (err != 0) { - aout(self) << "ERROR: Job_Actor - allocateTimeStructure\n"; - return {}; // Failure - } + if (err != 0) { aout(self) << "\nERROR: Job_Actor - allocateTimeStructure\n"; return {}; } // Spawn the file_access_actor. This will return the number of forcing files we are working with - self->state.file_access_actor = self->spawn(file_access_actor, self->state.start_gru, self->state.num_gru, - self->state.file_access_actor_settings, self); + self->state.file_access_actor = self->spawn(file_access_actor, + self->state.start_gru, self->state.num_gru, + self->state.file_access_actor_settings, self); aout(self) << "Job Actor Initalized \n"; @@ -93,71 +86,97 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, return { [=](init_gru) { - initGRUs(self); - }, + auto& gru_container = self->state.gru_container; + + gru_container.gru_start_time = std::chrono::high_resolution_clock::now(); + gru_container.run_attempts_left = self->state.max_run_attempts; + gru_container.run_attempts_left--; + + + // Spawn the GRUs + for(int i = 0; i < self->state.gru_container.num_gru_in_run_domain; i++) { + auto global_gru_index = self->state.gru_container.gru_list.size() + self->state.start_gru; + auto local_gru_index = self->state.gru_container.gru_list.size() + 1; // Fortran reference starts at 1 + + auto gru = self->spawn(hru_actor, + global_gru_index, + local_gru_index, + self->state.hru_actor_settings, + self->state.file_access_actor, + self); + + // Create the GRU object (Job uses this to keep track of GRU status) + self->state.gru_container.gru_list.push_back(new GRU(global_gru_index, + local_gru_index, + gru, + self->state.dt_init_start_factor, + self->state.max_run_attempts)); + } + }, // end init_gru [=](done_hru, int local_gru_index) { - chrono_time end_point = std::chrono::high_resolution_clock::now(); - double total_duration = std::chrono::duration_cast<std::chrono::seconds>(end_point - - self->state.gru_container.gru_start_time).count(); - - aout(self) << "\nJob_Actor: GRU Finished: \n" << - " global_gru_index = " << - self->state.gru_container.gru_list[local_gru_index-1]->getGlobalGRUIndex() << "\n" << - " local_gru_index = " << local_gru_index << "\n" << - " total_duration = " << total_duration << "\n\n"; + auto& gru_container = self->state.gru_container; + using namespace std::chrono; + + chrono_time end_point = high_resolution_clock::now(); + double total_duration = duration_cast<seconds>(end_point - gru_container.gru_start_time).count(); + + aout(self) << "\nJob_Actor: GRU Finished: \n" + << " global_gru_index = " + << gru_container.gru_list[local_gru_index-1]->getGlobalGRUIndex() << "\n" + << " local_gru_index = " << local_gru_index << "\n" + << " total_duration = " << total_duration << "\n\n"; // Update Timing - self->state.gru_container.gru_list[local_gru_index-1]->setRunTime(total_duration); - self->state.gru_container.gru_list[local_gru_index-1]->setInitDuration(-1); - self->state.gru_container.gru_list[local_gru_index-1]->setForcingDuration(-1); - self->state.gru_container.gru_list[local_gru_index-1]->setRunPhysicsDuration(-1); - self->state.gru_container.gru_list[local_gru_index-1]->setWriteOutputDuration(-1); + gru_container.gru_list[local_gru_index-1]->setRunTime(total_duration); + gru_container.gru_list[local_gru_index-1]->setInitDuration(-1); + gru_container.gru_list[local_gru_index-1]->setForcingDuration(-1); + gru_container.gru_list[local_gru_index-1]->setRunPhysicsDuration(-1); + gru_container.gru_list[local_gru_index-1]->setWriteOutputDuration(-1); - self->state.gru_container.gru_list[local_gru_index-1]->setSuccess(); + gru_container.gru_list[local_gru_index-1]->setSuccess(); - self->state.gru_container.num_gru_done++; + gru_container.num_gru_done++; // Check if we have finished all active GRUs - if (self->state.gru_container.num_gru_done >= self->state.gru_container.num_gru_in_run_domain) { + if (gru_container.num_gru_done >= gru_container.num_gru_in_run_domain) { // Check for failures - if(self->state.gru_container.num_gru_failed == 0 || self->state.gru_container.run_attempts_left == 0) { + if(gru_container.num_gru_failed == 0 || gru_container.run_attempts_left == 0) { //TODO: RENAME DEALLOCATE_STURCTURES this is more of a finalize std::vector<serializable_netcdf_gru_actor_info> netcdf_gru_info = getGruNetcdfInfo( self->state.max_run_attempts, - self->state.gru_container.gru_list); + gru_container.gru_list); self->send(self->state.file_access_actor, deallocate_structures_v, netcdf_gru_info); } else { aout(self) << "Job_Actor: Restarting GRUs that Failed\n"; - self->state.gru_container.num_gru_done = 0; - self->state.gru_container.num_gru_in_run_domain = self->state.gru_container.num_gru_failed; - self->state.gru_container.num_gru_failed = 0; + gru_container.num_gru_done = 0; + gru_container.num_gru_in_run_domain = gru_container.num_gru_failed; + gru_container.num_gru_failed = 0; self->send(self->state.file_access_actor, restart_failures_v); - for(auto GRU : self->state.gru_container.gru_list) { + for(auto GRU : gru_container.gru_list) { if(GRU->isFailed()) { - GRU->setRunning(); - GRU->decrementAttemptsLeft(); - self->state.hru_actor_settings.dt_init_factor *= 2; - auto global_gru_index = GRU->getGlobalGRUIndex(); - auto local_gru_index = GRU->getLocalGRUIndex(); - auto gru_actor = self->spawn(hru_actor, - global_gru_index, - local_gru_index, - self->state.hru_actor_settings, - self->state.file_access_actor, - self); - self->state.gru_container.gru_list[local_gru_index-1]->setGRUActor(gru_actor); + GRU->setRunning(); + GRU->decrementAttemptsLeft(); + self->state.hru_actor_settings.dt_init_factor *= 2; + auto global_gru_index = GRU->getGlobalGRUIndex(); + auto local_gru_index = GRU->getLocalGRUIndex(); + auto gru_actor = self->spawn(hru_actor, + global_gru_index, + local_gru_index, + self->state.hru_actor_settings, + self->state.file_access_actor, + self); + gru_container.gru_list[local_gru_index-1]->setGRUActor(gru_actor); } } - - } } }, [=](const error& err, caf::actor src) { + aout(self) << "\n\n ********** ERROR HANDLER \n"; switch(err.category()) { case type_id_v<hru_error>: @@ -185,61 +204,31 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, self->state.job_timing.updateEndPoint("total_duration"); - aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n"; - aout(self) << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n"; - aout(self) << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n"; - aout(self) << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n\n"; + aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n" + << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n" + << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n" + << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n" + << "________________________________________________________________________\n\n"; deallocateJobActor(&err); // Tell Parent we are done self->send(self->state.parent, - done_job_v, - self->state.num_gru_failed, - self->state.job_timing.getDuration("total_duration").value_or(-1.0), - read_duration, write_duration); + done_job_v, + self->state.num_gru_failed, + self->state.job_timing.getDuration("total_duration").value_or(-1.0), + read_duration, + write_duration); self->quit(); }, - - [=](file_access_actor_err, const std::string& err) { - aout(self) << "\n\033[31mJob_Actor: Error Handling for File_Access_Actor error: " << - err << " not implemented\033[0m\n"; + aout(self) << "Job_Actor: Error Handling for File_Access_Actor error: " << err << " not implemented\n"; self->quit(); - } - - - - + }, }; } -void initGRUs(stateful_actor<job_state>* self) { - self->state.gru_container.gru_start_time = std::chrono::high_resolution_clock::now(); - self->state.gru_container.run_attempts_left = self->state.max_run_attempts; - self->state.gru_container.run_attempts_left--; - for(int i = 0; i < self->state.gru_container.num_gru_in_run_domain; i++) { - // Spawn the GRU Actor - auto global_gru_index = self->state.gru_container.gru_list.size() + self->state.start_gru; - auto local_gru_index = self->state.gru_container.gru_list.size() + 1; // Fortran reference starts at 1 - auto gru = self->spawn(hru_actor, - global_gru_index, - local_gru_index, - self->state.hru_actor_settings, - self->state.file_access_actor, - self); - - // Create the GRU object - self->state.gru_container.gru_list.push_back( - new GRU(global_gru_index, - local_gru_index, - gru, - self->state.dt_init_start_factor, - self->state.max_run_attempts)); - } -} - std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, std::vector<GRU*> &gru_list) { std::vector<serializable_netcdf_gru_actor_info> gru_netcdf_info; diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index ca5b8eea2d3ef962c2f380d95c349d46f6a599bd..ecab9547d26fdf344a04a562ac42c55388c7aa9b 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -109,8 +109,8 @@ void caf_main(actor_system& sys, const config& cfg) { struct stat file_to_check; // Check if config file exists if (stat(cfg.config_file.c_str(), &file_to_check) != 0) { - aout(self) << "Config File Path Does Not Exist\n"; - aout(self) << "EXAMPLE: ./summa_actors -g 1 -n 10 -c location/of/config \n"; + aout(self) << "Config File Path Does Not Exist\n" + << "EXAMPLE: ./summa_actors -g 1 -n 10 -c location/of/config \n"; return; } @@ -122,43 +122,59 @@ void caf_main(actor_system& sys, const config& cfg) { aout(self) << "Printing Settings For SUMMA Simulation\n"; check_settings_from_json(distributed_settings, - summa_actor_settings, - file_access_actor_settings, - job_actor_settings, - hru_actor_settings); + summa_actor_settings, + file_access_actor_settings, + job_actor_settings, + hru_actor_settings); if (distributed_settings.distributed_mode) { // only command line arguments needed are config_file and server-mode if (cfg.server_mode) { - run_server(sys, cfg, distributed_settings, summa_actor_settings, - file_access_actor_settings, job_actor_settings, hru_actor_settings); + run_server(sys, + cfg, + distributed_settings, + summa_actor_settings, + file_access_actor_settings, + job_actor_settings, + hru_actor_settings); } else { - run_client(sys, cfg, distributed_settings); + run_client(sys, + cfg, + distributed_settings); } } else { // Configure command line arguments if (cfg.startGRU == -1) { - aout(self) << "Starting GRU was not defined!! " << - "startGRU is set with the \"-g\" option\n"; - aout(self) << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; + aout(self) << "Starting GRU was not defined!! " + << "startGRU is set with the \"-g\" option\n" + << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; return; } if (cfg.countGRU == -1) { - aout(self) << "Number of GRUs was not defined!! " << - "countGRU is set with the \"-n\" option\n"; - aout(self) << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; + aout(self) << "Number of GRUs was not defined!! " + << "countGRU is set with the \"-n\" option\n" + << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; return; } if (cfg.config_file == "") { - aout(self) << "File Manager was not defined!! " << - "fileManger is set with the \"-c\" option\n"; - aout(self) << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; + aout(self) << "File Manager was not defined!! " + << "fileManger is set with the \"-c\" option\n" + << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; return; } - auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, summa_actor_settings, - file_access_actor_settings, job_actor_settings, hru_actor_settings, self); + std::pair<int, char**> openCARP_args_cstyle = cfg.c_args_remainder(); + + + auto summa = sys.spawn(summa_actor, + cfg.startGRU, + cfg.countGRU, + summa_actor_settings, + file_access_actor_settings, + job_actor_settings, + hru_actor_settings, + self); } } diff --git a/build/source/actors/summa_actor/summa_actor.cpp b/build/source/actors/summa_actor/summa_actor.cpp index d564bdf9fd9e1c9d7ab0cf113985abc02529f44f..6dd013a1c33d65e7a247f0b782cda012881ba8f6 100644 --- a/build/source/actors/summa_actor/summa_actor.cpp +++ b/build/source/actors/summa_actor/summa_actor.cpp @@ -72,9 +72,9 @@ behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int aout(self) << "\n________________SUMMA_ACTOR TIMING INFO________________\n" << "Total Duration = " << self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n" << "Total Duration = " << self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n" - << "Total Duration = " << (self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n\n" - << "Total Read Duration = " << total_read_duration << "Seconds" - << "Total Write Duration = " << total_write_duration << "Seconds" + << "Total Duration = " << (self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n" + << "Total Read Duration = " << total_read_duration << "Seconds\n" + << "Total Write Duration = " << total_write_duration << "Seconds\n" << "___________________Program Finished__________________\n"; @@ -85,7 +85,6 @@ behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int total_write_duration); } else { - // spawn a new job spawnJob(self); } },