From 07f5b1092f621e1b166a235bd98836e7646fc097 Mon Sep 17 00:00:00 2001 From: Kyle Klenk <kyle.c.klenk@gmail.com> Date: Wed, 14 Feb 2024 04:48:46 +0000 Subject: [PATCH] Refactor getSettingsArray and job_actor*** --- .../file_access_actor/file_access_actor.cpp | 46 ++-- .../actors/global/settings_functions.cpp | 62 ++--- build/source/actors/job_actor/job_actor.cpp | 37 ++- build/source/actors/main.cpp | 106 ++++---- .../actors/summa_actor/summa_server.cpp | 254 +++++++++--------- 5 files changed, 258 insertions(+), 247 deletions(-) diff --git a/build/source/actors/file_access_actor/file_access_actor.cpp b/build/source/actors/file_access_actor/file_access_actor.cpp index 390601a..406aef7 100644 --- a/build/source/actors/file_access_actor/file_access_actor.cpp +++ b/build/source/actors/file_access_actor/file_access_actor.cpp @@ -160,14 +160,15 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, // Write message from the job actor [=](write_output, int steps_to_write, int start_gru, int max_gru) { - + self->state.file_access_timing.updateStartPoint("write_duration"); + writeOutput_fortran(self->state.handle_ncid, &steps_to_write, &start_gru, &max_gru, &self->state.write_params_flag, &self->state.err); if (self->state.write_params_flag) self->state.write_params_flag = false; - + self->state.file_access_timing.updateEndPoint("write_duration"); return self->state.err; }, @@ -208,28 +209,27 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, } -void writeOutput(stateful_actor<file_access_state>* self, Output_Partition* partition) { - - int num_timesteps_to_write = partition->getNumStoredTimesteps(); - int start_gru = partition->getStartGRUIndex(); - int max_gru = partition->getMaxGRUIndex(); - bool write_param_flag = partition->isWriteParams(); - - writeOutput_fortran(self->state.handle_ncid, &num_timesteps_to_write, - &start_gru, &max_gru, &write_param_flag, &self->state.err); - - partition->updateTimeSteps(); - - int num_steps_before_next_write = partition->getNumStoredTimesteps(); - - std::vector<caf::actor> hrus_to_update = partition->getReadyToWriteList(); - - for (int i = 0; i < hrus_to_update.size(); i++) { - self->send(hrus_to_update[i], num_steps_before_write_v, num_steps_before_next_write); - self->send(hrus_to_update[i], run_hru_v); - } +void writeOutput(stateful_actor<file_access_state>* self, Output_Partition* partition) { + int num_timesteps_to_write = partition->getNumStoredTimesteps(); + int start_gru = partition->getStartGRUIndex(); + int max_gru = partition->getMaxGRUIndex(); + bool write_param_flag = partition->isWriteParams(); + + writeOutput_fortran(self->state.handle_ncid, &num_timesteps_to_write, + &start_gru, &max_gru, &write_param_flag, &self->state.err); + + partition->updateTimeSteps(); + + int num_steps_before_next_write = partition->getNumStoredTimesteps(); + + std::vector<caf::actor> hrus_to_update = partition->getReadyToWriteList(); + + for (int i = 0; i < hrus_to_update.size(); i++) { + self->send(hrus_to_update[i], num_steps_before_write_v, num_steps_before_next_write); + self->send(hrus_to_update[i], run_hru_v); + } - partition->resetReadyToWriteList(); + partition->resetReadyToWriteList(); } } // end namespace \ No newline at end of file diff --git a/build/source/actors/global/settings_functions.cpp b/build/source/actors/global/settings_functions.cpp index e669be8..2d3f4d3 100644 --- a/build/source/actors/global/settings_functions.cpp +++ b/build/source/actors/global/settings_functions.cpp @@ -10,37 +10,39 @@ int default_dt_init_factor = 1; -std::optional<std::vector<std::string>> getSettingsArray(std::string json_settings_file, std::string key_1, std::string key_2) { - json settings; - std::ifstream settings_file(json_settings_file); - if (!settings_file.good()) return {}; // return none in the optional - settings_file >> settings; - settings_file.close(); - std::vector<std::string> return_vector; - - // find first key - try { - if (settings.find(key_1) != settings.end()) { - json key_1_settings = settings[key_1]; - - // find value behind second key - if (key_1_settings.find(key_2) != key_1_settings.end()) { - for(auto& host : key_1_settings[key_2]) { - return_vector.push_back(host["hostname"]); - } - return return_vector; - } else - return {}; - - } else { - return {}; // return none in the optional (error value) - } - } catch (json::exception& e) { - std::cout << e.what() << "\n"; - std::cout << key_1 << "\n"; - std::cout << key_2 << "\n"; +std::optional<std::vector<std::string>> getSettingsArray( + std::string json_settings_file, std::string key_1, std::string key_2) { + json settings; + std::ifstream settings_file(json_settings_file); + if (!settings_file.good()) return {}; // return none in the optional + settings_file >> settings; + settings_file.close(); + std::vector<std::string> return_vector; + + // find first key + try { + if (settings.find(key_1) != settings.end()) { + json key_1_settings = settings[key_1]; + + // find value behind second key + if (key_1_settings.find(key_2) != key_1_settings.end()) { + for(auto& host : key_1_settings[key_2]) + return_vector.push_back(host["hostname"]); + + return return_vector; + } + else return {}; - } + + } + else + return {}; // return none in the optional (error value) + } catch (json::exception& e) { + std::cout << e.what() << "\n"; + std::cout << key_1 << "\n"; + std::cout << key_2 << "\n"; + return {}; + } } Distributed_Settings readDistributedSettings(std::string json_settings_file) { diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index 46c419a..22a338b 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -153,23 +153,22 @@ behavior job_actor(stateful_actor<job_state>* self, aout(self) << "Job_Actor: Done Update for timestep:" << self->state.timestep << "\n"; // write the output - int steps_to_write = 1; - int start_gru = 1; - - self->request(self->state.file_access_actor, caf::infinite, - write_output_v, steps_to_write, start_gru, self->state.num_gru).await( - [=](int err) { - if (err != 0) { - aout(self) << "Job_Actor: Error Writing Output\n"; - for (auto GRU : self->state.gru_container.gru_list) - self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown); + // int steps_to_write = 1; + // int start_gru = 1; + // self->request(self->state.file_access_actor, caf::infinite, + // write_output_v, steps_to_write, start_gru, self->state.num_gru).await( + // [=](int err) { + // if (err != 0) { + // aout(self) << "Job_Actor: Error Writing Output\n"; + // for (auto GRU : self->state.gru_container.gru_list) + // self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown); - self->send_exit(self->state.file_access_actor, - exit_reason::user_shutdown); - self->quit(); - } - // else { aout(self) << "Job_Actor: Done Writing Output\n"; } - }); + // self->send_exit(self->state.file_access_actor, + // exit_reason::user_shutdown); + // self->quit(); + // } + // // else { aout(self) << "Job_Actor: Done Writing Output\n"; } + // }); self->state.timestep++; self->state.forcingStep++; @@ -177,11 +176,7 @@ behavior job_actor(stateful_actor<job_state>* self, // Check if we are done the simulation if (self->state.timestep > self->state.num_steps) { aout(self) << "Job_Actor: Done Job\n"; - for (auto GRU : self->state.gru_container.gru_list) { - self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown); - } - self->send_exit(self->state.file_access_actor, exit_reason::user_shutdown); - self->quit(); + self->send(self, finalize_v); } // Check if we need another forcing file else if (self->state.forcingStep > self->state.stepsInCurrentFFile) { diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index 43369cd..fa841f5 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -66,13 +66,16 @@ class config : public actor_system_config { }; void publish_server(caf::actor actor_to_publish, int port_number) { - std::cout << "Attempting to publish summa_server_actor on port " << port_number << std::endl; - auto is_port = io::publish(actor_to_publish, port_number); - if (!is_port) { - std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << "\n"; - return; - } - std::cout << "Successfully Published summa_server_actor on port " << *is_port << "\n"; + std::cout << "Attempting to publish summa_server_actor on port " + << port_number << std::endl; + auto is_port = io::publish(actor_to_publish, port_number); + if (!is_port) { + std::cerr << "********PUBLISH FAILED*******" + << to_string(is_port.error()) << "\n"; + return; + } + std::cout << "Successfully Published summa_server_actor on port " + << *is_port << "\n"; } void connect_client(caf::actor client_to_connect, std::string host_to_connect_to, int port_number) { @@ -97,39 +100,43 @@ void run_client(actor_system& system, const config& cfg, Distributed_Settings di } -void run_server(actor_system& system, const config& cfg, Distributed_Settings distributed_settings, - Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { - scoped_actor self{system}; - int err; +void run_server(actor_system& system, const config& cfg, + Distributed_Settings distributed_settings, + Summa_Actor_Settings summa_actor_settings, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings) { + scoped_actor self{system}; + int err; - if (distributed_settings.port == -1) { - aout(self) << "ERROR: run_server() port - CHECK SETTINGS FILE\n"; - return; - } + if (distributed_settings.port == -1) { + aout(self) << "ERROR: run_server() port - CHECK SETTINGS FILE\n"; + return; + } - // Check if we have are the backup server - if (cfg.backup_server) { - auto server = system.spawn(summa_backup_server_init, - distributed_settings, - summa_actor_settings, - file_access_actor_settings, - job_actor_settings, - hru_actor_settings); - - publish_server(server, distributed_settings.port); - connect_client(server, distributed_settings.servers_list[0], distributed_settings.port); - - } else { - auto server = system.spawn(summa_server_init, - distributed_settings, - summa_actor_settings, - file_access_actor_settings, - job_actor_settings, - hru_actor_settings); - - publish_server(server, distributed_settings.port); - } + // Check if we have are the backup server + if (cfg.backup_server) { + auto server = system.spawn(summa_backup_server_init, + distributed_settings, + summa_actor_settings, + file_access_actor_settings, + job_actor_settings, + hru_actor_settings); + + publish_server(server, distributed_settings.port); + connect_client(server, distributed_settings.servers_list[0], distributed_settings.port); + + } else { + aout(self) << "\n\n*****Starting SUMMA-Server*****\n\n"; + auto server = system.spawn(summa_server_init, + distributed_settings, + summa_actor_settings, + file_access_actor_settings, + job_actor_settings, + hru_actor_settings); + + publish_server(server, distributed_settings.port); + } } @@ -171,11 +178,11 @@ void caf_main(actor_system& sys, const config& cfg) { if (distributed_settings.distributed_mode) { // only command line arguments needed are config_file and server-mode if (cfg.server_mode) { - run_server(sys, cfg, distributed_settings, summa_actor_settings, - file_access_actor_settings, job_actor_settings, - hru_actor_settings); + run_server(sys, cfg, distributed_settings, summa_actor_settings, + file_access_actor_settings, job_actor_settings, + hru_actor_settings); } else { - run_client(sys,cfg, distributed_settings); + run_client(sys,cfg, distributed_settings); } } else { @@ -208,11 +215,15 @@ int main(int argc, char** argv) { // Find -g and insert -t after it if (*it == "-g" && std::next(it) != args.end()) { - auto count_gru = std::find_if(std::next(it), args.end(), [](const std::string& arg) { - return std::isdigit(arg.front()); + auto count_gru = std::find_if(std::next(it), args.end(), + [](const std::string& arg) { + return std::isdigit(arg.front()); }); if (count_gru != args.end()) { - args.insert(std::next(count_gru), "-t"); + args.insert(std::next(count_gru), "-t"); + } else { + std::cerr << "Error: -g requires a countGRU argument" << std::endl; + return 1; } break; } @@ -230,8 +241,9 @@ int main(int argc, char** argv) { } argc = args.size(); - exec_main_init_meta_objects<id_block::summa, io::middleman>(); + exec_main_init_meta_objects<io::middleman, id_block::summa>(); caf::core::init_global_meta_objects(); - return exec_main<>(caf_main, argc, argv2); + return exec_main<io::middleman, id_block::summa>(caf_main, argc, argv2); } + diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index 37adb0d..4961f26 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -2,149 +2,151 @@ namespace caf { -behavior summa_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, - Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { - - aout(self) << "Summa Server has Started \n"; - self->set_down_handler([=](const down_msg& dm) { - aout(self) << "\n\n ********** DOWN HANDLER ********** \n"; - aout(self) << "Lost Connection With A Connected Actor\n"; - std::optional<Client> client = self->state.client_container.getClient(dm.source); - if (client.has_value()) { - resolveLostClient(self, client.value()); - } else { - resolveLostBackupServer(self, dm); - } - }); - - self->state.distributed_settings = distributed_settings; - self->state.summa_actor_settings = summa_actor_settings; - self->state.file_access_actor_settings = file_access_actor_settings; - self->state.job_actor_settings = job_actor_settings; - self->state.hru_actor_settings = hru_actor_settings; - - self->state.client_container = Client_Container(); - self->state.batch_container = Batch_Container( - self->state.distributed_settings.total_hru_count, - self->state.distributed_settings.num_hru_per_batch); +behavior summa_server_init(stateful_actor<summa_server_state>* self, + Distributed_Settings distributed_settings, + Summa_Actor_Settings summa_actor_settings, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings) { + aout(self) << "Summa Server has Started \n"; + + self->set_down_handler([=](const down_msg& dm) { + aout(self) << "\n\n ********** DOWN HANDLER ********** \n" + << "Lost Connection With A Connected Actor\n"; + std::optional<Client> client = + self->state.client_container.getClient(dm.source); + if (client.has_value()) + resolveLostClient(self, client.value()); + else + resolveLostBackupServer(self, dm); + }); - // self->state.batch_container.printBatches(); - return summa_server(self); + self->state.distributed_settings = distributed_settings; + self->state.summa_actor_settings = summa_actor_settings; + self->state.file_access_actor_settings = file_access_actor_settings; + self->state.job_actor_settings = job_actor_settings; + self->state.hru_actor_settings = hru_actor_settings; + + self->state.client_container = Client_Container(); + self->state.batch_container = Batch_Container( + self->state.distributed_settings.total_hru_count, + self->state.distributed_settings.num_hru_per_batch); + + // self->state.batch_container.printBatches(); + return summa_server(self); } behavior summa_server(stateful_actor<summa_server_state>* self) { - self->state.current_server_actor = self; - aout(self) << "Server is Running \n"; - - return { - - [=] (is_lead_server, caf::actor client_actor) { - self->send(client_actor, is_lead_server_v, true, self); - }, - - // A message from a client requesting to connect - [=](connect_to_server, actor client_actor, std::string hostname) { - aout(self) << "\nActor trying to connect with hostname " << hostname << "\n"; - // Check if the client is already connected - std::optional<Client> client = self->state.client_container.getClient(client_actor.address()); - if (client.has_value()) { - aout(self) << "Client is already connected\n"; - aout(self) << "Updating " << hostname << " with current backup servers\n"; - self->send(client.value().getActor(), update_backup_server_list_v, self->state.backup_servers_list); - std::optional<Batch> batch = client.value().getBatch(); - if (batch.has_value()) { - return; - } - } else { - self->state.client_container.addClient(client_actor, hostname); - self->monitor(client_actor); - // Tell client they are connected - self->send(client_actor, connect_to_server_v, - self->state.summa_actor_settings, - self->state.file_access_actor_settings, - self->state.job_actor_settings, - self->state.hru_actor_settings, - self->state.backup_servers_list); - - std::optional<Batch> batch = self->state.batch_container.getUnsolvedBatch(); - if (batch.has_value()) { - self->state.client_container.setBatchForClient(client_actor, batch); - aout(self) << "SENDING: " << batch.value().toString() << "\n"; - self->send(client_actor, batch.value()); - for (auto& backup_server : self->state.backup_servers_list) { - caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, new_client_v, client_actor, hostname); - self->send(backup_server_actor, new_assigned_batch_v, client_actor, batch.value()); - } - } else { - aout(self) << "No batches left to assign - Waiting for All Clients to finish\n"; - // Let Backup Servers know that a new client has connected - for (auto& backup_server : self->state.backup_servers_list) { - caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, new_client_v, client_actor, hostname); - } - } - } - }, - - [=](connect_as_backup, actor backup_server, std::string hostname) { - aout(self) << "\nReceived Connection Request From a backup server " << hostname << "\n"; - self->monitor(backup_server); - // Check if the backup server is already connected - auto backup_server_iterator = find(self->state.backup_servers_list.begin(), self->state.backup_servers_list.end(), std::make_tuple(backup_server, hostname)); + self->state.current_server_actor = self; + aout(self) << "Server is Running \n"; + + return { + [=] (is_lead_server, caf::actor client_actor) { + self->send(client_actor, is_lead_server_v, true, self); + }, - if (backup_server_iterator != self->state.backup_servers_list.end()) { - aout(self) << "Backup Server is already connected\n"; - } else { - aout(self) << "Adding Backup Server to list\n"; - self->state.backup_servers_list.push_back(std::make_tuple(backup_server, hostname)); + // A message from a client requesting to connect + [=](connect_to_server, actor client_actor, std::string hostname) { + aout(self) << "\nActor trying to connect with hostname " << hostname << "\n"; + // Check if the client is already connected + std::optional<Client> client = self->state.client_container.getClient(client_actor.address()); + if (client.has_value()) { + aout(self) << "Client is already connected\n"; + aout(self) << "Updating " << hostname << " with current backup servers\n"; + self->send(client.value().getActor(), update_backup_server_list_v, self->state.backup_servers_list); + std::optional<Batch> batch = client.value().getBatch(); + if (batch.has_value()) { + return; } - - self->send(backup_server, connect_as_backup_v); // confirm connection with sender - // Now we need to send the backup actor our current state - self->send(backup_server, update_with_current_state_v, self->state.batch_container, self->state.client_container); - sendAllBackupServersList(self); - }, - - [=](done_batch, actor client_actor, Batch& batch) { - aout(self) << "\nReceived Completed Batch From Client\n"; - aout(self) << batch.toString() << "\n\n";\ - Client client = self->state.client_container.getClient(client_actor.address()).value(); - - self->state.batch_container.updateBatch_success(batch, self->state.csv_file_path, client.getHostname()); - printRemainingBatches(self); - - std::optional<Batch> new_batch = self->state.batch_container.getUnsolvedBatch(); + } else { + self->state.client_container.addClient(client_actor, hostname); + self->monitor(client_actor); + // Tell client they are connected + self->send(client_actor, connect_to_server_v, + self->state.summa_actor_settings, + self->state.file_access_actor_settings, + self->state.job_actor_settings, + self->state.hru_actor_settings, + self->state.backup_servers_list); - if (new_batch.has_value()) { - // send clients new batch and update backup servers - self->state.client_container.setBatchForClient(client_actor, new_batch); - self->send(client_actor, new_batch.value()); + std::optional<Batch> batch = self->state.batch_container.getUnsolvedBatch(); + if (batch.has_value()) { + self->state.client_container.setBatchForClient(client_actor, batch); + aout(self) << "SENDING: " << batch.value().toString() << "\n"; + self->send(client_actor, batch.value()); for (auto& backup_server : self->state.backup_servers_list) { caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, done_batch_v, client_actor, batch); - self->send(backup_server_actor, new_assigned_batch_v, client_actor, new_batch.value()); + self->send(backup_server_actor, new_client_v, client_actor, hostname); + self->send(backup_server_actor, new_assigned_batch_v, client_actor, batch.value()); } } else { - // We may be done - if (!self->state.batch_container.hasUnsolvedBatches()) { - // We are done - self->become(summa_server_exit(self)); - return; - } - - // No Batches left to assign but waiting for all clients to finish aout(self) << "No batches left to assign - Waiting for All Clients to finish\n"; - self->state.client_container.setBatchForClient(client_actor, {}); + // Let Backup Servers know that a new client has connected for (auto& backup_server : self->state.backup_servers_list) { caf::actor backup_server_actor = std::get<0>(backup_server); - self->send(backup_server_actor, done_batch_v, client_actor, batch); - self->send(backup_server_actor, no_more_batches_v, client_actor); + self->send(backup_server_actor, new_client_v, client_actor, hostname); } } - }, + } + }, + + [=](connect_as_backup, actor backup_server, std::string hostname) { + aout(self) << "\nReceived Connection Request From a backup server " << hostname << "\n"; + self->monitor(backup_server); + // Check if the backup server is already connected + auto backup_server_iterator = find(self->state.backup_servers_list.begin(), self->state.backup_servers_list.end(), std::make_tuple(backup_server, hostname)); + + if (backup_server_iterator != self->state.backup_servers_list.end()) { + aout(self) << "Backup Server is already connected\n"; + } else { + aout(self) << "Adding Backup Server to list\n"; + self->state.backup_servers_list.push_back(std::make_tuple(backup_server, hostname)); + } + + self->send(backup_server, connect_as_backup_v); // confirm connection with sender + // Now we need to send the backup actor our current state + self->send(backup_server, update_with_current_state_v, self->state.batch_container, self->state.client_container); + sendAllBackupServersList(self); + }, + + [=](done_batch, actor client_actor, Batch& batch) { + aout(self) << "\nReceived Completed Batch From Client\n"; + aout(self) << batch.toString() << "\n\n";\ + Client client = self->state.client_container.getClient(client_actor.address()).value(); + + self->state.batch_container.updateBatch_success(batch, self->state.csv_file_path, client.getHostname()); + printRemainingBatches(self); + + std::optional<Batch> new_batch = self->state.batch_container.getUnsolvedBatch(); + + if (new_batch.has_value()) { + // send clients new batch and update backup servers + self->state.client_container.setBatchForClient(client_actor, new_batch); + self->send(client_actor, new_batch.value()); + for (auto& backup_server : self->state.backup_servers_list) { + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, done_batch_v, client_actor, batch); + self->send(backup_server_actor, new_assigned_batch_v, client_actor, new_batch.value()); + } + } else { + // We may be done + if (!self->state.batch_container.hasUnsolvedBatches()) { + // We are done + self->become(summa_server_exit(self)); + return; + } + + // No Batches left to assign but waiting for all clients to finish + aout(self) << "No batches left to assign - Waiting for All Clients to finish\n"; + self->state.client_container.setBatchForClient(client_actor, {}); + for (auto& backup_server : self->state.backup_servers_list) { + caf::actor backup_server_actor = std::get<0>(backup_server); + self->send(backup_server_actor, done_batch_v, client_actor, batch); + self->send(backup_server_actor, no_more_batches_v, client_actor); + } + } + }, }; } -- GitLab