Skip to content
Snippets Groups Projects
Commit a33a35c0 authored by KyleKlenk's avatar KyleKlenk
Browse files

Got preliminary heartbeating started.

Client can be sent a heartbeat and then send  a message back to the server
parent de3e25ea
No related branches found
No related tags found
No related merge requests found
......@@ -49,6 +49,9 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id)
// Server Actor
CAF_ADD_ATOM(summa, done_batch)
CAF_ADD_ATOM(summa, time_to_exit)
CAF_ADD_ATOM(summa, start_health_check)
CAF_ADD_ATOM(summa, check_on_clients)
CAF_ADD_ATOM(summa, heartbeat)
// Struct Types
CAF_ADD_TYPE_ID(summa, (Distributed_Settings))
......
......@@ -50,12 +50,18 @@ class Client_Container {
*/
void addClient(caf::actor client_actor, std::string hostname);
int getNumClients();
int getClientID(caf::actor);
Client getClient(int index);
Client removeClient_fromBack();
std::string getHostname_ByClientID(int client_id);
bool isEmpty();
void sendHeartbeats();
};
\ No newline at end of file
......@@ -25,6 +25,9 @@ struct summa_server_state {
Client_Container *client_container;
Batch_Container *batch_container;
int heartbeat_interval = 20;
caf::actor health_check_reminder_actor;
Distributed_Settings distributed_settings;
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
......@@ -36,7 +39,7 @@ struct summa_server_state {
behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings,
Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings);
behavior cleint_health_check_reminder(event_based_actor* self);
int assembleBatches(stateful_actor<summa_server_state>* self);
std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self);
void initializeCSVOutput(std::string csv_output_name);
......
......@@ -35,6 +35,18 @@ void Client_Container::addClient(caf::actor client_actor, std::string hostname)
}
int Client_Container::getNumClients() {
return this->num_clients;
}
Client Client_Container::getClient(int index) {
if (index > this->num_clients) {
throw "Trying to access a client outside of the client_list";
}
return this->client_list[index];
}
int Client_Container::getClientID(caf::actor client_actor) {
for (int i = 0; i < num_clients; i++) {
......@@ -58,3 +70,6 @@ Client Client_Container::removeClient_fromBack() {
this->client_list.pop_back();
return client;
}
......@@ -119,6 +119,12 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a
self->send(server_actor, done_batch_v, self, self->state.client_id, self->state.current_batch);
},
[=](heartbeat) {
aout(self) << "Received Heartbeat \n";
self->send(server_actor, heartbeat_v, self->state.client_id);
},
[=](time_to_exit) {
aout(self) << "Client Exiting\n";
self->quit();
......
......@@ -6,6 +6,9 @@
#include "message_atoms.hpp"
#include "global.hpp"
#include <optional>
#include <iostream>
#include <thread>
#include <chrono>
namespace caf {
......@@ -53,6 +56,11 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
aout(self) << "no more batches left to assign\n";
aout(self) << "we are not done yet. Clients could Fail\n";
}
// Start the heartbeat actor after a client has connected
self->state.health_check_reminder_actor = self->spawn(cleint_health_check_reminder);
self->send(self->state.health_check_reminder_actor,
start_health_check_v, self, self->state.heartbeat_interval);
},
......@@ -92,7 +100,18 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
self->quit();
}
}
}
},
[=](check_on_clients) {
for (int i = 0; i < self->state.client_container->getNumClients(); i++) {
Client client = self->state.client_container->getClient(i);
self->send(client.getActor(), heartbeat_v);
}
},
[=](heartbeat, int client_id) {
aout(self) << "Received HeartBeat From: " << client_id << "\n";
},
};
}
......@@ -111,4 +130,17 @@ void initializeCSVOutput(std::string csv_output_name) {
csv_output.close();
}
behavior cleint_health_check_reminder(event_based_actor* self) {
return {
[=](start_health_check, caf::actor summa_server, int sleep_duration) {
std::this_thread::sleep_for(std::chrono::seconds(sleep_duration));
self->send(summa_server, check_on_clients_v);
},
};
}
} // end namespace
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment