diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp index ad70432e2ba8a13dd3ca25ed03c4e35c51536334..3559dee5d619798ff097854b047bed8d1197a6df 100644 --- a/build/includes/global/message_atoms.hpp +++ b/build/includes/global/message_atoms.hpp @@ -49,6 +49,9 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) // Server Actor CAF_ADD_ATOM(summa, done_batch) CAF_ADD_ATOM(summa, time_to_exit) + CAF_ADD_ATOM(summa, start_health_check) + CAF_ADD_ATOM(summa, check_on_clients) + CAF_ADD_ATOM(summa, heartbeat) // Struct Types CAF_ADD_TYPE_ID(summa, (Distributed_Settings)) diff --git a/build/includes/summa_actor/client.hpp b/build/includes/summa_actor/client.hpp index fc413a59c376fe36cb4b2edd1c1afb2e21917e3f..18e8fa9bc83fdb84d342a7b6b1cd33fbe5f48479 100644 --- a/build/includes/summa_actor/client.hpp +++ b/build/includes/summa_actor/client.hpp @@ -50,12 +50,18 @@ class Client_Container { */ void addClient(caf::actor client_actor, std::string hostname); + int getNumClients(); + int getClientID(caf::actor); + Client getClient(int index); + Client removeClient_fromBack(); std::string getHostname_ByClientID(int client_id); bool isEmpty(); + void sendHeartbeats(); + }; \ No newline at end of file diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index 7bbfd2080b9b45eb9bebe29c914bdee2c1aba9ce..bd2f66bd6f23cd63863a8cf651b9e0c538b79e31 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -25,6 +25,9 @@ struct summa_server_state { Client_Container *client_container; Batch_Container *batch_container; + int heartbeat_interval = 20; + caf::actor health_check_reminder_actor; + Distributed_Settings distributed_settings; Summa_Actor_Settings summa_actor_settings; File_Access_Actor_Settings file_access_actor_settings; @@ -36,7 +39,7 @@ struct summa_server_state { behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings); - +behavior cleint_health_check_reminder(event_based_actor* self); int assembleBatches(stateful_actor<summa_server_state>* self); std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self); void initializeCSVOutput(std::string csv_output_name); diff --git a/build/source/actors/summa_actor/client.cpp b/build/source/actors/summa_actor/client.cpp index ac926c7a925184ddc51d6ad9710c2c6e71f87f20..9aa4cdd9ab4d11615afaed9404e497c0dba6cd61 100644 --- a/build/source/actors/summa_actor/client.cpp +++ b/build/source/actors/summa_actor/client.cpp @@ -35,6 +35,18 @@ void Client_Container::addClient(caf::actor client_actor, std::string hostname) } +int Client_Container::getNumClients() { + return this->num_clients; +} + +Client Client_Container::getClient(int index) { + if (index > this->num_clients) { + throw "Trying to access a client outside of the client_list"; + } + + return this->client_list[index]; +} + int Client_Container::getClientID(caf::actor client_actor) { for (int i = 0; i < num_clients; i++) { @@ -58,3 +70,6 @@ Client Client_Container::removeClient_fromBack() { this->client_list.pop_back(); return client; } + + + diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index 41da453e4afb7230a8b253539e3fdf243ce56ae3..05db5347540c50ccc3c9f77a43506baa5aeaa27e 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -119,6 +119,12 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a self->send(server_actor, done_batch_v, self, self->state.client_id, self->state.current_batch); }, + [=](heartbeat) { + aout(self) << "Received Heartbeat \n"; + + self->send(server_actor, heartbeat_v, self->state.client_id); + }, + [=](time_to_exit) { aout(self) << "Client Exiting\n"; self->quit(); diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index 7130168166352bf8795624256b717be545fd6c6c..0f398ec6aac8988e701d2e1bb25fdd99238ac1ed 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -6,6 +6,9 @@ #include "message_atoms.hpp" #include "global.hpp" #include <optional> +#include <iostream> +#include <thread> +#include <chrono> namespace caf { @@ -53,6 +56,11 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett aout(self) << "no more batches left to assign\n"; aout(self) << "we are not done yet. Clients could Fail\n"; } + + // Start the heartbeat actor after a client has connected + self->state.health_check_reminder_actor = self->spawn(cleint_health_check_reminder); + self->send(self->state.health_check_reminder_actor, + start_health_check_v, self, self->state.heartbeat_interval); }, @@ -92,7 +100,18 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett self->quit(); } } - } + }, + + [=](check_on_clients) { + for (int i = 0; i < self->state.client_container->getNumClients(); i++) { + Client client = self->state.client_container->getClient(i); + self->send(client.getActor(), heartbeat_v); + } + }, + + [=](heartbeat, int client_id) { + aout(self) << "Received HeartBeat From: " << client_id << "\n"; + }, }; } @@ -111,4 +130,17 @@ void initializeCSVOutput(std::string csv_output_name) { csv_output.close(); } + + +behavior cleint_health_check_reminder(event_based_actor* self) { + return { + + [=](start_health_check, caf::actor summa_server, int sleep_duration) { + std::this_thread::sleep_for(std::chrono::seconds(sleep_duration)); + self->send(summa_server, check_on_clients_v); + }, + }; + +} + } // end namespace