diff --git a/build/CMakeLists.txt b/build/CMakeLists.txt index 9b57ffdc52f279e4c2bf5387a9818d45b9d3e1c8..a3cc267c2ca82f444097dd946b22d68aa51d4dee 100644 --- a/build/CMakeLists.txt +++ b/build/CMakeLists.txt @@ -234,6 +234,8 @@ set(JOB_ACTOR ${ACTORS_DIR}/job_actor/GRU.cpp ${ACTORS_DIR}/job_actor/job_actor.cpp ${ACTORS_DIR}/job_actor/async_mode.cpp + ${ACTORS_DIR}/job_actor/data_assimilation_mode.cpp + ${ACTORS_DIR}/job_actor/job_utils.cpp ${ACTORS_DIR}/job_actor/distributed_job_actor.cpp ${ACTORS_DIR}/job_actor/node_actor.cpp) set(HRU_ACTOR diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp index 0476edfb4dbe9e14771f6048c1ff54be2d91cca6..6c42a0007d91709be06f79371feca7459f43c45b 100644 --- a/build/includes/job_actor/job_actor.hpp +++ b/build/includes/job_actor/job_actor.hpp @@ -144,18 +144,16 @@ struct distributed_job_state { int num_serialize_messages_received = 0; }; -/** The Job Actor */ +/** The Job Actor Behaviors */ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings, actor parent); -// TODO: Implement the following behaviors -// behavior data_assimilation_mode() +behavior data_assimilation_mode(stateful_actor<job_state>* self); behavior async_mode(stateful_actor<job_state>* self); - /** The Job Actor For Internode Communication */ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, int start_gru, int num_gru, @@ -166,14 +164,13 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, /********************************************* - * Functions for the Job Actor + * Functions for the Job Actor (job_utils.cpp) *********************************************/ // Spawn HRU Actors Individually -void spawnHRUActors(stateful_actor<job_state>* self, bool normal_mode); +void spawnHRUActors(stateful_actor<job_state>* self); // Spawn HRU Batch Actors void spawnHRUBatches(stateful_actor<job_state>* self); - void handleFinishedGRU(stateful_actor<job_state>* self, int local_gru_index); void finalizeJob(stateful_actor<job_state>* self); diff --git a/build/source/job_actor/async_mode.cpp b/build/source/job_actor/async_mode.cpp index 0e494079116aba18a2716f1b9b24f3e34614f56f..e16ddb1b52df4786e7998ba32e57a032546b6f3c 100644 --- a/build/source/job_actor/async_mode.cpp +++ b/build/source/job_actor/async_mode.cpp @@ -8,7 +8,8 @@ behavior async_mode(stateful_actor<job_state>* self) { /*** From file access actor after it spawns ***/ [=](init_file_access_actor, int num_timesteps) { aout(self) << "Async Mode: init_file_access_actor\n"; - spawnHRUActors(self, true); + self->state.num_steps = num_timesteps; + spawnHRUActors(self); for(auto& gru : self->state.gru_container.gru_list) { self->send(gru->getGRUActor(), init_hru_v); self->send(gru->getGRUActor(), update_hru_async_v); @@ -25,83 +26,4 @@ behavior async_mode(stateful_actor<job_state>* self) { }; } -void finalizeJob(stateful_actor<job_state>* self) { - std::vector<serializable_netcdf_gru_actor_info> netcdf_gru_info = - getGruNetcdfInfo( - self->state.max_run_attempts,self->state.gru_container.gru_list); - - self->state.num_gru_failed = std::count_if(netcdf_gru_info.begin(), - netcdf_gru_info.end(), [](auto& gru_info) { - return !gru_info.successful; - }); - - self->request(self->state.file_access_actor, infinite, finalize_v).await( - [=](std::tuple<double, double> read_write_duration) { - int err = 0; - for (auto GRU : self->state.gru_container.gru_list) - delete GRU; - self->state.gru_container.gru_list.clear(); - self->state.job_timing.updateEndPoint("total_duration"); - aout(self) << "\n________________" - << "PRINTING JOB_ACTOR TIMING INFO RESULTS" - << "________________\n" - << "Total Duration = " - << self->state.job_timing.getDuration("total_duration") - .value_or(-1.0) << " Seconds\n" - << "Total Duration = " - << self->state.job_timing.getDuration("total_duration") - .value_or(-1.0) / 60 << " Minutes\n" - << "Total Duration = " - << (self->state.job_timing.getDuration("total_duration") - .value_or(-1.0) / 60) / 60 << " Hours\n" - << "Job Init Duration = " - << self->state.job_timing.getDuration("init_duration") - .value_or(-1.0) << " Seconds\n" - << "_________________________________" - << "_______________________________________\n\n"; - - deallocateJobActor(&err); - - // Tell Parent we are done - self->send(self->state.parent, done_job_v, self->state.num_gru_failed, - self->state.job_timing.getDuration("total_duration").value_or(-1.0), - std::get<0>(read_write_duration), - std::get<1>(read_write_duration)); - self->quit(); - }); -} - -void handleFinishedGRU(stateful_actor<job_state>* self, int local_gru_index) { - using namespace std::chrono; - auto& gru_container = self->state.gru_container; - chrono_time end_point = high_resolution_clock::now(); - double total_duration = duration_cast<seconds>(end_point - - gru_container.gru_start_time).count(); - gru_container.num_gru_done++; - - aout(self) << "GRU Finished: " << gru_container.num_gru_done << "/" - << gru_container.num_gru_in_run_domain << " -- GlobalGRU=" - << gru_container.gru_list[local_gru_index-1]->getGlobalGRUIndex() - << " -- LocalGRU=" << local_gru_index << "\n"; - - gru_container.gru_list[local_gru_index-1]->setRunTime(total_duration); - gru_container.gru_list[local_gru_index-1]->setInitDuration(-1); - gru_container.gru_list[local_gru_index-1]->setForcingDuration(-1); - gru_container.gru_list[local_gru_index-1]->setRunPhysicsDuration(-1); - gru_container.gru_list[local_gru_index-1]->setWriteOutputDuration(-1); - gru_container.gru_list[local_gru_index-1]->setSuccess(); - - - // Check if all GRUs are done - if (gru_container.num_gru_done >= gru_container.num_gru_in_run_domain) { - if(gru_container.num_gru_failed == 0 || self->state.max_run_attempts == 1) - self->send(self, finalize_v); - else - self->send(self, restart_failures_v); - } -} - - - - } // End of Namespace \ No newline at end of file diff --git a/build/source/job_actor/data_assimilation_mode.cpp b/build/source/job_actor/data_assimilation_mode.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3e12870d1bc34d56f97daecd742cae91b9c4b581 --- /dev/null +++ b/build/source/job_actor/data_assimilation_mode.cpp @@ -0,0 +1,102 @@ +#include "job_actor.hpp" + +namespace caf { +behavior data_assimilation_mode(stateful_actor<job_state>* self) { + aout(self) << "Data Assimilation Mode: Started\n"; + + return { + [=](init_file_access_actor, int num_timesteps) { + aout(self) << "Data Assimilation Mode: init_file_access_actor\n"; + self->state.num_steps = num_timesteps; + self->state.job_actor_settings.batch_size > 0 ? + spawnHRUBatches(self) : spawnHRUActors(self); + aout(self) << "Data Assimilation Mode: GRUs Initalized\n"; + self->send(self->state.file_access_actor, access_forcing_v, + self->state.iFile, self); + }, + + [=](new_forcing_file, int num_steps_in_iFile, int nextFile) { + aout(self) << "Data Assimilation Mode: New Forcing File\n"; + self->state.iFile = nextFile; + self->state.stepsInCurrentFFile = num_steps_in_iFile; + self->state.forcingStep = 1; + for(auto gru : self->state.gru_container.gru_list) { + self->send(gru->getGRUActor(), update_timeZoneOffset_v, + self->state.iFile); + } + + self->send(self, update_hru_v); + }, + + [=](update_hru){ + for(auto gru : self->state.gru_container.gru_list) { + self->send(gru->getGRUActor(), update_hru_v, self->state.timestep, + self->state.forcingStep); + } + }, + + [=](done_update, std::unordered_map<caf::actor, double> walltimes){ + self->state.num_gru_done_timestep++; + + if (self->state.num_gru_done_timestep >= + self->state.gru_container.gru_list.size()) { + aout(self) << "Job_Actor: Done Update for timestep:" + << self->state.timestep << "\n"; + + // write the output + int steps_to_write = 1; + int start_gru = 1; + self->request(self->state.file_access_actor, caf::infinite, + write_output_v, steps_to_write, start_gru, self->state.num_gru).await( + [=](int err) { + if (err != 0) { + aout(self) << "Job_Actor: Error Writing Output\n"; + for (auto GRU : self->state.gru_container.gru_list) + self->send(GRU->getGRUActor(), exit_msg_v); + + self->send_exit(self->state.file_access_actor, + exit_reason::user_shutdown); + self->quit(); + } + }); + + self->state.timestep++; + self->state.forcingStep++; + + // Check if we are done the simulation + if (self->state.timestep > self->state.num_steps) { + aout(self) << "Job_Actor: Done Job\n"; + for (auto GRU : self->state.gru_container.gru_list) { + self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown); + GRU->setSuccess(); + } + self->send(self, finalize_v); + } else if (self->state.forcingStep > self->state.stepsInCurrentFFile) { + // Check if we need another forcing file + aout(self) << "Job_Actor: Done Forcing Step\n"; + self->send(self->state.file_access_actor, access_forcing_v, + self->state.iFile+1, self); + } else { + // otherwise update all HRUs + self->send(self, update_hru_v); + } + self->state.num_gru_done_timestep = 0; + } + }, + + [=](std::vector<actor> hru_actors) {}, + + [=](serialize_hru, hru hru_data) { + aout(self) << "Job_Actor: Recieved HRU Data\n"; + auto sender = actor_cast<actor>(self->current_sender()); + + self->send(sender, reinit_hru_v, hru_data); + }, + + + [=](finalize) { finalizeJob(self); }, + + + }; +} +} // End of Namespace caf \ No newline at end of file diff --git a/build/source/job_actor/job_actor.cpp b/build/source/job_actor/job_actor.cpp index 625b718840dc6ff96ec4194d6eae09f5e756acff..19f70cf8d21e172712cda807f71075cb03d1df5e 100644 --- a/build/source/job_actor/job_actor.cpp +++ b/build/source/job_actor/job_actor.cpp @@ -69,7 +69,8 @@ behavior job_actor(stateful_actor<job_state>* self, if (job_actor_settings.data_assimilation_mode) { - aout(self) << "Job_Actor: Data Assimilation Mode\n"; + self->become(data_assimilation_mode(self)); + return {}; } else { self->become(async_mode(self)); return {}; @@ -79,124 +80,72 @@ behavior job_actor(stateful_actor<job_state>* self, return { /*** From file access actor after it spawns ***/ - [=](init_file_access_actor, int num_timesteps) { - self->state.num_steps = num_timesteps; - aout(self) << "Num Steps: " << self->state.num_steps << "\n"; + // [=](init_file_access_actor, int num_timesteps) { + // self->state.num_steps = num_timesteps; + // aout(self) << "Num Steps: " << self->state.num_steps << "\n"; - // ##################################################### - // # Data Assimilation Mode - // ##################################################### - if (self->state.job_actor_settings.data_assimilation_mode) { - aout(self) << "Job_Actor: Data Assimilation Mode\n"; + // // ##################################################### + // // # Data Assimilation Mode + // // ##################################################### + // if (self->state.job_actor_settings.data_assimilation_mode) { + // aout(self) << "Job_Actor: Data Assimilation Mode\n"; - auto& gru_container = self->state.gru_container; + // auto& gru_container = self->state.gru_container; - // Spawn HRUs in batches or individually - if (self->state.job_actor_settings.batch_size > 1) - spawnHRUBatches(self); - else - spawnHRUActors(self, false); + // // Spawn HRUs in batches or individually + // if (self->state.job_actor_settings.batch_size > 1) + // spawnHRUBatches(self); + // else + // spawnHRUActors(self); - aout(self) << "GRUs Initialized\n"; - self->send(self->state.file_access_actor, access_forcing_v, - self->state.iFile, self); - } else { - // ##################################################### - // # Normal Mode - // ##################################################### - aout(self) << "Job_Actor: Normal Mode\n"; - spawnHRUActors(self, true); - } - }, + // aout(self) << "GRUs Initialized\n"; + // self->send(self->state.file_access_actor, access_forcing_v, + // self->state.iFile, self); + // } else { + // // ##################################################### + // // # Normal Mode + // // ##################################################### + // aout(self) << "Job_Actor: Normal Mode\n"; + // spawnHRUActors(self); + // } + // }, // ##################################################### // # Data Assimilation Mode Start // ##################################################### - [=](new_forcing_file, int num_steps_in_iFile, int nextFile) { - aout(self) << "Job_Actor: New Forcing File\n"; - self->state.iFile = nextFile; - self->state.stepsInCurrentFFile = num_steps_in_iFile; - self->state.forcingStep = 1; - for(auto gru : self->state.gru_container.gru_list) { - self->send(gru->getGRUActor(), update_timeZoneOffset_v, - self->state.iFile); - } + // [=](new_forcing_file, int num_steps_in_iFile, int nextFile) { + // aout(self) << "Job_Actor: New Forcing File\n"; + // self->state.iFile = nextFile; + // self->state.stepsInCurrentFFile = num_steps_in_iFile; + // self->state.forcingStep = 1; + // for(auto gru : self->state.gru_container.gru_list) { + // self->send(gru->getGRUActor(), update_timeZoneOffset_v, + // self->state.iFile); + // } - self->send(self, update_hru_v); // update HRUs - }, + // self->send(self, update_hru_v); // update HRUs + // }, + + // [=](update_hru){ + // // aout(self) << "Job_Actor: Updating HRUs\n"; + // for(auto gru : self->state.gru_container.gru_list) { + // self->send(gru->getGRUActor(), update_hru_v, + // self->state.timestep, self->state.forcingStep); + // } + // }, - [=](update_hru){ - // aout(self) << "Job_Actor: Updating HRUs\n"; - for(auto gru : self->state.gru_container.gru_list) { - self->send(gru->getGRUActor(), update_hru_v, - self->state.timestep, self->state.forcingStep); - } - }, - [=](done_update, std::unordered_map<caf::actor, double> walltimes){ - self->state.num_gru_done_timestep++; - - if (self->state.num_gru_done_timestep >= - self->state.gru_container.gru_list.size()) { - aout(self) << "Job_Actor: Done Update for timestep:" - << self->state.timestep << "\n"; - - // write the output - int steps_to_write = 1; - int start_gru = 1; - self->request(self->state.file_access_actor, caf::infinite, - write_output_v, steps_to_write, start_gru, self->state.num_gru).await( - [=](int err) { - if (err != 0) { - aout(self) << "Job_Actor: Error Writing Output\n"; - for (auto GRU : self->state.gru_container.gru_list) - self->send(GRU->getGRUActor(), exit_msg_v); - - self->send_exit(self->state.file_access_actor, - exit_reason::user_shutdown); - self->quit(); - } - }); - - self->state.timestep++; - self->state.forcingStep++; - - // Check if we are done the simulation - if (self->state.timestep > self->state.num_steps) { - aout(self) << "Job_Actor: Done Job\n"; - for (auto GRU : self->state.gru_container.gru_list) { - self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown); - GRU->setSuccess(); - } - self->send(self, finalize_v); - } else if (self->state.forcingStep > self->state.stepsInCurrentFFile) { - // Check if we need another forcing file - aout(self) << "Job_Actor: Done Forcing Step\n"; - self->send(self->state.file_access_actor, access_forcing_v, - self->state.iFile+1, self); - } else { - // otherwise update all HRUs - self->send(self, update_hru_v); - } - self->state.num_gru_done_timestep = 0; - } - }, - [=](serialize_hru, hru hru_data) { - aout(self) << "Job_Actor: Recieved HRU Data\n"; - auto sender = actor_cast<actor>(self->current_sender()); - self->send(sender, reinit_hru_v, hru_data); - }, [=](reinit_hru) { aout(self) << "Job_Actor: HRU Actor Re-initialized\n"; self->send(self, update_hru_v); }, - [=](std::vector<actor> hru_actors) { - }, + // [=](std::vector<actor> hru_actors) { + // }, // ##################################################### // # Data Assimilation Mode End @@ -208,44 +157,44 @@ behavior job_actor(stateful_actor<job_state>* self, // ##################################################### // # Normal Mode Start // ##################################################### - [=](done_hru, int local_gru_index) { - auto& gru_container = self->state.gru_container; - using namespace std::chrono; + // [=](done_hru, int local_gru_index) { + // auto& gru_container = self->state.gru_container; + // using namespace std::chrono; - chrono_time end_point = high_resolution_clock::now(); - double total_duration = duration_cast<seconds>(end_point - - gru_container.gru_start_time).count(); - gru_container.num_gru_done++; + // chrono_time end_point = high_resolution_clock::now(); + // double total_duration = duration_cast<seconds>(end_point - + // gru_container.gru_start_time).count(); + // gru_container.num_gru_done++; - aout(self) << "GRU Finished: " << gru_container.num_gru_done << "/" - << gru_container.num_gru_in_run_domain << " -- " - << "GlobalGRU=" - << gru_container.gru_list[local_gru_index-1]->getGlobalGRUIndex() - << " -- LocalGRU=" << local_gru_index << "\n"; + // aout(self) << "GRU Finished: " << gru_container.num_gru_done << "/" + // << gru_container.num_gru_in_run_domain << " -- " + // << "GlobalGRU=" + // << gru_container.gru_list[local_gru_index-1]->getGlobalGRUIndex() + // << " -- LocalGRU=" << local_gru_index << "\n"; - // Update Timing - gru_container.gru_list[local_gru_index-1]->setRunTime(total_duration); - gru_container.gru_list[local_gru_index-1]->setInitDuration(-1); - gru_container.gru_list[local_gru_index-1]->setForcingDuration(-1); - gru_container.gru_list[local_gru_index-1]->setRunPhysicsDuration(-1); - gru_container.gru_list[local_gru_index-1]->setWriteOutputDuration(-1); + // // Update Timing + // gru_container.gru_list[local_gru_index-1]->setRunTime(total_duration); + // gru_container.gru_list[local_gru_index-1]->setInitDuration(-1); + // gru_container.gru_list[local_gru_index-1]->setForcingDuration(-1); + // gru_container.gru_list[local_gru_index-1]->setRunPhysicsDuration(-1); + // gru_container.gru_list[local_gru_index-1]->setWriteOutputDuration(-1); - gru_container.gru_list[local_gru_index-1]->setSuccess(); + // gru_container.gru_list[local_gru_index-1]->setSuccess(); - // Check if all GRUs are finished - if (gru_container.num_gru_done >= gru_container.num_gru_in_run_domain) { - // Check for failures - if(self->state.gru_container.num_gru_failed == 0 || - self->state.max_run_attempts == 1) { - self->send(self, finalize_v); - } else { - self->send(self, restart_failures_v); - } - } - - }, + // // Check if all GRUs are finished + // if (gru_container.num_gru_done >= gru_container.num_gru_in_run_domain) { + // // Check for failures + // if(self->state.gru_container.num_gru_failed == 0 || + // self->state.max_run_attempts == 1) { + // self->send(self, finalize_v); + // } else { + // self->send(self, restart_failures_v); + // } + // } + + // }, [=](restart_failures) { aout(self) << "Job_Actor: Restarting GRUs that Failed\n"; @@ -286,57 +235,57 @@ behavior job_actor(stateful_actor<job_state>* self, } }, - [=](finalize) { - std::vector<serializable_netcdf_gru_actor_info> - netcdf_gru_info = getGruNetcdfInfo( - self->state.max_run_attempts,self->state.gru_container.gru_list); + // [=](finalize) { + // std::vector<serializable_netcdf_gru_actor_info> + // netcdf_gru_info = getGruNetcdfInfo( + // self->state.max_run_attempts,self->state.gru_container.gru_list); - self->state.num_gru_failed = std::count_if(netcdf_gru_info.begin(), - netcdf_gru_info.end(), [](auto& gru_info) { - return !gru_info.successful; - }); - - self->request(self->state.file_access_actor, infinite, finalize_v).await( - [=](std::tuple<double, double> read_write_duration) { - int err = 0; - for (auto GRU : self->state.gru_container.gru_list) { - delete GRU; - } - self->state.gru_container.gru_list.clear(); - - self->state.job_timing.updateEndPoint("total_duration"); - - aout(self) << "\n________________" - << "PRINTING JOB_ACTOR TIMING INFO RESULTS" - << "________________\n" - << "Total Duration = " - << self->state.job_timing.getDuration("total_duration") - .value_or(-1.0) << " Seconds\n" - << "Total Duration = " - << self->state.job_timing.getDuration("total_duration") - .value_or(-1.0) / 60 << " Minutes\n" - << "Total Duration = " - << (self->state.job_timing.getDuration("total_duration") - .value_or(-1.0) / 60) / 60 << " Hours\n" - << "Job Init Duration = " - << self->state.job_timing.getDuration("init_duration") - .value_or(-1.0) << " Seconds\n" - << "_________________________________" - << "_______________________________________\n\n"; - - deallocateJobActor(&err); - - // Tell Parent we are done - self->send(self->state.parent, done_job_v, self->state.num_gru_failed, - self->state.job_timing.getDuration("total_duration") - .value_or(-1.0), - std::get<0>(read_write_duration), - std::get<1>(read_write_duration)); - self->quit(); - - }); - }, + // self->state.num_gru_failed = std::count_if(netcdf_gru_info.begin(), + // netcdf_gru_info.end(), [](auto& gru_info) { + // return !gru_info.successful; + // }); + + // self->request(self->state.file_access_actor, infinite, finalize_v).await( + // [=](std::tuple<double, double> read_write_duration) { + // int err = 0; + // for (auto GRU : self->state.gru_container.gru_list) { + // delete GRU; + // } + // self->state.gru_container.gru_list.clear(); + + // self->state.job_timing.updateEndPoint("total_duration"); + + // aout(self) << "\n________________" + // << "PRINTING JOB_ACTOR TIMING INFO RESULTS" + // << "________________\n" + // << "Total Duration = " + // << self->state.job_timing.getDuration("total_duration") + // .value_or(-1.0) << " Seconds\n" + // << "Total Duration = " + // << self->state.job_timing.getDuration("total_duration") + // .value_or(-1.0) / 60 << " Minutes\n" + // << "Total Duration = " + // << (self->state.job_timing.getDuration("total_duration") + // .value_or(-1.0) / 60) / 60 << " Hours\n" + // << "Job Init Duration = " + // << self->state.job_timing.getDuration("init_duration") + // .value_or(-1.0) << " Seconds\n" + // << "_________________________________" + // << "_______________________________________\n\n"; + + // deallocateJobActor(&err); + + // // Tell Parent we are done + // self->send(self->state.parent, done_job_v, self->state.num_gru_failed, + // self->state.job_timing.getDuration("total_duration") + // .value_or(-1.0), + // std::get<0>(read_write_duration), + // std::get<1>(read_write_duration)); + // self->quit(); + + // }); + // }, // Handle Sundials Error [=](err_atom, caf::actor src, double rtol, double atol) { @@ -376,77 +325,8 @@ behavior job_actor(stateful_actor<job_state>* self, } -void spawnHRUActors(stateful_actor<job_state>* self, bool normal_mode) { - auto& gru_container = self->state.gru_container; - gru_container.gru_start_time = std::chrono::high_resolution_clock::now(); - gru_container.run_attempts_left = self->state.max_run_attempts; - gru_container.run_attempts_left--; - - for (int i = 0; i < gru_container.num_gru_in_run_domain; i++) { - auto global_gru_index = gru_container.gru_list.size() + - self->state.start_gru; - auto local_gru_index = gru_container.gru_list.size() + 1; - auto gru = self->spawn(hru_actor, global_gru_index, local_gru_index, - self->state.hru_actor_settings, self->state.file_access_actor, self); - // Create the GRU object (Job uses this to keep track of GRU status) - gru_container.gru_list.push_back(new GRU(global_gru_index, - local_gru_index, gru, self->state.dt_init_start_factor, - self->state.hru_actor_settings.rel_tol, - self->state.hru_actor_settings.abs_tol, self->state.max_run_attempts)); - - // if (normal_mode) self->send(gru, update_hru_async_v); - } - -} - -void spawnHRUBatches(stateful_actor<job_state>* self) { - aout(self) << "Job_Actor: Spawning HRU Batches\n"; - int batch_size; - - auto& gru_container = self->state.gru_container; - gru_container.gru_start_time = std::chrono::high_resolution_clock::now(); - gru_container.run_attempts_left = self->state.max_run_attempts; - gru_container.run_attempts_left--; - - if (self->state.job_actor_settings.batch_size == 9999) { - batch_size = std::ceil(gru_container.num_gru_in_run_domain / - (std::thread::hardware_concurrency() * 2)); - } else { - batch_size = self->state.job_actor_settings.batch_size; - } - - // Correct when number of batches is greater than number of HRUs - if (batch_size == 0) { - batch_size = 1; - } - - // Correct if number of GRUs is less than the desired batch size - aout(self) << "Job_Actor: Batch Size=" << batch_size << "\n"; - - int remaining_hru_to_batch = gru_container.num_gru_in_run_domain; - int start_hru_global = self->state.start_gru; - int start_hru_local = 1; - - while (remaining_hru_to_batch > 0) { - int current_batch_size = std::min(batch_size, remaining_hru_to_batch); - auto gru_batch = self->spawn(hru_batch_actor, start_hru_local, - start_hru_global, current_batch_size, self->state.hru_actor_settings, - self->state.file_access_actor, self); - - gru_container.gru_list.push_back(new GRU(start_hru_global, - start_hru_local, gru_batch, self->state.dt_init_start_factor, - self->state.hru_actor_settings.rel_tol, - self->state.hru_actor_settings.abs_tol, self->state.max_run_attempts)); - - remaining_hru_to_batch -= current_batch_size; - start_hru_local += current_batch_size; - start_hru_global += current_batch_size; - } - aout(self) << "Number of HRU_Batch_Actors: " - << gru_container.gru_list.size() << "\n"; -} diff --git a/build/source/job_actor/job_utils.cpp b/build/source/job_actor/job_utils.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ada32377151afdc4d7afc78a496b87da33eef558 --- /dev/null +++ b/build/source/job_actor/job_utils.cpp @@ -0,0 +1,153 @@ +#include "job_actor.hpp" + +namespace caf { + +void spawnHRUActors(stateful_actor<job_state>* self) { + auto& gru_container = self->state.gru_container; + gru_container.gru_start_time = std::chrono::high_resolution_clock::now(); + gru_container.run_attempts_left = self->state.max_run_attempts; + gru_container.run_attempts_left--; + + for (int i = 0; i < gru_container.num_gru_in_run_domain; i++) { + auto global_gru_index = gru_container.gru_list.size() + + self->state.start_gru; + auto local_gru_index = gru_container.gru_list.size() + 1; + + auto gru = self->spawn(hru_actor, global_gru_index, local_gru_index, + self->state.hru_actor_settings, self->state.file_access_actor, self); + + // Create the GRU object (Job uses this to keep track of GRU status) + gru_container.gru_list.push_back(new GRU(global_gru_index, + local_gru_index, gru, self->state.dt_init_start_factor, + self->state.hru_actor_settings.rel_tol, + self->state.hru_actor_settings.abs_tol, self->state.max_run_attempts)); + + // if (normal_mode) self->send(gru, update_hru_async_v); + } + +} + +void spawnHRUBatches(stateful_actor<job_state>* self) { + aout(self) << "Job_Actor: Spawning HRU Batches\n"; + int batch_size; + + auto& gru_container = self->state.gru_container; + gru_container.gru_start_time = std::chrono::high_resolution_clock::now(); + gru_container.run_attempts_left = self->state.max_run_attempts; + gru_container.run_attempts_left--; + + if (self->state.job_actor_settings.batch_size == 9999) { + batch_size = std::ceil(gru_container.num_gru_in_run_domain / + (std::thread::hardware_concurrency() * 2)); + } else { + batch_size = self->state.job_actor_settings.batch_size; + } + + // Correct when number of batches is greater than number of HRUs + if (batch_size == 0) { + batch_size = 1; + } + + // Correct if number of GRUs is less than the desired batch size + aout(self) << "Job_Actor: Batch Size=" << batch_size << "\n"; + + int remaining_hru_to_batch = gru_container.num_gru_in_run_domain; + int start_hru_global = self->state.start_gru; + int start_hru_local = 1; + + while (remaining_hru_to_batch > 0) { + int current_batch_size = std::min(batch_size, remaining_hru_to_batch); + auto gru_batch = self->spawn(hru_batch_actor, start_hru_local, + start_hru_global, current_batch_size, self->state.hru_actor_settings, + self->state.file_access_actor, self); + + gru_container.gru_list.push_back(new GRU(start_hru_global, + start_hru_local, gru_batch, self->state.dt_init_start_factor, + self->state.hru_actor_settings.rel_tol, + self->state.hru_actor_settings.abs_tol, self->state.max_run_attempts)); + + remaining_hru_to_batch -= current_batch_size; + start_hru_local += current_batch_size; + start_hru_global += current_batch_size; + } + aout(self) << "Number of HRU_Batch_Actors: " + << gru_container.gru_list.size() << "\n"; +} + + +void finalizeJob(stateful_actor<job_state>* self) { + std::vector<serializable_netcdf_gru_actor_info> netcdf_gru_info = + getGruNetcdfInfo( + self->state.max_run_attempts,self->state.gru_container.gru_list); + + self->state.num_gru_failed = std::count_if(netcdf_gru_info.begin(), + netcdf_gru_info.end(), [](auto& gru_info) { + return !gru_info.successful; + }); + + self->request(self->state.file_access_actor, infinite, finalize_v).await( + [=](std::tuple<double, double> read_write_duration) { + int err = 0; + for (auto GRU : self->state.gru_container.gru_list) + delete GRU; + self->state.gru_container.gru_list.clear(); + self->state.job_timing.updateEndPoint("total_duration"); + aout(self) << "\n________________" + << "PRINTING JOB_ACTOR TIMING INFO RESULTS" + << "________________\n" + << "Total Duration = " + << self->state.job_timing.getDuration("total_duration") + .value_or(-1.0) << " Seconds\n" + << "Total Duration = " + << self->state.job_timing.getDuration("total_duration") + .value_or(-1.0) / 60 << " Minutes\n" + << "Total Duration = " + << (self->state.job_timing.getDuration("total_duration") + .value_or(-1.0) / 60) / 60 << " Hours\n" + << "Job Init Duration = " + << self->state.job_timing.getDuration("init_duration") + .value_or(-1.0) << " Seconds\n" + << "_________________________________" + << "_______________________________________\n\n"; + + deallocateJobActor(&err); + + // Tell Parent we are done + self->send(self->state.parent, done_job_v, self->state.num_gru_failed, + self->state.job_timing.getDuration("total_duration").value_or(-1.0), + std::get<0>(read_write_duration), + std::get<1>(read_write_duration)); + self->quit(); + }); +} + +void handleFinishedGRU(stateful_actor<job_state>* self, int local_gru_index) { + using namespace std::chrono; + auto& gru_container = self->state.gru_container; + chrono_time end_point = high_resolution_clock::now(); + double total_duration = duration_cast<seconds>(end_point - + gru_container.gru_start_time).count(); + gru_container.num_gru_done++; + + aout(self) << "GRU Finished: " << gru_container.num_gru_done << "/" + << gru_container.num_gru_in_run_domain << " -- GlobalGRU=" + << gru_container.gru_list[local_gru_index-1]->getGlobalGRUIndex() + << " -- LocalGRU=" << local_gru_index << "\n"; + + gru_container.gru_list[local_gru_index-1]->setRunTime(total_duration); + gru_container.gru_list[local_gru_index-1]->setInitDuration(-1); + gru_container.gru_list[local_gru_index-1]->setForcingDuration(-1); + gru_container.gru_list[local_gru_index-1]->setRunPhysicsDuration(-1); + gru_container.gru_list[local_gru_index-1]->setWriteOutputDuration(-1); + gru_container.gru_list[local_gru_index-1]->setSuccess(); + + + // Check if all GRUs are done + if (gru_container.num_gru_done >= gru_container.num_gru_in_run_domain) { + if(gru_container.num_gru_failed == 0 || self->state.max_run_attempts == 1) + self->send(self, finalize_v); + else + self->send(self, restart_failures_v); + } +} +} // End of Namespace caf \ No newline at end of file