diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp index a33361f46991a20c3a381f0ea0426d1368d55a99..639adbab1f8425127047d2b96203b6616cca67f8 100644 --- a/build/includes/summa_actor/summa_client.hpp +++ b/build/includes/summa_actor/summa_client.hpp @@ -14,32 +14,33 @@ namespace caf { struct summa_client_state { - strong_actor_ptr current_server = nullptr; - actor current_server_actor; - std::vector<strong_actor_ptr> servers; - - std::string hostname; - actor summa_actor_ref; - uint16_t port; - int batch_id; - int client_id; // id held by server - bool running = false; // initalized to false - flipped to true when client returns behavior summa_client - - - // tuple is the actor ref and hostname of the backup server - std::vector<std::tuple<caf::actor, std::string>> backup_servers_list; - - Batch current_batch; - bool saved_batch = false; - - Summa_Actor_Settings summa_actor_settings; - File_Access_Actor_Settings file_access_actor_settings; - Job_Actor_Settings job_actor_settings; - HRU_Actor_Settings hru_actor_settings; + strong_actor_ptr current_server = nullptr; + actor current_server_actor; + std::vector<strong_actor_ptr> servers; + + std::string hostname; + actor summa_actor_ref; + uint16_t port; + int batch_id; + int client_id; // id held by server + bool running = false; // initalized to false - flipped to true when client returns behavior summa_client + + + // tuple is the actor ref and hostname of the backup server + std::vector<std::tuple<caf::actor, std::string>> backup_servers_list; + + Batch current_batch; + bool saved_batch = false; + + Distributed_Settings distributed_settings; + + Summa_Actor_Settings summa_actor_settings; + File_Access_Actor_Settings file_access_actor_settings; + Job_Actor_Settings job_actor_settings; + HRU_Actor_Settings hru_actor_settings; }; -behavior summa_client_init(stateful_actor<summa_client_state>* self); -behavior summa_client(stateful_actor<summa_client_state>* self); +behavior summa_client(stateful_actor<summa_client_state>* self, Distributed_Settings Distributed_Settings); void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port); diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index 314d62111ffc156dc6765db9dd1118423c886fce..74739387b748d8099fbdd2833a8bd6eeb995a51f 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -19,41 +19,45 @@ namespace caf { struct summa_server_state { - strong_actor_ptr current_server; // if server is a backup then this will be set to the lead server - actor current_server_actor; - - std::string hostname; - - std::string csv_file_path; - - std::string csv_output_name = "/batch_results.csv"; - - - Client_Container client_container; - Batch_Container batch_container; - - // Actor Reference, Hostname - std::vector<std::tuple<caf::actor, std::string>> backup_servers_list; - - // Settings Structures - Distributed_Settings distributed_settings; - Summa_Actor_Settings summa_actor_settings; - File_Access_Actor_Settings file_access_actor_settings; - Job_Actor_Settings job_actor_settings; - HRU_Actor_Settings hru_actor_settings; + // Server reference -- set if this is a backup server + strong_actor_ptr current_server; + actor current_server_actor; + + // Our hostnme + std::string hostname; + + // Output CSV file + std::string csv_file_path; + std::string csv_output_name = "/batch_results.csv"; + + // Containers + Client_Container client_container; + Batch_Container batch_container; + // Actor Reference, Hostname + std::vector<std::tuple<caf::actor, std::string>> backup_servers_list; + + // Settings Structures + Distributed_Settings distributed_settings; + Summa_Actor_Settings summa_actor_settings; + File_Access_Actor_Settings file_access_actor_settings; + Job_Actor_Settings job_actor_settings; + HRU_Actor_Settings hru_actor_settings; + + // Timing vars + using chrono_time = std::chrono::time_point<std::chrono::system_clock>; + chrono_time start_time; + chrono_time end_time; + bool started_simulation = false; }; -// Summa Server setup behaviour - initializes the state for the server -behavior summa_server_init(stateful_actor<summa_server_state>* self, - Distributed_Settings distributed_settings, - Summa_Actor_Settings summa_actor_settings, - File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, - HRU_Actor_Settings hru_actor_settings); - // Summa Server behaviour - handles messages from clients -behavior summa_server(stateful_actor<summa_server_state>* self); +behavior summa_server(stateful_actor<summa_server_state>* self, + Distributed_Settings distributed_settings, + Summa_Actor_Settings summa_actor_settings, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings); // Summa Server backup behaviour - handles the exit messages for clients behavior summa_server_exit(stateful_actor<summa_server_state>* self); @@ -65,7 +69,8 @@ void initializeCSVOutput(std::string csv_output_path); void sendAllBackupServersList(stateful_actor<summa_server_state>* self); // Look for the lost backup server in the backup servers list and remove it -void findAndRemoveLostBackupServer(stateful_actor<summa_server_state>* self, actor_addr lost_backup_server); +void findAndRemoveLostBackupServer(stateful_actor<summa_server_state>* self, + actor_addr lost_backup_server); // Check for an idle client to send the failed or next batch we find that is not assigned void checkForIdleClients(stateful_actor<summa_server_state>* self); diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index 22a338b5c93a5214eb1cbc967a7694f8cd142fec..412e85430cafe98908c2ebbabd1383820bd96606 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -153,22 +153,22 @@ behavior job_actor(stateful_actor<job_state>* self, aout(self) << "Job_Actor: Done Update for timestep:" << self->state.timestep << "\n"; // write the output - // int steps_to_write = 1; - // int start_gru = 1; - // self->request(self->state.file_access_actor, caf::infinite, - // write_output_v, steps_to_write, start_gru, self->state.num_gru).await( - // [=](int err) { - // if (err != 0) { - // aout(self) << "Job_Actor: Error Writing Output\n"; - // for (auto GRU : self->state.gru_container.gru_list) - // self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown); + int steps_to_write = 1; + int start_gru = 1; + self->request(self->state.file_access_actor, caf::infinite, + write_output_v, steps_to_write, start_gru, self->state.num_gru).await( + [=](int err) { + if (err != 0) { + aout(self) << "Job_Actor: Error Writing Output\n"; + for (auto GRU : self->state.gru_container.gru_list) + self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown); - // self->send_exit(self->state.file_access_actor, - // exit_reason::user_shutdown); - // self->quit(); - // } - // // else { aout(self) << "Job_Actor: Done Writing Output\n"; } - // }); + self->send_exit(self->state.file_access_actor, + exit_reason::user_shutdown); + self->quit(); + } + // else { aout(self) << "Job_Actor: Done Writing Output\n"; } + }); self->state.timestep++; self->state.forcingStep++; diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index fa841f58eae571787847fe4791d5f6ab338e3438..7762e2809e7d865b44c151733096e7b430108fa4 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -65,19 +65,6 @@ class config : public actor_system_config { } }; -void publish_server(caf::actor actor_to_publish, int port_number) { - std::cout << "Attempting to publish summa_server_actor on port " - << port_number << std::endl; - auto is_port = io::publish(actor_to_publish, port_number); - if (!is_port) { - std::cerr << "********PUBLISH FAILED*******" - << to_string(is_port.error()) << "\n"; - return; - } - std::cout << "Successfully Published summa_server_actor on port " - << *is_port << "\n"; -} - void connect_client(caf::actor client_to_connect, std::string host_to_connect_to, int port_number) { if (!host_to_connect_to.empty() && port_number > 0) { uint16_t port = 4444; @@ -93,10 +80,12 @@ void run_client(actor_system& system, const config& cfg, Distributed_Settings di aout(self) << "Starting SUMMA-Client in Distributed Mode\n"; - auto client = system.spawn(summa_client_init); - for (auto host : distributed_settings.servers_list) { - connect_client(client, host, distributed_settings.port); - } + auto client = system.spawn(summa_client, distributed_settings); + + // Connect to the servers + // for (auto host : distributed_settings.servers_list) { + // connect_client(client, host, distributed_settings.port); + // } } @@ -123,19 +112,17 @@ void run_server(actor_system& system, const config& cfg, job_actor_settings, hru_actor_settings); - publish_server(server, distributed_settings.port); - connect_client(server, distributed_settings.servers_list[0], distributed_settings.port); + // publish_server(server, distributed_settings.port); + // connect_client(server, distributed_settings.servers_list[0], distributed_settings.port); } else { aout(self) << "\n\n*****Starting SUMMA-Server*****\n\n"; - auto server = system.spawn(summa_server_init, - distributed_settings, - summa_actor_settings, - file_access_actor_settings, - job_actor_settings, - hru_actor_settings); - - publish_server(server, distributed_settings.port); + auto server = system.spawn(summa_server, + distributed_settings, + summa_actor_settings, + file_access_actor_settings, + job_actor_settings, + hru_actor_settings); } } @@ -146,17 +133,17 @@ void caf_main(actor_system& sys, const config& cfg) { int err; if (cfg.generate_config) { - std::cout << "Generating Config File" << std::endl; - generate_config_file(); - return; + std::cout << "Generating Config File" << std::endl; + generate_config_file(); + return; } // Check if the master file was if not check if the config file was specified if (!std::filesystem::exists((std::filesystem::path) cfg.master_file)) { if (!std::filesystem::exists((std::filesystem::path) cfg.config_file)) { aout(self) << "\n\n**** Config (-c) or Master File (-m) " - << "Does Not Exist or Not Specified!! ****\n\n" - << command_line_help << std::endl; + << "Does Not Exist or Not Specified!! ****\n\n" + << command_line_help << std::endl; return; } } diff --git a/build/source/actors/summa_actor/summa_actor.cpp b/build/source/actors/summa_actor/summa_actor.cpp index 2327f784cb0d0fbcec9eec0fea0b0c57a270cf11..532780871278746cd3591515fba9252894f49dea 100644 --- a/build/source/actors/summa_actor/summa_actor.cpp +++ b/build/source/actors/summa_actor/summa_actor.cpp @@ -143,28 +143,28 @@ behavior summa_actor(stateful_actor<summa_actor_state>* self, if (self->state.batch_container.hasUnsolvedBatches()) { spawnJob(self); } else { - aout(self) << "All Batches Finished\n"; - aout(self) << self->state.batch_container.getAllBatchInfoString(); + aout(self) << "All Batches Finished\n" + << self->state.batch_container.getAllBatchInfoString(); self->state.summa_actor_timing.updateEndPoint("total_duration"); + + double total_dur_sec = self->state.summa_actor_timing.getDuration( + "total_duration").value_or(-1.0); + double total_dur_min = total_dur_sec / 60; + double total_dur_hr = total_dur_min / 60; + double read_dur_sec = self->state.batch_container.getTotalReadTime(); + double write_dur_sec = self->state.batch_container.getTotalWriteTime(); aout(self) << "\n________________SUMMA INFO________________\n" - << "Total Duration = " - << self->state.summa_actor_timing.getDuration( - "total_duration").value_or(-1.0) << " Seconds\n" - << "Total Duration = " - << self->state.summa_actor_timing.getDuration( - "total_duration").value_or(-1.0) / 60 << " Minutes\n" - << "Total Duration = " - << (self->state.summa_actor_timing.getDuration( - "total_duration").value_or(-1.0) / 60) / 60 << " Hours\n" - << "Total Read Duration = " - << self->state.batch_container.getTotalReadTime() << "Seconds\n" - << "Total Write Duration = " - << self->state.batch_container.getTotalWriteTime() << "Seconds\n" - << "Num Failed = " << self->state.numFailed << "\n" - << "___________________Program Finished__________________\n"; + << "Total Duration = " << total_dur_sec << " Seconds\n" + << "Total Duration = " << total_dur_min << " Minutes\n" + << "Total Duration = " << total_dur_hr << " Hours\n" + << "Total Read Duration = " << read_dur_sec << "Seconds\n" + << "Total Write Duration = " << write_dur_sec << "Seconds\n" + << "Num Failed = " << self->state.numFailed << "\n" + << "___________________Program Finished__________________\n"; - self->quit(); + self->send(self->state.parent, done_batch_v, total_dur_sec, + read_dur_sec, write_dur_sec); } }, diff --git a/build/source/actors/summa_actor/summa_backup_server.cpp b/build/source/actors/summa_actor/summa_backup_server.cpp index ac5df402748494a957657ecc0e4716811bf34b54..93ee75988438ea9a1ec41b0198d34c439b54500f 100644 --- a/build/source/actors/summa_actor/summa_backup_server.cpp +++ b/build/source/actors/summa_actor/summa_backup_server.cpp @@ -38,7 +38,12 @@ behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Dist if (std::get<0>(self->state.backup_servers_list[0]) == self) { aout(self) << "*** Becoming New Server\n"; self->state.backup_servers_list.erase(self->state.backup_servers_list.begin()); - self->become(summa_server(self)); + self->become(summa_server(self, + self->state.distributed_settings, + self->state.summa_actor_settings, + self->state.file_access_actor_settings, + self->state.job_actor_settings, + self->state.hru_actor_settings)); } else { aout(self) << "Still A backup - but need to connect to new server\n"; connecting_backup(self, std::get<1>(self->state.backup_servers_list[0]), (uint16_t) self->state.distributed_settings.port); diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index 46d58af649c03be5043e1151648a71cab2e66028..6f11d6cfecec9089e9a52b2ab4e3be1bcb2193d8 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -1,40 +1,59 @@ #include "summa_client.hpp" namespace caf { -behavior summa_client_init(stateful_actor<summa_client_state>* self) { - aout(self) << "Client Has Started Successfully" << std::endl; - - char host[HOST_NAME_MAX]; - gethostname(host, HOST_NAME_MAX); - self->state.hostname = host; - self->set_down_handler([=](const down_msg& dm){ - if(dm.source == self->state.current_server) { - aout(self) << "*** Lost Connection to Server" << std::endl; - self->state.current_server = nullptr; - // try to connect to new server - if (self->state.backup_servers_list.size() > 0) { - aout(self) << "Trying to connect to backup server" << std::endl; - std::this_thread::sleep_for(std::chrono::seconds(3)); // sleep to give the server time to start - connecting(self, std::get<1>(self->state.backup_servers_list[0]), self->state.port); - - } else { - aout(self) << "No backup servers available" << std::endl; - } - } - }); - - return { - [=] (connect_atom, const std::string& host, uint16_t port) { - connecting(self, host, port); - } - }; -} +// behavior summa_client_init(stateful_actor<summa_client_state>* self) { +// // aout(self) << "Client Has Started Successfully" << std::endl; + +// char host[HOST_NAME_MAX]; +// gethostname(host, HOST_NAME_MAX); +// self->state.hostname = host; +// self->set_down_handler([=](const down_msg& dm){ +// if(dm.source == self->state.current_server) { +// aout(self) << "*** Lost Connection to Server" << std::endl; +// self->state.current_server = nullptr; +// // try to connect to new server +// if (self->state.backup_servers_list.size() > 0) { +// aout(self) << "Trying to connect to backup server\n"; +// std::this_thread::sleep_for(std::chrono::seconds(3)); +// // TODO: Not obvious where the code goes from here. +// connecting(self, std::get<1>(self->state.backup_servers_list[0]), +// self->state.port); + +// } else { +// aout(self) << "No backup servers available" << std::endl; +// } +// } +// }); + +// return { +// [=] (connect_atom, const std::string& host, uint16_t port) { +// aout(self) << "Received a connect request while not running\n"; +// connecting(self, host, port); +// } +// }; +// } -behavior summa_client(stateful_actor<summa_client_state>* self) { - - self->state.running = true; - self->send(self->state.current_server_actor, connect_to_server_v, self, self->state.hostname); +behavior summa_client(stateful_actor<summa_client_state>* self, + Distributed_Settings distributed_settings) { + + self->state.running = true; + self->state.distributed_settings = distributed_settings; + for (auto host : distributed_settings.servers_list) { + auto server = self->system().middleman().remote_actor(host, + distributed_settings.port); + if (!server) { + aout(self) << "Failed To Connect To Server\n"; + return {}; + } + aout(self) << "Connected to Server\n"; + // self->state.servers.push_back(server); + self->state.current_server_actor = *server; + self->state.current_server = actor_cast<strong_actor_ptr>(*server); + } + + self->send(self->state.current_server_actor, connect_to_server_v, self, + self->state.hostname); return { // Response from the server on successful connection [=](connect_to_server, @@ -128,39 +147,4 @@ behavior summa_client(stateful_actor<summa_client_state>* self) { }; } - -void connecting(stateful_actor<summa_client_state>* self, const std::string& host, uint16_t port) { - self->state.current_server = nullptr; - self->state.port = port; - auto mm = self->system().middleman().actor_handle(); - self->request(mm, infinite, connect_atom_v, host, port) - .await( - [=](const node_id&, strong_actor_ptr serv, - const std::set<std::string>& ifs) { - if (!serv) { - aout(self) << R"(*** no server found at ")" << host << R"(":)" << port - << std::endl; - return; - } - if (!ifs.empty()) { - aout(self) << R"(*** typed actor found at ")" << host << R"(":)" - << port << ", but expected an untyped actor " << std::endl; - return; - } - aout(self) << "*** successfully connected to server" << std::endl; - self->state.servers.push_back(serv); - auto hdl = actor_cast<actor>(serv); - self->send(hdl, is_lead_server_v, self); - if (!self->state.running) { - self->become(summa_client(self)); - } - - }, - [=](const error& err) { - aout(self) << R"(*** cannot connect to ")" << host << R"(":)" << port - << " => " << to_string(err) << std::endl; - self->become(summa_client_init(self)); - }); -} - } \ No newline at end of file diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index 4961f269ce758b7811718cc5d2f2a96a98ccef72..26c6bd31f23396c9903dc2a5851e96fb2947e718 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -2,14 +2,15 @@ namespace caf { -behavior summa_server_init(stateful_actor<summa_server_state>* self, - Distributed_Settings distributed_settings, - Summa_Actor_Settings summa_actor_settings, - File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, - HRU_Actor_Settings hru_actor_settings) { - aout(self) << "Summa Server has Started \n"; - +behavior summa_server(stateful_actor<summa_server_state>* self, + Distributed_Settings distributed_settings, + Summa_Actor_Settings summa_actor_settings, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings) { + + aout(self) << "Summa-Server-Actor Started\n"; + self->set_down_handler([=](const down_msg& dm) { aout(self) << "\n\n ********** DOWN HANDLER ********** \n" << "Lost Connection With A Connected Actor\n"; @@ -20,7 +21,7 @@ behavior summa_server_init(stateful_actor<summa_server_state>* self, else resolveLostBackupServer(self, dm); }); - + self->state.distributed_settings = distributed_settings; self->state.summa_actor_settings = summa_actor_settings; self->state.file_access_actor_settings = file_access_actor_settings; @@ -28,229 +29,256 @@ behavior summa_server_init(stateful_actor<summa_server_state>* self, self->state.hru_actor_settings = hru_actor_settings; self->state.client_container = Client_Container(); + // TODO: Batch Container should have start gru passed to it self->state.batch_container = Batch_Container( + 1, self->state.distributed_settings.total_hru_count, self->state.distributed_settings.num_hru_per_batch); - - // self->state.batch_container.printBatches(); - return summa_server(self); -} -behavior summa_server(stateful_actor<summa_server_state>* self) { + // Publish the server actor + auto is_published = self->system().middleman().publish(self, + self->state.distributed_settings.port); + if (!is_published) { + aout(self) << "Failed to publish actor\n"; + self->quit(); + return {}; + } self->state.current_server_actor = self; aout(self) << "Server is Running \n"; return { [=] (is_lead_server, caf::actor client_actor) { - self->send(client_actor, is_lead_server_v, true, self); + self->send(client_actor, is_lead_server_v, true, self); }, // A message from a client requesting to connect [=](connect_to_server, actor client_actor, std::string hostname) { - aout(self) << "\nActor trying to connect with hostname " << hostname << "\n"; - // Check if the client is already connected - std::optional<Client> client = self->state.client_container.getClient(client_actor.address()); - if (client.has_value()) { - aout(self) << "Client is already connected\n"; - aout(self) << "Updating " << hostname << " with current backup servers\n"; - self->send(client.value().getActor(), update_backup_server_list_v, self->state.backup_servers_list); - std::optional<Batch> batch = client.value().getBatch(); - if (batch.has_value()) { - return; - } - } else { - self->state.client_container.addClient(client_actor, hostname); - self->monitor(client_actor); - // Tell client they are connected - self->send(client_actor, connect_to_server_v, - self->state.summa_actor_settings, - self->state.file_access_actor_settings, - self->state.job_actor_settings, - self->state.hru_actor_settings, - self->state.backup_servers_list); + aout(self) << "\nActor trying to connect with hostname " + << hostname << "\n"; + + // Check if the simulation has started (first-actor connected) + if (!self->state.started_simulation) { + self->state.started_simulation = true; + self->state.start_time = std::chrono::system_clock::now(); + } + + + // Check if the client is already connected + std::optional<Client> client = + self->state.client_container.getClient(client_actor.address()); + + if (client.has_value()) { + aout(self) << "Client is already connected\n"; + aout(self) << "Updating " << hostname << " with current backup servers\n"; + self->send(client.value().getActor(), update_backup_server_list_v, + self->state.backup_servers_list); + std::optional<Batch> batch = client.value().getBatch(); + if (batch.has_value()) + return; + } else { + self->state.client_container.addClient(client_actor, hostname); + self->monitor(client_actor); + // Tell client they are connected + self->send(client_actor, connect_to_server_v, + self->state.summa_actor_settings, + self->state.file_access_actor_settings, + self->state.job_actor_settings, + self->state.hru_actor_settings, + self->state.backup_servers_list); - std::optional<Batch> batch = self->state.batch_container.getUnsolvedBatch(); - if (batch.has_value()) { - self->state.client_container.setBatchForClient(client_actor, batch); - aout(self) << "SENDING: " << batch.value().toString() << "\n"; - self->send(client_actor, batch.value()); - for (auto& backup_server : self->state.backup_servers_list) { - caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, new_client_v, client_actor, hostname); - self->send(backup_server_actor, new_assigned_batch_v, client_actor, batch.value()); - } - } else { - aout(self) << "No batches left to assign - Waiting for All Clients to finish\n"; - // Let Backup Servers know that a new client has connected - for (auto& backup_server : self->state.backup_servers_list) { - caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, new_client_v, client_actor, hostname); - } - } - } + std::optional<Batch> batch = self->state.batch_container.getUnsolvedBatch(); + if (batch.has_value()) { + self->state.client_container.setBatchForClient(client_actor, batch); + aout(self) << "SENDING: " << batch.value().toString() << "\n"; + self->send(client_actor, batch.value()); + for (auto& backup_server : self->state.backup_servers_list) { + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, new_client_v, client_actor, hostname); + self->send(backup_server_actor, new_assigned_batch_v, client_actor, batch.value()); + } + } else { + aout(self) << "No batches left to assign - Waiting for All Clients to finish\n"; + // Let Backup Servers know that a new client has connected + for (auto& backup_server : self->state.backup_servers_list) { + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, new_client_v, client_actor, + hostname); + } + } + } }, [=](connect_as_backup, actor backup_server, std::string hostname) { - aout(self) << "\nReceived Connection Request From a backup server " << hostname << "\n"; - self->monitor(backup_server); - // Check if the backup server is already connected - auto backup_server_iterator = find(self->state.backup_servers_list.begin(), self->state.backup_servers_list.end(), std::make_tuple(backup_server, hostname)); + aout(self) << "\nReceived Connection Request From a backup server " << hostname << "\n"; + self->monitor(backup_server); + // Check if the backup server is already connected + auto backup_server_iterator = find(self->state.backup_servers_list.begin(), self->state.backup_servers_list.end(), std::make_tuple(backup_server, hostname)); - if (backup_server_iterator != self->state.backup_servers_list.end()) { - aout(self) << "Backup Server is already connected\n"; - } else { - aout(self) << "Adding Backup Server to list\n"; - self->state.backup_servers_list.push_back(std::make_tuple(backup_server, hostname)); - } + if (backup_server_iterator != self->state.backup_servers_list.end()) { + aout(self) << "Backup Server is already connected\n"; + } else { + aout(self) << "Adding Backup Server to list\n"; + self->state.backup_servers_list.push_back(std::make_tuple(backup_server, hostname)); + } - self->send(backup_server, connect_as_backup_v); // confirm connection with sender - // Now we need to send the backup actor our current state - self->send(backup_server, update_with_current_state_v, self->state.batch_container, self->state.client_container); - sendAllBackupServersList(self); + self->send(backup_server, connect_as_backup_v); // confirm connection with sender + // Now we need to send the backup actor our current state + self->send(backup_server, update_with_current_state_v, self->state.batch_container, self->state.client_container); + sendAllBackupServersList(self); }, [=](done_batch, actor client_actor, Batch& batch) { - aout(self) << "\nReceived Completed Batch From Client\n"; - aout(self) << batch.toString() << "\n\n";\ - Client client = self->state.client_container.getClient(client_actor.address()).value(); + aout(self) << "\nReceived Completed Batch From Client\n"; + aout(self) << batch.toString() << "\n\n";\ + Client client = self->state.client_container.getClient(client_actor.address()).value(); - self->state.batch_container.updateBatch_success(batch, self->state.csv_file_path, client.getHostname()); - printRemainingBatches(self); + self->state.batch_container.updateBatch_success(batch, self->state.csv_file_path, client.getHostname()); + printRemainingBatches(self); - std::optional<Batch> new_batch = self->state.batch_container.getUnsolvedBatch(); + std::optional<Batch> new_batch = self->state.batch_container.getUnsolvedBatch(); - if (new_batch.has_value()) { - // send clients new batch and update backup servers - self->state.client_container.setBatchForClient(client_actor, new_batch); - self->send(client_actor, new_batch.value()); - for (auto& backup_server : self->state.backup_servers_list) { - caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, done_batch_v, client_actor, batch); - self->send(backup_server_actor, new_assigned_batch_v, client_actor, new_batch.value()); - } - } else { - // We may be done - if (!self->state.batch_container.hasUnsolvedBatches()) { - // We are done - self->become(summa_server_exit(self)); - return; - } + if (new_batch.has_value()) { + // send clients new batch and update backup servers + self->state.client_container.setBatchForClient(client_actor, new_batch); + self->send(client_actor, new_batch.value()); + for (auto& backup_server : self->state.backup_servers_list) { + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, done_batch_v, client_actor, batch); + self->send(backup_server_actor, new_assigned_batch_v, client_actor, new_batch.value()); + } + } else { + // We may be done + if (!self->state.batch_container.hasUnsolvedBatches()) { + // We are done + self->become(summa_server_exit(self)); + return; + } - // No Batches left to assign but waiting for all clients to finish - aout(self) << "No batches left to assign - Waiting for All Clients to finish\n"; - self->state.client_container.setBatchForClient(client_actor, {}); - for (auto& backup_server : self->state.backup_servers_list) { - caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, done_batch_v, client_actor, batch); - self->send(backup_server_actor, no_more_batches_v, client_actor); - } + // No Batches left to assign but waiting for all clients to finish + aout(self) << "No batches left to assign - Waiting for All Clients to finish\n"; + self->state.client_container.setBatchForClient(client_actor, {}); + for (auto& backup_server : self->state.backup_servers_list) { + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, done_batch_v, client_actor, batch); + self->send(backup_server_actor, no_more_batches_v, client_actor); } + } }, - }; + }; } behavior summa_server_exit(stateful_actor<summa_server_state>* self) { - aout(self) << "SUMMA Simulation is complete\n"; - aout(self) << "Telling Clients to Exit\n"; - while(!self->state.client_container.isEmpty()) { - Client client = self->state.client_container.removeClient_fromBack(); - caf::actor client_actor = client.getActor(); - self->send(client_actor, time_to_exit_v); - } - aout(self) << "Telling Backup Servers to Exit\n"; - for (auto& backup_server : self->state.backup_servers_list) { - caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, time_to_exit_v); - } - self->quit(); - return {}; + aout(self) << "SUMMA Simulation is complete\n"; + aout(self) << "Telling Clients to Exit\n"; + while(!self->state.client_container.isEmpty()) { + Client client = self->state.client_container.removeClient_fromBack(); + caf::actor client_actor = client.getActor(); + self->send(client_actor, time_to_exit_v); + } + aout(self) << "Telling Backup Servers to Exit\n"; + for (auto& backup_server : self->state.backup_servers_list) { + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, time_to_exit_v); + } + + // Print timing + self->state.end_time = std::chrono::system_clock::now(); + std::chrono::duration<double> elapsed_seconds = + self->state.end_time - self->state.start_time; + + aout(self) << "Elapsed Time: " << elapsed_seconds.count() << "s\n"; + self->quit(); + return {}; } void initializeCSVOutput(std::string csv_output_path) { - std::ofstream csv_output; - csv_output.open(csv_output_path, std::ios_base::out); - csv_output << - "Batch_ID," << - "Start_HRU," << - "Num_HRU," << - "Hostname," << - "Run_Time," << - "Read_Time," << - "Write_Time\n"; - csv_output.close(); + std::ofstream csv_output; + csv_output.open(csv_output_path, std::ios_base::out); + csv_output << + "Batch_ID," << + "Start_HRU," << + "Num_HRU," << + "Hostname," << + "Run_Time," << + "Read_Time," << + "Write_Time\n"; + csv_output.close(); } void printRemainingBatches(stateful_actor<summa_server_state>* self) { - aout(self) << "******************\n" << "Batches Remaining: " << - self->state.batch_container.getBatchesRemaining() << - "\n******************\n\n"; + aout(self) << "******************\n Batches Remaining: " + << self->state.batch_container.getBatchesRemaining() + << "\n******************\n\n"; } void sendAllBackupServersList(stateful_actor<summa_server_state>* self) { - std::vector<Client> clients = self->state.client_container.getClientList(); - for (Client client : clients) { - self->send(client.getActor(), update_backup_server_list_v, self->state.backup_servers_list); - } - - for(std::tuple<actor, std::string> backup_server : self->state.backup_servers_list) { - self->send(std::get<0>(backup_server), update_backup_server_list_v, self->state.backup_servers_list); - } + std::vector<Client> clients = self->state.client_container.getClientList(); + for (Client client : clients) { + self->send(client.getActor(), update_backup_server_list_v, + self->state.backup_servers_list); + } + + for(std::tuple<actor, std::string> backup_server : self->state.backup_servers_list) { + self->send(std::get<0>(backup_server), update_backup_server_list_v, + self->state.backup_servers_list); + } } void findAndRemoveLostBackupServer(stateful_actor<summa_server_state>* self, actor_addr lost_backup_server) { - for (int i = 0; i < self->state.backup_servers_list.size(); i++) { - if (std::get<0>(self->state.backup_servers_list[i]) == lost_backup_server) { - aout(self) << "Removed backup server with hostname: " << - std::get<1>(self->state.backup_servers_list[i]) << "\n"; - self->state.backup_servers_list.erase(self->state.backup_servers_list.begin() + i); - break; - } + for (int i = 0; i < self->state.backup_servers_list.size(); i++) { + if (std::get<0>(self->state.backup_servers_list[i]) == lost_backup_server) { + aout(self) << "Removed backup server with hostname: " + << std::get<1>(self->state.backup_servers_list[i]) << "\n"; + self->state.backup_servers_list.erase( + self->state.backup_servers_list.begin() + i); + break; } + } } void checkForIdleClients(stateful_actor<summa_server_state>* self) { - aout(self) << "Looking for an idle Client\n"; - std::optional<Client> client = self->state.client_container.getIdleClient(); - if (client.has_value()) { - aout(self) << "Found an idle Client\n"; - std::optional<Batch> new_batch = self->state.batch_container.getUnsolvedBatch(); - if (new_batch.has_value()) { - // send clients new batch and update backup servers - aout(self) << "Found a batch to assign\n"; - self->state.client_container.setBatchForClient(client.value().getActor(), new_batch); - self->send(client.value().getActor(), new_batch.value()); - for (auto& backup_server : self->state.backup_servers_list) { - caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, new_assigned_batch_v, client.value().getActor(), new_batch.value()); - } - } - } else { - aout(self) << "No idle clients found, batch will be added to the back of the list\n"; + aout(self) << "Looking for an idle Client\n"; + std::optional<Client> client = self->state.client_container.getIdleClient(); + if (client.has_value()) { + aout(self) << "Found an idle Client\n"; + std::optional<Batch> new_batch = self->state.batch_container.getUnsolvedBatch(); + if (new_batch.has_value()) { + // send clients new batch and update backup servers + aout(self) << "Found a batch to assign\n"; + self->state.client_container.setBatchForClient(client.value().getActor(), new_batch); + self->send(client.value().getActor(), new_batch.value()); + for (auto& backup_server : self->state.backup_servers_list) { + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, new_assigned_batch_v, client.value().getActor(), new_batch.value()); + } } + } else { + aout(self) << "No idle clients found, batch will be added to the back of the list\n"; + } } void notifyBackupServersOfRemovedClient(stateful_actor<summa_server_state>* self, Client client) { - for (auto& backup_server : self->state.backup_servers_list) { - caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, client_removed_v, client); - } + for (auto& backup_server : self->state.backup_servers_list) { + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, client_removed_v, client); + } } void resolveLostClient(stateful_actor<summa_server_state>* self, Client client) { - aout(self) << "Lost Client: " << client.getHostname() << "\n"; - std::optional<Batch> batch = client.getBatch(); - self->state.batch_container.setBatchUnassigned(batch.value()); - self->state.client_container.removeClient(client); - notifyBackupServersOfRemovedClient(self, client); - checkForIdleClients(self); + aout(self) << "Lost Client: " << client.getHostname() << "\n"; + std::optional<Batch> batch = client.getBatch(); + self->state.batch_container.setBatchUnassigned(batch.value()); + self->state.client_container.removeClient(client); + notifyBackupServersOfRemovedClient(self, client); + checkForIdleClients(self); } void resolveLostBackupServer(stateful_actor<summa_server_state>* self, const down_msg& dm) { - aout(self) << "Lost Backup Server\n"; - findAndRemoveLostBackupServer(self, dm.source); - sendAllBackupServersList(self); + aout(self) << "Lost Backup Server\n"; + findAndRemoveLostBackupServer(self, dm.source); + sendAllBackupServersList(self); } } // end namespace