diff --git a/build/includes/summa_actor/batch_manager.hpp b/build/includes/summa_actor/batch_manager.hpp index 684684b587f33b98ab8181fed22e253a7d7afcb7..11cb442a8beb7098058486384a42aceb5dea67d6 100644 --- a/build/includes/summa_actor/batch_manager.hpp +++ b/build/includes/summa_actor/batch_manager.hpp @@ -37,6 +37,7 @@ class Batch { void updateRunTime(double run_time); void updateReadTime(double read_time); void updateWriteTime(double write_time); + void updateAssignedActor(bool boolean); void printBatchInfo(); void writeBatchToFile(std::string csv_output); @@ -130,6 +131,12 @@ class Batch_Container { */ void printBatches(); + /** + * @brief + * + */ + void updateBatchStatus_LostClient(int batch_id); + private: diff --git a/build/includes/summa_actor/client.hpp b/build/includes/summa_actor/client.hpp index 5883e9835d6b6ea22a5e82445e0d6b3aa630ea51..22081f50a6885ea383d9b4186af96370783c560a 100644 --- a/build/includes/summa_actor/client.hpp +++ b/build/includes/summa_actor/client.hpp @@ -43,6 +43,11 @@ class Client { */ int getID(); + /** + * @brief Get the current_batch_id + */ + int getCurrentBatchID(); + /** * @brief Get the Hostname of the client */ @@ -181,4 +186,6 @@ class Client_Container { * @return int */ int findClientByID(int client_id); + + void removeLostClient(int index); }; \ No newline at end of file diff --git a/build/source/actors/summa_actor/batch_manager.cpp b/build/source/actors/summa_actor/batch_manager.cpp index 366b9cf37713b4eae8dd355ae0e1bb31676c80d2..9709b4ea2b8758c32d5ded4cf24e4be68aed02d8 100644 --- a/build/source/actors/summa_actor/batch_manager.cpp +++ b/build/source/actors/summa_actor/batch_manager.cpp @@ -37,6 +37,15 @@ void Batch_Container::printBatches() { } } +void Batch_Container::updateBatchStatus_LostClient(int batch_id) { + std::optional<int> index = this->findBatch(batch_id); + if (index.has_value()) { + this->batch_list[index.value()].updateAssignedActor(false); + } else { + throw "updateBatchStatus_LostClient - Could not find batch with id"; + } +} + std::optional<Batch> Batch_Container::assignBatch(std::string hostname, caf::actor actor_ref) { @@ -126,6 +135,11 @@ void Batch::updateWriteTime(double write_time) { this->write_time = write_time; } +void Batch::updateAssignedActor(bool boolean) { + this->assigned_to_actor = boolean; +} + +// general methods void Batch::assignToActor(std::string hostname, caf::actor assigned_actor) { this->hostname = hostname; this->assigned_actor = assigned_actor; diff --git a/build/source/actors/summa_actor/client.cpp b/build/source/actors/summa_actor/client.cpp index 34f81cf81faeaf24b1b42f20bc20bec3268e2cb7..8dd9c0888387d8f876e3da1a8606613e9b9764ce 100644 --- a/build/source/actors/summa_actor/client.cpp +++ b/build/source/actors/summa_actor/client.cpp @@ -22,6 +22,10 @@ int Client::getID() { return this->id; } +int Client::getCurrentBatchID() { + return this->current_batch_id; +} + std::string Client::getHostname() { return this->hostname; } @@ -126,6 +130,11 @@ int Client_Container::findClientByID(int client_id) { throw "Cannot Find Client"; } +void Client_Container::removeLostClient(int index) { + this->client_list.erase(this->client_list.begin() + index); + this->num_clients--; +} + diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index bab058610bcb4adee52ceb9fa086a5de420de554..3b4d886ae3440a49fece9b88535c887b5b342c58 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -67,6 +67,13 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett }, + /** + * @brief Construct a new [=] object + * + * @param client_actor + * @param client_id + * @param batch + */ [=](done_batch, actor client_actor, int client_id, Batch& batch) { aout(self) << "Recieved Completed Batch From Client\n"; @@ -105,12 +112,21 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett } }, + /** + * @brief Construct a new [=] object + * + */ [=](check_on_clients) { for (int i = 0; i < self->state.client_container->getNumClients(); i++) { Client client = self->state.client_container->getClient(i); if(self->state.client_container->checkForLostClient(i)) { // Client May Be Lost aout(self) << "Client " << client.getID() << " is considered lost\n"; + + self->state.batch_container->updateBatchStatus_LostClient(client.getCurrentBatchID()); + + self->state.client_container->removeLostClient(i); + } else { self->send(client.getActor(), heartbeat_v); } @@ -119,6 +135,11 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett start_health_check_v, self, self->state.distributed_settings.heartbeat_interval); }, + /** + * @brief Construct a new [=] object + * + * @param client_id + */ [=](heartbeat, int client_id) { aout(self) << "Received HeartBeat From: " << client_id << "\n"; self->state.client_container->decrementLostPotential(client_id);