diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp index 88f9ea4d702fd8aca5c443cc7624e8afeb05410e..cab8e6471aeea49df0050b6133f6167fd7e32a5f 100644 --- a/build/includes/job_actor/job_actor.hpp +++ b/build/includes/job_actor/job_actor.hpp @@ -20,7 +20,8 @@ * Job Actor Fortran Functions *********************************************/ extern "C" { - void job_init_fortran(char const* file_manager, int* start_gru_index, int* num_gru, int* num_hru, int* err); + void job_init_fortran(char const* file_manager, int* start_gru_index, + int* num_gru, int* num_hru, int* err); void deallocateJobActor(int* err); } diff --git a/build/source/hru_actor/hru_batch_actor.cpp b/build/source/hru_actor/hru_batch_actor.cpp index 82b6d6d6174e7ee19aefc5bfbce7d34d56ec6759..91e3b4efd7a7ba2326d494e64427b0e59e31c636 100644 --- a/build/source/hru_actor/hru_batch_actor.cpp +++ b/build/source/hru_actor/hru_batch_actor.cpp @@ -22,21 +22,18 @@ behavior hru_batch_actor(stateful_actor<hru_batch_state>* self, return { [=](update_timeZoneOffset, int iFile) { - // aout(self) << "HRU Batch Actor - Update Time Zone Offset\n"; for (auto& hru_actor : self->state.hru_actors) { self->send(hru_actor, update_timeZoneOffset_v, iFile); } }, [=](update_hru, int timestep, int forcingstep) { - // aout(self) << "HRU Batch Actor - Update HRU\n"; for (auto& hru_actor : self->state.hru_actors) { self->send(hru_actor, update_hru_v, timestep, forcingstep); } }, [=](done_update) { - // aout(self) << "HRU Batch Actor - Done Update\n"; self->state.num_done++; if (self->state.num_done == self->state.hru_actors.size()) { self->send(self->state.parent, done_update_v); diff --git a/build/source/job_actor/distributed_job_actor.cpp b/build/source/job_actor/distributed_job_actor.cpp index af33849d762051e885b803ec3f8466d79a0abc99..c27b5eea3f078333773cfee8df6c7cb407c4475d 100644 --- a/build/source/job_actor/distributed_job_actor.cpp +++ b/build/source/job_actor/distributed_job_actor.cpp @@ -5,12 +5,15 @@ namespace caf { behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, - int start_gru, int num_gru, - Distributed_Settings distributed_settings, - File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, - HRU_Actor_Settings hru_actor_settings) { + int start_gru, int num_gru, Distributed_Settings distributed_settings, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings) { aout(self) << "Starting Distributed Job Actor\n"; + self->state.job_timing = TimingInfo(); + self->state.job_timing.addTimePoint("total_duration"); + self->state.job_timing.updateStartPoint("total_duration"); + self->set_down_handler([=](const down_msg& dm){ aout(self) << "Received Down Message\n"; }); @@ -59,23 +62,22 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, } auto is_published = self->system().middleman().publish(self, - distributed_settings.port); - + distributed_settings.port); if (!is_published) { aout(self) << "Unable to publish actor\n"; self->quit(); return {}; } - aout(self) << "Published Actor\n"; + aout(self) << "Distributed Job Actor Has Been Published\n"; // Spawn the local node actor auto node = self->spawn(node_actor, "", self, distributed_settings, - file_access_actor_settings, job_actor_settings, - hru_actor_settings); + file_access_actor_settings, job_actor_settings, hru_actor_settings); return { + // Message from nodes when they are connected [=](connect_to_server, actor client_actor, std::string hostname) { aout(self) << "Received a connect request from: " << hostname << "\n"; @@ -85,6 +87,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, if (self->state.connected_nodes.size() == distributed_settings.num_nodes) { + aout(self) << "All Nodes Connected\n"; for (int i = 0; i < distributed_settings.num_nodes; i++) { @@ -94,43 +97,38 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, } } }, - + + // Get the next forcing file in the simulation [=](new_forcing_file, int num_steps_in_iFile, int nextFile, int num_timesteps) { - aout(self) << "New forcing file loaded for node\n"; self->state.messages_returned++; self->state.iFile = nextFile; self->state.stepsInCurrentFFile = num_steps_in_iFile; self->state.forcingStep = 1; if (self->state.num_steps == 0) { - aout(self) << "Setting num_steps\n"; self->state.num_steps = num_timesteps; aout(self) << "Num Steps: " << self->state.num_steps << "\n"; } if (self->state.messages_returned >= distributed_settings.num_nodes) { - aout(self) << "All files loaded\n"; - self->state.messages_returned = 0; self->send(self, update_hru_v); } - }, - + + // Run the simulation for one timestep [=](update_hru) { for(auto node : self->state.connected_nodes) { self->send(node, update_hru_v); } }, + // Message from nodes when they have finished a timestep [=](done_update) { self->state.messages_returned++; if (self->state.messages_returned >= distributed_settings.num_nodes) { - aout(self) << "Distributed Job_Actor: Done Update for timestep:" - << self->state.timestep << "\n"; - int steps_to_write = 1; - + for (auto node : self->state.connected_nodes) { self->send(node, write_output_v, steps_to_write); } @@ -139,10 +137,11 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, } }, + // Message from nodes when they have finished writing output [=](write_output) { self->state.messages_returned++; if (self->state.messages_returned >= distributed_settings.num_nodes) { - aout(self) << "Distributed Job_Actor: Done Writing Output for timestep:" + aout(self) << "Distributed Job_Actor: Done timestep:" << self->state.timestep << "\n"; self->state.timestep++; @@ -151,17 +150,17 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, if (self->state.timestep > self->state.num_steps) { aout(self) << "Distributed Job_Actor: Done Simulation\n"; + for(auto node : self->state.connected_nodes) { + self->send(node, finalize_v); + } self->send(self, finalize_v); - } - - else if(self->state.forcingStep > self->state.stepsInCurrentFFile) { + + } else if(self->state.forcingStep > self->state.stepsInCurrentFFile) { aout(self) << "Distributed Job_Actor: Done Forcing File\n"; - self->send(self, new_forcing_file_v, - self->state.stepsInCurrentFFile, self->state.iFile + 1); - } - - else { - aout(self) << "Distributed Job_Actor: Updating HRUs\n"; + for(auto node : self->state.connected_nodes) { + self->send(node, access_forcing_v, self->state.iFile + 1); + } + } else { self->send(self, update_hru_v); } } @@ -173,8 +172,18 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, }, [=](finalize) { - aout(self) << "Finalizing\n"; - self->quit(); + aout(self) << "Simulation Finished\n"; + self->state.job_timing.updateEndPoint("total_duration"); + double total_dur_sec = self->state.job_timing.getDuration( + "total_duration").value_or(-1.0); + double total_dur_min = total_dur_sec / 60; + double total_dur_hr = total_dur_min / 60; + aout(self) << "Total Duration = " << total_dur_sec << " Seconds\n" + << "Total Duration = " << total_dur_min << " Minutes\n" + << "Total Duration = " << total_dur_hr << " Hours\n" + << "___________________Program Finished__________________\n"; + + std::exit(0); } diff --git a/build/source/job_actor/node_actor.cpp b/build/source/job_actor/node_actor.cpp index 0c644ae4e1e0f580409c44fca7b2704ba784667c..de9d466711444e47c26baa454d55f0df1b6c3b4c 100644 --- a/build/source/job_actor/node_actor.cpp +++ b/build/source/job_actor/node_actor.cpp @@ -88,6 +88,11 @@ behavior node_actor(stateful_actor<node_state>* self, self->state.iFile, self); }, + [=](access_forcing, int new_iFile) { + self->send(self->state.file_access_actor, access_forcing_v, new_iFile, + self); + }, + [=](new_forcing_file, int num_steps_in_iFile, int nextFile) { aout(self) << "Received New Forcing File\n"; self->state.iFile = nextFile; @@ -103,7 +108,6 @@ behavior node_actor(stateful_actor<node_state>* self, }, [=](update_hru) { - aout(self) << "Updating HRUs\n"; for (auto gru : self->state.gru_container.gru_list) { self->send(gru->getGRUActor(), update_hru_v, self->state.timestep, self->state.forcingStep); @@ -146,12 +150,12 @@ behavior node_actor(stateful_actor<node_state>* self, }); self->send(self->state.current_server, write_output_v); - } - - - - + }, + [=](finalize) { + aout(self) << "Done Simulation\n"; + std::exit(0); + } }; }