Skip to content
Snippets Groups Projects
Commit 57a2ea7f authored by KyleKlenk's avatar KyleKlenk
Browse files

Patch for issue with backup server taking over for server actor:

 - When the server goes down and the backup takes over a client going down would not be noticed by the backup server.
 - The problem was the backup server was never monitoring the clients. This is now fixed
parent cf03c8d7
No related branches found
No related tags found
No related merge requests found
#!/bin/bash
#SBATCH --cpus-per-task=1
#SBATCH --time=01:00:00
#SBATCH --mem=4G
#SBATCH --cpus-per-task=32
#SBATCH --time=02:00:00
#SBATCH --mem=32G
#SBATCH --job-name=Summa-Actors
#SBATCH --mail-user=kyle.klenk@usask.ca
#SBATCH --mail-type=ALL
......@@ -9,9 +9,9 @@
#SBATCH --account=hpc_c_giws_clark
#SBATCH --array=0-9
offset=$SLURM_ARRAY_TASK_ID
startGRU=$((offset*100+1))
/globalhome/kck540/HPC/Summa-Projects/Summa-Actors/build/summa/bin/summa.exe -g $startGRU 100 -m /project/gwf/gwf_cmt/kck540/domain_NorthAmerica/summa_actors_input/OutputTesting/fileManager_SummaOriginal.txt
# offset=$SLURM_ARRAY_TASK_ID
# startGRU=$((offset*100+1))
# /globalhome/kck540/HPC/Summa-Projects/Summa-Actors/build/summa/bin/summa.exe -g $startGRU 100 -m /project/gwf/gwf_cmt/kck540/domain_NorthAmerica/summa_actors_input/OutputTesting/fileManager_SummaOriginal.txt
# then
# /globalhome/kck540/HPC/Summa-Projects/Summa-Actors/bin/summa_actors -g 1 -n 1000 -c /project/gwf/gwf_cmt/kck540/domain_NorthAmerica/summa_actors_input/Summa_Actors_Settings.json
/globalhome/kck540/HPC/Summa-Projects/Summa-Actors/bin/summa_actors -g 1 -n 10000 -c /project/gwf/gwf_cmt/kck540/domain_NorthAmerica/summa_actors_input/Summa_Actors_Settings.json
......@@ -69,7 +69,16 @@ void checkForIdleClients(stateful_actor<summa_server_state>* self);
void notifyBackupServersOfRemovedClient(stateful_actor<summa_server_state>* self, Client client);
// Finds the batch the lost client was working on and reassigns it to another client if available
// If no client is available then the batch is added back to the list to be reassigned later
void resolveLostClient(stateful_actor<summa_server_state>* self, Client client);
// Removes the backup server from the list of backup servers
// All connected actors are then notified of the change
void resolveLostBackupServer(stateful_actor<summa_server_state>* self, const down_msg& dm);
// Convience function to keep code clean - just does what you think it does
void printRemainingBatches(stateful_actor<summa_server_state>* self);
} // namespace caf
......@@ -26,16 +26,26 @@ behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Dist
self->set_down_handler([=](const down_msg& dm){
if(dm.source == self->state.current_server) {
aout(self) << "*** Lost Connection to Server\n";
// check if we should become the new server
if (std::get<0>(self->state.backup_servers_list[0]) == self) {
aout(self) << "*** Becoming New Server\n";
self->state.backup_servers_list.erase(self->state.backup_servers_list.begin());
self->become(summa_server(self));
if (self->state.current_server_actor == self) {
aout(self) << "Lost Connection With A Connected Actor\n";
std::optional<Client> client = self->state.client_container.getClient(dm.source);
if (client.has_value()) {
resolveLostClient(self, client.value());
} else {
aout(self) << "Still A backup - but need to connect to new server\n";
connecting_backup(self, std::get<1>(self->state.backup_servers_list[0]), (uint16_t) self->state.distributed_settings.port);
resolveLostBackupServer(self, dm);
}
} else {
if(dm.source == self->state.current_server) {
aout(self) << "*** Lost Connection to Server\n";
// check if we should become the new server
if (std::get<0>(self->state.backup_servers_list[0]) == self) {
aout(self) << "*** Becoming New Server\n";
self->state.backup_servers_list.erase(self->state.backup_servers_list.begin());
self->become(summa_server(self));
} else {
aout(self) << "Still A backup - but need to connect to new server\n";
connecting_backup(self, std::get<1>(self->state.backup_servers_list[0]), (uint16_t) self->state.distributed_settings.port);
}
}
}
});
......@@ -112,6 +122,7 @@ behavior summa_backup_server(stateful_actor<summa_server_state>* self, const act
// New Client has been received
[=](new_client, caf::actor client_actor, std::string hostname) {
aout(self) << "Received a new client from the lead server\n";
self->monitor(client_actor);
self->state.client_container.addClient(client_actor, hostname);
},
......
......@@ -32,6 +32,7 @@ behavior summa_client_init(stateful_actor<summa_client_state>* self) {
behavior summa_client(stateful_actor<summa_client_state>* self) {
self->state.running = true;
self->send(self->state.current_server_actor, connect_to_server_v, self, self->state.hostname);
return {
......
......@@ -11,17 +11,9 @@ behavior summa_server_init(stateful_actor<summa_server_state>* self, Distributed
aout(self) << "Lost Connection With A Connected Actor\n";
std::optional<Client> client = self->state.client_container.getClient(dm.source);
if (client.has_value()) {
aout(self) << "Lost Client: " << client.value().getHostname() << "\n";
std::optional<Batch> batch = client.value().getBatch();
self->state.batch_container.setBatchUnassigned(batch.value());
self->state.client_container.removeClient(client.value());
notifyBackupServersOfRemovedClient(self, client.value());
checkForIdleClients(self);
resolveLostClient(self, client.value());
} else {
aout(self) << "Lost Backup Server\n";
findAndRemoveLostBackupServer(self, dm.source);
sendAllBackupServersList(self);
resolveLostBackupServer(self, dm);
}
});
......@@ -46,7 +38,7 @@ behavior summa_server_init(stateful_actor<summa_server_state>* self, Distributed
}
behavior summa_server(stateful_actor<summa_server_state>* self) {
self->state.current_server_actor = self;
aout(self) << "Server is Running \n";
return {
......@@ -225,6 +217,8 @@ void checkForIdleClients(stateful_actor<summa_server_state>* self) {
self->send(backup_server_actor, new_assigned_batch_v, client.value().getActor(), new_batch.value());
}
}
} else {
aout(self) << "No idle clients found, batch will be added to the back of the list\n";
}
}
......@@ -235,4 +229,19 @@ void notifyBackupServersOfRemovedClient(stateful_actor<summa_server_state>* self
}
}
void resolveLostClient(stateful_actor<summa_server_state>* self, Client client) {
aout(self) << "Lost Client: " << client.getHostname() << "\n";
std::optional<Batch> batch = client.getBatch();
self->state.batch_container.setBatchUnassigned(batch.value());
self->state.client_container.removeClient(client);
notifyBackupServersOfRemovedClient(self, client);
checkForIdleClients(self);
}
void resolveLostBackupServer(stateful_actor<summa_server_state>* self, const down_msg& dm) {
aout(self) << "Lost Backup Server\n";
findAndRemoveLostBackupServer(self, dm.source);
sendAllBackupServersList(self);
}
} // end namespace
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment