From ccd8689960fb1096ce4257c2c438c44284b5e18a Mon Sep 17 00:00:00 2001
From: Kyle <kyle.c.klenk@gmail.com>
Date: Fri, 1 Mar 2024 22:38:04 +0000
Subject: [PATCH] Add HRU Batch Actors and handle lost connection to server

---
 build/includes/hru_actor/hru_batch_actor.hpp  |  22 ++
 build/includes/job_actor/job_actor.hpp        |   6 +
 build/includes/summa_actor/summa_actor.hpp    |  70 ++--
 .../actors/hru_actor/hru_batch_actor.cpp      |  55 +++
 build/source/actors/job_actor/job_actor.cpp   | 365 ++++++++++--------
 .../source/actors/summa_actor/summa_actor.cpp | 113 +++---
 .../actors/summa_actor/summa_client.cpp       |  56 ++-
 7 files changed, 395 insertions(+), 292 deletions(-)
 create mode 100644 build/includes/hru_actor/hru_batch_actor.hpp
 create mode 100644 build/source/actors/hru_actor/hru_batch_actor.cpp

diff --git a/build/includes/hru_actor/hru_batch_actor.hpp b/build/includes/hru_actor/hru_batch_actor.hpp
new file mode 100644
index 0000000..af3f3b3
--- /dev/null
+++ b/build/includes/hru_actor/hru_batch_actor.hpp
@@ -0,0 +1,22 @@
+#pragma once
+#include "caf/all.hpp"
+#include "hru_actor.hpp"
+
+namespace caf {
+
+struct hru_batch_state {
+  // Actor References
+	caf::actor file_access_actor;
+	caf::actor parent;
+
+  std::vector<caf::actor> hru_actors;
+  int num_done = 0;
+};
+
+behavior hru_batch_actor(stateful_actor<hru_batch_state>* self, 
+                         int start_gru_local, int start_gru_global, int num_gru, 
+                         HRU_Actor_Settings hru_actor_settings,
+                         caf::actor file_access_actor, caf::actor parent);
+
+
+} // namespace caf
\ No newline at end of file
diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp
index 5ee14cd..ba69073 100644
--- a/build/includes/job_actor/job_actor.hpp
+++ b/build/includes/job_actor/job_actor.hpp
@@ -7,6 +7,7 @@
 #include "global.hpp"
 #include "json.hpp"
 #include "hru_actor.hpp"
+#include "hru_batch_actor.hpp"
 #include "message_atoms.hpp"
 #include "file_access_actor.hpp"
 #include <unistd.h>
@@ -89,6 +90,11 @@ behavior job_actor(stateful_actor<job_state>* self,
 /*********************************************
  * Functions for the Job Actor
  *********************************************/
+// Spawn HRU Actors Individually
+void spawnHRUActors(stateful_actor<job_state>* self, bool normal_mode);
+// Spawn HRU Batch Actors
+void spawnHRUBatches(stateful_actor<job_state>* self);
+
 
 /** Get the information for the GRUs that will be written to the netcdf file */
 std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, 
diff --git a/build/includes/summa_actor/summa_actor.hpp b/build/includes/summa_actor/summa_actor.hpp
index 36457e1..8a736af 100644
--- a/build/includes/summa_actor/summa_actor.hpp
+++ b/build/includes/summa_actor/summa_actor.hpp
@@ -13,47 +13,53 @@ namespace caf {
 
 
 struct job_timing_info {
-    std::vector<double> job_duration;
-    std::vector<double> job_read_duration;
-    std::vector<double> job_write_duration;
+  std::vector<double> job_duration;
+  std::vector<double> job_read_duration;
+  std::vector<double> job_write_duration;
 };
 
 struct summa_actor_state {
-    // Timing Information For Summa-Actor
-    TimingInfo summa_actor_timing;
-    struct job_timing_info timing_info_for_jobs;
-
-    // Program Parameters
-    int startGRU;           // starting GRU for the simulation
-    int numGRU;             // number of GRUs to compute
-    int fileGRU;            // number of GRUs in the file
-    std::string configPath; // path to the fileManager.txt file
-    int numFailed = 0;      // Number of jobs that have failed
-    caf::actor currentJob;  // Reference to the current job actor
-    caf::actor parent;
-
-    // Batches
-    Batch_Container batch_container;
-    int current_batch_id;
-
-
-    // settings for all child actors (save in case we need to recover)
-    Summa_Actor_Settings summa_actor_settings;
-    File_Access_Actor_Settings file_access_actor_settings;
-    Job_Actor_Settings job_actor_settings; 
-    HRU_Actor_Settings hru_actor_settings;
+  // Timing Information For Summa-Actor
+  TimingInfo summa_actor_timing;
+  struct job_timing_info timing_info_for_jobs;
+
+  // Program Parameters
+  int startGRU;           // starting GRU for the simulation
+  int numGRU;             // number of GRUs to compute
+  int fileGRU;            // number of GRUs in the file
+  std::string configPath; // path to the fileManager.txt file
+  int numFailed = 0;      // Number of jobs that have failed
+  caf::actor currentJob;  // Reference to the current job actor
+  caf::actor parent;
+
+  // Batches
+  Batch_Container batch_container;
+  int current_batch_id;
+
+
+  // settings for all child actors (save in case we need to recover)
+  Summa_Actor_Settings summa_actor_settings;
+  File_Access_Actor_Settings file_access_actor_settings;
+  Job_Actor_Settings job_actor_settings; 
+  HRU_Actor_Settings hru_actor_settings;
 };
 
+
 behavior summa_actor(stateful_actor<summa_actor_state>* self, 
-    int startGRU, int numGRU, 
-    Summa_Actor_Settings summa_actor_settings, 
-    File_Access_Actor_Settings file_access_actor_settings,
-    Job_Actor_Settings job_actor_settings, 
-    HRU_Actor_Settings hru_actor_settings, actor parent);
+                     int startGRU, int numGRU, 
+                     Summa_Actor_Settings summa_actor_settings, 
+                     File_Access_Actor_Settings file_access_actor_settings,
+                     Job_Actor_Settings job_actor_settings, 
+                     HRU_Actor_Settings hru_actor_settings, actor parent);
 
 void spawnJob(stateful_actor<summa_actor_state>* self);
+} // namespace caf
 
 
+// Helper Function to extract a string from the line of a file that 
+// is enclosed in quotes
+std::string extractEnclosed(const std::string& line);
 
+// Gets the number of GRUs from the attribute file
+int getNumGRUInFile(const std::string &file_manager);
 
-} // namespace caf
\ No newline at end of file
diff --git a/build/source/actors/hru_actor/hru_batch_actor.cpp b/build/source/actors/hru_actor/hru_batch_actor.cpp
new file mode 100644
index 0000000..9cde3fd
--- /dev/null
+++ b/build/source/actors/hru_actor/hru_batch_actor.cpp
@@ -0,0 +1,55 @@
+#include "hru_batch_actor.hpp"
+
+namespace caf {
+
+behavior hru_batch_actor(stateful_actor<hru_batch_state>* self,
+                         int start_gru_local, int start_gru_global, int num_gru,
+                         HRU_Actor_Settings hru_actor_settings,
+                         caf::actor file_access_actor, caf::actor parent) {
+  // aout(self) << "HRU Batch Actor Started\n"
+  //            << "\tStart GRU Local: " << start_gru_local << "\n"
+  //            << "\tStart GRU Global: " << start_gru_global << "\n"
+  //            << "\tNum GRU: " << num_gru << "\n";
+  
+  self->state.file_access_actor = file_access_actor;
+  self->state.parent = parent;
+
+  
+  for (int i = 0; i < num_gru; i++) {
+    self->state.hru_actors.push_back(
+      self->spawn(hru_actor, start_gru_global + i, start_gru_local + i, 
+                  hru_actor_settings, file_access_actor, self)
+    );
+  }
+
+
+
+  return {
+    [=](update_timeZoneOffset, int iFile) {
+      // aout(self) << "HRU Batch Actor - Update Time Zone Offset\n";
+      for (auto& hru_actor : self->state.hru_actors) {
+        self->send(hru_actor, update_timeZoneOffset_v, iFile);
+      }
+    },
+
+    [=](update_hru, int timestep, int forcingstep) {
+      // aout(self) << "HRU Batch Actor - Update HRU\n";
+      for (auto& hru_actor : self->state.hru_actors) {
+        self->send(hru_actor, update_hru_v, timestep, forcingstep);
+      }
+    },
+    [=](done_update) {
+      // aout(self) << "HRU Batch Actor - Done Update\n";
+      self->state.num_done++;
+      if (self->state.num_done == self->state.hru_actors.size()) {
+        self->send(self->state.parent, done_update_v);
+        self->state.num_done = 0;
+      }
+    }
+  };
+
+}
+                            
+
+
+}
\ No newline at end of file
diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp
index 412e854..1220813 100644
--- a/build/source/actors/job_actor/job_actor.cpp
+++ b/build/source/actors/job_actor/job_actor.cpp
@@ -3,6 +3,8 @@
 using json = nlohmann::json;
 using chrono_time = std::chrono::time_point<std::chrono::system_clock>;
 
+bool batching = true;
+int batch_size = 10;
 
 namespace caf {
 
@@ -91,24 +93,12 @@ behavior job_actor(stateful_actor<job_state>* self,
 
         auto& gru_container = self->state.gru_container;
         
-        // Spawn GRUs
-        for (int i = 0; i < gru_container.num_gru_in_run_domain; i++) {
-          auto global_gru_index = gru_container.gru_list.size() 
-                                          + self->state.start_gru;
-          auto local_gru_index = gru_container.gru_list.size() + 1;                                
-          
-          auto gru = self->spawn(hru_actor,global_gru_index, local_gru_index,               
-                                 self->state.hru_actor_settings,                                
-                                 self->state.file_access_actor, self);
-
-          // Create the GRU object (Job uses this to keep track of GRU status)
-          gru_container.gru_list.push_back(new GRU(global_gru_index, 
-                  local_gru_index, gru, 
-                  self->state.dt_init_start_factor, 
-                  self->state.hru_actor_settings.rel_tol,
-                  self->state.hru_actor_settings.abs_tol,
-                  self->state.max_run_attempts));  
-        }
+        // Spawn HRUs in batches or individually
+        if (batching)
+          spawnHRUBatches(self);
+        else
+          spawnHRUActors(self, false);
+        
 
         aout(self) << "GRUs Initialized\n";
         self->send(self->state.file_access_actor, access_forcing_v, 
@@ -118,7 +108,7 @@ behavior job_actor(stateful_actor<job_state>* self,
         // # Normal Mode
         // #####################################################
         aout(self) << "Job_Actor: Normal Mode\n";
-        self->send(self, init_normal_mode_v);
+        spawnHRUActors(self, true);
       }
     },
 
@@ -149,7 +139,8 @@ behavior job_actor(stateful_actor<job_state>* self,
     [=](done_update){
       self->state.num_gru_done_timestep++;
       
-      if (self->state.num_gru_done_timestep >= self->state.num_gru) {
+      if (self->state.num_gru_done_timestep >= 
+          self->state.gru_container.gru_list.size()) {
         aout(self) << "Job_Actor: Done Update for timestep:" 
                    << self->state.timestep << "\n";
         // write the output
@@ -203,38 +194,6 @@ behavior job_actor(stateful_actor<job_state>* self,
     // #####################################################
     // # Normal Mode Start
     // #####################################################
-    [=](init_normal_mode) {
-      auto& gru_container = self->state.gru_container;
-
-      gru_container.gru_start_time = std::chrono::high_resolution_clock::now();
-      gru_container.run_attempts_left = self->state.max_run_attempts;
-      gru_container.run_attempts_left--;
-
-
-      // Spawn the GRUs
-      for(int i = 0; i < gru_container.num_gru_in_run_domain; i++) {
-        auto global_gru_index = gru_container.gru_list.size() 
-                                + self->state.start_gru;
-        auto local_gru_index = gru_container.gru_list.size() + 1; 
-
-        auto gru = self->spawn(hru_actor, global_gru_index, local_gru_index,               
-                               self->state.hru_actor_settings,                                
-                               self->state.file_access_actor, self);
-
-        // Create the GRU object (Job uses this to keep track of GRU status)
-        gru_container.gru_list.push_back(
-            new GRU(global_gru_index, local_gru_index, gru, 
-                    self->state.dt_init_start_factor, 
-                    self->state.hru_actor_settings.rel_tol,
-                    self->state.hru_actor_settings.abs_tol,
-                    self->state.max_run_attempts));  
-
-        // Start the HRU_Actor to compute all timesteps asynchonously
-        self->send(gru, update_hru_async_v);  
-      }
-    }, // end init_gru
-
-
     [=](done_hru, int local_gru_index) {
       auto& gru_container = self->state.gru_container;
       using namespace std::chrono;
@@ -271,141 +230,205 @@ behavior job_actor(stateful_actor<job_state>* self,
 
     },
 
-      [=](restart_failures) {
-        aout(self) << "Job_Actor: Restarting GRUs that Failed\n";
+    [=](restart_failures) {
+      aout(self) << "Job_Actor: Restarting GRUs that Failed\n";
 
-        self->state.gru_container.num_gru_done = 0;
-        self->state.gru_container.num_gru_in_run_domain = self->state.gru_container.num_gru_failed;
-        self->state.gru_container.num_gru_failed = 0;
+      self->state.gru_container.num_gru_done = 0;
+      self->state.gru_container.num_gru_in_run_domain = self->state.gru_container.num_gru_failed;
+      self->state.gru_container.num_gru_failed = 0;
 
-        self->send(self->state.file_access_actor, restart_failures_v); // notify file_access_actor
+      self->send(self->state.file_access_actor, restart_failures_v); // notify file_access_actor
 
-        // Set Sundials tolerance or decrease timestep length
-        if (self->state.hru_actor_settings.rel_tol > 0 && 
-          self->state.hru_actor_settings.abs_tol > 0) {
-          self->state.hru_actor_settings.rel_tol /= 10;
-          self->state.hru_actor_settings.abs_tol /= 10;
-        } else {
-          self->state.hru_actor_settings.dt_init_factor *= 2;
-        }
+      // Set Sundials tolerance or decrease timestep length
+      if (self->state.hru_actor_settings.rel_tol > 0 && 
+        self->state.hru_actor_settings.abs_tol > 0) {
+        self->state.hru_actor_settings.rel_tol /= 10;
+        self->state.hru_actor_settings.abs_tol /= 10;
+      } else {
+        self->state.hru_actor_settings.dt_init_factor *= 2;
+      }
 
 
-        for(auto GRU : self->state.gru_container.gru_list) {
-          if(GRU->isFailed()) {
-            GRU->setRunning();
-            GRU->decrementAttemptsLeft();
-            auto global_gru_index = GRU->getGlobalGRUIndex();
-            auto local_gru_index = GRU->getLocalGRUIndex();
-            auto gru_actor = self->spawn(hru_actor, 
-                                          global_gru_index, 
-                                          local_gru_index, 
-                                          self->state.hru_actor_settings,
-                                          self->state.file_access_actor, 
-                                          self);
-            self->state.gru_container.gru_list[local_gru_index-1]->setGRUActor(gru_actor);
-          }
+      for(auto GRU : self->state.gru_container.gru_list) {
+        if(GRU->isFailed()) {
+          GRU->setRunning();
+          GRU->decrementAttemptsLeft();
+          auto global_gru_index = GRU->getGlobalGRUIndex();
+          auto local_gru_index = GRU->getLocalGRUIndex();
+          auto gru_actor = self->spawn(hru_actor, 
+                                        global_gru_index, 
+                                        local_gru_index, 
+                                        self->state.hru_actor_settings,
+                                        self->state.file_access_actor, 
+                                        self);
+          self->state.gru_container.gru_list[local_gru_index-1]->setGRUActor(gru_actor);
         }
-      },
+      }
+    },
 
-      [=](finalize) {            
-        std::vector<serializable_netcdf_gru_actor_info> 
-            netcdf_gru_info = getGruNetcdfInfo(
-              self->state.max_run_attempts,self->state.gru_container.gru_list);
-          
-              
-        self->state.num_gru_failed = std::count_if(netcdf_gru_info.begin(), 
-            netcdf_gru_info.end(), [](auto& gru_info) {
-          return !gru_info.successful;
-        });
-
-        self->request(self->state.file_access_actor, infinite, finalize_v).await(
-          [=](std::tuple<double, double> read_write_duration) {
-            int err = 0;
-            for (auto GRU : self->state.gru_container.gru_list) {
-              delete GRU;
-            }
-            self->state.gru_container.gru_list.clear();
-
-            self->state.job_timing.updateEndPoint("total_duration");
-
-            aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n"
-                        << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n"
-                        << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n"
-                        << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n"
-                        << "________________________________________________________________________\n\n";
-
-            deallocateJobActor(&err);
-
-              // Tell Parent we are done
-            self->send(self->state.parent, 
-                        done_job_v, 
-                        self->state.num_gru_failed, 
-                        self->state.job_timing.getDuration("total_duration").value_or(-1.0),
-                        std::get<0>(read_write_duration), 
-                        std::get<1>(read_write_duration));
-            self->quit();
-
-        });
-      },
-
-      // Handle Sundials Error
-      [=](err_atom, caf::actor src, double rtol, double atol) {
-        self->state.hru_actor_settings.rel_tol = rtol;
-        self->state.hru_actor_settings.abs_tol = atol;
-        handleGRUError(self, src);
-      },
-
-      [=](const error& err, caf::actor src) {
+    [=](finalize) {            
+      std::vector<serializable_netcdf_gru_actor_info> 
+          netcdf_gru_info = getGruNetcdfInfo(
+            self->state.max_run_attempts,self->state.gru_container.gru_list);
         
-        aout(self) << "\n\n ********** ERROR HANDLER \n";
+            
+      self->state.num_gru_failed = std::count_if(netcdf_gru_info.begin(), 
+          netcdf_gru_info.end(), [](auto& gru_info) {
+        return !gru_info.successful;
+      });
+
+      self->request(self->state.file_access_actor, infinite, finalize_v).await(
+        [=](std::tuple<double, double> read_write_duration) {
+          int err = 0;
+          for (auto GRU : self->state.gru_container.gru_list) {
+            delete GRU;
+          }
+          self->state.gru_container.gru_list.clear();
+
+          self->state.job_timing.updateEndPoint("total_duration");
+
+          aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n"
+                      << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n"
+                      << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n"
+                      << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n"
+                      << "________________________________________________________________________\n\n";
+
+          deallocateJobActor(&err);
+
+            // Tell Parent we are done
+          self->send(self->state.parent, 
+                      done_job_v, 
+                      self->state.num_gru_failed, 
+                      self->state.job_timing.getDuration("total_duration").value_or(-1.0),
+                      std::get<0>(read_write_duration), 
+                      std::get<1>(read_write_duration));
+          self->quit();
+
+      });
+    },
+
+    // Handle Sundials Error
+    [=](err_atom, caf::actor src, double rtol, double atol) {
+      self->state.hru_actor_settings.rel_tol = rtol;
+      self->state.hru_actor_settings.abs_tol = atol;
+      handleGRUError(self, src);
+    },
+
+    [=](const error& err, caf::actor src) {
+      
+      aout(self) << "\n\n ********** ERROR HANDLER \n";
+      
+      switch(err.category()) {
         
-        switch(err.category()) {
-          
-          case type_id_v<hru_error>:
-            aout(self) << "HRU Error: " << to_string(err) << "\n";
-            handleGRUError(self, src);
-
-            break;
-          case type_id_v<file_access_error>:
-            if (err == file_access_error::mDecisions_error) {
-              aout(self) << "Check mDecisions File For Correctness";
-            } else {
-              aout(self) << "File Access Error: " << to_string(err) << "No Handling Implemented\n";
-            }
-            for (auto GRU : self->state.gru_container.gru_list) {
-              self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown);
-            }
-            self->quit();
-            break;
-          default:
-            aout(self) << "Unknown Error: " << to_string(err) << "\n";
-            break;
-        }
-      },
+        case type_id_v<hru_error>:
+          aout(self) << "HRU Error: " << to_string(err) << "\n";
+          handleGRUError(self, src);
+
+          break;
+        case type_id_v<file_access_error>:
+          if (err == file_access_error::mDecisions_error) {
+            aout(self) << "Check mDecisions File For Correctness";
+          } else {
+            aout(self) << "File Access Error: " << to_string(err) << "No Handling Implemented\n";
+          }
+          for (auto GRU : self->state.gru_container.gru_list) {
+            self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown);
+          }
+          self->quit();
+          break;
+        default:
+          aout(self) << "Unknown Error: " << to_string(err) << "\n";
+          break;
+      }
+    },
   };
 }
 
-std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, std::vector<GRU*> &gru_list) {
 
-    std::vector<serializable_netcdf_gru_actor_info> gru_netcdf_info;
-    for(auto gru : gru_list) {
+void spawnHRUActors(stateful_actor<job_state>* self, bool normal_mode) {
+  auto& gru_container = self->state.gru_container;
+  gru_container.gru_start_time = std::chrono::high_resolution_clock::now();
+  gru_container.run_attempts_left = self->state.max_run_attempts;
+  gru_container.run_attempts_left--;
+
+  for (int i = 0; i < gru_container.num_gru_in_run_domain; i++) {
+    auto global_gru_index = gru_container.gru_list.size() 
+                            + self->state.start_gru;
+    auto local_gru_index = gru_container.gru_list.size() + 1;                                
+
+    auto gru = self->spawn(hru_actor, global_gru_index, local_gru_index,               
+                           self->state.hru_actor_settings,                                
+                           self->state.file_access_actor, self);
+
+    // Create the GRU object (Job uses this to keep track of GRU status)
+    gru_container.gru_list.push_back(new GRU(global_gru_index, 
+                                     local_gru_index, gru, 
+                                     self->state.dt_init_start_factor, 
+                                     self->state.hru_actor_settings.rel_tol,
+                                     self->state.hru_actor_settings.abs_tol,
+                                     self->state.max_run_attempts));  
+    
+    if (normal_mode) self->send(gru, update_hru_async_v);
+  }                        
+          
+}
 
-        serializable_netcdf_gru_actor_info gru_info;
-        gru_info.run_time = gru->getRunTime();
-        gru_info.init_duration = gru->getInitDuration();
-        gru_info.forcing_duration = gru->getForcingDuration();
-        gru_info.run_physics_duration = gru->getRunPhysicsDuration();
-        gru_info.write_output_duration = gru->getWriteOutputDuration();
-        
-        gru_info.num_attempts = max_run_attempts - gru->getAttemptsLeft() + 1;
-        gru_info.successful = is_success(gru->getStatus());
-        gru_info.rel_tol = gru->getRelTol();
-        gru_info.abs_tol = gru->getAbsTol();
+void spawnHRUBatches(stateful_actor<job_state>* self) {
+  auto& gru_container = self->state.gru_container;
+  gru_container.gru_start_time = std::chrono::high_resolution_clock::now();
+  gru_container.run_attempts_left = self->state.max_run_attempts;
+  gru_container.run_attempts_left--;
+
+  int remaining_hru_to_batch = gru_container.num_gru_in_run_domain;
+  int start_hru_global = self->state.start_gru;
+  int start_hru_local = 1;
+
+  while (remaining_hru_to_batch > 0) {
+    int current_batch_size = std::min(batch_size, remaining_hru_to_batch);
+    auto gru_batch = self->spawn(hru_batch_actor, start_hru_local,
+                                 start_hru_global, current_batch_size,
+                                  self->state.hru_actor_settings,
+                                 self->state.file_access_actor, self);
 
-        gru_netcdf_info.push_back(gru_info);
+    gru_container.gru_list.push_back(new GRU(start_hru_global, 
+                                    start_hru_local, gru_batch, 
+                                    self->state.dt_init_start_factor, 
+                                    self->state.hru_actor_settings.rel_tol,
+                                    self->state.hru_actor_settings.abs_tol,
+                                    self->state.max_run_attempts));  
 
-    }
-    return gru_netcdf_info;
+    remaining_hru_to_batch -= current_batch_size;
+    start_hru_local += current_batch_size;
+    start_hru_global += current_batch_size;
+  }
+}
+
+
+
+
+
+
+std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(
+    int max_run_attempts, std::vector<GRU*> &gru_list) {
+  std::vector<serializable_netcdf_gru_actor_info> gru_netcdf_info;
+  
+  for(auto gru : gru_list) {
+    serializable_netcdf_gru_actor_info gru_info;
+    gru_info.run_time = gru->getRunTime();
+    gru_info.init_duration = gru->getInitDuration();
+    gru_info.forcing_duration = gru->getForcingDuration();
+    gru_info.run_physics_duration = gru->getRunPhysicsDuration();
+    gru_info.write_output_duration = gru->getWriteOutputDuration();
+    
+    gru_info.num_attempts = max_run_attempts - gru->getAttemptsLeft() + 1;
+    gru_info.successful = is_success(gru->getStatus());
+    gru_info.rel_tol = gru->getRelTol();
+    gru_info.abs_tol = gru->getAbsTol();
+
+    gru_netcdf_info.push_back(gru_info);
+  }
+  
+  return gru_netcdf_info;
 }
 
 
diff --git a/build/source/actors/summa_actor/summa_actor.cpp b/build/source/actors/summa_actor/summa_actor.cpp
index 5327808..9e378d6 100644
--- a/build/source/actors/summa_actor/summa_actor.cpp
+++ b/build/source/actors/summa_actor/summa_actor.cpp
@@ -12,55 +12,6 @@
 #include <netcdf.h>
 
 using json = nlohmann::json;
-// Helper function to extract the information from the file_manager
-std::string extractEnclosed(const std::string& line) {
-    std::size_t first_quote = line.find_first_of("'");
-    std::size_t last_quote = line.find_last_of("'");
-    if (first_quote != std::string::npos && last_quote != std::string::npos 
-        && first_quote < last_quote) {
-        return line.substr(first_quote + 1, last_quote - first_quote - 1);
-    }
-    return "";
-}
-
-// Check the number of GRUs in the attribute file
-int getNumGRUInFile(const std::string &file_manager) {
-  std::ifstream file(file_manager);
-  std::string attributeFile, settingPath;
-  if (!file.is_open())
-    return -1;
-  
-  std::string line;
-  while (std::getline(file, line)) {
-    if (line.compare(0, 13, "attributeFile") == 0)
-      attributeFile = extractEnclosed(line);
-    if (line.compare(0, 12, "settingsPath") == 0)
-      settingPath = extractEnclosed(line);
-  }
-
-  file.close();
-
-  size_t fileGRU = -1;
-  int ncid, gru_dim;
-  if (attributeFile.empty() || settingPath.empty())
-    return fileGRU;
-  
-  std::string combined = settingPath + attributeFile;
-
-  if (NC_NOERR != nc_open(combined.c_str(), NC_NOWRITE, &ncid))
-    return fileGRU;
-  if (NC_NOERR != nc_inq_dimid(ncid, "gru", &gru_dim)) {
-    nc_close(ncid);
-    return -1;
-  }
-  if (NC_NOERR != nc_inq_dimlen(ncid, gru_dim, &fileGRU)) {
-    nc_close(ncid);
-    return -1;
-  }
-  nc_close(ncid);
-  return fileGRU;
-}
-
 
 namespace caf {
 behavior summa_actor(stateful_actor<summa_actor_state>* self, 
@@ -100,17 +51,18 @@ behavior summa_actor(stateful_actor<summa_actor_state>* self,
       self->state.numGRU, 
       self->state.summa_actor_settings.max_gru_per_job);
   
-  aout(self) << "Starting SUMMA With " << 
-      self->state.batch_container.getBatchesRemaining() << " Batches\n";
-  aout(self) << "###################################################\n"
-              << self->state.batch_container.getBatchesAsString()
-              << "###################################################\n";
+  aout(self) << "Starting SUMMA With "
+             << self->state.batch_container.getBatchesRemaining() 
+             << " Batches\n"
+             << "###################################################\n"
+             << self->state.batch_container.getBatchesAsString()
+            << "###################################################\n";
 
   std::optional<Batch> batch = 
       self->state.batch_container.getUnsolvedBatch();
   if (!batch.has_value()) {
     aout(self) << "ERROR--Summa_Actor: No Batches To Solve\n";
-    self->quit();
+    self->quit(); return {};
   } 
   self->state.current_batch_id = batch->getBatchID();
   aout(self) << "Starting Batch " << self->state.current_batch_id + 1 << "\n";
@@ -189,3 +141,54 @@ void spawnJob(stateful_actor<summa_actor_state>* self) {
 }
 
 } // end namespace
+
+
+std::string extractEnclosed(const std::string& line) {
+  std::size_t first_quote = line.find_first_of("'");
+  std::size_t last_quote = line.find_last_of("'");
+  if (first_quote != std::string::npos && last_quote != std::string::npos 
+      && first_quote < last_quote) {
+    return line.substr(first_quote + 1, last_quote - first_quote - 1);
+  }
+  return "";
+}
+
+int getNumGRUInFile(const std::string &file_manager) {
+  std::ifstream file(file_manager);
+  std::string attributeFile, settingPath;
+  if (!file.is_open())
+    return -1;
+  
+  std::string line;
+  while (std::getline(file, line)) {
+    if (line.compare(0, 13, "attributeFile") == 0)
+      attributeFile = extractEnclosed(line);
+    if (line.compare(0, 12, "settingsPath") == 0)
+      settingPath = extractEnclosed(line);
+  }
+
+  file.close();
+
+  size_t fileGRU = -1;
+  int ncid, gru_dim;
+  if (attributeFile.empty() || settingPath.empty())
+    return fileGRU;
+  
+  std::string combined = settingPath + attributeFile;
+
+  if (NC_NOERR != nc_open(combined.c_str(), NC_NOWRITE, &ncid))
+    return fileGRU;
+  if (NC_NOERR != nc_inq_dimid(ncid, "gru", &gru_dim)) {
+    nc_close(ncid);
+    return -1;
+  }
+  if (NC_NOERR != nc_inq_dimlen(ncid, gru_dim, &fileGRU)) {
+    nc_close(ncid);
+    return -1;
+  }
+  nc_close(ncid);
+  return fileGRU;
+}
+
+
+
diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp
index 6f11d6c..f2b086e 100644
--- a/build/source/actors/summa_actor/summa_client.cpp
+++ b/build/source/actors/summa_actor/summa_client.cpp
@@ -1,44 +1,32 @@
 #include "summa_client.hpp"
 namespace caf {
 
-// behavior summa_client_init(stateful_actor<summa_client_state>* self) {
-//   // aout(self) << "Client Has Started Successfully" << std::endl;
-
-//   char host[HOST_NAME_MAX];
-//   gethostname(host, HOST_NAME_MAX);
-//   self->state.hostname = host;
-//   self->set_down_handler([=](const down_msg& dm){
-//     if(dm.source == self->state.current_server) {
-//       aout(self) << "*** Lost Connection to Server" << std::endl;
-//       self->state.current_server = nullptr;
-//       // try to connect to new server
-//       if (self->state.backup_servers_list.size() > 0) {
-//           aout(self) << "Trying to connect to backup server\n";
-//           std::this_thread::sleep_for(std::chrono::seconds(3)); 
-//           // TODO: Not obvious where the code goes from here. 
-//           connecting(self, std::get<1>(self->state.backup_servers_list[0]), 
-//                      self->state.port);
-
-//       } else {
-//           aout(self) << "No backup servers available" << std::endl;
-//       }
-//     }
-//   });
-
-//   return {
-//     [=] (connect_atom, const std::string& host, uint16_t port) {
-//       aout(self) << "Received a connect request while not running\n";
-//       connecting(self, host, port);
-//     }
-//   };
-// }
-
- 
 behavior summa_client(stateful_actor<summa_client_state>* self,
                       Distributed_Settings distributed_settings) {
-
   self->state.running = true;
   self->state.distributed_settings = distributed_settings;
+  char host[HOST_NAME_MAX];
+  gethostname(host, HOST_NAME_MAX);
+  self->state.hostname = host;
+
+  self->set_down_handler([=](const down_msg& dm){
+    if(dm.source == self->state.current_server) {
+      aout(self) << "*** Lost Connection to Server" << std::endl;
+      self->state.current_server = nullptr;
+      // try to connect to new server
+      if (self->state.backup_servers_list.size() > 0) {
+        aout(self) << "Trying to connect to backup server\n";
+        std::this_thread::sleep_for(std::chrono::seconds(3)); 
+        // TODO: Not obvious where the code goes from here. 
+        // connecting(self, std::get<1>(self->state.backup_servers_list[0]), 
+        //            self->state.port);
+
+      } else {
+        aout(self) << "No backup servers available" << std::endl;
+      }
+    }
+  });
+
   for (auto host : distributed_settings.servers_list) {
     auto server = self->system().middleman().remote_actor(host, 
                                            distributed_settings.port);
-- 
GitLab