From 1204e739fc0bc77b8c9e62c7035d01b1e5636b78 Mon Sep 17 00:00:00 2001
From: Kyle Klenk <kyle.c.klenk@gmail.com>
Date: Mon, 1 Apr 2024 22:57:37 +0000
Subject: [PATCH] Started pre-allocating structures in load balancing routines

---
 build/includes/job_actor/job_actor.hpp        |  3 +
 .../job_actor/distributed_job_actor.cpp       | 83 ++++++++++++-------
 2 files changed, 56 insertions(+), 30 deletions(-)

diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp
index 14b4cc4..d0ab14e 100644
--- a/build/includes/job_actor/job_actor.hpp
+++ b/build/includes/job_actor/job_actor.hpp
@@ -121,6 +121,9 @@ struct distributed_job_state {
 
   std::unordered_map<caf::actor, std::vector<std::pair<caf::actor, hru>>> 
       node_to_hru_to_balance_map;
+  std::unordered_map<caf::actor, int> node_to_hru_to_balance_map_size; 
+
+  int num_hrus_to_swap = 0; // We want to swap %25 of the HRUs
 
   // Forcing information
   int iFile = 1; // index of current forcing file from forcing file list
diff --git a/build/source/job_actor/distributed_job_actor.cpp b/build/source/job_actor/distributed_job_actor.cpp
index e03e028..11af21b 100644
--- a/build/source/job_actor/distributed_job_actor.cpp
+++ b/build/source/job_actor/distributed_job_actor.cpp
@@ -69,6 +69,18 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
                 << self->state.node_num_gru_info[i].num_gru_local << "\n"; 
   }
 
+  // Get the number of actors that will be load balanced
+  self->state.num_hrus_to_swap = std::round( (self->state.num_gru / 
+      distributed_settings.num_nodes) * 0.25);
+  aout(self) << "Distributed Job Actor: Number of HRUs to Swap = " 
+             << self->state.num_hrus_to_swap << "\n";
+
+  // Preallocate the vectors
+  self->state.node_walltimes.resize(distributed_settings.num_nodes);
+
+
+
+
   auto is_published = self->system().middleman().publish(self, 
       distributed_settings.port);
   if (!is_published) {
@@ -92,8 +104,11 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
       self->monitor(client_actor);
       self->state.connected_nodes.push_back(client_actor);
       // Create an empty vector for the nodes hru pairs
-      self->state.node_to_hru_to_balance_map[client_actor] = 
-          std::vector<std::pair<caf::actor, hru>>();
+      std::vector<std::pair<caf::actor, hru>> tempVector;
+      tempVector.resize(self->state.num_hrus_to_swap);
+      self->state.node_to_hru_to_balance_map[client_actor] = tempVector;
+      self->state.node_to_hru_to_balance_map_size[client_actor] = 0;
+
 
       if (self->state.connected_nodes.size() == 
           distributed_settings.num_nodes) {
@@ -161,7 +176,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
       auto min_node_hru_times =
           self->state.node_to_hru_map[min_node->first];
 
-      // Find the 5 HRUs with the highest walltimes from the 
+
       // max_node_hru_times map
       std::vector<std::pair<caf::actor, double>> max_hru_times(
           max_node_hru_times.begin(), max_node_hru_times.end());
@@ -171,6 +186,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
             return p1.second > p2.second;
           });
       
+      // min_node_hru_times map
       std::vector<std::pair<caf::actor, double>> min_hru_times(
           min_node_hru_times.begin(), min_node_hru_times.end());
       std::sort(min_hru_times.begin(), min_hru_times.end(),
@@ -179,21 +195,22 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
             return p1.second < p2.second;
           });
 
-      // Get the 5 HRUs with the highest walltimes states from the 
+      // Get the 25% HRUs with the highest walltimes states from the 
       // max node
       aout(self) << "Distributed Job_Actor: Requesting serialized state from max\n";
-      for (int i = 0; i < 5; i++) {
+      for (int i = 0; i < self->state.num_hrus_to_swap; i++) {
         self->send(max_hru_times[i].first, serialize_hru_v);
       }
 
-      // Get the 5 HRUs with the lowest walltimes states from the
+      // Get the 25% HRUs with the lowest walltimes states from the
       // min node
       aout(self) << "Distributed Job_Actor: Requesting serialized state from min\n";
-      for (int i = 0; i < 5; i++) {
+      for (int i = 0; i < self->state.num_hrus_to_swap; i++) {
         self->send(min_hru_times[i].first, serialize_hru_v);
       }
 
-      self->state.num_serialize_messages_sent = 10;
+      self->state.num_serialize_messages_sent = 
+          self->state.num_hrus_to_swap * 2;
     },
 
     [=] (caf::actor actor_ref, hru hru_data) {
@@ -201,8 +218,12 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
 
       // Get the node the actor_ref is part of 
       auto node = self->state.hru_to_node_map[actor_ref];
-      self->state.node_to_hru_to_balance_map[node].push_back(
-          std::make_pair(actor_ref, hru_data));
+
+      // Add the actor and hru data to the map and increment the size
+      self->state.node_to_hru_to_balance_map[node][
+          self->state.node_to_hru_to_balance_map_size[node]++
+          ] = std::make_pair(actor_ref, hru_data);
+      
 
       if (self->state.num_serialize_messages_received >= 
           self->state.num_serialize_messages_sent) {
@@ -210,11 +231,11 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
         int num_sent = 0;
 
         // Redistribute the HRU data
-        auto node_1_vector = self->state.node_to_hru_to_balance_map[
+        auto& node_1_vector = self->state.node_to_hru_to_balance_map[
             self->state.connected_nodes[0]];
-        auto node_2_vector = self->state.node_to_hru_to_balance_map[
+        auto& node_2_vector = self->state.node_to_hru_to_balance_map[
             self->state.connected_nodes[1]];
-        for (int i = 0; i < 5; i++) {
+        for (int i = 0; i < self->state.num_hrus_to_swap; i++) {
           auto hru1 = node_1_vector[i];
           auto hru2 = node_2_vector[i];
           self->send(self->state.connected_nodes[0], reinit_hru_v, 
@@ -227,13 +248,16 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
         aout(self) << "Distributed Job_Actor: All requests sent: " 
                    << num_sent << "\n";
 
+        self->state.node_to_hru_to_balance_map_size[
+            self->state.connected_nodes[0]] = 0;
+        self->state.node_to_hru_to_balance_map_size[
+            self->state.connected_nodes[1]] = 0;
         self->state.num_serialize_messages_received = 0;
       }
     },
 
     [=](reinit_hru) {
       self->state.num_serialize_messages_received++;
-      aout(self) << "Recieved: " << self->state.num_serialize_messages_received << "\n";
       if (self->state.num_serialize_messages_received >= 
           self->state.num_serialize_messages_sent) {
         aout(self) << "Distributed Job_Actor: Done Redistributing HRUs\n";
@@ -246,7 +270,6 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
             self->state.load_balance_end_time - 
             self->state.load_balance_start_time);
         self->state.load_balance_time += duration.count();
-        self->state.node_walltimes.clear();
         self->send(self, update_hru_v);
       }
     },
@@ -260,16 +283,12 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
       self->state.node_walltimes_map[sender] = node_walltime;
       self->state.node_to_hru_map[sender] = hru_walltimes;
 
-      // self->state.node_walltimes.push_back(node_walltime);
 
       for (auto& hru_walltime : hru_walltimes) {
         self->state.hru_walltimes[hru_walltime.first] = hru_walltime.second;
       }
 
       if (self->state.messages_returned >= distributed_settings.num_nodes) {
-
-
-
         int steps_to_write = 1;
         for (auto node : self->state.connected_nodes) {
           self->send(node, write_output_v, steps_to_write);
@@ -292,24 +311,28 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self,
 
         bool load_balance = false;
         if (self->state.distributed_settings.load_balancing) {
-          // Calculate the standard deviation of the node_wall_times_map
+
+          size_t index = 0;
           for (auto& node_walltime : self->state.node_walltimes_map) {
-            self->state.node_walltimes.push_back(node_walltime.second);
+            self->state.node_walltimes[index] = node_walltime.second;
+            index++;
+          }
+
+          if (self->state.node_walltimes.size() > 2) {
+            aout(self) << "ERROR: Too Many Nodes\n";
+            self->quit();
           }
 
-          // Calculate the standard Deviation of the walltimes
+          // Calculate the percent difference between the max and min node
+          double diff = std::abs(self->state.node_walltimes[0] - 
+              self->state.node_walltimes[1]);
           double sum = std::accumulate(self->state.node_walltimes.begin(), 
               self->state.node_walltimes.end(), 0.0);
           double mean = sum / self->state.node_walltimes.size();
-          double sq_sum = std::inner_product(self->state.node_walltimes.begin(), 
-              self->state.node_walltimes.end(), 
-              self->state.node_walltimes.begin(), 0.0);
-          double variance = 
-              sq_sum / self->state.node_walltimes.size() - mean * mean;
-          double stdev = std::sqrt(variance);
+          double percent_diff = (diff / mean) * 100;
           
-          double load_balancing_threshold = 0.005;
-          if (stdev > load_balancing_threshold) {
+          double load_balancing_threshold = 10;
+          if (percent_diff > load_balancing_threshold) {
             load_balance = true;
           }
         }
-- 
GitLab