From 8480aa56b815d58ee14929ee4923bc349798e99b Mon Sep 17 00:00:00 2001
From: KyleKlenk <kyle.c.klenk@gmail.com>
Date: Wed, 13 Jul 2022 14:30:53 -0600
Subject: [PATCH] can run and output results

---
 build/includes/summa_actor/batch_manager.hpp  |  2 +
 build/includes/summa_actor/summa_server.hpp   |  3 +-
 build/module_load.sh                          |  4 +-
 build/source/actors/job_actor/job_actor.cpp   | 15 +++-
 .../actors/summa_actor/batch_manager.cpp      | 15 ++++
 .../actors/summa_actor/summa_client.cpp       |  7 +-
 .../actors/summa_actor/summa_server.cpp       | 81 ++++++++++++++-----
 7 files changed, 98 insertions(+), 29 deletions(-)

diff --git a/build/includes/summa_actor/batch_manager.hpp b/build/includes/summa_actor/batch_manager.hpp
index 2e8f46d..062cd06 100644
--- a/build/includes/summa_actor/batch_manager.hpp
+++ b/build/includes/summa_actor/batch_manager.hpp
@@ -42,6 +42,8 @@ class Batch {
         void assignedBatch(std::string hostname, caf::actor actor_ref);
 
         void updateRunTime(double run_time);
+
+        void writeBatchToFile(std::string file_name);
 };
 
 
diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp
index 83d4fee..2a082b3 100644
--- a/build/includes/summa_actor/summa_server.hpp
+++ b/build/includes/summa_actor/summa_server.hpp
@@ -21,9 +21,10 @@ struct summa_server_state {
     std::vector<Batch> solved_batches;
     std::vector<Batch> failed_batches;
     std::vector<Client> client_list;
+    std::string csv_output_name;
 };
 
 behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path);
 int assembleBatches(stateful_actor<summa_server_state>* self);
-std::optional<Batch> getUnsolvedBatch(stateful_actor<summa_server_state>* self);
+std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self);
 }
\ No newline at end of file
diff --git a/build/module_load.sh b/build/module_load.sh
index 6359abc..041d0e1 100755
--- a/build/module_load.sh
+++ b/build/module_load.sh
@@ -4,4 +4,6 @@
 module load gcc/9.3.0
 module load netcdf-fortran
 module load openblas
-module load caf
\ No newline at end of file
+module load caf
+
+export LD_LIBRARY_PATH=/globalhome/kck540/HPC/SummaProjects/Summa-Actors/bin
\ No newline at end of file
diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp
index 11381b8..1fb5f5e 100644
--- a/build/source/actors/job_actor/job_actor.cpp
+++ b/build/source/actors/job_actor/job_actor.cpp
@@ -170,9 +170,10 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
         },
 
         [=](file_access_actor_err, std::string function) {
-            aout(self) << "Failure in File Access Actor in function" << function << "\n";
+            aout(self) << "Failure in File Access Actor in function: " << function << "\n";
             if (function == "def_output") {
                 aout(self) << "Error with the output file, will try creating it agian\n";
+                std::this_thread::sleep_for(std::chrono::seconds(5));
                 self->state.file_access_actor = self->spawn(file_access_actor, self->state.startGRU, self->state.numGRU, 
                     self->state.outputStrucSize, self->state.configPath, self);
             } else {
@@ -197,9 +198,15 @@ void initJob(stateful_actor<job_state>* self) {
         self->state.successOutputFile = self->state.csvPath += success += 
             std::to_string(self->state.startGRU) += ".csv";
         file.open(self->state.successOutputFile, std::ios_base::out);
-        file << "GRU" << "," << "totalDuration" << "," << "initDuration" << "," << 
-                    "forcingDuration" << "," << "runPhysicsDuration" << "," << "writeOutputDuration" << 
-                    "," << "dt_init" << "," << "numAttemtps" << "\n";
+        file << 
+            "GRU,"                 << 
+            "totalDuration,"       <<
+            "initDuration,"        << 
+            "forcingDuration,"     << 
+            "runPhysicsDuration,"  << 
+            "writeOutputDuration," << 
+            "dt_init,"             << 
+            "numAttemtps\n";
         file.close();
     }
 
diff --git a/build/source/actors/summa_actor/batch_manager.cpp b/build/source/actors/summa_actor/batch_manager.cpp
index 5fa2677..85beab3 100644
--- a/build/source/actors/summa_actor/batch_manager.cpp
+++ b/build/source/actors/summa_actor/batch_manager.cpp
@@ -50,4 +50,19 @@ void Batch::updateRunTime(double run_time) {
     this->run_time = run_time;
 }
 
+void Batch::writeBatchToFile(std::string file_name) {
+    std::ofstream output_file;
+    output_file.open(file_name, std::ios_base::app);
+    output_file <<
+        this->batch_id      << "," <<
+        this->start_hru     << "," << 
+        this->num_hru       << "," << 
+        this->assigned_host << "," <<
+        this->run_time      << "," << 
+        this->read_time     << "," <<
+        this->write_time    << "," <<
+        this->status        << "\n";
+    output_file.close();
+}
+
 
diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp
index 1ef8231..522b510 100644
--- a/build/source/actors/summa_actor/summa_client.cpp
+++ b/build/source/actors/summa_actor/summa_client.cpp
@@ -73,7 +73,12 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a
     self->send(server_actor, connect_to_server_v, self, self->state.hostname);
     return {
         [=](batch, int client_id, int batch_id, int start_hru, int num_hru, std::string config_path) {
-            aout(self) << "Received batch to compute" << std::endl;
+            aout(self) << "\nReceived batch to compute" << "\n";
+            aout(self) << "BatchID = " << batch_id << "\n";
+            aout(self) << "Start HRU = " << start_hru << "\n";
+            aout(self) << "Num HRU = " << num_hru << "\n";
+
+
             self->state.client_id = client_id;
             self->state.batch_id = batch_id;
             self->state.summa_actor_ref = self->spawn(summa_actor, start_hru, num_hru, config_path, self);
diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp
index 814a107..fec6fd0 100644
--- a/build/source/actors/summa_actor/summa_server.cpp
+++ b/build/source/actors/summa_actor/summa_server.cpp
@@ -12,11 +12,11 @@ namespace caf {
 
 behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path) {
     aout(self) << "Summa Server has Started \n";
-    std::string returnType;
-    getSettingsTest(std::vector<std::string> {"test", "test2"} ,returnType);
-
-
     self->state.config_path = config_path;
+    // std::string returnType;
+    // getSettingsTest(std::vector<std::string> {"test", "test2"} ,returnType);
+
+    // --------------------------Initalize Settings --------------------------
     self->state.total_hru_count = getSettings(self->state.config_path, "SimulationSettings", "total_hru_count", 
 		self->state.total_hru_count).value_or(-1);
     if (self->state.total_hru_count == -1) {
@@ -27,7 +27,25 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
     if (self->state.num_hru_per_batch == -1) {
         aout(self) << "ERROR: With num_hru_per_batch - CHECK Summa_Actors_Settings.json\n";
     }
-
+    // --------------------------Initalize Settings --------------------------
+
+    // -------------------------- Initalize CSV ------------------------------
+    std::ofstream csv_output;
+    self->state.csv_output_name = "Batch_Results.csv";
+    csv_output.open(self->state.csv_output_name, std::ios_base::out);
+    csv_output << 
+        "Batch_ID,"  <<
+        "Start_HRU," <<
+        "Num_HRU,"   << 
+        "Hostname,"  <<
+        "Run_Time,"  <<
+        "Read_Time," <<
+        "Write_Time,"<<
+        "Status\n";
+    csv_output.close();
+    // -------------------------- Initalize CSV ------------------------------
+
+    // -------------------------- Assemble Batches ---------------------------
     aout(self) << "Assembling HRUs into Batches\n";
     if (assembleBatches(self) == -1) {
         aout(self) << "ERROR: assembleBatches\n";
@@ -38,6 +56,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
             self->state.batch_list[i].printBatchInfo();
         }
     }
+    // -------------------------- Assemble Batches ---------------------------
 
     return {
         [=](connect_to_server, actor client, std::string hostname) {
@@ -46,14 +65,24 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
             int client_id = self->state.client_list.size(); // So we can lookup the client in O(1) time 
             self->state.client_list.push_back(Client(client_id, client, hostname));
 
-            std::optional<Batch> batch_to_send = getUnsolvedBatch(self);
-            if (batch_to_send.has_value()) {
-                Batch verified_batch = batch_to_send.value();
-                verified_batch.assignedBatch(hostname, client);
-                self->send(client, batch_v, client_id, batch_to_send->getBatchID(), batch_to_send->getStartHRU(), 
-                    batch_to_send->getNumHRU(), self->state.config_path);
+            std::optional<int> batch_id = getUnsolvedBatchID(self);
+            if (batch_id.has_value()) {
+                // update the batch in the batch list with the host and actor_ref
+                self->state.batch_list[batch_id.value()].assignedBatch(self->state.client_list[client_id].getHostname(), client);
+                
+                int start_hru = self->state.batch_list[batch_id.value()].getStartHRU();
+                int num_hru = self->state.batch_list[batch_id.value()].getNumHRU();
+
+                self->send(client, 
+                    batch_v, 
+                    client_id, 
+                    batch_id.value(), 
+                    start_hru, 
+                    num_hru, 
+                    self->state.config_path);
 
             } else {
+
                 aout(self) << "We Are Done - Telling Clients to exit \n";
                 for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) {
                     self->send(self->state.client_list[i].getActor(), time_to_exit_v);
@@ -67,6 +96,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
             double total_read_duration, double total_write_duration) {
             
             self->state.batch_list[batch_id].solvedBatch(total_duration, total_read_duration, total_write_duration);
+            self->state.batch_list[batch_id].writeBatchToFile(self->state.csv_output_name);
             self->state.batches_solved++;
             self->state.batches_remaining = self->state.batch_list.size() - self->state.batches_solved;
 
@@ -81,12 +111,22 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
             aout(self) << "****************************************\n";
 
             // Find a new batch
-            std::optional<Batch> batch_to_send = getUnsolvedBatch(self);
-            if (batch_to_send.has_value()) {
-                Batch verified_batch = batch_to_send.value();
-                verified_batch.assignedBatch(self->state.client_list[client_id].getHostname(), client);
-                self->send(client, batch_v, client_id, verified_batch.getBatchID(), verified_batch.getStartHRU(), 
-                    verified_batch.getNumHRU(), self->state.config_path);
+            std::optional<int> new_batch_id = getUnsolvedBatchID(self);
+             if (new_batch_id.has_value()) {
+                // update the batch in the batch list with the host and actor_ref
+                self->state.batch_list[new_batch_id.value()].assignedBatch(self->state.client_list[client_id].getHostname(), client);
+            
+                int start_hru = self->state.batch_list[new_batch_id.value()].getStartHRU();
+                int num_hru = self->state.batch_list[new_batch_id.value()].getNumHRU();
+
+                self->send(client, 
+                    batch_v, 
+                    client_id, 
+                    new_batch_id.value(), 
+                    start_hru, 
+                    num_hru, 
+                    self->state.config_path);
+
             } else {
                 aout(self) << "We Are Done - Telling Clients to exit \n";
                 for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) {
@@ -121,17 +161,14 @@ int assembleBatches(stateful_actor<summa_server_state>* self) {
     return 0;
 }
 
-std::optional<Batch> getUnsolvedBatch(stateful_actor<summa_server_state>* self) {
-
+std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self) {
     // Find the first unassigned batch
     for (std::vector<int>::size_type i = 0; i < self->state.batch_list.size(); i++) {
         if (self->state.batch_list[i].getBatchStatus() == unassigned) {
-            return self->state.batch_list[i];
+            return i;
         }
     }
-
     return {};
-
 }
 
 } // end namespace
-- 
GitLab