diff --git a/build/source/job_actor/distributed_job_actor.cpp b/build/source/job_actor/distributed_job_actor.cpp index 11af21b37c65934fd5387fd4efdb2b727f70e08e..67da8f85c5825b29ed075a6259b89dc2385e8dbf 100644 --- a/build/source/job_actor/distributed_job_actor.cpp +++ b/build/source/job_actor/distributed_job_actor.cpp @@ -1,7 +1,8 @@ #include "job_actor.hpp" #include "summa_actor.hpp" #include "node_actor.hpp" - +#include <fstream> +#include <iostream> namespace caf { behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, @@ -71,7 +72,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, // Get the number of actors that will be load balanced self->state.num_hrus_to_swap = std::round( (self->state.num_gru / - distributed_settings.num_nodes) * 0.25); + distributed_settings.num_nodes) * 0.10); aout(self) << "Distributed Job Actor: Number of HRUs to Swap = " << self->state.num_hrus_to_swap << "\n"; @@ -171,11 +172,66 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, return p1.second < p2.second; }); + auto max_node_hru_times = self->state.node_to_hru_map[max_node->first]; + aout(self) << "Distributed Job_Actor: Max Node is = " + << max_node->first << "\n"; + aout(self) << "Distributed Job_Actor: Max Node Walltime Size = " + << max_node_hru_times.size() << "\n"; + + std::string max_file = "max_node_times.txt"; + std::ifstream max_file_stream(max_file); + bool max_file_exists = max_file_stream.good(); + + // Open the file for writing + std::ofstream max_file_out(max_file,max_file_exists ? + std::ios::app : std::ios::out); + + if (!max_file_out.is_open()) { + aout(self) << "Error opening file\n"; + exit(1); + } + + // Output all the values to the file + for (auto& hru_time : max_node_hru_times) { + std::string actor_ref = to_string(hru_time.first); + max_file_out << "(" << actor_ref << ":" + << hru_time.second << "), "; + } + max_file_out << "\n"; + + + auto min_node_hru_times = self->state.node_to_hru_map[min_node->first]; + aout(self) << "Distributed Job_Actor: Min Node is = " + << min_node->first << "\n"; + aout(self) << "Distributed Job_Actor: Min Node Walltime Size = " + << min_node_hru_times.size() << "\n"; + + std::string min_file = "min_node_times.txt"; + std::ifstream min_file_stream(min_file); + bool min_file_exists = min_file_stream.good(); + + // Open the file for writing + std::ofstream min_file_out(min_file,min_file_exists ? + std::ios::app : std::ios::out); + + if (!min_file_out.is_open()) { + aout(self) << "Error opening file\n"; + exit(1); + } + // Output all the values to the file + for (auto& hru_time : min_node_hru_times) { + std::string actor_ref = to_string(hru_time.first); + min_file_out << "(" << actor_ref << ":" + << hru_time.second << "), "; + } + min_file_out << "\n"; + + // max_node_hru_times map std::vector<std::pair<caf::actor, double>> max_hru_times( @@ -186,6 +242,15 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, return p1.second > p2.second; }); + // Output all the values to the file + for (auto& hru_time : max_node_hru_times) { + std::string actor_ref = to_string(hru_time.first); + max_file_out << "(" << actor_ref << ":" + << hru_time.second << "), "; + } + max_file_out << "\n"; + + // min_node_hru_times map std::vector<std::pair<caf::actor, double>> min_hru_times( min_node_hru_times.begin(), min_node_hru_times.end()); @@ -195,6 +260,17 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, return p1.second < p2.second; }); + // Output all the values to the file + for (auto& hru_time : min_node_hru_times) { + std::string actor_ref = to_string(hru_time.first); + min_file_out << "(" << actor_ref << ":" + << hru_time.second << "), "; + } + min_file_out << "\n"; + + max_file_out.close(); + min_file_out.close(); + // Get the 25% HRUs with the highest walltimes states from the // max node aout(self) << "Distributed Job_Actor: Requesting serialized state from max\n"; @@ -260,7 +336,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, self->state.num_serialize_messages_received++; if (self->state.num_serialize_messages_received >= self->state.num_serialize_messages_sent) { - aout(self) << "Distributed Job_Actor: Done Redistributing HRUs\n"; + self->state.num_serialize_messages_received = 0; self->state.num_serialize_messages_sent = 0; @@ -270,6 +346,9 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, self->state.load_balance_end_time - self->state.load_balance_start_time); self->state.load_balance_time += duration.count(); + + aout(self) << "Distributed Job_Actor: Load Balance Time = " + << duration.count() / 1000 << " s\n"; self->send(self, update_hru_v); } }, @@ -311,8 +390,10 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, bool load_balance = false; if (self->state.distributed_settings.load_balancing) { - + size_t index = 0; + chrono_time load_balance_check_start_time = + std::chrono::system_clock::now(); for (auto& node_walltime : self->state.node_walltimes_map) { self->state.node_walltimes[index] = node_walltime.second; index++; @@ -330,8 +411,18 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, self->state.node_walltimes.end(), 0.0); double mean = sum / self->state.node_walltimes.size(); double percent_diff = (diff / mean) * 100; + aout(self) << "Distributed Job_Actor: Percent Diff = " + << percent_diff << "\n"; + + chrono_time load_balance_check_end_time = + std::chrono::system_clock::now(); + auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( + load_balance_check_end_time - load_balance_check_start_time); + + aout(self) << "Distributed Job_Actor: Load Balance Check Time = " + << duration.count() / 1000 << " s\n"; - double load_balancing_threshold = 10; + double load_balancing_threshold = 25; if (percent_diff > load_balancing_threshold) { load_balance = true; }