Skip to content
Snippets Groups Projects
Commit 1dea976f authored by KyleKlenk's avatar KyleKlenk
Browse files

Can successfully handle lost node in preliminary attempt at hanlding failures.

parent 911989f2
No related branches found
No related tags found
No related merge requests found
...@@ -37,6 +37,7 @@ class Batch { ...@@ -37,6 +37,7 @@ class Batch {
void updateRunTime(double run_time); void updateRunTime(double run_time);
void updateReadTime(double read_time); void updateReadTime(double read_time);
void updateWriteTime(double write_time); void updateWriteTime(double write_time);
void updateAssignedActor(bool boolean);
void printBatchInfo(); void printBatchInfo();
void writeBatchToFile(std::string csv_output); void writeBatchToFile(std::string csv_output);
...@@ -130,6 +131,12 @@ class Batch_Container { ...@@ -130,6 +131,12 @@ class Batch_Container {
*/ */
void printBatches(); void printBatches();
/**
* @brief
*
*/
void updateBatchStatus_LostClient(int batch_id);
private: private:
......
...@@ -43,6 +43,11 @@ class Client { ...@@ -43,6 +43,11 @@ class Client {
*/ */
int getID(); int getID();
/**
* @brief Get the current_batch_id
*/
int getCurrentBatchID();
/** /**
* @brief Get the Hostname of the client * @brief Get the Hostname of the client
*/ */
...@@ -181,4 +186,6 @@ class Client_Container { ...@@ -181,4 +186,6 @@ class Client_Container {
* @return int * @return int
*/ */
int findClientByID(int client_id); int findClientByID(int client_id);
void removeLostClient(int index);
}; };
\ No newline at end of file
...@@ -37,6 +37,15 @@ void Batch_Container::printBatches() { ...@@ -37,6 +37,15 @@ void Batch_Container::printBatches() {
} }
} }
void Batch_Container::updateBatchStatus_LostClient(int batch_id) {
std::optional<int> index = this->findBatch(batch_id);
if (index.has_value()) {
this->batch_list[index.value()].updateAssignedActor(false);
} else {
throw "updateBatchStatus_LostClient - Could not find batch with id";
}
}
std::optional<Batch> Batch_Container::assignBatch(std::string hostname, caf::actor actor_ref) { std::optional<Batch> Batch_Container::assignBatch(std::string hostname, caf::actor actor_ref) {
...@@ -126,6 +135,11 @@ void Batch::updateWriteTime(double write_time) { ...@@ -126,6 +135,11 @@ void Batch::updateWriteTime(double write_time) {
this->write_time = write_time; this->write_time = write_time;
} }
void Batch::updateAssignedActor(bool boolean) {
this->assigned_to_actor = boolean;
}
// general methods
void Batch::assignToActor(std::string hostname, caf::actor assigned_actor) { void Batch::assignToActor(std::string hostname, caf::actor assigned_actor) {
this->hostname = hostname; this->hostname = hostname;
this->assigned_actor = assigned_actor; this->assigned_actor = assigned_actor;
......
...@@ -22,6 +22,10 @@ int Client::getID() { ...@@ -22,6 +22,10 @@ int Client::getID() {
return this->id; return this->id;
} }
int Client::getCurrentBatchID() {
return this->current_batch_id;
}
std::string Client::getHostname() { std::string Client::getHostname() {
return this->hostname; return this->hostname;
} }
...@@ -126,6 +130,11 @@ int Client_Container::findClientByID(int client_id) { ...@@ -126,6 +130,11 @@ int Client_Container::findClientByID(int client_id) {
throw "Cannot Find Client"; throw "Cannot Find Client";
} }
void Client_Container::removeLostClient(int index) {
this->client_list.erase(this->client_list.begin() + index);
this->num_clients--;
}
......
...@@ -67,6 +67,13 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett ...@@ -67,6 +67,13 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
}, },
/**
* @brief Construct a new [=] object
*
* @param client_actor
* @param client_id
* @param batch
*/
[=](done_batch, actor client_actor, int client_id, Batch& batch) { [=](done_batch, actor client_actor, int client_id, Batch& batch) {
aout(self) << "Recieved Completed Batch From Client\n"; aout(self) << "Recieved Completed Batch From Client\n";
...@@ -105,12 +112,21 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett ...@@ -105,12 +112,21 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
} }
}, },
/**
* @brief Construct a new [=] object
*
*/
[=](check_on_clients) { [=](check_on_clients) {
for (int i = 0; i < self->state.client_container->getNumClients(); i++) { for (int i = 0; i < self->state.client_container->getNumClients(); i++) {
Client client = self->state.client_container->getClient(i); Client client = self->state.client_container->getClient(i);
if(self->state.client_container->checkForLostClient(i)) { if(self->state.client_container->checkForLostClient(i)) {
// Client May Be Lost // Client May Be Lost
aout(self) << "Client " << client.getID() << " is considered lost\n"; aout(self) << "Client " << client.getID() << " is considered lost\n";
self->state.batch_container->updateBatchStatus_LostClient(client.getCurrentBatchID());
self->state.client_container->removeLostClient(i);
} else { } else {
self->send(client.getActor(), heartbeat_v); self->send(client.getActor(), heartbeat_v);
} }
...@@ -119,6 +135,11 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett ...@@ -119,6 +135,11 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
start_health_check_v, self, self->state.distributed_settings.heartbeat_interval); start_health_check_v, self, self->state.distributed_settings.heartbeat_interval);
}, },
/**
* @brief Construct a new [=] object
*
* @param client_id
*/
[=](heartbeat, int client_id) { [=](heartbeat, int client_id) {
aout(self) << "Received HeartBeat From: " << client_id << "\n"; aout(self) << "Received HeartBeat From: " << client_id << "\n";
self->state.client_container->decrementLostPotential(client_id); self->state.client_container->decrementLostPotential(client_id);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment