Skip to content
Snippets Groups Projects
Commit 911989f2 authored by KyleKlenk's avatar KyleKlenk
Browse files

changed lost_likelihood_indicator to lost_potential_indicator

Added threshold and heartbeat interval as parameters that are passed into summa through json
parent e984289c
No related branches found
No related tags found
No related merge requests found
...@@ -15,6 +15,9 @@ struct Distributed_Settings { ...@@ -15,6 +15,9 @@ struct Distributed_Settings {
int port; // the port number of the server actor int port; // the port number of the server actor
int total_hru_count; int total_hru_count;
int num_hru_per_batch; int num_hru_per_batch;
int heartbeat_interval; // number of seconds between each heartbeat message
int lost_node_threshold; // the maximum value the lost_potentail_indicator value can be before
// we assume the node is lost
}; };
template<class Inspector> template<class Inspector>
...@@ -24,7 +27,9 @@ bool inspect(Inspector& inspector, Distributed_Settings& distributed_settings) { ...@@ -24,7 +27,9 @@ bool inspect(Inspector& inspector, Distributed_Settings& distributed_settings) {
inspector.field("hostname", distributed_settings.hostname), inspector.field("hostname", distributed_settings.hostname),
inspector.field("port", distributed_settings.port), inspector.field("port", distributed_settings.port),
inspector.field("total_hru_count", distributed_settings.total_hru_count), inspector.field("total_hru_count", distributed_settings.total_hru_count),
inspector.field("num_hru_per_batch",distributed_settings.num_hru_per_batch)); inspector.field("num_hru_per_batch",distributed_settings.num_hru_per_batch),
inspector.field("heartbeat_interval",distributed_settings.heartbeat_interval),
inspector.field("lost_node_threshold",distributed_settings.lost_node_threshold));
} }
......
...@@ -82,7 +82,7 @@ class Client { ...@@ -82,7 +82,7 @@ class Client {
class Client_Container { class Client_Container {
private: private:
int num_clients = 0; int num_clients = 0;
int lost_client_threshold = 3; // value to determine if client is lost int lost_client_threshold; // value to determine if client is lost
std::vector<Client> client_list; std::vector<Client> client_list;
...@@ -90,7 +90,7 @@ class Client_Container { ...@@ -90,7 +90,7 @@ class Client_Container {
/** /**
* @brief Construct a new Client_Container object * @brief Construct a new Client_Container object
*/ */
Client_Container(); Client_Container(int lost_node_threshold);
// Getters // Getters
/** /**
......
...@@ -25,7 +25,6 @@ struct summa_server_state { ...@@ -25,7 +25,6 @@ struct summa_server_state {
Client_Container *client_container; Client_Container *client_container;
Batch_Container *batch_container; Batch_Container *batch_container;
int heartbeat_interval = 20;
caf::actor health_check_reminder_actor; caf::actor health_check_reminder_actor;
Distributed_Settings distributed_settings; Distributed_Settings distributed_settings;
......
...@@ -26,6 +26,12 @@ int read_settings_from_json(std::string json_settings_file, ...@@ -26,6 +26,12 @@ int read_settings_from_json(std::string json_settings_file,
distributed_settings.num_hru_per_batch = getSettings(json_settings_file, parent_key, distributed_settings.num_hru_per_batch = getSettings(json_settings_file, parent_key,
"num_hru_per_batch", distributed_settings.num_hru_per_batch).value_or(-1); "num_hru_per_batch", distributed_settings.num_hru_per_batch).value_or(-1);
distributed_settings.heartbeat_interval = getSettings(json_settings_file, parent_key,
"heartbeat_interval", distributed_settings.heartbeat_interval).value_or(-1);
distributed_settings.lost_node_threshold = getSettings(json_settings_file, parent_key,
"lost_node_threshold", distributed_settings.lost_node_threshold).value_or(-1);
// read settings for summa actor // read settings for summa actor
parent_key = "Summa_Actor"; parent_key = "Summa_Actor";
...@@ -75,7 +81,9 @@ void check_settings_from_json(Distributed_Settings &distributed_settings, ...@@ -75,7 +81,9 @@ void check_settings_from_json(Distributed_Settings &distributed_settings,
std::cout << distributed_settings.hostname << "\n"; std::cout << distributed_settings.hostname << "\n";
std::cout << distributed_settings.port << "\n"; std::cout << distributed_settings.port << "\n";
std::cout << distributed_settings.total_hru_count << "\n"; std::cout << distributed_settings.total_hru_count << "\n";
std::cout << distributed_settings.num_hru_per_batch << "\n\n\n"; std::cout << distributed_settings.num_hru_per_batch << "\n";
std::cout << distributed_settings.heartbeat_interval << "\n";
std::cout << distributed_settings.lost_node_threshold << "\n\n\n";
std::cout << "************ SUMMA_ACTOR_SETTINGS ************\n"; std::cout << "************ SUMMA_ACTOR_SETTINGS ************\n";
std::cout << summa_actor_settings.max_gru_per_job << "\n\n\n"; std::cout << summa_actor_settings.max_gru_per_job << "\n\n\n";
......
...@@ -101,8 +101,10 @@ void caf_main(actor_system& sys, const config& cfg) { ...@@ -101,8 +101,10 @@ void caf_main(actor_system& sys, const config& cfg) {
aout(self) << "Printing Settings For SUMMA Simulation\n"; aout(self) << "Printing Settings For SUMMA Simulation\n";
check_settings_from_json(distributed_settings, check_settings_from_json(distributed_settings,
summa_actor_settings, file_access_actor_settings, job_actor_settings, summa_actor_settings,
hru_actor_settings); file_access_actor_settings,
job_actor_settings,
hru_actor_settings);
if (distributed_settings.distributed_mode) { if (distributed_settings.distributed_mode) {
// only command line arguments needed are config_file and server-mode // only command line arguments needed are config_file and server-mode
......
...@@ -47,7 +47,9 @@ void Client::decrementLostPotential() { ...@@ -47,7 +47,9 @@ void Client::decrementLostPotential() {
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
Client_Container::Client_Container() {} Client_Container::Client_Container(int lost_node_threshold) {
this->lost_client_threshold = lost_node_threshold;
}
void Client_Container::addClient(caf::actor client_actor, std::string hostname) { void Client_Container::addClient(caf::actor client_actor, std::string hostname) {
int client_id = this->num_clients; int client_id = this->num_clients;
......
...@@ -24,7 +24,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett ...@@ -24,7 +24,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
self->state.job_actor_settings = job_actor_settings; self->state.job_actor_settings = job_actor_settings;
self->state.hru_actor_settings = hru_actor_settings; self->state.hru_actor_settings = hru_actor_settings;
self->state.client_container = new Client_Container(); self->state.client_container = new Client_Container(self->state.distributed_settings.lost_node_threshold);
self->state.batch_container = new Batch_Container( self->state.batch_container = new Batch_Container(
self->state.distributed_settings.total_hru_count, self->state.distributed_settings.total_hru_count,
self->state.distributed_settings.num_hru_per_batch); self->state.distributed_settings.num_hru_per_batch);
...@@ -34,7 +34,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett ...@@ -34,7 +34,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
// Start the heartbeat actor after a client has connected // Start the heartbeat actor after a client has connected
self->state.health_check_reminder_actor = self->spawn(cleint_health_check_reminder); self->state.health_check_reminder_actor = self->spawn(cleint_health_check_reminder);
self->send(self->state.health_check_reminder_actor, self->send(self->state.health_check_reminder_actor,
start_health_check_v, self, self->state.heartbeat_interval); start_health_check_v, self, self->state.distributed_settings.heartbeat_interval);
return { return {
/** /**
...@@ -116,7 +116,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett ...@@ -116,7 +116,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
} }
} }
self->send(self->state.health_check_reminder_actor, self->send(self->state.health_check_reminder_actor,
start_health_check_v, self, self->state.heartbeat_interval); start_health_check_v, self, self->state.distributed_settings.heartbeat_interval);
}, },
[=](heartbeat, int client_id) { [=](heartbeat, int client_id) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment