diff --git a/Distributed-Documentation.md b/Distributed-Documentation.md index 4f5fb78dda33ca2346061e8c740f4610788098af..1cc65d69eaea93c9d0e46c26e3b6f0e130dc8e96 100644 --- a/Distributed-Documentation.md +++ b/Distributed-Documentation.md @@ -7,7 +7,11 @@ To use distributed mode. Set the "distributed-mode" setting to true in the Summa ## SUMMA-Server Run the server with: - - summaMain -s -c /path/to/config + - summaMain -s -c /path/to/config/Summa_Actors_Settings.json + +The server gets its settings from Summa_Actors_Settings.json where the user can configure how many HRUs to compute and how many HRUs should be inside a batch. ## SUMMA-Client Run the cleint with + - summaMain -c /path/to/config/Summa_Actors_Settings.json + The client needs the Distributed_Settings from the Summa_Actors_Settings.json file. The rest are set by the server. \ No newline at end of file diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp index 6b8132db829c7b598f410b34a466b90afaa54387..37f171dc19ede45e3300bfda1097f259a060097d 100644 --- a/build/includes/summa_actor/summa_client.hpp +++ b/build/includes/summa_actor/summa_client.hpp @@ -2,6 +2,7 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" +#include "settings_functions.hpp" #include <string> #include <optional> @@ -15,6 +16,11 @@ struct summa_client_state { actor summa_actor_ref; int batch_id; int client_id; // id held by server + + Summa_Actor_Settings summa_actor_settings; + File_Access_Actor_Settings file_access_actor_settings; + Job_Actor_Settings job_actor_settings; + HRU_Actor_Settings hru_actor_settings; }; behavior summa_client(stateful_actor<summa_client_state>* self, std::optional<std::string> config_path); behavior unconnected(stateful_actor<summa_client_state>*); diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index 2a082b36038d4a972c88bafaefbb3c60f2592cc1..69282b1b49ee9d91c2bf92dd4ad32e1a1e92e42a 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -3,6 +3,7 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" #include "batch_manager.hpp" +#include "settings_functions.hpp" #include "client.hpp" #include <string> #include <optional> @@ -11,9 +12,7 @@ namespace caf { struct summa_server_state { - int total_hru_count; int num_clients; - int num_hru_per_batch; int batches_remaining = 0; int batches_solved = 0; std::string config_path; @@ -22,9 +21,20 @@ struct summa_server_state { std::vector<Batch> failed_batches; std::vector<Client> client_list; std::string csv_output_name; + + + Distributed_Settings distributed_settings; + Summa_Actor_Settings summa_actor_settings; + File_Access_Actor_Settings file_access_actor_settings; + Job_Actor_Settings job_actor_settings; + HRU_Actor_Settings hru_actor_settings; + }; -behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path); +behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, + Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings); int assembleBatches(stateful_actor<summa_server_state>* self); std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self); +void initializeCSVOutput(std::string csv_output_name); } \ No newline at end of file diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index 02da6f25416f21d78ef08c2aa2bf98eea90ccd3b..20bfd59e2f63e502e9802308f2d22ca006153b5f 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -37,41 +37,39 @@ class config : public actor_system_config { } }; -void run_client(actor_system& system, const config& cfg) { +void run_client(actor_system& system, const config& cfg, Distributed_Settings distributed_settings) { scoped_actor self{system}; - std::string host; - uint16_t port; aout(self) << "Starting SUMMA-Client in Distributed Mode\n"; - host = getSettings(cfg.config_file, "DistributedSettings", "host", host).value_or(""); - port = getSettings(cfg.config_file, "DistributedSettings", "port", port).value_or(-1); - if (host == "" || port == -1) { + if (distributed_settings.hostname == "" || distributed_settings.port == -1) { aout(self) << "ERROR: run_client() host and port - CHECK SETTINGS FILE\n"; return; } std::optional<std::string> path = cfg.config_file; auto c = system.spawn(summa_client, path); - if (!host.empty() && port > 0) { - anon_send(c, connect_atom_v, host, port); + if (!distributed_settings.hostname.empty() && distributed_settings.port > 0) { + anon_send(c, connect_atom_v, distributed_settings.hostname , (uint16_t) distributed_settings.port ); } else { aout(self) << "No Server Config" << std::endl; } } -void run_server(actor_system& system, const config& cfg) { +void run_server(actor_system& system, const config& cfg, Distributed_Settings distributed_settings, + Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { scoped_actor self{system}; - uint16_t port; + int err; - port = getSettings(cfg.config_file, "DistributedSettings", "port", port).value_or(-1); - if (port == -1) { + if (distributed_settings.port == -1) { aout(self) << "ERROR: run_server() port - CHECK SETTINGS FILE\n"; return; } - auto server = system.spawn(summa_server, cfg.config_file); - aout(self) << "Attempting to publish summa_server_actor on port " << port << std::endl; - auto is_port = io::publish(server, port); + auto server = system.spawn(summa_server, distributed_settings, + summa_actor_settings, file_access_actor_settings, job_actor_settings, hru_actor_settings); + aout(self) << "Attempting to publish summa_server_actor on port " << distributed_settings.port << std::endl; + auto is_port = io::publish(server, distributed_settings.port); if (!is_port) { std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << "\n"; return; @@ -80,22 +78,21 @@ void run_server(actor_system& system, const config& cfg) { } - - void caf_main(actor_system& sys, const config& cfg) { scoped_actor self{sys}; int err; + Distributed_Settings distributed_settings; Summa_Actor_Settings summa_actor_settings; File_Access_Actor_Settings file_access_actor_settings; Job_Actor_Settings job_actor_settings; HRU_Actor_Settings hru_actor_settings; err = read_settings_from_json(cfg.config_file, - distributed_settings, - summa_actor_settings, - file_access_actor_settings, - job_actor_settings, - hru_actor_settings); + distributed_settings, + summa_actor_settings, + file_access_actor_settings, + job_actor_settings, + hru_actor_settings); aout(self) << "Printing Settings For SUMMA Simulation\n"; @@ -105,8 +102,12 @@ void caf_main(actor_system& sys, const config& cfg) { if (distributed_settings.distributed_mode) { // only command line arguments needed are config_file and server-mode - auto system = cfg.server_mode ? run_server : run_client; - system(sys, cfg); + if (cfg.server_mode) { + run_server(sys, cfg, distributed_settings, summa_actor_settings, + file_access_actor_settings, job_actor_settings, hru_actor_settings); + } else { + run_client(sys, cfg, distributed_settings); + } } else { // Configure command line arguments @@ -127,7 +128,7 @@ void caf_main(actor_system& sys, const config& cfg) { "fileManger is set with the \"-c\" option\n"; aout(self) << "EXAMPLE: ./summaMain -g 1 -n 10 -c location/of/config \n"; return; - } + } auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, summa_actor_settings, file_access_actor_settings, job_actor_settings, hru_actor_settings, self); diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index f22ae2fa8867436e10c369a34b1a7776a8e0a772..89606b354c05196a9dc790b7b2d35ca8f82ddffd 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -73,6 +73,21 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a self->send(server_actor, connect_to_server_v, self, self->state.hostname); return { + // Response from the server on successful connection + [=](connect_to_server, int client_id, Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { + + aout(self) << "Successfully Connected to Server Actor \n"; + + self->state.client_id = client_id; + self->state.summa_actor_settings = summa_actor_settings; + self->state.file_access_actor_settings = file_access_actor_settings; + self->state.job_actor_settings = job_actor_settings; + self->state.hru_actor_settings = hru_actor_settings; + + }, + + [=](batch, int client_id, int batch_id, int start_hru, int num_hru, std::string config_path) { aout(self) << "\nReceived batch to compute" << "\n"; aout(self) << "BatchID = " << batch_id << "\n"; @@ -83,11 +98,14 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a self->state.batch_id = batch_id; - // self->state.summa_actor_ref = self->spawn(summa_actor, - // start_hru, - // num_hru, - // self->state.config_path.value_or(config_path), - // self); + self->state.summa_actor_ref = self->spawn(summa_actor, + start_hru, + num_hru, + self->state.summa_actor_settings, + self->state.file_access_actor_settings, + self->state.job_actor_settings, + self->state.hru_actor_settings, + self); }, diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index d6f8039e9876a656059602dcfc06669e531a2cf8..abd617682072a2cddcf8800f9398230f5c83365b 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -10,37 +10,20 @@ namespace caf { -behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path) { +behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, + Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { + aout(self) << "Summa Server has Started \n"; - self->state.config_path = config_path; - // --------------------------Initalize Settings -------------------------- - self->state.total_hru_count = getSettings(self->state.config_path, "SimulationSettings", "total_hru_count", - self->state.total_hru_count).value_or(-1); - if (self->state.total_hru_count == -1) { - aout(self) << "ERROR: With total_hru_count - CHECK Summa_Actors_Settings.json\n"; - } - self->state.num_hru_per_batch = getSettings(self->state.config_path, "SimulationSettings", "num_hru_per_batch", - self->state.num_hru_per_batch).value_or(-1); - if (self->state.num_hru_per_batch == -1) { - aout(self) << "ERROR: With num_hru_per_batch - CHECK Summa_Actors_Settings.json\n"; - } - // --------------------------Initalize Settings -------------------------- + self->state.distributed_settings = distributed_settings; + self->state.summa_actor_settings = summa_actor_settings; + self->state.file_access_actor_settings = file_access_actor_settings; + self->state.job_actor_settings = job_actor_settings; + self->state.hru_actor_settings = hru_actor_settings; + - // -------------------------- Initalize CSV ------------------------------ - std::ofstream csv_output; self->state.csv_output_name = "Batch_Results.csv"; - csv_output.open(self->state.csv_output_name, std::ios_base::out); - csv_output << - "Batch_ID," << - "Start_HRU," << - "Num_HRU," << - "Hostname," << - "Run_Time," << - "Read_Time," << - "Write_Time,"<< - "Status\n"; - csv_output.close(); - // -------------------------- Initalize CSV ------------------------------ + initializeCSVOutput(self->state.csv_output_name); aout(self) << "Assembling HRUs into Batches\n"; if (assembleBatches(self) == -1) { @@ -60,6 +43,12 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf int client_id = self->state.client_list.size(); // So we can lookup the client in O(1) time self->state.client_list.push_back(Client(client_id, client, hostname)); + // Tell client they are connected + self->send(client, connect_to_server_v, client_id, self->state.summa_actor_settings, + self->state.file_access_actor_settings, self->state.job_actor_settings, self->state.hru_actor_settings); + + + std::optional<int> batch_id = getUnsolvedBatchID(self); if (batch_id.has_value()) { // update the batch in the batch list with the host and actor_ref @@ -141,21 +130,21 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf int assembleBatches(stateful_actor<summa_server_state>* self) { - int remaining_hru_to_batch = self->state.total_hru_count; - int count_index = 0; // this is like the offset for slurm bash scripts + int remaining_hru_to_batch = self->state.distributed_settings.total_hru_count; + int count_index = 0; int start_hru = 1; while(remaining_hru_to_batch > 0) { - if (self->state.num_hru_per_batch > remaining_hru_to_batch) { + if (self->state.distributed_settings.num_hru_per_batch > remaining_hru_to_batch) { self->state.batch_list.push_back(Batch(count_index, start_hru, remaining_hru_to_batch)); remaining_hru_to_batch = 0; } else { self->state.batch_list.push_back(Batch(count_index, start_hru, - self->state.num_hru_per_batch)); + self->state.distributed_settings.num_hru_per_batch)); - remaining_hru_to_batch -= self->state.num_hru_per_batch; - start_hru += self->state.num_hru_per_batch; + remaining_hru_to_batch -= self->state.distributed_settings.num_hru_per_batch; + start_hru += self->state.distributed_settings.num_hru_per_batch; count_index += 1; } } @@ -172,4 +161,19 @@ std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self) return {}; } +void initializeCSVOutput(std::string csv_output_name) { + std::ofstream csv_output; + csv_output.open(csv_output_name, std::ios_base::out); + csv_output << + "Batch_ID," << + "Start_HRU," << + "Num_HRU," << + "Hostname," << + "Run_Time," << + "Read_Time," << + "Write_Time,"<< + "Status\n"; + csv_output.close(); +} + } // end namespace