From bd211a2d82acd816051d63fa7aaa14d2bc244cd6 Mon Sep 17 00:00:00 2001 From: KyleKlenk <kyle.c.klenk@gmail.com> Date: Tue, 15 Nov 2022 10:12:30 -0600 Subject: [PATCH] added array of backups to summa_actors_configuration file and tests for verifying it works --- build/includes/global/message_atoms.hpp | 2 +- build/includes/summa_actor/batch/batch.hpp | 65 ++++++ .../batch_container.hpp} | 71 +----- build/includes/summa_actor/client/client.hpp | 38 ++++ .../client_container.hpp} | 98 ++------ .../summa_actor/summa_backup_server.hpp | 7 +- build/includes/summa_actor/summa_client.hpp | 7 +- build/includes/summa_actor/summa_server.hpp | 16 +- build/makefile_sundials | 26 ++- build/source/actors/main.cpp | 10 +- .../source/actors/summa_actor/batch/batch.cpp | 100 +++++++++ .../summa_actor/batch/batch_container.cpp | 82 +++++++ .../actors/summa_actor/batch_manager.cpp | 184 --------------- build/source/actors/summa_actor/client.cpp | 209 ------------------ .../actors/summa_actor/client/client.cpp | 46 ++++ .../summa_actor/client/client_container.cpp | 132 +++++++++++ .../summa_actor/summa_backup_server.cpp | 47 ++-- .../actors/summa_actor/summa_client.cpp | 22 +- .../actors/summa_actor/summa_server.cpp | 128 ++++------- 19 files changed, 590 insertions(+), 700 deletions(-) create mode 100644 build/includes/summa_actor/batch/batch.hpp rename build/includes/summa_actor/{batch_manager.hpp => batch/batch_container.hpp} (56%) create mode 100644 build/includes/summa_actor/client/client.hpp rename build/includes/summa_actor/{client.hpp => client/client_container.hpp} (55%) create mode 100644 build/source/actors/summa_actor/batch/batch.cpp create mode 100644 build/source/actors/summa_actor/batch/batch_container.cpp delete mode 100644 build/source/actors/summa_actor/batch_manager.cpp delete mode 100644 build/source/actors/summa_actor/client.cpp create mode 100644 build/source/actors/summa_actor/client/client.cpp create mode 100644 build/source/actors/summa_actor/client/client_container.cpp diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp index bc1f341..98b0744 100644 --- a/build/includes/global/message_atoms.hpp +++ b/build/includes/global/message_atoms.hpp @@ -1,6 +1,6 @@ #pragma once -#include "../summa_actor/batch_manager.hpp" +#include "batch/batch.hpp" #include <vector> #include "settings_functions.hpp" diff --git a/build/includes/summa_actor/batch/batch.hpp b/build/includes/summa_actor/batch/batch.hpp new file mode 100644 index 0000000..4854145 --- /dev/null +++ b/build/includes/summa_actor/batch/batch.hpp @@ -0,0 +1,65 @@ +#pragma once +#include "caf/all.hpp" +#include "client/client.hpp" +#include <string> + +class Client; + +class Batch { + private: + int batch_id; + int start_hru; + int num_hru; + + double run_time; + double read_time; + double write_time; + + bool assigned_to_actor; + std::string hostname; + caf::actor assigned_actor; + + Client* assigned_client; + + public: + Batch(int batch_id = -1, int start_hru = -1, int num_hru = -1); + + // Getters + int getBatchID(); + int getStartHRU(); + int getNumHRU(); + double getRunTime(); + double getReadTime(); + double getWriteTime(); + bool getBatchStatus(); + + // Setters + void updateRunTime(double run_time); + void updateReadTime(double read_time); + void updateWriteTime(double write_time); + void updateAssignedActor(bool boolean); + + void printBatchInfo(); + void writeBatchToFile(std::string csv_output); + + std::string toString(); + + void assignToActor(std::string hostname, caf::actor assigned_actor); + + void assignBatch(Client *client); + + + template <class Inspector> + friend bool inspect(Inspector& inspector, Batch& batch) { + return inspector.object(batch).fields( + inspector.field("batch_id", batch.batch_id), + inspector.field("start_hru", batch.start_hru), + inspector.field("num_hru", batch.num_hru), + inspector.field("run_time", batch.run_time), + inspector.field("read_time", batch.read_time), + inspector.field("write_time", batch.write_time), + inspector.field("status", batch.assigned_to_actor), + inspector.field("hostname", batch.hostname), + inspector.field("assigned_actor", batch.assigned_actor)); + } +}; \ No newline at end of file diff --git a/build/includes/summa_actor/batch_manager.hpp b/build/includes/summa_actor/batch/batch_container.hpp similarity index 56% rename from build/includes/summa_actor/batch_manager.hpp rename to build/includes/summa_actor/batch/batch_container.hpp index 1c70512..4f970ea 100644 --- a/build/includes/summa_actor/batch_manager.hpp +++ b/build/includes/summa_actor/batch/batch_container.hpp @@ -1,70 +1,7 @@ #pragma once #include "caf/all.hpp" -#include <vector> -#include <string> -#include <sstream> -#include <optional> +#include "client/client.hpp" -class Batch; - -class Batch { - private: - int batch_id; - int start_hru; - int num_hru; - - double run_time; - double read_time; - double write_time; - - bool assigned_to_actor; - std::string hostname; - caf::actor assigned_actor; - - public: - Batch(int batch_id = -1, int start_hru = -1, int num_hru = -1); - - // Getters - int getBatchID(); - int getStartHRU(); - int getNumHRU(); - double getRunTime(); - double getReadTime(); - double getWriteTime(); - bool getBatchStatus(); - - // Setters - void updateRunTime(double run_time); - void updateReadTime(double read_time); - void updateWriteTime(double write_time); - void updateAssignedActor(bool boolean); - - void printBatchInfo(); - void writeBatchToFile(std::string csv_output); - - std::string toString(); - /** - * @brief Mark batch as assigned to an actor - * Update the assigned_to_actor to True and - * update the hostname and assigned_actor instance variables - */ - void assignToActor(std::string hostname, caf::actor assigned_actor); - - - template <class Inspector> - friend bool inspect(Inspector& inspector, Batch& batch) { - return inspector.object(batch).fields( - inspector.field("batch_id", batch.batch_id), - inspector.field("start_hru", batch.start_hru), - inspector.field("num_hru", batch.num_hru), - inspector.field("run_time", batch.run_time), - inspector.field("read_time", batch.read_time), - inspector.field("write_time", batch.write_time), - inspector.field("status", batch.assigned_to_actor), - inspector.field("hostname", batch.hostname), - inspector.field("assigned_actor", batch.assigned_actor)); - } -}; class Batch_Container { private: @@ -97,7 +34,7 @@ class Batch_Container { * are added to the client for the servers awareness * The batch is then returned by this method and sent to the respective client */ - std::optional<Batch> assignBatch(std::string hostname, caf::actor actor_ref); + std::optional<Batch> assignBatch(Client *client); /** * On a successful batch we take the batch given to us by the client @@ -155,8 +92,4 @@ class Batch_Container { */ std::optional<int> findBatch(int batch_id); - - - - }; \ No newline at end of file diff --git a/build/includes/summa_actor/client/client.hpp b/build/includes/summa_actor/client/client.hpp new file mode 100644 index 0000000..fdd7437 --- /dev/null +++ b/build/includes/summa_actor/client/client.hpp @@ -0,0 +1,38 @@ +#pragma once +#include "caf/all.hpp" +#include <optional> +#include "batch/batch.hpp" + + +class Batch; + +class Client { + private: + caf::actor client_actor; + std::string hostname; + + int id; + int batches_solved; + bool connected; + + std::optional<Batch*> current_batch; + + + public: + Client(int id, caf::actor client_actor, std::string hostname); + // #################################################################### + // Getters + // #################################################################### + caf::actor getActor(); + int getID(); + std::string getHostname(); + // #################################################################### + // Setters + // #################################################################### + void setBatch(Batch *batch); + // #################################################################### + // Methods + // #################################################################### + std::string toString(); + +}; \ No newline at end of file diff --git a/build/includes/summa_actor/client.hpp b/build/includes/summa_actor/client/client_container.hpp similarity index 55% rename from build/includes/summa_actor/client.hpp rename to build/includes/summa_actor/client/client_container.hpp index f046636..b4e2eca 100644 --- a/build/includes/summa_actor/client.hpp +++ b/build/includes/summa_actor/client/client_container.hpp @@ -1,89 +1,23 @@ #pragma once - #include "caf/all.hpp" -// #include "summa_server.hpp" -#include "batch_manager.hpp" #include <vector> -#include <sstream> - - -class Client { - private: - // Identifying Characteristics - caf::actor client_actor; - std::string hostname; - - int id; - int batches_solved; - bool connected; - bool assigned_batch; - int current_batch_id; +#include "batch/batch.hpp" +#include "client/client.hpp" - int lost_potential_indicator = 0; // value to indicate the Potential that a client is lost. - // The greater the lost_Potential_indicator the greater chances the client has been lost. - - - public: - Client(int id, caf::actor client_actor, std::string hostname); - // #################################################################### - // Getters - // #################################################################### - caf::actor getActor(); - int getLostPotentialIndicator(); - int getID(); - int getCurrentBatchID(); - std::string getHostname(); - bool getAssignedBatch(); - // #################################################################### - // Setters - // #################################################################### - void updateCurrentBatchID(int batch_id); - void setAssignedBatch(bool boolean); - - // methods - /** - * @brief Increments the lost_likely_hood indicator variable - * this is done everytime a client is sent a heartbeat message - * - * checks if the client is likely lost or not - */ - void incrementLostPotential(); - - /** - * @brief Decrement the lost_likley_hood indicator variables - * this is done everytime a client sends a heartbeat message back - * to the server - */ - void decrementLostPotential(); - - /** - * Check if the clients lost_potential_indicator is over a certain - * threshold - */ - bool isLost(int threshold); - - - std::string toString(); - -}; class Client_Container { private: - int num_clients = 0; - int lost_client_threshold; // value to determine if client is lost - std::vector<Client> connected_client_list; - std::vector<Client> lost_client_list; - + std::vector<Client> client_list; + int id_counter; public: - Client_Container(int lost_node_threshold); + Client_Container(); // #################################################################### // Getters // #################################################################### int getNumClients(); int getClientID(caf::actor client_actor); - Client getClient(int index); std::vector<Client> getConnectedClientList(); std::vector<Client> getLostClientList(); @@ -91,8 +25,10 @@ class Client_Container { // Setters // #################################################################### void setAssignedBatch(int client_id, bool boolean); - - // Methods + void setBatchForClient(caf::actor client_ref, Batch *batch); + // #################################################################### + // Methods + // #################################################################### /** * @brief add a client to the client vector * increment the number of clients @@ -111,14 +47,6 @@ class Client_Container { */ void updateCurrentBatch(int client_id, int batch_id); - /** - * @brief Decrement the lost_likley_hood indicator variables - * this is done everytime a client sends a heartbeat message back - * to the server - */ - void decrementLostPotential(int client_id); - - void incrementLostPotential(int client_id); /** * @brief Removes a client from the back of the list @@ -163,6 +91,9 @@ class Client_Container { std::optional<Client> findIdleClient(); + // find a client by its actor ref + Client getClient(caf::actor_addr client_ref); + /** Function that checks for lost clients Returns true if clients are lost and false if they are none @@ -172,7 +103,7 @@ class Client_Container { /** * Transfer all lost batches */ - void reconcileLostBatches(Batch_Container* batch_container); + // void reconcileLostBatches(Batch_Container* batch_container); std::string connectedClientsToString(); @@ -183,4 +114,5 @@ class Client_Container { * Sends all connected clients a heartbeat message */ // void sendAllClientsHeartbeat(stateful_actor<summa_server_state>* self); -}; \ No newline at end of file +}; + diff --git a/build/includes/summa_actor/summa_backup_server.hpp b/build/includes/summa_actor/summa_backup_server.hpp index be2949c..e4e3887 100644 --- a/build/includes/summa_actor/summa_backup_server.hpp +++ b/build/includes/summa_actor/summa_backup_server.hpp @@ -8,13 +8,12 @@ namespace caf { // Inital behaviour that waits to connect to the lead server -behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, +behavior summa_backup_server(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings); // Function that is called ot connect to the lead server -void connecting(stateful_actor<summa_server_state>* self, const std::string& host, uint16_t port); +void connecting_backup(stateful_actor<summa_server_state>* self, const std::string& host, uint16_t port); -// The behaviour of the backup server -behavior summa_backup_server(stateful_actor<summa_server_state>* self, const actor& summa_server); +behavior test(stateful_actor<summa_server_state>* self, const actor& server_actor); } \ No newline at end of file diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp index 4595114..b66fae8 100644 --- a/build/includes/summa_actor/summa_client.hpp +++ b/build/includes/summa_actor/summa_client.hpp @@ -3,10 +3,13 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" #include "settings_functions.hpp" -#include "batch_manager.hpp" - +#include "batch/batch.hpp" +#include "summa_actor.hpp" +#include "message_atoms.hpp" #include <string> #include <optional> +#include <unistd.h> +#include <limits.h> namespace caf { diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index b61d19a..9b24821 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -2,11 +2,18 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" -#include "batch_manager.hpp" +#include "batch/batch.hpp" +#include "batch/batch_container.hpp" +#include "client/client.hpp" +#include "client/client_container.hpp" #include "settings_functions.hpp" -#include "client.hpp" +#include "global.hpp" +#include "message_atoms.hpp" #include <string> #include <optional> +#include <thread> +#include <chrono> +#include <iostream> namespace caf { @@ -40,10 +47,13 @@ struct summa_server_state { }; -behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, + +behavior summa_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings); +behavior summa_server(stateful_actor<summa_server_state>* self); + void sendClientsHeartbeat(stateful_actor<summa_server_state>* self); behavior client_health_check_reminder(event_based_actor* self); diff --git a/build/makefile_sundials b/build/makefile_sundials index a7b4f56..84e5021 100644 --- a/build/makefile_sundials +++ b/build/makefile_sundials @@ -286,8 +286,15 @@ SUMMA_ACTOR = $(SOURCE_DIR)/summa_actor/summa_actor.cpp SUMMA_CLIENT = $(SOURCE_DIR)/summa_actor/summa_client.cpp SUMMA_SERVER = $(SOURCE_DIR)/summa_actor/summa_server.cpp SUMMA_BACKUP_SERVER = $(SOURCE_DIR)/summa_actor/summa_backup_server.cpp -BATCH_MANGER = $(SOURCE_DIR)/summa_actor/batch_manager.cpp -CLIENT_MANAGER = $(SOURCE_DIR)/summa_actor/client.cpp + +BATCH = $(SOURCE_DIR)/summa_actor/batch/batch.cpp +BATCH_CONTAINER = $(SOURCE_DIR)/summa_actor/batch/batch_container.cpp + +CLIENT = $(SOURCE_DIR)/summa_actor/client/client.cpp +CLIENT_CONTAINER = $(SOURCE_DIR)/summa_actor/client/client_container.cpp + +CLIENT_BATCH = $(SOURCE_DIR)/summa_actor/batch_client.cpp +CLIENT_BATCH_CONTAINERS = $(SOURCE_DIR)/summa_actor/batch_client_containers.cpp GRU_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/gru_actor GRU_ACTOR = $(SOURCE_DIR)/gru_actor/gru_actor.cpp @@ -355,31 +362,32 @@ clean_fortran: ################################################ COMPILE SUMMA-C++ ################################################ ################################################################################################################### compile_globals: - $(CC) $(FLAGS_ACTORS) -c $(GLOBAL) $(TIMEINFO) $(AUXILARY) $(SETTINGS_FILES) $(GLOBAL_INCLUDES) + $(CC) $(FLAGS_ACTORS) -c $(GLOBAL) $(TIMEINFO) $(AUXILARY) $(SETTINGS_FILES) $(GLOBAL_INCLUDES) $(SUMMA_ACTOR_INCLUDES) compile_gru_actor: - $(CC) $(FLAGS_ACTORS) -c $(GRU_ACTOR) $(HRU_ACTOR_INCLUDES) $(GRU_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) + $(CC) $(FLAGS_ACTORS) -c $(GRU_ACTOR) $(HRU_ACTOR_INCLUDES) $(GRU_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) $(SUMMA_ACTOR_INCLUDES) compile_hru_actor: - $(CC) $(FLAGS_ACTORS) -c $(HRU_ACTOR) $(HRU_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) + $(CC) $(FLAGS_ACTORS) -c $(HRU_ACTOR) $(HRU_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) $(SUMMA_ACTOR_INCLUDES) compile_file_access_actor: $(CC) $(FLAGS_ACTORS) -c $(FILE_ACCESS_ACTOR) $(FORCING_FILE_INFO) $(OUTPUT_MANAGER) \ - $(FILE_ACCESS_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) + $(FILE_ACCESS_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) $(SUMMA_ACTOR_INCLUDES) compile_job_actor: $(CC) $(FLAGS_ACTORS) -c $(JOB_ACTOR) $(GRUinfo) $(JOB_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) \ - $(FILE_ACCESS_ACTOR_INCLUDES) $(HRU_ACTOR_INCLUDES) $(GRU_ACTOR_INCLUDES) + $(FILE_ACCESS_ACTOR_INCLUDES) $(HRU_ACTOR_INCLUDES) $(GRU_ACTOR_INCLUDES) $(SUMMA_ACTOR_INCLUDES) compile_summa_actor: $(CC) $(FLAGS_ACTORS) -c $(SUMMA_ACTOR) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) $(ACTORS_INCLUDES) \ - $(JOB_ACTOR_INCLUDES) + $(JOB_ACTOR_INCLUDES) $(SUMMA_ACTOR_INCLUDES) compile_summa_client: $(CC) $(FLAGS_ACTORS) -c $(SUMMA_CLIENT) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) compile_summa_server: - $(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(SUMMA_BACKUP_SERVER) $(BATCH_MANGER) $(CLIENT_MANAGER) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) + $(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(SUMMA_BACKUP_SERVER) $(BATCH) $(CLIENT) $(BATCH_CONTAINER) $(CLIENT_CONTAINER) \ + $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) compile_main: $(CC) $(FLAGS_ACTORS) -c $(MAIN) $(GLOBAL_INCLUDES) $(SUMMA_ACTOR_INCLUDES) $(JOB_ACTOR_INCLUDES) diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index a80025e..4200d23 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -12,7 +12,6 @@ #include <unistd.h> #include <iostream> #include "json.hpp" -#include "batch_manager.hpp" #include <optional> using namespace caf; @@ -50,7 +49,8 @@ void publish_server(caf::actor actor_to_publish, int port_number) { void connect_client(caf::actor client_to_connect, std::string host_to_connect_to, int port_number) { if (!host_to_connect_to.empty() && port_number > 0) { - anon_send(client_to_connect, connect_atom_v, host_to_connect_to, (uint16_t) port_number ); + uint16_t port = 4444; + anon_send(client_to_connect, connect_atom_v, host_to_connect_to, (uint16_t) port ); } else { std::cerr << "No Server Config" << std::endl; } @@ -85,15 +85,15 @@ void run_server(actor_system& system, const config& cfg, Distributed_Settings di // Check if we have are the backup server if (cfg.backup_server) { - auto server = system.spawn(summa_backup_server_init, + auto server = system.spawn(summa_backup_server, distributed_settings,summa_actor_settings,file_access_actor_settings, job_actor_settings,hru_actor_settings); publish_server(server, distributed_settings.port); connect_client(server, distributed_settings.hostname, distributed_settings.port); - self->send(server, connect_as_backup_v); + // self->send(server, connect_as_backup_v); } else { - auto server = system.spawn(summa_server, distributed_settings, + auto server = system.spawn(summa_server_init, distributed_settings, summa_actor_settings, file_access_actor_settings, job_actor_settings, diff --git a/build/source/actors/summa_actor/batch/batch.cpp b/build/source/actors/summa_actor/batch/batch.cpp new file mode 100644 index 0000000..f70cc4b --- /dev/null +++ b/build/source/actors/summa_actor/batch/batch.cpp @@ -0,0 +1,100 @@ +#include "batch/batch.hpp" + +Batch::Batch(int batch_id, int start_hru, int num_hru){ + this->batch_id = batch_id; + this->start_hru = start_hru; + this->num_hru = num_hru; + this->assigned_to_actor = false; +} + +// Getters +int Batch::getBatchID() { + return this->batch_id; +} + +int Batch::getStartHRU() { + return this->start_hru; +} + +int Batch::getNumHRU() { + return this->num_hru; +} + +bool Batch::getBatchStatus() { + return this->assigned_to_actor; +} + +double Batch::getRunTime() { + return this->run_time; +} + +double Batch::getReadTime() { + return this->read_time; +} + +double Batch::getWriteTime() { + return this->write_time; +} + +// Setters +void Batch::updateRunTime(double run_time) { + this->run_time = run_time; +} + +void Batch::updateReadTime(double read_time) { + this->read_time = read_time; +} + +void Batch::updateWriteTime(double write_time) { + this->write_time = write_time; +} + +void Batch::updateAssignedActor(bool boolean) { + this->assigned_to_actor = boolean; +} + +// general methods +void Batch::assignToActor(std::string hostname, caf::actor assigned_actor) { + this->hostname = hostname; + this->assigned_actor = assigned_actor; + this->assigned_to_actor = true; +} + +void Batch::assignBatch(Client *client) { + this->assigned_client = client; +} + +void Batch::printBatchInfo() { + std::cout << "batch_id: " << this->batch_id << "\n"; + std::cout << "start_hru: " << this->start_hru << "\n"; + std::cout << "num_hru: " << this->num_hru << "\n"; +} + +std::string Batch::toString() { + std::stringstream out_string; + + out_string << "batch_id: " << this->batch_id << "\n" << + "start_hru: " << this->start_hru << "\n" << + "num_hru: " << this->num_hru << "\n" << + "run_time: " << this->run_time << "\n" << + "read_time: " << this->read_time << "\n" << + "write_time: " << this->write_time << "\n" << + "assigned_to_actor: " << this->assigned_to_actor << "\n" << + "hostname: " << this->hostname << "\n"; + + return out_string.str(); +} + +void Batch::writeBatchToFile(std::string file_name) { + std::ofstream output_file; + output_file.open(file_name, std::ios_base::app); + output_file << + this->batch_id << "," << + this->start_hru << "," << + this->num_hru << "," << + this->hostname << "," << + this->run_time << "," << + this->read_time << "," << + this->write_time << "\n"; + output_file.close(); +} \ No newline at end of file diff --git a/build/source/actors/summa_actor/batch/batch_container.cpp b/build/source/actors/summa_actor/batch/batch_container.cpp new file mode 100644 index 0000000..57f3224 --- /dev/null +++ b/build/source/actors/summa_actor/batch/batch_container.cpp @@ -0,0 +1,82 @@ +#include "batch/batch_container.hpp" + +Batch_Container::Batch_Container(int total_hru_count, int num_hru_per_batch) { + this->total_hru_count = total_hru_count; + this->num_hru_per_batch = num_hru_per_batch; + + this->assembleBatches(this->total_hru_count, this->num_hru_per_batch); +} + +int Batch_Container::getBatchesRemaining() { + return this->batch_list.size(); +} + +void Batch_Container::assembleBatches(int total_hru_count, int num_hru_per_batch) { + int remaining_hru_to_batch = total_hru_count; + int batch_id = 0; + int start_hru = 1; + + while(remaining_hru_to_batch > 0) { + if (num_hru_per_batch > remaining_hru_to_batch) { + this->batch_list.push_back(Batch(batch_id, start_hru, remaining_hru_to_batch)); + remaining_hru_to_batch = 0; + } else { + this->batch_list.push_back(Batch(batch_id, start_hru, num_hru_per_batch)); + + remaining_hru_to_batch -= num_hru_per_batch; + start_hru += num_hru_per_batch; + batch_id += 1; + } + } +} + +void Batch_Container::printBatches() { + for (std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) { + this->batch_list[i].printBatchInfo(); + } +} + +void Batch_Container::updateBatchStatus_LostClient(int batch_id) { + std::optional<int> index = this->findBatch(batch_id); + if (index.has_value()) { + this->batch_list[index.value()].updateAssignedActor(false); + } else { + throw "updateBatchStatus_LostClient - Could not find batch with id"; + } +} + + +std::optional<Batch> Batch_Container::assignBatch(Client *client) { + + for (std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) { + if (!this->batch_list[i].getBatchStatus()) { + this->batch_list[i].assignBatch(client); + return this->batch_list[i]; + } + } + return {}; +} + +void Batch_Container::updateBatch_success(Batch successful_batch, std::string output_csv) { + this->solved_batches.push_back(successful_batch); + + successful_batch.writeBatchToFile(output_csv); + + std::optional<int> index_to_remove = this->findBatch(successful_batch.getBatchID()); + if (index_to_remove.has_value()) { + this->batch_list.erase(this->batch_list.begin() + index_to_remove.value()); + } else { + throw "No element in BatchList Matches the succesful_batch"; + } +} + +std::optional<int> Batch_Container::findBatch(int batch_id) { + + for(std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) { + if (this->batch_list[i].getBatchID() == batch_id) { + return i; + } + } + + return {}; +} \ No newline at end of file diff --git a/build/source/actors/summa_actor/batch_manager.cpp b/build/source/actors/summa_actor/batch_manager.cpp deleted file mode 100644 index 9709b4e..0000000 --- a/build/source/actors/summa_actor/batch_manager.cpp +++ /dev/null @@ -1,184 +0,0 @@ -#include "batch_manager.hpp" - - -Batch_Container::Batch_Container(int total_hru_count, int num_hru_per_batch) { - this->total_hru_count = total_hru_count; - this->num_hru_per_batch = num_hru_per_batch; - - this->assembleBatches(this->total_hru_count, this->num_hru_per_batch); -} - -int Batch_Container::getBatchesRemaining() { - return this->batch_list.size(); -} - -void Batch_Container::assembleBatches(int total_hru_count, int num_hru_per_batch) { - int remaining_hru_to_batch = total_hru_count; - int batch_id = 0; - int start_hru = 1; - - while(remaining_hru_to_batch > 0) { - if (num_hru_per_batch > remaining_hru_to_batch) { - this->batch_list.push_back(Batch(batch_id, start_hru, remaining_hru_to_batch)); - remaining_hru_to_batch = 0; - } else { - this->batch_list.push_back(Batch(batch_id, start_hru, num_hru_per_batch)); - - remaining_hru_to_batch -= num_hru_per_batch; - start_hru += num_hru_per_batch; - batch_id += 1; - } - } -} - -void Batch_Container::printBatches() { - for (std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) { - this->batch_list[i].printBatchInfo(); - } -} - -void Batch_Container::updateBatchStatus_LostClient(int batch_id) { - std::optional<int> index = this->findBatch(batch_id); - if (index.has_value()) { - this->batch_list[index.value()].updateAssignedActor(false); - } else { - throw "updateBatchStatus_LostClient - Could not find batch with id"; - } -} - - -std::optional<Batch> Batch_Container::assignBatch(std::string hostname, caf::actor actor_ref) { - - for (std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) { - if (!this->batch_list[i].getBatchStatus()) { - this->batch_list[i].assignToActor(hostname, actor_ref); - return this->batch_list[i]; - } - } - return {}; -} - -void Batch_Container::updateBatch_success(Batch successful_batch, std::string output_csv) { - this->solved_batches.push_back(successful_batch); - - successful_batch.writeBatchToFile(output_csv); - - std::optional<int> index_to_remove = this->findBatch(successful_batch.getBatchID()); - if (index_to_remove.has_value()) { - this->batch_list.erase(this->batch_list.begin() + index_to_remove.value()); - } else { - throw "No element in BatchList Matches the succesful_batch"; - } -} - -std::optional<int> Batch_Container::findBatch(int batch_id) { - - for(std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) { - if (this->batch_list[i].getBatchID() == batch_id) { - return i; - } - } - - return {}; -} - -// ************************** -// Batch Class -// ************************** - -Batch::Batch(int batch_id, int start_hru, int num_hru){ - this->batch_id = batch_id; - this->start_hru = start_hru; - this->num_hru = num_hru; - this->assigned_to_actor = false; -} - -// Getters -int Batch::getBatchID() { - return this->batch_id; -} - -int Batch::getStartHRU() { - return this->start_hru; -} - -int Batch::getNumHRU() { - return this->num_hru; -} - -bool Batch::getBatchStatus() { - return this->assigned_to_actor; -} - -double Batch::getRunTime() { - return this->run_time; -} - -double Batch::getReadTime() { - return this->read_time; -} - -double Batch::getWriteTime() { - return this->write_time; -} - -// Setters -void Batch::updateRunTime(double run_time) { - this->run_time = run_time; -} - -void Batch::updateReadTime(double read_time) { - this->read_time = read_time; -} - -void Batch::updateWriteTime(double write_time) { - this->write_time = write_time; -} - -void Batch::updateAssignedActor(bool boolean) { - this->assigned_to_actor = boolean; -} - -// general methods -void Batch::assignToActor(std::string hostname, caf::actor assigned_actor) { - this->hostname = hostname; - this->assigned_actor = assigned_actor; - this->assigned_to_actor = true; -} - -void Batch::printBatchInfo() { - std::cout << "batch_id: " << this->batch_id << "\n"; - std::cout << "start_hru: " << this->start_hru << "\n"; - std::cout << "num_hru: " << this->num_hru << "\n"; -} - -std::string Batch::toString() { - std::stringstream out_string; - - out_string << "batch_id: " << this->batch_id << "\n" << - "start_hru: " << this->start_hru << "\n" << - "num_hru: " << this->num_hru << "\n" << - "run_time: " << this->run_time << "\n" << - "read_time: " << this->read_time << "\n" << - "write_time: " << this->write_time << "\n" << - "assigned_to_actor: " << this->assigned_to_actor << "\n" << - "hostname: " << this->hostname << "\n"; - - return out_string.str(); -} - -void Batch::writeBatchToFile(std::string file_name) { - std::ofstream output_file; - output_file.open(file_name, std::ios_base::app); - output_file << - this->batch_id << "," << - this->start_hru << "," << - this->num_hru << "," << - this->hostname << "," << - this->run_time << "," << - this->read_time << "," << - this->write_time << "\n"; - output_file.close(); -} - - diff --git a/build/source/actors/summa_actor/client.cpp b/build/source/actors/summa_actor/client.cpp deleted file mode 100644 index b3af062..0000000 --- a/build/source/actors/summa_actor/client.cpp +++ /dev/null @@ -1,209 +0,0 @@ -#include "caf/all.hpp" -#include "client.hpp" - - -Client::Client(int id, caf::actor client_actor, std::string hostname) { - this->id = id; - this->client_actor = client_actor; - this->hostname = hostname; - this->connected = true; - this->assigned_batch = false; -} - -// Getters -caf::actor Client::getActor() { - return this->client_actor; -} -int Client::getLostPotentialIndicator() { - return this->lost_potential_indicator; -} -int Client::getID() { - return this->id; -} -int Client::getCurrentBatchID() { - return this->current_batch_id; -} -std::string Client::getHostname() { - return this->hostname; -} -bool Client::getAssignedBatch() { - return this->assigned_batch; -} -// Setters -void Client::updateCurrentBatchID(int batch_id) { - this->current_batch_id = batch_id; - this->assigned_batch = true; -} - -void Client::setAssignedBatch(bool boolean) { - this->assigned_batch = boolean; -} - -// Methods -void Client::incrementLostPotential() { - this->lost_potential_indicator++; -} - -void Client::decrementLostPotential() { - this->lost_potential_indicator--; -} - -bool Client::isLost(int threshold) { - return lost_potential_indicator > threshold; -} - -std::string Client::toString() { - std::stringstream out_string; - - out_string << "hostname: " << this->hostname << "\n" << - "id: " << this->id << "\n" << - "batches_solved: " << this->batches_solved << "\n" << - "connected: " << this->connected << "\n" << - "assigned_batch: " << this->assigned_batch << "\n" << - "current_batch_id: " << this->current_batch_id << "\n" << - "lost_potential_indicator: " << this->lost_potential_indicator << "\n"; - - return out_string.str(); -} - -//////////////////////////////////////////////// - - - -Client_Container::Client_Container(int lost_node_threshold) { - this->lost_client_threshold = lost_node_threshold; -} - -void Client_Container::addClient(caf::actor client_actor, std::string hostname) { - int client_id = this->num_clients; - - this->connected_client_list.push_back( - Client{client_id, client_actor, hostname}); - - this->num_clients++; - -} - -void Client_Container::setAssignedBatch(int client_id, bool boolean) { - int index = findClientByID(client_id); - this->connected_client_list[index].setAssignedBatch(boolean); -} - -int Client_Container::getNumClients() { - return this->num_clients; -} - -Client Client_Container::getClient(int index) { - if (index > this->num_clients) { - throw "Trying to access a client outside of the connected_client_list"; - } - - return this->connected_client_list[index]; -} - -std::vector<Client> Client_Container::getConnectedClientList() { - return this->connected_client_list; -} - -void Client_Container::decrementLostPotential(int client_id) { - int index = findClientByID(client_id); - this->connected_client_list[index].decrementLostPotential(); -} - -void Client_Container::incrementLostPotential(int client_id) { - int index = findClientByID(client_id); - this->connected_client_list[index].incrementLostPotential(); -} - - -int Client_Container::getClientID(caf::actor client_actor) { - for (int i = 0; i < num_clients; i++) { - if (client_actor == this->connected_client_list[i].getActor()){ - return this->connected_client_list[i].getID(); - } - } - return -1; -} - -std::string Client_Container::getHostname_ByClientID(int client_id) { - return this->connected_client_list[client_id].getHostname(); -} - -bool Client_Container::isEmpty() { - return this->connected_client_list.empty(); -} - -Client Client_Container::removeClient_fromBack() { - Client client = this->connected_client_list.back(); - this->connected_client_list.pop_back(); - return client; -} - -void Client_Container::updateCurrentBatch(int client_id, int batch_id) { - int index = findClientByID(client_id); - this->connected_client_list[index].updateCurrentBatchID(batch_id);; -} - -int Client_Container::findClientByID(int client_id) { - for(int i = 0; i < this->num_clients; i++) { - if (client_id == this->connected_client_list[i].getID()){ - return i; - } - } - throw "Cannot Find Client"; -} - - -std::optional<Client> Client_Container::findIdleClient() { - for(int i = 0; i < this->num_clients; i++) { - if (!this->connected_client_list[i].getAssignedBatch()) { - return this->connected_client_list[i]; - } - } - - return {}; -} - -bool Client_Container::checkForLostClients() { - bool return_val = false; - for(auto client = begin(this->connected_client_list); client != end(this->connected_client_list); ++client) { - client->incrementLostPotential(); - if (client->isLost(this->lost_client_threshold)) { - this->lost_client_list.push_back(*client); - this->connected_client_list.erase(client); - - return_val = true; - } - } - return return_val; -} - - -void Client_Container::reconcileLostBatches(Batch_Container* batch_container) { - for(auto client = begin(this->lost_client_list); client != end(this->lost_client_list); ++client) { - batch_container->updateBatchStatus_LostClient(client->getCurrentBatchID()); - } - - this->lost_client_list.clear(); -} - -std::string Client_Container::connectedClientsToString() { - std::stringstream out_string; - for(auto client = begin(this->connected_client_list); client != end(this->connected_client_list); ++client) { - out_string << client->toString() << "\n"; - } - return out_string.str(); -} - -std::string Client_Container::lostClientsToString() { - std::stringstream out_string; - for(auto client = begin(this->lost_client_list); client != end(this->lost_client_list); ++client) { - out_string << client->toString() << "\n"; - } - return out_string.str(); -} - - - - - diff --git a/build/source/actors/summa_actor/client/client.cpp b/build/source/actors/summa_actor/client/client.cpp new file mode 100644 index 0000000..fb2e8e6 --- /dev/null +++ b/build/source/actors/summa_actor/client/client.cpp @@ -0,0 +1,46 @@ +#include "client/client.hpp" + + +Client::Client(int id, caf::actor client_actor, std::string hostname) { + this->id = id; + this->client_actor = client_actor; + this->hostname = hostname; + this->connected = true; + this->current_batch = {}; +} + +// #################################################################### +// Getters +// #################################################################### +caf::actor Client::getActor() { + return this->client_actor; +} + +int Client::getID() { + return this->id; +} + +std::string Client::getHostname() { + return this->hostname; +} + +// #################################################################### +// Setters +// #################################################################### +void Client::setBatch(Batch *batch) { + this->current_batch = batch; +} + +// #################################################################### +// Methods +// #################################################################### +std::string Client::toString() { + std::stringstream out_string; + + out_string << "hostname: " << this->hostname << "\n" << + "id: " << this->id << "\n" << + "batches_solved: " << this->batches_solved << "\n" << + "connected: " << this->connected << "\n"; + + return out_string.str(); +} \ No newline at end of file diff --git a/build/source/actors/summa_actor/client/client_container.cpp b/build/source/actors/summa_actor/client/client_container.cpp new file mode 100644 index 0000000..74c6219 --- /dev/null +++ b/build/source/actors/summa_actor/client/client_container.cpp @@ -0,0 +1,132 @@ +#include "client/client_container.hpp" + + +Client_Container::Client_Container() { + this->id_counter = 0; +} +// #################################################################### +// Getters +// #################################################################### +// #################################################################### +// Setters +// #################################################################### + +// #################################################################### +// Methods +// #################################################################### +void Client_Container::addClient(caf::actor client_actor, std::string hostname) { + int client_id = this->id_counter; + this->id_counter++; + + this->client_list.push_back( + Client{client_id, client_actor, hostname}); +} + +void Client_Container::setBatchForClient(caf::actor client_ref, Batch *batch) { + for(auto client = begin(this->client_list); client != end(this->client_list); ++client) { + if (client_ref == client->getActor()) { + client->setBatch(batch); + break; + } + } +} + +// void Client_Container::setAssignedBatch(int client_id, bool boolean) { +// int index = findClientByID(client_id); +// this->client_list[index].setAssignedBatch(boolean); +// } + +int Client_Container::getNumClients() { + return this->client_list.size(); +} + +// Client Client_Container::getClient(int index) { +// if (index > this->num_clients) { +// throw "Trying to access a client outside of the client_list"; +// } + +// return this->client_list[index]; +// } + +std::vector<Client> Client_Container::getConnectedClientList() { + return this->client_list; +} + + + +// int Client_Container::getClientID(caf::actor client_actor) { +// for (int i = 0; i < num_clients; i++) { +// if (client_actor == this->client_list[i].getActor()){ +// return this->client_list[i].getID(); +// } +// } +// return -1; +// } + +std::string Client_Container::getHostname_ByClientID(int client_id) { + return this->client_list[client_id].getHostname(); +} + +bool Client_Container::isEmpty() { + return this->client_list.empty(); +} + +Client Client_Container::removeClient_fromBack() { + Client client = this->client_list.back(); + this->client_list.pop_back(); + return client; +} + + +// std::optional<Client> Client_Container::findIdleClient() { +// for(int i = 0; i < this->client_list; i++) { +// if (!this->client_list[i].getAssignedBatch()) { +// return this->client_list[i]; +// } +// } + +// return {}; +// } + +Client Client_Container::getClient(caf::actor_addr client_ref) { + std::cout << "Looking for client\n"; + for(auto client = begin(this->client_list); client != end(this->client_list); ++client) { + if(client_ref == client->getActor()) { + std::cout << "Found Client\n"; + return *client; + } + } + + throw "ERROR -- Client Not Found"; +} + +// bool Client_Container::checkForLostClients() { +// bool return_val = false; +// for(auto client = begin(this->client_list); client != end(this->client_list); ++client) { +// client->incrementLostPotential(); +// if (client->isLost(this->lost_client_threshold)) { +// this->lost_client_list.push_back(*client); +// this->client_list.erase(client); + +// return_val = true; +// } +// } +// return return_val; +// } + + +// void Client_Container::reconcileLostBatches(Batch_Container* batch_container) { +// for(auto client = begin(this->lost_client_list); client != end(this->lost_client_list); ++client) { +// batch_container->updateBatchStatus_LostClient(client->getCurrentBatchID()); +// } + +// this->lost_client_list.clear(); +// } + +std::string Client_Container::connectedClientsToString() { + std::stringstream out_string; + for(auto client = begin(this->client_list); client != end(this->client_list); ++client) { + out_string << client->toString() << "\n"; + } + return out_string.str(); +} diff --git a/build/source/actors/summa_actor/summa_backup_server.cpp b/build/source/actors/summa_actor/summa_backup_server.cpp index 45d4388..3880195 100644 --- a/build/source/actors/summa_actor/summa_backup_server.cpp +++ b/build/source/actors/summa_actor/summa_backup_server.cpp @@ -7,7 +7,7 @@ #include <thread> namespace caf { -behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, +behavior summa_backup_server(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { aout(self) << "Backup Server Started\n"; @@ -18,21 +18,22 @@ behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Dist self->state.job_actor_settings = job_actor_settings; self->state.hru_actor_settings = hru_actor_settings; - // self->set_down_handler([=](const down_msg& dm){ - // if(dm.source == self->state.current_server) { - // aout(self) << "*** Lost Connection to Server" << std::endl; - // uint16_t port = 4444; - // std::string host = "a0449745d77d"; - // connecting(self, host, port); - // } - // }); + self->set_down_handler([=](const down_msg& dm){ + if(dm.source == self->state.current_server) { + // aout(self) << "*** Lost Connection to Server" << std::endl; + // uint16_t port = 4444; + // std::string host = "a0449745d77d"; + // connecting(self, host, port); + } + }); return { // Called by main to init the process [=](connect_atom, const std::string& host, uint16_t port) { - connecting(self, host, port); + connecting_backup(self, host, port); }, [=] (connect_as_backup) { - aout(self) << "Received Message to connect to lead in beha server\n"; + aout(self) << "Received Message to connect to lead to server\n"; + self->send(self->state.current_server_actor, connect_as_backup_v, self); } // [=]() }; @@ -60,8 +61,11 @@ void connecting_backup(stateful_actor<summa_server_state>* self, const std::stri aout(self) << "*** successfully connected to server" << std::endl; self->state.current_server = serv; auto hdl = actor_cast<actor>(serv); + self->state.current_server_actor = hdl; self->monitor(hdl); - self->become(summa_backup_server(self, hdl)); + aout(self) << "Should become test\n"; + self->become(test(self, hdl)); + }, [=](const error& err) { aout(self) << R"(*** cannot connect to ")" << host << R"(":)" << port @@ -69,22 +73,17 @@ void connecting_backup(stateful_actor<summa_server_state>* self, const std::stri }); } - -behavior summa_backup_server(stateful_actor<summa_server_state>* self, const actor& summa_server) { - aout(self) << "Backup Server is now Running\n"; +behavior test(stateful_actor<summa_server_state>* self, const actor& server_actor) { + aout(self) << "We are the test behaviour\n"; + self->send(server_actor, connect_as_backup_v, self); return { + [=] (connect_as_backup) { - aout(self) << "Received Message to connect to lead server\n"; + aout(self) << "Received Message to connect to lead to server\n"; + self->send(self->state.current_server_actor, connect_as_backup_v, self); } - - - - }; -} - - - +} } \ No newline at end of file diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index e89d72c..cb7164a 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -1,14 +1,4 @@ -#include "caf/all.hpp" -#include "caf/io/all.hpp" #include "summa_client.hpp" -#include "summa_actor.hpp" -#include "message_atoms.hpp" -#include "batch_manager.hpp" -#include <optional> -#include <unistd.h> -#include <limits.h> - - namespace caf { behavior summa_client(stateful_actor<summa_client_state>* self) { @@ -73,12 +63,10 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a self->send(server_actor, connect_to_server_v, self, self->state.hostname); return { // Response from the server on successful connection - [=](connect_to_server, int client_id, Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, + [=](connect_to_server, Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { aout(self) << "Successfully Connected to Server Actor \n"; - aout(self) << "Recieved ID of " << client_id << "\n"; - self->state.client_id = client_id; self->state.summa_actor_settings = summa_actor_settings; self->state.file_access_actor_settings = file_access_actor_settings; self->state.job_actor_settings = job_actor_settings; @@ -118,18 +106,12 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a if(self->state.current_server == nullptr) { aout(self) << "Maybe We Should not Send this\n"; } else { - self->send(server_actor, done_batch_v, self, self->state.client_id, self->state.current_batch); + self->send(server_actor, done_batch_v, self, self->state.current_batch); } }, - [=](heartbeat) { - aout(self) << "Received Heartbeat \n"; - - self->send(server_actor, heartbeat_v, self->state.client_id); - }, - [=](time_to_exit) { aout(self) << "Client Exiting\n"; self->quit(); diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index 1d958dc..773a828 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -1,33 +1,20 @@ -#include "caf/all.hpp" -#include "caf/io/all.hpp" -#include <string> -#include "batch_manager.hpp" #include "summa_server.hpp" -#include "message_atoms.hpp" -#include "global.hpp" -#include <optional> -#include <iostream> -#include <thread> -#include <chrono> - namespace caf { -behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, +behavior summa_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { - + aout(self) << "Summa Server has Started \n"; - self->set_down_handler([=](const down_msg& dm) { - - if (dm.source == self->state.backup_server) { - aout(self) << "Lost Client 1\n"; - } - - if (dm.source == self->state.backup_server2) { - aout(self) << "Lost Client 2\n"; - } + aout(self) << "Lost A Client\n"; + Client client = self->state.client_container->getClient(dm.source); + aout(self) << "Lost Client: " << client.getID() << "\n"; + // Batch batch = client.getCurrentBatch(); + // std::optional<Client> idle_client = findIdleClient + // if (idle_client.has_value) then send client batch + // else mark batch as not done }); self->state.distributed_settings = distributed_settings; @@ -36,7 +23,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett self->state.job_actor_settings = job_actor_settings; self->state.hru_actor_settings = hru_actor_settings; - self->state.client_container = new Client_Container(self->state.distributed_settings.lost_node_threshold); + self->state.client_container = new Client_Container(); self->state.batch_container = new Batch_Container( self->state.distributed_settings.total_hru_count, self->state.distributed_settings.num_hru_per_batch); @@ -46,39 +33,37 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett initializeCSVOutput(self->state.job_actor_settings.csv_path, self->state.csv_output_name); // Start the heartbeat actor after a client has connected - self->state.health_check_reminder_actor = self->spawn(client_health_check_reminder); - self->send(self->state.health_check_reminder_actor, - start_health_check_v, self, self->state.distributed_settings.heartbeat_interval); + // self->state.health_check_reminder_actor = self->spawn(client_health_check_reminder); + // self->send(self->state.health_check_reminder_actor, + // start_health_check_v, self, self->state.distributed_settings.heartbeat_interval); - return { - // For when a backup server attempts to connect to the main server - // [=] (connect_atom, const std::string& host, uint16_t port) { - // int err; - // connecting(self, host, port); + return summa_server(self); + +} + +behavior summa_server(stateful_actor<summa_server_state>* self) { - // }, + aout(self) << "Server is Running \n"; - // [=](connect_to_server) { - // self->state.current_server_actor = actor_cast<actor>(self->state.current_server); - // self->send(self->state.current_server_actor, connect_as_backup_v, self); - // }, + return { // A message from a client requesting to connect [=](connect_to_server, actor client_actor, std::string hostname) { - + aout(self) << "Actor trying to connect with hostname " << hostname << "\n"; self->state.client_container->addClient(client_actor, hostname); - + self->monitor(client_actor); // Tell client they are connected - self->send(client_actor, connect_to_server_v, self->state.client_container->getClientID(client_actor), + self->send(client_actor, connect_to_server_v, self->state.summa_actor_settings, self->state.file_access_actor_settings, self->state.job_actor_settings, self->state.hru_actor_settings); + + + Client client = self->state.client_container->getClient(client_actor.address()); - std::optional<Batch> batch = self->state.batch_container->assignBatch(hostname, client_actor); + std::optional<Batch> batch = self->state.batch_container->assignBatch(&client); if (batch.has_value()) { - self->state.client_container->updateCurrentBatch( - self->state.client_container->getClientID(client_actor), - batch.value().getBatchID()); + self->state.client_container->setBatchForClient(client_actor, &batch.value()); self->send(client_actor, batch.value()); } else { aout(self) << "no more batches left to assign\n"; @@ -90,19 +75,22 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett [=](connect_as_backup, actor backup_server) { aout(self) << "Received Connection Request From a backup server\n"; - }, + }, - [=](done_batch, actor client_actor, int client_id, Batch& batch) { + [=](done_batch, actor client_actor, Batch& batch) { aout(self) << "Received Completed Batch From Client\n"; - + aout(self) << batch.toString() << "\n\n"; + Client client = self->state.client_container->getClient(client_actor.address()); + self->state.batch_container->updateBatch_success(batch, self->state.csv_output_name); - aout(self) << "******************\n" << "Batches Remaining: " << - self->state.batch_container->getBatchesRemaining() << "\n******************\n\n"; + aout(self) << "******************\n" + << "Batches Remaining: " << + self->state.batch_container->getBatchesRemaining() << + "\n******************\n\n"; - std::optional<Batch> new_batch = self->state.batch_container->assignBatch( - self->state.client_container->getHostname_ByClientID(client_id), client_actor); + std::optional<Batch> new_batch = self->state.batch_container->assignBatch(&client); if (new_batch.has_value()) { @@ -114,8 +102,8 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett aout(self) << "no more batches left to assign\n"; aout(self) << "Keeping Client connected because other clients could Fail\n"; - - self->state.client_container->setAssignedBatch(client_id, false); + self->state.client_container->setBatchForClient(client_actor, {}); + // self->state.client_container->setAssignedBatch(client_id, false); } else { aout(self) << "Telling Clients To Exit\n"; @@ -130,41 +118,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett self->quit(); } } - }, - - // check for lost clients, send all connected a message - [=](check_on_clients) { - // Loop Through All Clients To see if any are lost - if (self->state.client_container->checkForLostClients()) { - aout(self) << "Client Is Lost\n"; - self->state.client_container->reconcileLostBatches(self->state.batch_container); - aout(self) << "Reconciled Batches\n"; - std::optional<Client> client = self->state.client_container->findIdleClient(); - if(client.has_value()) { - aout(self) << "getting new Batches\n"; - std::optional<Batch> new_batch = self->state.batch_container->assignBatch( - self->state.client_container->getHostname_ByClientID(client.value().getID()), - client.value().getActor()); - aout(self) << "Got New BATCH\n"; - if (new_batch.has_value()) { - aout(self) << "sending new Batches\n"; - - self->send(client.value().getActor(), new_batch.value()); - - } - } - } - sendClientsHeartbeat(self); - - self->send(self->state.health_check_reminder_actor, - start_health_check_v, self, self->state.distributed_settings.heartbeat_interval); - }, - - // Received heartbeat from client - [=](heartbeat, int client_id) { - aout(self) << "Received HeartBeat From: " << client_id << "\n"; - self->state.client_container->decrementLostPotential(client_id); - }, + }, }; } -- GitLab