From 76e881310fe9242859f94386b28c1d7d9b5ee2f6 Mon Sep 17 00:00:00 2001 From: KyleKlenk <kyle.c.klenk@gmail.com> Date: Wed, 10 Apr 2024 09:49:41 -0600 Subject: [PATCH] Adjusted load balancing strategy, removed output files to check GRU times --- build/source/hru_actor/hru_actor.cpp | 1 + .../job_actor/distributed_job_actor.cpp | 110 ++---------------- build/source/job_actor/node_actor.cpp | 26 ----- 3 files changed, 8 insertions(+), 129 deletions(-) diff --git a/build/source/hru_actor/hru_actor.cpp b/build/source/hru_actor/hru_actor.cpp index c59aa51..3b7282f 100644 --- a/build/source/hru_actor/hru_actor.cpp +++ b/build/source/hru_actor/hru_actor.cpp @@ -146,6 +146,7 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, self->quit(); return; } + self->send(self->state.parent, done_update_v, self->state.walltime_timestep, self->state.indxGRU); }, diff --git a/build/source/job_actor/distributed_job_actor.cpp b/build/source/job_actor/distributed_job_actor.cpp index 8e1efef..c4b0088 100644 --- a/build/source/job_actor/distributed_job_actor.cpp +++ b/build/source/job_actor/distributed_job_actor.cpp @@ -72,7 +72,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, // Get the number of actors that will be load balanced self->state.num_hrus_to_swap = std::round( (self->state.num_gru / - distributed_settings.num_nodes) * 0.10); + distributed_settings.num_nodes) * 0.15); aout(self) << "Distributed Job Actor: Number of HRUs to Swap = " << self->state.num_hrus_to_swap << "\n"; @@ -153,7 +153,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, [=](load_balance) { self->state.load_balance_start_time = std::chrono::system_clock::now(); - aout(self) << "Distributed Job_Actor: Finding max and min elements\n"; + // aout(self) << "Distributed Job_Actor: Finding max and min elements\n"; // Find the node with the highest walltime from the map auto max_node = std::max_element( self->state.node_walltimes_map.begin(), @@ -172,66 +172,11 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, return p1.second < p2.second; }); - auto max_node_hru_times = self->state.node_to_hru_map[max_node->first]; - aout(self) << "Distributed Job_Actor: Max Node is = " - << max_node->first << "\n"; - aout(self) << "Distributed Job_Actor: Max Node Walltime Size = " - << max_node_hru_times.size() << "\n"; - - std::string max_file = "max_node_times.txt"; - std::ifstream max_file_stream(max_file); - bool max_file_exists = max_file_stream.good(); - - // Open the file for writing - std::ofstream max_file_out(max_file,max_file_exists ? - std::ios::app : std::ios::out); - - if (!max_file_out.is_open()) { - aout(self) << "Error opening file\n"; - exit(1); - } - - // Output all the values to the file - for (auto& hru_time : max_node_hru_times) { - std::string actor_ref = to_string(hru_time.first); - max_file_out << "(" << actor_ref << ":" - << hru_time.second << "), "; - } - max_file_out << "\n"; - - auto min_node_hru_times = self->state.node_to_hru_map[min_node->first]; - aout(self) << "Distributed Job_Actor: Min Node is = " - << min_node->first << "\n"; - aout(self) << "Distributed Job_Actor: Min Node Walltime Size = " - << min_node_hru_times.size() << "\n"; - - std::string min_file = "min_node_times.txt"; - std::ifstream min_file_stream(min_file); - bool min_file_exists = min_file_stream.good(); - - // Open the file for writing - std::ofstream min_file_out(min_file,min_file_exists ? - std::ios::app : std::ios::out); - - if (!min_file_out.is_open()) { - aout(self) << "Error opening file\n"; - exit(1); - } - - // Output all the values to the file - for (auto& hru_time : min_node_hru_times) { - std::string actor_ref = to_string(hru_time.first); - min_file_out << "(" << actor_ref << ":" - << hru_time.second << "), "; - } - min_file_out << "\n"; - - // max_node_hru_times map std::vector<std::pair<caf::actor, double>> max_hru_times( @@ -242,15 +187,6 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, return p1.second > p2.second; }); - // Output all the values to the file - for (auto& hru_time : max_hru_times) { - std::string actor_ref = to_string(hru_time.first); - max_file_out << "(" << actor_ref << ":" - << hru_time.second << "), "; - } - max_file_out << "\n"; - - // min_node_hru_times map std::vector<std::pair<caf::actor, double>> min_hru_times( min_node_hru_times.begin(), min_node_hru_times.end()); @@ -260,27 +196,17 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, return p1.second < p2.second; }); - // Output all the values to the file - for (auto& hru_time : min_hru_times) { - std::string actor_ref = to_string(hru_time.first); - min_file_out << "(" << actor_ref << ":" - << hru_time.second << "), "; - } - min_file_out << "\n"; - - max_file_out.close(); - min_file_out.close(); // Get the 25% HRUs with the highest walltimes states from the // max node - aout(self) << "Distributed Job_Actor: Requesting serialized state from max\n"; + // aout(self) << "Distributed Job_Actor: Requesting serialized state from max\n"; for (int i = 0; i < self->state.num_hrus_to_swap; i++) { self->send(max_hru_times[i].first, serialize_hru_v); } // Get the 25% HRUs with the lowest walltimes states from the // min node - aout(self) << "Distributed Job_Actor: Requesting serialized state from min\n"; + // aout(self) << "Distributed Job_Actor: Requesting serialized state from min\n"; for (int i = 0; i < self->state.num_hrus_to_swap; i++) { self->send(min_hru_times[i].first, serialize_hru_v); } @@ -303,7 +229,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, if (self->state.num_serialize_messages_received >= self->state.num_serialize_messages_sent) { - aout(self) << "Distributed Job_Actor: Redistributing HRUs\n"; + // aout(self) << "Distributed Job_Actor: Redistributing HRUs\n"; int num_sent = 0; // Redistribute the HRU data @@ -371,29 +297,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, int steps_to_write = 1; for (auto node : self->state.connected_nodes) { auto node_hru_walltimes = self->state.node_to_hru_map[node]; - std::string node_file = "node_" + to_string(node) + "_times.txt"; - std::ifstream node_file_stream(node_file); - bool node_file_exists = node_file_stream.good(); - - // Open the file for writing - std::ofstream node_file_out(node_file,node_file_exists ? - std::ios::app : std::ios::out); - if (!node_file_out.is_open()) { - aout(self) << "Error opening file\n"; - exit(1); - } - - // Output all the values to the file - for (auto& hru_time : node_hru_walltimes) { - std::string actor_ref = to_string(hru_time.first); - node_file_out << "(" << actor_ref << ":" - << hru_time.second << "), "; - } - node_file_out << "\n"; - node_file_out.close(); - - self->send(node, write_output_v, steps_to_write); } @@ -443,8 +347,8 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( load_balance_check_end_time - load_balance_check_start_time); - aout(self) << "Distributed Job_Actor: Load Balance Check Time = " - << duration.count() / 1000 << " s\n"; + // aout(self) << "Distributed Job_Actor: Load Balance Check Time = " + // << duration.count() / 1000 << " s\n"; double load_balancing_threshold = 25; if (percent_diff > load_balancing_threshold) { diff --git a/build/source/job_actor/node_actor.cpp b/build/source/job_actor/node_actor.cpp index 25e1d4f..76d4bca 100644 --- a/build/source/job_actor/node_actor.cpp +++ b/build/source/job_actor/node_actor.cpp @@ -147,32 +147,6 @@ behavior node_actor(stateful_actor<node_state>* self, std::string host, if (self->state.num_gru_done_timestep >= self->state.gru_container.gru_list.size()) { - - std::string actual_node_file = "self-node_" + - to_string(caf::actor_cast<caf::actor>(self)) + ".txt"; - - std::ifstream node_file(actual_node_file); - bool file_exists = node_file.good(); - - std::ofstream node_file_out(actual_node_file, file_exists - ? std::ios_base::app : std::ios::out); - - if (!node_file_out.is_open()) { - aout(self) << "Error Opening File\n"; - exit(1); - } - - for(const auto& pair : self->state.hru_walltimes) { - std::string actor_ref = to_string(pair.first); - node_file_out << "(" << actor_ref << ":" - << pair.second << "), "; - } - node_file_out << "\n"; - - node_file_out.close(); - - - // Get Time Taken for the entire timestep self->state.timestep_end_time -- GitLab