Skip to content
Snippets Groups Projects
Commit 3e8b895d authored by Kyle's avatar Kyle
Browse files

Refactor job_actor and hru_batch_actor functions

parent 555c40c6
No related branches found
No related tags found
1 merge request!7Data assimilation mode
......@@ -20,7 +20,8 @@
* Job Actor Fortran Functions
*********************************************/
extern "C" {
void job_init_fortran(char const* file_manager, int* start_gru_index, int* num_gru, int* num_hru, int* err);
void job_init_fortran(char const* file_manager, int* start_gru_index,
int* num_gru, int* num_hru, int* err);
void deallocateJobActor(int* err);
}
......
......@@ -22,21 +22,18 @@ behavior hru_batch_actor(stateful_actor<hru_batch_state>* self,
return {
[=](update_timeZoneOffset, int iFile) {
// aout(self) << "HRU Batch Actor - Update Time Zone Offset\n";
for (auto& hru_actor : self->state.hru_actors) {
self->send(hru_actor, update_timeZoneOffset_v, iFile);
}
},
[=](update_hru, int timestep, int forcingstep) {
// aout(self) << "HRU Batch Actor - Update HRU\n";
for (auto& hru_actor : self->state.hru_actors) {
self->send(hru_actor, update_hru_v, timestep, forcingstep);
}
},
[=](done_update) {
// aout(self) << "HRU Batch Actor - Done Update\n";
self->state.num_done++;
if (self->state.num_done == self->state.hru_actors.size()) {
self->send(self->state.parent, done_update_v);
......
......@@ -5,12 +5,15 @@
namespace caf {
behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
int start_gru, int num_gru,
Distributed_Settings distributed_settings,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings) {
int start_gru, int num_gru, Distributed_Settings distributed_settings,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings) {
aout(self) << "Starting Distributed Job Actor\n";
self->state.job_timing = TimingInfo();
self->state.job_timing.addTimePoint("total_duration");
self->state.job_timing.updateStartPoint("total_duration");
self->set_down_handler([=](const down_msg& dm){
aout(self) << "Received Down Message\n";
});
......@@ -59,23 +62,22 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
}
auto is_published = self->system().middleman().publish(self,
distributed_settings.port);
distributed_settings.port);
if (!is_published) {
aout(self) << "Unable to publish actor\n";
self->quit();
return {};
}
aout(self) << "Published Actor\n";
aout(self) << "Distributed Job Actor Has Been Published\n";
// Spawn the local node actor
auto node = self->spawn(node_actor, "", self, distributed_settings,
file_access_actor_settings, job_actor_settings,
hru_actor_settings);
file_access_actor_settings, job_actor_settings, hru_actor_settings);
return {
// Message from nodes when they are connected
[=](connect_to_server, actor client_actor, std::string hostname) {
aout(self) << "Received a connect request from: " << hostname << "\n";
......@@ -85,6 +87,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
if (self->state.connected_nodes.size() ==
distributed_settings.num_nodes) {
aout(self) << "All Nodes Connected\n";
for (int i = 0; i < distributed_settings.num_nodes; i++) {
......@@ -94,43 +97,38 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
}
}
},
// Get the next forcing file in the simulation
[=](new_forcing_file, int num_steps_in_iFile, int nextFile,
int num_timesteps) {
aout(self) << "New forcing file loaded for node\n";
self->state.messages_returned++;
self->state.iFile = nextFile;
self->state.stepsInCurrentFFile = num_steps_in_iFile;
self->state.forcingStep = 1;
if (self->state.num_steps == 0) {
aout(self) << "Setting num_steps\n";
self->state.num_steps = num_timesteps;
aout(self) << "Num Steps: " << self->state.num_steps << "\n";
}
if (self->state.messages_returned >= distributed_settings.num_nodes) {
aout(self) << "All files loaded\n";
self->state.messages_returned = 0;
self->send(self, update_hru_v);
}
},
// Run the simulation for one timestep
[=](update_hru) {
for(auto node : self->state.connected_nodes) {
self->send(node, update_hru_v);
}
},
// Message from nodes when they have finished a timestep
[=](done_update) {
self->state.messages_returned++;
if (self->state.messages_returned >= distributed_settings.num_nodes) {
aout(self) << "Distributed Job_Actor: Done Update for timestep:"
<< self->state.timestep << "\n";
int steps_to_write = 1;
for (auto node : self->state.connected_nodes) {
self->send(node, write_output_v, steps_to_write);
}
......@@ -139,10 +137,11 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
}
},
// Message from nodes when they have finished writing output
[=](write_output) {
self->state.messages_returned++;
if (self->state.messages_returned >= distributed_settings.num_nodes) {
aout(self) << "Distributed Job_Actor: Done Writing Output for timestep:"
aout(self) << "Distributed Job_Actor: Done timestep:"
<< self->state.timestep << "\n";
self->state.timestep++;
......@@ -151,17 +150,17 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
if (self->state.timestep > self->state.num_steps) {
aout(self) << "Distributed Job_Actor: Done Simulation\n";
for(auto node : self->state.connected_nodes) {
self->send(node, finalize_v);
}
self->send(self, finalize_v);
}
else if(self->state.forcingStep > self->state.stepsInCurrentFFile) {
} else if(self->state.forcingStep > self->state.stepsInCurrentFFile) {
aout(self) << "Distributed Job_Actor: Done Forcing File\n";
self->send(self, new_forcing_file_v,
self->state.stepsInCurrentFFile, self->state.iFile + 1);
}
else {
aout(self) << "Distributed Job_Actor: Updating HRUs\n";
for(auto node : self->state.connected_nodes) {
self->send(node, access_forcing_v, self->state.iFile + 1);
}
} else {
self->send(self, update_hru_v);
}
}
......@@ -173,8 +172,18 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
},
[=](finalize) {
aout(self) << "Finalizing\n";
self->quit();
aout(self) << "Simulation Finished\n";
self->state.job_timing.updateEndPoint("total_duration");
double total_dur_sec = self->state.job_timing.getDuration(
"total_duration").value_or(-1.0);
double total_dur_min = total_dur_sec / 60;
double total_dur_hr = total_dur_min / 60;
aout(self) << "Total Duration = " << total_dur_sec << " Seconds\n"
<< "Total Duration = " << total_dur_min << " Minutes\n"
<< "Total Duration = " << total_dur_hr << " Hours\n"
<< "___________________Program Finished__________________\n";
std::exit(0);
}
......
......@@ -88,6 +88,11 @@ behavior node_actor(stateful_actor<node_state>* self,
self->state.iFile, self);
},
[=](access_forcing, int new_iFile) {
self->send(self->state.file_access_actor, access_forcing_v, new_iFile,
self);
},
[=](new_forcing_file, int num_steps_in_iFile, int nextFile) {
aout(self) << "Received New Forcing File\n";
self->state.iFile = nextFile;
......@@ -103,7 +108,6 @@ behavior node_actor(stateful_actor<node_state>* self,
},
[=](update_hru) {
aout(self) << "Updating HRUs\n";
for (auto gru : self->state.gru_container.gru_list) {
self->send(gru->getGRUActor(), update_hru_v,
self->state.timestep, self->state.forcingStep);
......@@ -146,12 +150,12 @@ behavior node_actor(stateful_actor<node_state>* self,
});
self->send(self->state.current_server, write_output_v);
}
},
[=](finalize) {
aout(self) << "Done Simulation\n";
std::exit(0);
}
};
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment