diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp index 73fba206bd4c340099764c52c2bb2ad09732f82b..14b4cc456e80a098b55855493b92243059e1c945 100644 --- a/build/includes/job_actor/job_actor.hpp +++ b/build/includes/job_actor/job_actor.hpp @@ -119,6 +119,9 @@ struct distributed_job_state { std::vector<std::pair<caf::actor, hru>> hrus_to_balance; + std::unordered_map<caf::actor, std::vector<std::pair<caf::actor, hru>>> + node_to_hru_to_balance_map; + // Forcing information int iFile = 1; // index of current forcing file from forcing file list int stepsInCurrentFFile; diff --git a/build/source/job_actor/distributed_job_actor.cpp b/build/source/job_actor/distributed_job_actor.cpp index fed9ca60b67613c42787c3710fcddb02040ae393..e03e0284ac3c72f615643f59f35b7763d69b5250 100644 --- a/build/source/job_actor/distributed_job_actor.cpp +++ b/build/source/job_actor/distributed_job_actor.cpp @@ -90,8 +90,10 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, aout(self) << "Received a connect request from: " << hostname << "\n"; self->monitor(client_actor); - self->state.connected_nodes.push_back(client_actor); + // Create an empty vector for the nodes hru pairs + self->state.node_to_hru_to_balance_map[client_actor] = + std::vector<std::pair<caf::actor, hru>>(); if (self->state.connected_nodes.size() == distributed_settings.num_nodes) { @@ -133,9 +135,9 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, }, [=](load_balance) { - self->state.load_balance_start_time = std::chrono::system_clock::now(); - + + aout(self) << "Distributed Job_Actor: Finding max and min elements\n"; // Find the node with the highest walltime from the map auto max_node = std::max_element( self->state.node_walltimes_map.begin(), @@ -179,12 +181,14 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, // Get the 5 HRUs with the highest walltimes states from the // max node + aout(self) << "Distributed Job_Actor: Requesting serialized state from max\n"; for (int i = 0; i < 5; i++) { self->send(max_hru_times[i].first, serialize_hru_v); } // Get the 5 HRUs with the lowest walltimes states from the // min node + aout(self) << "Distributed Job_Actor: Requesting serialized state from min\n"; for (int i = 0; i < 5; i++) { self->send(min_hru_times[i].first, serialize_hru_v); } @@ -195,47 +199,44 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, [=] (caf::actor actor_ref, hru hru_data) { self->state.num_serialize_messages_received++; - self->state.hrus_to_balance.push_back( + // Get the node the actor_ref is part of + auto node = self->state.hru_to_node_map[actor_ref]; + self->state.node_to_hru_to_balance_map[node].push_back( std::make_pair(actor_ref, hru_data)); if (self->state.num_serialize_messages_received >= self->state.num_serialize_messages_sent) { + aout(self) << "Distributed Job_Actor: Redistributing HRUs\n"; + int num_sent = 0; // Redistribute the HRU data - while(!self->state.hrus_to_balance.empty()) { - // Find two HRUs that have different node owners - auto hru1 = self->state.hrus_to_balance.back(); - self->state.hrus_to_balance.pop_back(); - auto node_1 = self->state.hru_to_node_map[hru1.first]; - for (auto hru2 : self->state.hrus_to_balance) { - auto node_2 = self->state.hru_to_node_map[hru2.first]; - if (node_1 != node_2) { - - self->send(node_1, reinit_hru_v, hru1.first, hru2.second); - self->send(node_2, reinit_hru_v, hru2.first, hru1.second); - - self->state.hrus_to_balance.erase( - std::remove_if(self->state.hrus_to_balance.begin(), - self->state.hrus_to_balance.end(), - [&hru2](const auto& pair) { - return pair.first == hru2.first; } - ), - self->state.hrus_to_balance.end() - ); - break; - } - } + auto node_1_vector = self->state.node_to_hru_to_balance_map[ + self->state.connected_nodes[0]]; + auto node_2_vector = self->state.node_to_hru_to_balance_map[ + self->state.connected_nodes[1]]; + for (int i = 0; i < 5; i++) { + auto hru1 = node_1_vector[i]; + auto hru2 = node_2_vector[i]; + self->send(self->state.connected_nodes[0], reinit_hru_v, + hru1.first, hru2.second); + self->send(self->state.connected_nodes[1], reinit_hru_v, + hru2.first, hru1.second); + num_sent += 2; } + aout(self) << "Distributed Job_Actor: All requests sent: " + << num_sent << "\n"; + self->state.num_serialize_messages_received = 0; } }, [=](reinit_hru) { self->state.num_serialize_messages_received++; - + aout(self) << "Recieved: " << self->state.num_serialize_messages_received << "\n"; if (self->state.num_serialize_messages_received >= self->state.num_serialize_messages_sent) { + aout(self) << "Distributed Job_Actor: Done Redistributing HRUs\n"; self->state.num_serialize_messages_received = 0; self->state.num_serialize_messages_sent = 0; diff --git a/build/source/job_actor/node_actor.cpp b/build/source/job_actor/node_actor.cpp index b1b0c2d31ca42b85b7a02cfab4a66dfaef93eacc..944930a14f85bf4c0bed0faa2dae624a08173455 100644 --- a/build/source/job_actor/node_actor.cpp +++ b/build/source/job_actor/node_actor.cpp @@ -72,8 +72,9 @@ behavior node_actor(stateful_actor<node_state>* self, std::string host, num_gru = self->state.num_gru_info.num_gru_local; num_hru = self->state.num_gru_info.num_gru_local; } - - + + self->state.node_timing.addTimePoint("node_init"); + self->state.node_timing.updateStartPoint("node_init"); int err, file_gru_to_remove; job_init_fortran(self->state.job_actor_settings.file_manager_path.c_str(), &start_gru, &num_gru, &num_hru, &file_gru_to_remove, &err); @@ -89,6 +90,7 @@ behavior node_actor(stateful_actor<node_state>* self, std::string host, self->monitor(self->state.file_access_actor); self->send(self->state.file_access_actor, def_output_v, self->state.num_gru_info.file_gru); + self->state.node_timing.updateEndPoint("node_init"); }, [=](init_file_access_actor, int num_timesteps) { @@ -248,6 +250,7 @@ behavior node_actor(stateful_actor<node_state>* self, std::string host, aout(self) << "Total Duration: " << total_duration << " seconds\n" << "Total Duration: " << total_dur_min << " minutes\n" << "Total Duration: " << total_dur_hr << " hours\n" + << "Init Duration: " << init_duration << " seconds\n" << "___________________Node Finished__________________\n"; exit(1); });