From d2145637adb0cf6b59d1d572ae5dfc5eb456e4ce Mon Sep 17 00:00:00 2001 From: KyleKlenk <kyle.c.klenk@gmail.com> Date: Wed, 16 Nov 2022 13:42:24 -0600 Subject: [PATCH] added backup_client as a tuple. The tuple holds the actor address and the hostname so actors can connect when a server goes down --- .../summa_actor/summa_backup_server.hpp | 3 +++ build/includes/summa_actor/summa_client.hpp | 9 ++++++--- build/includes/summa_actor/summa_server.hpp | 9 +++++++-- build/source/actors/main.cpp | 2 +- .../actors/summa_actor/summa_backup_server.cpp | 13 +++++-------- .../source/actors/summa_actor/summa_client.cpp | 17 ++++++----------- .../source/actors/summa_actor/summa_server.cpp | 17 ++++++++++------- 7 files changed, 38 insertions(+), 32 deletions(-) diff --git a/build/includes/summa_actor/summa_backup_server.hpp b/build/includes/summa_actor/summa_backup_server.hpp index cae7580..bec1514 100644 --- a/build/includes/summa_actor/summa_backup_server.hpp +++ b/build/includes/summa_actor/summa_backup_server.hpp @@ -4,6 +4,9 @@ #include "caf/io/all.hpp" #include "summa_server.hpp" #include <string> +#include <unistd.h> +#include <limits.h> + namespace caf { diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp index b66fae8..308c0ad 100644 --- a/build/includes/summa_actor/summa_client.hpp +++ b/build/includes/summa_actor/summa_client.hpp @@ -17,9 +17,13 @@ struct summa_client_state { strong_actor_ptr current_server; std::string hostname; actor summa_actor_ref; + uint16_t port; int batch_id; int client_id; // id held by server + // tuple is the actor ref and hostname of the backup server + std::vector<std::tuple<caf::actor, std::string>> backup_servers; + Batch current_batch; Summa_Actor_Settings summa_actor_settings; @@ -28,9 +32,8 @@ struct summa_client_state { HRU_Actor_Settings hru_actor_settings; }; -behavior summa_client(stateful_actor<summa_client_state>* self); -behavior unconnected(stateful_actor<summa_client_state>*); +behavior summa_client_init(stateful_actor<summa_client_state>* self); void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port); -behavior running(stateful_actor<summa_client_state>*, const actor& summa_server); +behavior summa_client(stateful_actor<summa_client_state>*, const actor& summa_server); } \ No newline at end of file diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index ab35a9a..c72ec2b 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -23,12 +23,17 @@ struct summa_server_state { actor backup_server2 = nullptr; strong_actor_ptr current_server; // if server is a backup then this will be set to the lead server actor current_server_actor; - + + std::string hostname; + + std::string csv_output_name = "/batch_results.csv"; Client_Container *client_container; Batch_Container *batch_container; - std::vector<caf::actor> backup_servers_list; + + // Actor Reference, Hostname + std::vector<std::tuple<caf::actor, std::string>> backup_servers_list; // Settings Structures Distributed_Settings distributed_settings; diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index 61e13b3..6de6292 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -66,7 +66,7 @@ void run_client(actor_system& system, const config& cfg, Distributed_Settings di aout(self) << "ERROR: run_client() host and port - CHECK SETTINGS FILE\n"; return; } - auto client = system.spawn(summa_client); + auto client = system.spawn(summa_client_init); connect_client(client, distributed_settings.hostname, distributed_settings.port); } diff --git a/build/source/actors/summa_actor/summa_backup_server.cpp b/build/source/actors/summa_actor/summa_backup_server.cpp index fed439b..5fc7071 100644 --- a/build/source/actors/summa_actor/summa_backup_server.cpp +++ b/build/source/actors/summa_actor/summa_backup_server.cpp @@ -11,7 +11,10 @@ behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Dist Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { aout(self) << "Backup Server Started\n"; - + char host[HOST_NAME_MAX]; + gethostname(host, HOST_NAME_MAX); + self->state.hostname = host; + self->state.distributed_settings = distributed_settings; self->state.summa_actor_settings = summa_actor_settings; self->state.file_access_actor_settings = file_access_actor_settings; @@ -31,11 +34,6 @@ behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Dist [=](connect_atom, const std::string& host, uint16_t port) { connecting_backup(self, host, port); }, - [=] (connect_as_backup) { - aout(self) << "Received Message to connect to lead to server\n"; - self->send(self->state.current_server_actor, connect_as_backup_v, self); - } - // [=]() }; } @@ -63,7 +61,6 @@ void connecting_backup(stateful_actor<summa_server_state>* self, const std::stri auto hdl = actor_cast<actor>(serv); self->state.current_server_actor = hdl; self->monitor(hdl); - aout(self) << "Should become test\n"; self->become(summa_backup_server(self, hdl)); }, @@ -75,7 +72,7 @@ void connecting_backup(stateful_actor<summa_server_state>* self, const std::stri behavior summa_backup_server(stateful_actor<summa_server_state>* self, const actor& server_actor) { aout(self) << "We are the test behaviour\n"; - self->send(server_actor, connect_as_backup_v, self); + self->send(server_actor, connect_as_backup_v, self, self->state.hostname); return { diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index cb7164a..ae642e6 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -1,7 +1,7 @@ #include "summa_client.hpp" namespace caf { -behavior summa_client(stateful_actor<summa_client_state>* self) { +behavior summa_client_init(stateful_actor<summa_client_state>* self) { self->set_down_handler([=](const down_msg& dm){ if(dm.source == self->state.current_server) { @@ -10,16 +10,11 @@ behavior summa_client(stateful_actor<summa_client_state>* self) { // self->become(unconnected(self)); } }); - return unconnected(self); -} -/** - * Attempt to connect to the server - */ -behavior unconnected(stateful_actor<summa_client_state>* self) { + return { [=] (connect_atom, const std::string& host, uint16_t port) { connecting(self, host, port); - }, + } }; } @@ -45,16 +40,16 @@ void connecting(stateful_actor<summa_client_state>* self, const std::string& hos self->state.current_server = serv; auto hdl = actor_cast<actor>(serv); self->monitor(hdl); - self->become(running(self, hdl)); + self->become(summa_client(self, hdl)); }, [=](const error& err) { aout(self) << R"(*** cannot connect to ")" << host << R"(":)" << port << " => " << to_string(err) << std::endl; - self->become(unconnected(self)); + self->become(summa_client_init(self)); }); } -behavior running(stateful_actor<summa_client_state>* self, const actor& server_actor) { +behavior summa_client(stateful_actor<summa_client_state>* self, const actor& server_actor) { char host[HOST_NAME_MAX]; aout(self) << "Client Has Started Successfully" << std::endl; gethostname(host, HOST_NAME_MAX); diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index c0a1cac..c857643 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -70,10 +70,10 @@ behavior summa_server(stateful_actor<summa_server_state>* self) { }, - [=](connect_as_backup, actor backup_server) { + [=](connect_as_backup, actor backup_server, std::string hostname) { aout(self) << "Received Connection Request From a backup server\n"; self->monitor(backup_server); - self->state.backup_servers_list.push_back(backup_server); + self->state.backup_servers_list.push_back(std::make_tuple(backup_server, hostname)); self->send(backup_server, connect_as_backup_v); // confirm connection with sender // Now we need to send the backup actor our current state self->send(backup_server, update_with_current_state_v, *self->state.batch_container, *self->state.client_container); @@ -93,8 +93,9 @@ behavior summa_server(stateful_actor<summa_server_state>* self) { self->state.client_container->setBatchForClient(client_actor, new_batch.value()); self->send(client_actor, new_batch.value()); for (auto& backup_server : self->state.backup_servers_list) { - self->send(backup_server, done_batch_v, client_actor, batch); - self->send(backup_server, new_assigned_batch_v, client_actor, new_batch.value()); + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, done_batch_v, client_actor, batch); + self->send(backup_server_actor, new_assigned_batch_v, client_actor, new_batch.value()); } } else { // We may be done @@ -107,8 +108,9 @@ behavior summa_server(stateful_actor<summa_server_state>* self) { aout(self) << "No batches left to assign - Waiting for All Clients to finish\n"; self->state.client_container->setBatchForClient(client_actor, {}); for (auto& backup_server : self->state.backup_servers_list) { - self->send(backup_server, done_batch_v, client_actor, batch); - self->send(backup_server, no_more_batches_v, client_actor); + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, done_batch_v, client_actor, batch); + self->send(backup_server_actor, no_more_batches_v, client_actor); } } }, @@ -125,7 +127,8 @@ behavior summa_server_exit(stateful_actor<summa_server_state>* self) { } aout(self) << "Telling Backup Servers to Exit\n"; for (auto& backup_server : self->state.backup_servers_list) { - self->send(backup_server, time_to_exit_v); + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, time_to_exit_v); } self->quit(); return {}; -- GitLab