diff --git a/build/includes/summa_actor/client.hpp b/build/includes/summa_actor/client.hpp index 18e8fa9bc83fdb84d342a7b6b1cd33fbe5f48479..68d6f6cadaba943e11f1461607b5b06ae67ca75f 100644 --- a/build/includes/summa_actor/client.hpp +++ b/build/includes/summa_actor/client.hpp @@ -12,12 +12,14 @@ class Client { bool connected; caf::actor client_actor; std::string hostname; - Batch* current_batch; + int current_batch_id; public: Client(int id, caf::actor client_actor, std::string hostname); + void updateCurrentBatchID(int batch_id); + caf::actor getActor(); int getID(); @@ -50,18 +52,61 @@ class Client_Container { */ void addClient(caf::actor client_actor, std::string hostname); + /** + * @brief Update the current batch id the client is working on + * + * @param client_id The id of the client we want to update the batch for + * @param batch_id The id of the batch + */ + void updateCurrentBatch(int client_id, int batch_id); + + /** + * @brief Get the number of connected clients + * + * @return int + */ int getNumClients(); - int getClientID(caf::actor); + /** + * @brief Get the Client ID of a cleint from its actor ref + * + * @param cleint_actor + * @return int + */ + int getClientID(caf::actor client_actor); + /** + * @brief Get a client from the client list + * This is used when we need to get all of the + * clients but we do not want to remove them + * from the client_list; + * @param index + * @return Client + */ Client getClient(int index); + /** + * @brief Removes a client from the back of the list + * Used when we are finished and want to pop the clients + * off the list to send them an exit message + * @return Client + */ Client removeClient_fromBack(); + /** + * @brief Get the Hostname of a client by their ClientID + * + * @param client_id + * @return std::string + */ std::string getHostname_ByClientID(int client_id); + /** + * @brief Check if the client list is empty + * + * @return true + * @return false + */ bool isEmpty(); - void sendHeartbeats(); - }; \ No newline at end of file diff --git a/build/source/actors/summa_actor/client.cpp b/build/source/actors/summa_actor/client.cpp index 9aa4cdd9ab4d11615afaed9404e497c0dba6cd61..220676dd1e16afd0191ee6c8279426fac1b2d534 100644 --- a/build/source/actors/summa_actor/client.cpp +++ b/build/source/actors/summa_actor/client.cpp @@ -14,6 +14,10 @@ caf::actor Client::getActor() { return this->client_actor; } +void Client::updateCurrentBatchID(int batch_id) { + this->current_batch_id = batch_id; +} + int Client::getID() { return this->id; } @@ -71,5 +75,13 @@ Client Client_Container::removeClient_fromBack() { return client; } +void Client_Container::updateCurrentBatch(int client_id, int batch_id) { + for (int i = 0; i < num_clients; i++) { + if (client_id == this->client_list[i].getID()){ + this->client_list[i].updateCurrentBatchID(batch_id); + } + } +} + diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index 0f398ec6aac8988e701d2e1bb25fdd99238ac1ed..b30a03be550c809353c370fef1c4d68db97885fa 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -31,6 +31,11 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett self->state.batch_container->printBatches(); + // Start the heartbeat actor after a client has connected + self->state.health_check_reminder_actor = self->spawn(cleint_health_check_reminder); + self->send(self->state.health_check_reminder_actor, + start_health_check_v, self, self->state.heartbeat_interval); + return { /** * @brief A message from a client requresting to connect @@ -51,17 +56,15 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett std::optional<Batch> batch = self->state.batch_container->assignBatch(hostname, client_actor); if (batch.has_value()) { + self->state.client_container->updateCurrentBatch( + self->state.client_container->getClientID(client_actor), + batch.value().getBatchID()); self->send(client_actor, batch.value()); } else { aout(self) << "no more batches left to assign\n"; aout(self) << "we are not done yet. Clients could Fail\n"; } - - // Start the heartbeat actor after a client has connected - self->state.health_check_reminder_actor = self->spawn(cleint_health_check_reminder); - self->send(self->state.health_check_reminder_actor, - start_health_check_v, self, self->state.heartbeat_interval); - + }, [=](done_batch, actor client_actor, int client_id, Batch& batch) { @@ -107,10 +110,13 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett Client client = self->state.client_container->getClient(i); self->send(client.getActor(), heartbeat_v); } + self->send(self->state.health_check_reminder_actor, + start_health_check_v, self, self->state.heartbeat_interval); }, [=](heartbeat, int client_id) { aout(self) << "Received HeartBeat From: " << client_id << "\n"; + }, }; }