From f7b89f66520f970476a74f904ac4e933d1b04b7f Mon Sep 17 00:00:00 2001 From: KyleKlenk <kyle.c.klenk@gmail.com> Date: Wed, 9 Aug 2023 13:09:21 -0600 Subject: [PATCH] Removed some unecessary messages: Replaced some messages with request statements. This is to show the direct relationship the actors have in this request, instead of relying on a send and send. --- build/includes/global/message_atoms.hpp | 21 +- build/includes/job_actor/job_actor.hpp | 11 +- .../cpp_code/file_access_actor.cpp | 20 +- .../actors/hru_actor/cpp_code/hru_actor.cpp | 3 +- build/source/actors/job_actor/job_actor.cpp | 245 +++++++++--------- build/source/actors/main.cpp | 28 +- .../source/actors/summa_actor/summa_actor.cpp | 6 +- 7 files changed, 162 insertions(+), 172 deletions(-) diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp index dbb7dd3..aa5cd4d 100644 --- a/build/includes/global/message_atoms.hpp +++ b/build/includes/global/message_atoms.hpp @@ -16,6 +16,7 @@ enum class hru_error : uint8_t { enum class file_access_error : uint8_t { writing_error = 1, + unhandleable_error = 2, }; // HRU Errors @@ -76,18 +77,6 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) // Sender: // Reciever: // Summary: - CAF_ADD_ATOM(summa, done_file_access_actor_init) - // Sender: - // Reciever: - // Summary: - CAF_ADD_ATOM(summa, file_access_actor_done) - // Sender: - // Reciever: - // Summary: - CAF_ADD_ATOM(summa, file_access_actor_err) - // Sender: - // Reciever: - // Summary: CAF_ADD_ATOM(summa, access_forcing) // Sender: // Reciever: @@ -193,6 +182,10 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) // Reciever: // Summary: CAF_ADD_ATOM(summa, get_num_output_steps) + // Sender: + // Reciever: + // Summary: + CAF_ADD_ATOM(summa, finalize) // Struct Types CAF_ADD_TYPE_ID(summa, (Distributed_Settings)) @@ -216,12 +209,16 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) CAF_ADD_TYPE_ID(summa, (std::vector<std::tuple<caf::actor, std::string>>)) CAF_ADD_TYPE_ID(summa, (std::vector<serializable_netcdf_gru_actor_info>)) + // GRU Parameter/Attribute Vectors CAF_ADD_TYPE_ID(summa, (std::tuple<std::vector<double>, std::vector<int>, std::vector<long int>, std::vector<double>, std::vector<double>, std::vector<std::vector<double>>>)) + + // File_Access_Actor Read/Write times + CAF_ADD_TYPE_ID(summa, (std::tuple<double, double>)) CAF_ADD_TYPE_ID(summa, (std::optional<caf::strong_actor_ptr>)) diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp index 7659d19..47fa10b 100644 --- a/build/includes/job_actor/job_actor.hpp +++ b/build/includes/job_actor/job_actor.hpp @@ -60,12 +60,15 @@ struct job_state { /** The Job Actor */ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, - File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, - HRU_Actor_Settings hru_actor_settings, actor parent); + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings, + actor parent); /** Get the information for the GRUs that will be written to the netcdf file */ -std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, std::vector<GRU*> &gru_list); +std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, + std::vector<GRU*> &gru_list); -void handleGRUError(stateful_actor<job_state>* self, const error& err, caf::actor src); +void handleGRUError(stateful_actor<job_state>* self, caf::actor src); } // end namespace \ No newline at end of file diff --git a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp index b773634..32afa0c 100644 --- a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp +++ b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp @@ -241,11 +241,9 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr aout(self) << "Total Read Duration = " << self->state.file_access_timing.getDuration("read_duration").value_or(-1.0) << " Seconds\n"; aout(self) << "Total Write Duration = " << self->state.file_access_timing.getDuration("write_duration").value_or(-1.0) << " Seconds\n"; - self->send(self->state.parent, - file_access_actor_done_v, - self->state.file_access_timing.getDuration("read_duration").value_or(-1.0), - self->state.file_access_timing.getDuration("write_duration").value_or(-1.0)); self->quit(); + return std::make_tuple(self->state.file_access_timing.getDuration("read_duration").value_or(-1.0), + self->state.file_access_timing.getDuration("write_duration").value_or(-1.0)); }, }; @@ -262,7 +260,7 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { if (err != 0) { aout(self) << "Error: ffile_info_C - File_Access_Actor \n"; std::string function = "ffile_info_C"; - self->send(self->state.parent, file_access_actor_err_v, function); + self->send(self->state.parent, file_access_error::unhandleable_error, self); self->quit(); return; } @@ -272,7 +270,7 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { if (err != 0) { aout(self) << "ERROR: File_Access_Actor in mDecisions\n"; std::string function = "mDecisions"; - self->send(self->state.parent, file_access_actor_err_v, function); + self->send(self->state.parent, file_access_error::unhandleable_error, self); self->quit(); return; } @@ -287,7 +285,7 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { if (err != 0) { aout(self) << "ERROR: read_pinit_C\n"; std::string function = "read_pinit_C"; - self->send(self->state.parent, file_access_actor_err_v, function); + self->send(self->state.parent, file_access_error::unhandleable_error, self); self->quit(); return; } @@ -296,7 +294,7 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { if (err != 0) { aout(self) << "ERROR: read_vegitationTables\n"; std::string function = "read_vegitationTables"; - self->send(self->state.parent, file_access_actor_err_v, function); + self->send(self->state.parent, file_access_error::unhandleable_error, self); self->quit(); return; } @@ -308,7 +306,7 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { if (err != 0) { aout(self) << "ERROR: Create_OutputFile\n"; std::string function = "def_output"; - self->send(self->state.parent, file_access_actor_err_v, function); + self->send(self->state.parent, file_access_error::unhandleable_error, self); self->quit(); return; } @@ -321,7 +319,7 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { if (self->state.err != 0) { aout(self) << "ERROR: Init_OutputStruct\n"; std::string function = "Init_OutputStruct"; - self->send(self->state.parent, file_access_actor_err_v, function); + self->send(self->state.parent, file_access_error::unhandleable_error, self); self->quit(); return; } @@ -330,7 +328,7 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { if (self->state.err != 0) { aout(self) << "ERROR: Init_OutputTimeStep\n"; std::string function = "Init_OutputTimeStep"; - self->send(self->state.parent, file_access_actor_err_v, function); + self->send(self->state.parent, file_access_error::unhandleable_error, self); self->quit(); return; } diff --git a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp index 3e84385..6d6f40a 100644 --- a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp +++ b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp @@ -97,8 +97,7 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, Initialize_HRU(self); - self->send(self, start_hru_v); - }); + self->send(self, start_hru_v); }); return { diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index aee7e8f..fc86f75 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -109,10 +109,10 @@ behavior job_actor(stateful_actor<job_state>* self, // Create the GRU object (Job uses this to keep track of GRU status) gru_container.gru_list.push_back(new GRU(global_gru_index, - local_gru_index, - gru, - self->state.dt_init_start_factor, - self->state.max_run_attempts)); + local_gru_index, + gru, + self->state.dt_init_start_factor, + self->state.max_run_attempts)); } }, // end init_gru @@ -139,95 +139,108 @@ behavior job_actor(stateful_actor<job_state>* self, gru_container.num_gru_done++; - // Check if we have finished all active GRUs + + // Check if all GRUs are finished if (gru_container.num_gru_done >= gru_container.num_gru_in_run_domain) { - // Check for failures - if(gru_container.num_gru_failed == 0 || gru_container.run_attempts_left == 0) { - //TODO: RENAME DEALLOCATE_STURCTURES this is more of a finalize - std::vector<serializable_netcdf_gru_actor_info> netcdf_gru_info = getGruNetcdfInfo( - self->state.max_run_attempts, - gru_container.gru_list); - self->send(self->state.file_access_actor, deallocate_structures_v, netcdf_gru_info); - + if(self->state.gru_container.num_gru_failed == 0 || self->state.max_run_attempts == 1) { + self->send(self, finalize_v); } else { - aout(self) << "Job_Actor: Restarting GRUs that Failed\n"; - gru_container.num_gru_done = 0; - gru_container.num_gru_in_run_domain = gru_container.num_gru_failed; - gru_container.num_gru_failed = 0; - self->send(self->state.file_access_actor, restart_failures_v); - - for(auto GRU : gru_container.gru_list) { - if(GRU->isFailed()) { - GRU->setRunning(); - GRU->decrementAttemptsLeft(); - self->state.hru_actor_settings.dt_init_factor *= 2; - auto global_gru_index = GRU->getGlobalGRUIndex(); - auto local_gru_index = GRU->getLocalGRUIndex(); - auto gru_actor = self->spawn(hru_actor, - global_gru_index, - local_gru_index, - self->state.hru_actor_settings, - self->state.file_access_actor, - self); - gru_container.gru_list[local_gru_index-1]->setGRUActor(gru_actor); - } - } + self->send(self, restart_failures_v); } } + }, - [=](const error& err, caf::actor src) { - - aout(self) << "\n\n ********** ERROR HANDLER \n"; - switch(err.category()) { - case type_id_v<hru_error>: - aout(self) << "HRU Error: " << to_string(err) << "\n"; - handleGRUError(self, err, src); - break; - case type_id_v<file_access_error>: - aout(self) << "File Access Error: " << to_string(err) << "\n"; - break; - default: - aout(self) << "Unknown Error: " << to_string(err) << "\n"; - break; + [=](restart_failures) { + aout(self) << "Job_Actor: Restarting GRUs that Failed\n"; + + self->state.gru_container.num_gru_done = 0; + self->state.gru_container.num_gru_in_run_domain = self->state.gru_container.num_gru_failed; + self->state.gru_container.num_gru_failed = 0; + + self->send(self->state.file_access_actor, restart_failures_v); // notify file_access_actor + + for(auto GRU : self->state.gru_container.gru_list) { + if(GRU->isFailed()) { + GRU->setRunning(); + GRU->decrementAttemptsLeft(); + self->state.hru_actor_settings.dt_init_factor *= 2; + auto global_gru_index = GRU->getGlobalGRUIndex(); + auto local_gru_index = GRU->getLocalGRUIndex(); + auto gru_actor = self->spawn(hru_actor, + global_gru_index, + local_gru_index, + self->state.hru_actor_settings, + self->state.file_access_actor, + self); + self->state.gru_container.gru_list[local_gru_index-1]->setGRUActor(gru_actor); } + } }, + [=](finalize) { + + std::vector<serializable_netcdf_gru_actor_info> + netcdf_gru_info = getGruNetcdfInfo(self->state.max_run_attempts,self->state.gru_container.gru_list); + + self->request(self->state.file_access_actor, + infinite, + deallocate_structures_v, netcdf_gru_info) + .await( + [=](std::tuple<double, double> read_write_duration) { + + int err = 0; + + + for (auto GRU : self->state.gru_container.gru_list) { + delete GRU; + } + self->state.gru_container.gru_list.clear(); + + self->state.job_timing.updateEndPoint("total_duration"); + + aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n" + << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n" + << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n" + << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n" + << "________________________________________________________________________\n\n"; + + deallocateJobActor(&err); + + // Tell Parent we are done + self->send(self->state.parent, + done_job_v, + self->state.num_gru_failed, + self->state.job_timing.getDuration("total_duration").value_or(-1.0), + std::get<0>(read_write_duration), + std::get<1>(read_write_duration)); + self->quit(); + + }); - [=](file_access_actor_done, double read_duration, double write_duration) { - int err = 0; - // Delete GRUs - for (auto GRU : self->state.gru_container.gru_list) { - delete GRU; - } - self->state.gru_container.gru_list.clear(); - - - self->state.job_timing.updateEndPoint("total_duration"); - - aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n" - << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n" - << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n" - << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n" - << "________________________________________________________________________\n\n"; - - deallocateJobActor(&err); - // Tell Parent we are done - self->send(self->state.parent, - done_job_v, - self->state.num_gru_failed, - self->state.job_timing.getDuration("total_duration").value_or(-1.0), - read_duration, - write_duration); - self->quit(); }, - [=](file_access_actor_err, const std::string& err) { - aout(self) << "Job_Actor: Error Handling for File_Access_Actor error: " << err << " not implemented\n"; - self->quit(); + [=](const error& err, caf::actor src) { + + aout(self) << "\n\n ********** ERROR HANDLER \n"; + + switch(err.category()) { + + case type_id_v<hru_error>: + aout(self) << "HRU Error: " << to_string(err) << "\n"; + handleGRUError(self, src); + + break; + case type_id_v<file_access_error>: + aout(self) << "File Access Error: " << to_string(err) << "No Hanlding Implemented\n"; + self->quit(); + break; + default: + aout(self) << "Unknown Error: " << to_string(err) << "\n"; + break; + } }, - }; } @@ -252,57 +265,35 @@ std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_att return gru_netcdf_info; } -void handleGRUError(stateful_actor<job_state>* self, const error& err, caf::actor src) { - - // Find the GRU that failed - for(auto GRU : self->state.gru_container.gru_list) { - if (GRU->getGRUActor() == src) { - GRU->setFailed(); - GRU->decrementAttemptsLeft(); - self->state.gru_container.num_gru_done++; - self->state.gru_container.num_gru_failed++; - self->send(self->state.file_access_actor, run_failure_v, GRU->getLocalGRUIndex()); - - // Check if we have finished all active GRUs - if (self->state.gru_container.num_gru_done >= self->state.gru_container.num_gru_in_run_domain) { - // Check for failures - if(self->state.gru_container.num_gru_failed == 0 || self->state.max_run_attempts == 1) { - //TODO: RENAME DEALLOCATE_STURCTURES this is more of a finalize - std::vector<serializable_netcdf_gru_actor_info> netcdf_gru_info = getGruNetcdfInfo( - self->state.max_run_attempts, - self->state.gru_container.gru_list); - self->send(self->state.file_access_actor, deallocate_structures_v, netcdf_gru_info); - - } else { - aout(self) << "Job_Actor: Restarting GRUs that Failed\n"; - self->send(self->state.file_access_actor, restart_failures_v); - self->state.gru_container.num_gru_done = 0; - self->state.gru_container.num_gru_in_run_domain = self->state.gru_container.num_gru_failed; - self->state.gru_container.num_gru_failed = 0; - for(auto GRU : self->state.gru_container.gru_list) { - if(GRU->isFailed()) { - GRU->setRunning(); - GRU->decrementAttemptsLeft(); - self->state.hru_actor_settings.dt_init_factor *= 2; - auto global_gru_index = GRU->getGlobalGRUIndex(); - auto local_gru_index = GRU->getLocalGRUIndex(); - auto gru_actor = self->spawn(hru_actor, - global_gru_index, - local_gru_index, - self->state.hru_actor_settings, - self->state.file_access_actor, - self); - self->state.gru_container.gru_list[local_gru_index-1]->setGRUActor(gru_actor); - } - } - } - } - break; - } - } +void handleGRUError(stateful_actor<job_state>* self, caf::actor src) { + auto it = std::find_if(self->state.gru_container.gru_list.begin(), + self->state.gru_container.gru_list.end(), + [src](auto& gru) { + return gru->getGRUActor() == src; + }); + + if (it != self->state.gru_container.gru_list.end()) { + (*it)->setFailed(); + (*it)->decrementAttemptsLeft(); + self->state.gru_container.num_gru_done++; + self->state.gru_container.num_gru_failed++; + self->send(self->state.file_access_actor, run_failure_v, (*it)->getLocalGRUIndex()); + } else { + aout(self) << "ERROR: Job_Actor: Could not find GRU in GRU_Container\n"; + } + + // Check if all GRUs are finished + if (self->state.gru_container.num_gru_done >= self->state.gru_container.num_gru_in_run_domain) { + // Check for failures + if(self->state.gru_container.num_gru_failed == 0 || self->state.max_run_attempts == 1) { + self->send(self, finalize_v); + } else { + self->send(self, restart_failures_v); + } + } } diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index ecab954..058c18f 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -72,8 +72,8 @@ void run_client(actor_system& system, const config& cfg, Distributed_Settings di } void run_server(actor_system& system, const config& cfg, Distributed_Settings distributed_settings, - Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { + Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { scoped_actor self{system}; int err; @@ -85,18 +85,23 @@ void run_server(actor_system& system, const config& cfg, Distributed_Settings di // Check if we have are the backup server if (cfg.backup_server) { auto server = system.spawn(summa_backup_server_init, - distributed_settings,summa_actor_settings,file_access_actor_settings, - job_actor_settings,hru_actor_settings); + distributed_settings, + summa_actor_settings, + file_access_actor_settings, + job_actor_settings, + hru_actor_settings); + publish_server(server, distributed_settings.port); connect_client(server, distributed_settings.servers_list[0], distributed_settings.port); - // self->send(server, connect_as_backup_v); } else { - auto server = system.spawn(summa_server_init, distributed_settings, - summa_actor_settings, - file_access_actor_settings, - job_actor_settings, - hru_actor_settings); + auto server = system.spawn(summa_server_init, + distributed_settings, + summa_actor_settings, + file_access_actor_settings, + job_actor_settings, + hru_actor_settings); + publish_server(server, distributed_settings.port); } @@ -164,9 +169,6 @@ void caf_main(actor_system& sys, const config& cfg) { return; } - std::pair<int, char**> openCARP_args_cstyle = cfg.c_args_remainder(); - - auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, diff --git a/build/source/actors/summa_actor/summa_actor.cpp b/build/source/actors/summa_actor/summa_actor.cpp index 6dd013a..24603dc 100644 --- a/build/source/actors/summa_actor/summa_actor.cpp +++ b/build/source/actors/summa_actor/summa_actor.cpp @@ -54,10 +54,10 @@ behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int for (size_t i = 0; i < timing_info.job_duration.size(); ++i) { - aout(self) << "\n________________Job " << ++i << " Info_____________\n" + aout(self) << "\n________________Job " << i + 1 << " Info_____________\n" << "Job Duration = " << timing_info.job_duration[i] << "\n" - << "Job Read Duration = " << timing_info.job_read_duration[i - 1] << "\n" - << "Job Write Duration = " << timing_info.job_write_duration[i - 1] << "\n" + << "Job Read Duration = " << timing_info.job_read_duration[i] << "\n" + << "Job Write Duration = " << timing_info.job_write_duration[i] << "\n" << "_____________________________________________________\n\n"; } -- GitLab