From 8c988881821434adfa2cd67dc3d874409af7f3cd Mon Sep 17 00:00:00 2001
From: KyleKlenk <kyle.c.klenk@gmail.com>
Date: Mon, 18 Mar 2024 16:41:06 -0600
Subject: [PATCH] Fix compilation errors and improve code structure

---
 build/includes/global/message_atoms.hpp       |  1 -
 build/includes/job_actor/job_actor.hpp        |  3 ++-
 .../actors/hru_actor/hru_batch_actor.cpp      |  2 ++
 build/source/actors/job_actor/job_actor.cpp   | 23 ++++++++++++-------
 4 files changed, 19 insertions(+), 10 deletions(-)

diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp
index b20a369..34b5d53 100644
--- a/build/includes/global/message_atoms.hpp
+++ b/build/includes/global/message_atoms.hpp
@@ -47,7 +47,6 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id)
     CAF_ADD_ATOM(summa, init_normal_mode)
     CAF_ADD_ATOM(summa, update_hru_async)
 
-
     // Sender: job_actor 
     // Reciever: summa_actor
     // Summary: job_actor finished job
diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp
index ba69073..8082439 100644
--- a/build/includes/job_actor/job_actor.hpp
+++ b/build/includes/job_actor/job_actor.hpp
@@ -12,6 +12,7 @@
 #include "file_access_actor.hpp"
 #include <unistd.h>
 #include <limits.h>
+#include <cmath>
 
 
 /*********************************************
@@ -52,7 +53,7 @@ struct job_state {
     int max_run_attempts = 1;     // Max number of attempts to solve a GRU
 
     GRU_Container gru_container;
-
+    
     // Variables for GRU monitoring
     int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
     int num_gru_done = 0;         // The number of GRUs that have completed
diff --git a/build/source/actors/hru_actor/hru_batch_actor.cpp b/build/source/actors/hru_actor/hru_batch_actor.cpp
index 52801e3..82b6d6d 100644
--- a/build/source/actors/hru_actor/hru_batch_actor.cpp
+++ b/build/source/actors/hru_actor/hru_batch_actor.cpp
@@ -34,6 +34,7 @@ behavior hru_batch_actor(stateful_actor<hru_batch_state>* self,
         self->send(hru_actor, update_hru_v, timestep, forcingstep);
       }
     },
+
     [=](done_update) {
       // aout(self) << "HRU Batch Actor - Done Update\n";
       self->state.num_done++;
@@ -42,6 +43,7 @@ behavior hru_batch_actor(stateful_actor<hru_batch_state>* self,
         self->state.num_done = 0;
       }
     },
+
     [=](exit_msg) {
       for(auto& hru_actor : self->state.hru_actors) {
         self->send_exit(hru_actor, exit_reason::user_shutdown);
diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp
index 4fb0ba9..0b7f6f9 100644
--- a/build/source/actors/job_actor/job_actor.cpp
+++ b/build/source/actors/job_actor/job_actor.cpp
@@ -3,8 +3,6 @@
 using json = nlohmann::json;
 using chrono_time = std::chrono::time_point<std::chrono::system_clock>;
 
-int batch_size = 20;
-
 namespace caf {
 
 // First Actor that is spawned that is not the Coordinator Actor.
@@ -25,7 +23,7 @@ behavior job_actor(stateful_actor<job_state>* self,
       aout(self) << "\n\n ********** EXIT HANDLER ********** \n";
       aout(self) << "Exit Reason: " << to_string(em.reason) << "\n";
   });
-    
+
 
   // Timing Information
   self->state.job_timing = TimingInfo(); 
@@ -69,7 +67,6 @@ behavior job_actor(stateful_actor<job_state>* self,
     aout(self) << "\nERROR: Job_Actor - job_init_fortran\n"; 
     return {};
   }
-  
 
   // Spawn the file_access_actor.
   self->state.file_access_actor = self->spawn(file_access_actor, 
@@ -154,13 +151,11 @@ behavior job_actor(stateful_actor<job_state>* self,
               aout(self) << "Job_Actor: Error Writing Output\n";
               for (auto GRU : self->state.gru_container.gru_list)
                 self->send(GRU->getGRUActor(), exit_msg_v);
-                // self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown);
               
               self->send_exit(self->state.file_access_actor, 
                               exit_reason::user_shutdown);
               self->quit();
             } 
-            // else {  aout(self) << "Job_Actor: Done Writing Output\n"; }
           });
 
         self->state.timestep++;
@@ -168,6 +163,8 @@ behavior job_actor(stateful_actor<job_state>* self,
 
         // Check if we are done the simulation
         if (self->state.timestep > self->state.num_steps) {
+          for (auto GRU : self->state.gru_container.gru_list)
+            GRU->setSuccess();
           aout(self) << "Job_Actor: Done Job\n";
           self->send(self, finalize_v);
         }
@@ -376,18 +373,26 @@ void spawnHRUActors(stateful_actor<job_state>* self, bool normal_mode) {
 }
 
 void spawnHRUBatches(stateful_actor<job_state>* self) {
+  int batch_size;
+
   auto& gru_container = self->state.gru_container;
   gru_container.gru_start_time = std::chrono::high_resolution_clock::now();
   gru_container.run_attempts_left = self->state.max_run_attempts;
   gru_container.run_attempts_left--;
 
+  if (self->state.job_actor_settings.batch_size == 9999) {
+    batch_size = std::ceil(gru_container.num_gru_in_run_domain / 
+                           (std::thread::hardware_concurrency() * 2));
+  } else {
+    batch_size = self->state.job_actor_settings.batch_size;
+  }
+
   int remaining_hru_to_batch = gru_container.num_gru_in_run_domain;
   int start_hru_global = self->state.start_gru;
   int start_hru_local = 1;
 
   while (remaining_hru_to_batch > 0) {
-    int current_batch_size = std::min(self->state.job_actor_settings.batch_size, 
-                                      remaining_hru_to_batch);
+    int current_batch_size = std::min(batch_size, remaining_hru_to_batch);
     auto gru_batch = self->spawn(hru_batch_actor, start_hru_local,
                                  start_hru_global, current_batch_size,
                                   self->state.hru_actor_settings,
@@ -404,6 +409,8 @@ void spawnHRUBatches(stateful_actor<job_state>* self) {
     start_hru_local += current_batch_size;
     start_hru_global += current_batch_size;
   }
+  aout(self) << "Number of HRU_Batch_Actors: " 
+             << gru_container.gru_list.size() << "\n";
 }
 
 
-- 
GitLab