From 7cff2666929e5f83895b280af9f92e5c7fec1df4 Mon Sep 17 00:00:00 2001
From: KyleKlenk <kyle.c.klenk@gmail.com>
Date: Fri, 24 Jun 2022 17:22:38 -0600
Subject: [PATCH] Client can solve multiple batches of HRUs

---
 build/includes/global/message_atoms.hpp       |  4 ++
 build/includes/summa_actor/batch_manager.hpp  | 31 +++++++++++--
 build/includes/summa_actor/client.hpp         |  7 ++-
 build/includes/summa_actor/summa_actor.hpp    |  3 +-
 build/includes/summa_actor/summa_client.hpp   |  3 ++
 build/includes/summa_actor/summa_server.hpp   |  6 ++-
 build/makefile                                |  3 +-
 build/source/actors/main.cpp                  |  6 +--
 .../actors/summa_actor/batch_manager.cpp      | 31 ++++++++++++-
 build/source/actors/summa_actor/client.cpp    | 15 +++++++
 .../source/actors/summa_actor/summa_actor.cpp | 10 ++++-
 .../actors/summa_actor/summa_client.cpp       | 18 +++++++-
 .../actors/summa_actor/summa_server.cpp       | 44 ++++++++++++++++++-
 13 files changed, 163 insertions(+), 18 deletions(-)

diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp
index bad2da8..b28f3c0 100644
--- a/build/includes/global/message_atoms.hpp
+++ b/build/includes/global/message_atoms.hpp
@@ -39,5 +39,9 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id)
     CAF_ADD_ATOM(summa, dt_init_factor)
     // Client Actor
     CAF_ADD_ATOM(summa, connect_to_server)
+    CAF_ADD_ATOM(summa, batch)
+    // Server Actor
+    CAF_ADD_ATOM(summa, done_batch)
+    CAF_ADD_ATOM(summa, time_to_exit)
 
 CAF_END_TYPE_ID_BLOCK(summa)
\ No newline at end of file
diff --git a/build/includes/summa_actor/batch_manager.hpp b/build/includes/summa_actor/batch_manager.hpp
index 30ae38f..83c6056 100644
--- a/build/includes/summa_actor/batch_manager.hpp
+++ b/build/includes/summa_actor/batch_manager.hpp
@@ -3,15 +3,23 @@
 #include <vector>
 
 
+enum batch_status {
+    unassigned,
+    assigned,
+    solved,
+    failed
+};
+
 class Batch {
     private:
         int batch_id;
         int start_hru;
         int num_hru;
-        int run_time;
-        int read_time;
-        int write_time;
+        double run_time;
+        double read_time;
+        double write_time;
         caf::actor assigned_actor;
+        batch_status status;
 
 
     public:
@@ -19,9 +27,26 @@ class Batch {
 
         void printBatchInfo();
 
+        batch_status getBatchStatus();
+
+        int getBatchID();
+
+        int getStartHRU();
+
+        int getNumHRU();
+
+        void solvedBatch();
+
+        void assignedBatch();
+
+        void updateRunTime(double run_time);
+
 };
 
 
+
+
+
 class Batch_Manager {
     private:
         std::vector<Batch> batch_list;
diff --git a/build/includes/summa_actor/client.hpp b/build/includes/summa_actor/client.hpp
index 358a18c..310fcb4 100644
--- a/build/includes/summa_actor/client.hpp
+++ b/build/includes/summa_actor/client.hpp
@@ -1,8 +1,7 @@
 #pragma once
 
 #include "caf/all.hpp"
-
-
+#include "batch_manager.hpp"
 
 
 class Client {
@@ -12,8 +11,12 @@ class Client {
         bool connected;
         caf::actor client_actor;
         std::string host_name;
+        Batch* current_batch;
 
 
     public:
+        Client(int id, caf::actor client_actor, std::string host_name);
+
+        caf::actor getActor();
 
 };
\ No newline at end of file
diff --git a/build/includes/summa_actor/summa_actor.hpp b/build/includes/summa_actor/summa_actor.hpp
index b86256f..91859eb 100644
--- a/build/includes/summa_actor/summa_actor.hpp
+++ b/build/includes/summa_actor/summa_actor.hpp
@@ -25,10 +25,11 @@ struct summa_actor_state {
     int outputStrucSize; 
 
     caf::actor currentJob;  // Reference to the current job actor
+    caf::actor parent;
 
 };
 
-behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, std::string configPath);
+behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, std::string configPath, actor parent);
 
 void spawnJob(stateful_actor<summa_actor_state>* self);
 
diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp
index b244c69..f18a25d 100644
--- a/build/includes/summa_actor/summa_client.hpp
+++ b/build/includes/summa_actor/summa_client.hpp
@@ -10,6 +10,9 @@ namespace caf {
 struct summa_client_state {
     strong_actor_ptr current_server;
     std::string hostname;
+    std::string config_path;
+    actor summa_actor_ref;
+    int batch_id;
 };
 behavior summa_client(stateful_actor<summa_client_state>* self);
 behavior unconnected(stateful_actor<summa_client_state>*);
diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp
index ee4d85c..8653e0c 100644
--- a/build/includes/summa_actor/summa_server.hpp
+++ b/build/includes/summa_actor/summa_server.hpp
@@ -3,6 +3,7 @@
 #include "caf/all.hpp"
 #include "caf/io/all.hpp"
 #include "batch_manager.hpp"
+#include "client.hpp"
 #include <string>
 
 
@@ -11,13 +12,16 @@ namespace caf {
 struct summa_server_state {
     int total_hru_count;
     int num_clients;
-    int num_hru_per_batch;   
+    int num_hru_per_batch;
+    std::string config_path;
     std::vector<Batch*> batch_list;
     std::vector<Batch> solved_batches;
     std::vector<Batch> failed_batches;
+    std::vector<Client*> client_list;
 };
 
 behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path);
 int parseSettings(stateful_actor<summa_server_state>* self, std::string config_path);
 int assembleBatches(stateful_actor<summa_server_state>* self);
+Batch* getUnsolvedBatch(stateful_actor<summa_server_state>* self);
 }
\ No newline at end of file
diff --git a/build/makefile b/build/makefile
index b61ce06..f8b31cd 100644
--- a/build/makefile
+++ b/build/makefile
@@ -255,6 +255,7 @@ SUMMA_CLIENT = $(SOURCE_DIR)/summa_actor/summa_client.cpp
 SUMMA_SERVER = $(SOURCE_DIR)/summa_actor/summa_server.cpp
 
 BATCH_MANGER = $(SOURCE_DIR)/summa_actor/batch_manager.cpp
+CLIENT_MANAGER = $(SOURCE_DIR)/summa_actor/client.cpp
 
 JOB_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/job_actor
 JOB_ACTOR = $(SOURCE_DIR)/job_actor/job_actor.cpp
@@ -340,7 +341,7 @@ compile_summa_client:
 	$(CC) $(FLAGS_ACTORS) -c $(SUMMA_CLIENT) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES)
 
 compile_summa_server:
-	$(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(BATCH_MANGER) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES)
+	$(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(BATCH_MANGER) $(CLIENT_MANAGER) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES)
 
 compile_main:
 	$(CC) $(FLAGS_ACTORS) -c $(MAIN) $(GLOBAL_INCLUDES) $(SUMMA_ACTOR_INCLUDES) $(JOB_ACTOR_INCLUDES)
diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp
index 3c4643f..9aca725 100644
--- a/build/source/actors/main.cpp
+++ b/build/source/actors/main.cpp
@@ -22,7 +22,7 @@ class config : public actor_system_config {
         std::string configPath = "";
         bool debugMode = false;
         uint16_t port = 4444;
-        std::string host = "localhost";
+        std::string host = "cnic-giws-cpu-19001-02";
         bool server_mode = false;
         bool distributed = false;
     
@@ -52,7 +52,7 @@ void run_client(actor_system& system, const config& cfg) {
 
     } else {
         aout(self) << "Starting SUMMA in non-distributed mode \n"; 
-        auto summa = system.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath);
+        auto summa = system.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath, self);
     }
    
 }
@@ -105,7 +105,7 @@ void caf_main(actor_system& sys, const config& cfg) {
         auto system = cfg.server_mode ? run_server : run_client;
         system(sys, cfg);
     } else {
-        auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath);
+        auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath, self);
     }
     // start SUMMA
     // auto system = cfg.server_mode ? run_server : run_client;
diff --git a/build/source/actors/summa_actor/batch_manager.cpp b/build/source/actors/summa_actor/batch_manager.cpp
index 495a0f0..b3d8576 100644
--- a/build/source/actors/summa_actor/batch_manager.cpp
+++ b/build/source/actors/summa_actor/batch_manager.cpp
@@ -6,10 +6,39 @@ Batch::Batch(int batch_id, int start_hru, int num_hru) {
     this->batch_id = batch_id;
     this->start_hru = start_hru;
     this->num_hru = num_hru;
+    this->status = unassigned;
 }
 
 void Batch::printBatchInfo() {
     std::cout << "batch_id: " << this->batch_id << "\n";
     std::cout << "start_hru: " << this->start_hru << "\n";
     std::cout << "num_hru: " << this->num_hru << "\n";
-}
\ No newline at end of file
+}
+
+batch_status Batch::getBatchStatus() {
+    return this->status;
+}
+
+int Batch::getBatchID() {
+    return this->batch_id;
+}
+
+int Batch::getStartHRU() {
+    return this->start_hru;
+}
+
+int Batch::getNumHRU() {
+    return this->num_hru;
+}
+
+void Batch::solvedBatch() {
+    this->status = solved;
+}
+
+void Batch::assignedBatch() {
+    this->status = assigned;
+}
+
+void Batch::updateRunTime(double run_time) {
+    this->run_time = run_time;
+}
diff --git a/build/source/actors/summa_actor/client.cpp b/build/source/actors/summa_actor/client.cpp
index e69de29..078f316 100644
--- a/build/source/actors/summa_actor/client.cpp
+++ b/build/source/actors/summa_actor/client.cpp
@@ -0,0 +1,15 @@
+#include "caf/all.hpp"
+#include "client.hpp"
+
+
+Client::Client(int id, caf::actor client_actor, std::string host_name) {
+    this->id = id;
+    this->client_actor = client_actor;
+    this->host_name = host_name;
+    this->connected = true;
+}
+
+
+caf::actor Client::getActor() {
+    return this->client_actor;
+}
\ No newline at end of file
diff --git a/build/source/actors/summa_actor/summa_actor.cpp b/build/source/actors/summa_actor/summa_actor.cpp
index 6984ef3..570872c 100644
--- a/build/source/actors/summa_actor/summa_actor.cpp
+++ b/build/source/actors/summa_actor/summa_actor.cpp
@@ -14,12 +14,13 @@ using json = nlohmann::json;
 
 namespace caf {
 
-behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, std::string configPath) {
+behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, std::string configPath, actor parent) {
  	self->state.start = std::chrono::high_resolution_clock::now();
 	// Set Variables
 	self->state.startGRU = startGRU;
 	self->state.numGRU = numGRU;
 	self->state.configPath = configPath;
+	self->state.parent = parent;
 
 	parseSettings(self, configPath);
 	aout(self) << "SETTINGS FOR SUMMA_ACTOR\n";
@@ -45,9 +46,14 @@ behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int
             	aout(self) << "     " << self->state.duration / 1000  << " Seconds\n";
             	aout(self) << "     " << (self->state.duration / 1000) / 60  << " Minutes\n";
             	aout(self) << "     " << ((self->state.duration / 1000) / 60) / 60 << " Hours\n";
-
 				aout(self) << "Program Finished \n";
 
+			
+				self->send(self->state.parent, done_batch_v, self->state.duration);
+			
+
+
+
 			} else {
 				// spawn a new job
 				spawnJob(self);
diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp
index 2ec1f4a..ab2f7ff 100644
--- a/build/source/actors/summa_actor/summa_client.cpp
+++ b/build/source/actors/summa_actor/summa_client.cpp
@@ -2,6 +2,7 @@
 #include "caf/io/all.hpp"
 
 #include "summa_client.hpp"
+#include "summa_actor.hpp"
 #include "message_atoms.hpp"
 
 #include <unistd.h>
@@ -70,9 +71,22 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a
 
     self->send(server_actor, connect_to_server_v, self, self->state.hostname);
     return {
-        [=](std::string test) {
-            aout(self) << test << std::endl;
+        [=](batch, int batch_id, int start_hru, int num_hru, std::string config_path) {
+            aout(self) << "Received batch to compute" << std::endl;
+            self->state.batch_id = batch_id;
+            self->state.summa_actor_ref = self->spawn(summa_actor, start_hru, num_hru, config_path, self);
+        },
+
+        [=](done_batch, double duration) {
+            aout(self) << "summa_actor has finished, sending message to the server for another batch\n";
+            self->send(server_actor, done_batch_v, self, duration, self->state.batch_id);
+        },
+
+        [=](time_to_exit) {
+            aout(self) << "Client Exiting\n";
+            self->quit();
         }
+        
     };
 }
 }
\ No newline at end of file
diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp
index 137b6d5..9542b92 100644
--- a/build/source/actors/summa_actor/summa_server.cpp
+++ b/build/source/actors/summa_actor/summa_server.cpp
@@ -14,7 +14,7 @@ namespace caf {
 
 behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path) {
     aout(self) << "Summa Server has Started \n";
-
+    self->state.config_path = config_path;
     if (parseSettings(self, config_path) == -1) {
         aout(self) << "ERROR WITH JSON SETTINGS FILE!!\n";
         aout(self) << "Summa_Server_Actor Exiting\n";
@@ -40,8 +40,35 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
 
     return {
         [=](connect_to_server, actor client, std::string hostname) {
-            aout(self) << "Actor Trying to connect with hostname " << hostname << "\n"; 
+            aout(self) << "Actor trying to connect with hostname " << hostname << "\n";
+            int client_id = self->state.client_list.size(); // So we can lookup the client in O(1) time 
+            self->state.client_list.push_back(new Client(client_id, client, hostname));
+            Batch *batch_to_send = getUnsolvedBatch(self);
+
+            batch_to_send->assignedBatch();
+
+            self->send(client, batch_v, batch_to_send->getBatchID(), batch_to_send->getStartHRU(), 
+                batch_to_send->getNumHRU(), self->state.config_path);
         },
+
+        [=](done_batch, actor client, double duration, int batch_id) {
+            aout(self) << "Client has Solved Batch " << batch_id << "\n";
+            self->state.batch_list[batch_id]->solvedBatch();
+
+            Batch *batch_to_send = getUnsolvedBatch(self);
+
+            if (batch_to_send == NULL) {
+                aout(self) << "We Are Done - Telling Clients to exit \n";
+                for (int i = 0; i < self->state.client_list.size(); i++) {
+                    self->send(self->state.client_list[i]->getActor(), time_to_exit_v);
+                }
+
+            } else {
+                self->send(client, batch_v, batch_to_send->getBatchID(), batch_to_send->getStartHRU(), 
+                    batch_to_send->getNumHRU(), self->state.config_path);
+            }
+
+        }
     };
 }
 
@@ -102,4 +129,17 @@ int assembleBatches(stateful_actor<summa_server_state>* self) {
     return 0;
 }
 
+Batch* getUnsolvedBatch(stateful_actor<summa_server_state>* self) {
+
+    // Find the first unassigned batch
+    for (int i = 0; i < self->state.batch_list.size(); i++) {
+        if (self->state.batch_list[i]->getBatchStatus() == unassigned) {
+            return self->state.batch_list[i];
+        }
+    }
+
+    return NULL;
+
+}
+
 } // end namespace
-- 
GitLab