diff --git a/build/includes/global/global.hpp b/build/includes/global/global.hpp index 08104ec274048b9240e65ef977561aaeabba4516..4ded34993623d76f5a2da6538eef1cb52bf9392e 100644 --- a/build/includes/global/global.hpp +++ b/build/includes/global/global.hpp @@ -2,8 +2,6 @@ #include <chrono> -extern bool debug; - /** * Return the time between to time points */ diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp index 8a680b93a4365b47ba97c01acbe76e8b8540f995..459511413cf17253a2d8a51587d0e79f4920852c 100644 --- a/build/includes/summa_actor/summa_client.hpp +++ b/build/includes/summa_actor/summa_client.hpp @@ -13,7 +13,6 @@ namespace caf { struct summa_client_state { strong_actor_ptr current_server; std::string hostname; - std::optional<std::string> config_path; actor summa_actor_ref; int batch_id; int client_id; // id held by server @@ -26,7 +25,7 @@ struct summa_client_state { HRU_Actor_Settings hru_actor_settings; }; -behavior summa_client(stateful_actor<summa_client_state>* self, std::optional<std::string> config_path); +behavior summa_client(stateful_actor<summa_client_state>* self); behavior unconnected(stateful_actor<summa_client_state>*); void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port); behavior running(stateful_actor<summa_client_state>*, const actor& summa_server); diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index 31a2c4a71c5caf17b29f0d2e0f420a3cb13b215d..e93f449e988a30141b31c9ccb19a75dab347da7a 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -14,6 +14,7 @@ namespace caf { struct summa_server_state { actor backup_server = nullptr; actor backup_server2 = nullptr; + strong_actor_ptr current_server; // if server is a backup then this will be set to the lead server int num_clients; int batches_remaining = 0; int batches_solved = 0; @@ -51,4 +52,6 @@ int assembleBatches(stateful_actor<summa_server_state>* self); std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self); void initializeCSVOutput(std::string csv_output_path, std::string csv_output_name); + +void connecting(stateful_actor<summa_server_state>*, const std::string& host, uint16_t port); } \ No newline at end of file diff --git a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp index 02f4956f7ab65aafcde3a49d602bbbe8985213d1..513aa63c84b43ca015defde96736cc59c633a7d2 100644 --- a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp +++ b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp @@ -11,8 +11,6 @@ using json = nlohmann::json; -bool debug; - namespace caf { diff --git a/build/source/actors/global/settings_functions.cpp b/build/source/actors/global/settings_functions.cpp index 4079ef4a353eb1eebe595639aa15df93b9f3c7c0..9dd85d0eb6e292a5422e24d588435ede8dde4473 100644 --- a/build/source/actors/global/settings_functions.cpp +++ b/build/source/actors/global/settings_functions.cpp @@ -35,9 +35,6 @@ std::optional<std::vector<std::string>> getSettingsArray(std::string json_settin } - - - int read_settings_from_json(std::string json_settings_file, Distributed_Settings &distributed_settings, Summa_Actor_Settings &summa_actor_settings, diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index 10cdcd036da60d34b4f89bed0ec9ff87e5e8e77f..7071542334d95d420d81fd9412296b13038e645b 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -13,17 +13,12 @@ using json = nlohmann::json; namespace caf { -/** - * @brief First Actor that is spawned that is not the Coordinator Actor. - * - * @param self - * @return behavior - */ +// First Actor that is spawned that is not the Coordinator Actor. behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings, caf::actor parent) { - // Timinig Information + // Timing Information self->state.job_timing = TimingInfo(); self->state.job_timing.addTimePoint("total_duration"); self->state.job_timing.updateStartPoint("total_duration"); @@ -82,9 +77,6 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, }, [=](done_init_hru) { - if (debug) { - aout(self) << "Done Init\n"; - } self->state.gru_init++; if (self->state.gru_init >= self->state.num_gru) { @@ -151,15 +143,6 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, [=](file_access_actor_done, double read_duration, double write_duration) { int err = 0; - if (debug) { - aout(self) << "\n********************************\n"; - aout(self) << "Outputing Timing Info for HRUs\n"; - - for(auto gru : self->state.gru_list) { - gru->printOutput(); - } - aout(self) << "********************************\n"; - } // Delete GRUs for (auto GRU : self->state.gru_list) { delete GRU; diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index c4ab5caf09548143b71a58e4f0a35511febff4fc..5f2eda2485aa8a9a83de3059c5a7c906e4d30328 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -24,7 +24,7 @@ class config : public actor_system_config { int startGRU = -1; int countGRU = -1; std::string config_file = ""; - bool debugMode = false; + bool backup_server = false; bool server_mode = false; config() { @@ -32,11 +32,29 @@ class config : public actor_system_config { .add(startGRU, "gru,g", "Starting GRU Index") .add(countGRU, "numGRU,n", "Total Number of GRUs") .add(config_file, "config,c", "Path name of the config directory") - .add(debugMode, "debug-mode,b", "enable debug mode") + .add(backup_server, "backup-server,b", "flag to denote if the server starting is a backup server") .add(server_mode, "server-mode,s", "enable server mode"); } }; +void publish_server(caf::actor actor_to_publish, int port_number) { + std::cout << "Attempting to publish summa_server_actor on port " << port_number << std::endl; + auto is_port = io::publish(actor_to_publish, port_number); + if (!is_port) { + std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << "\n"; + return; + } + std::cout << "Successfully Published summa_server_actor on port " << *is_port << "\n"; +} + +void connect_client(caf::actor client_to_connect, std::string host_to_connect_to, int port_number) { + if (!host_to_connect_to.empty() && port_number > 0) { + anon_send(client_to_connect, connect_atom_v, host_to_connect_to, (uint16_t) port_number ); + } else { + std::cerr << "No Server Config" << std::endl; + } +} + void run_client(actor_system& system, const config& cfg, Distributed_Settings distributed_settings) { scoped_actor self{system}; @@ -46,13 +64,8 @@ void run_client(actor_system& system, const config& cfg, Distributed_Settings di aout(self) << "ERROR: run_client() host and port - CHECK SETTINGS FILE\n"; return; } - std::optional<std::string> path = cfg.config_file; - auto c = system.spawn(summa_client, path); - if (!distributed_settings.hostname.empty() && distributed_settings.port > 0) { - anon_send(c, connect_atom_v, distributed_settings.hostname , (uint16_t) distributed_settings.port ); - } else { - aout(self) << "No Server Config" << std::endl; - } + auto client = system.spawn(summa_client); + connect_client(client, distributed_settings.hostname, distributed_settings.port); } @@ -66,19 +79,24 @@ void run_server(actor_system& system, const config& cfg, Distributed_Settings di aout(self) << "ERROR: run_server() port - CHECK SETTINGS FILE\n"; return; } + auto server = system.spawn(summa_server, distributed_settings, - summa_actor_settings, - file_access_actor_settings, - job_actor_settings, - hru_actor_settings); - - aout(self) << "Attempting to publish summa_server_actor on port " << distributed_settings.port << std::endl; - auto is_port = io::publish(server, distributed_settings.port); - if (!is_port) { - std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << "\n"; - return; + summa_actor_settings, + file_access_actor_settings, + job_actor_settings, + hru_actor_settings); + + // Check if we have are the backup server + if (cfg.backup_server) { + publish_server(server, distributed_settings.port); + + connect_client(server, distributed_settings.hostname, distributed_settings.port); + + + } else { + publish_server(server, distributed_settings.port); } - aout(self) << "Successfully Published summa_server_actor on port " << *is_port << "\n"; + } @@ -97,7 +115,9 @@ void caf_main(actor_system& sys, const config& cfg) { file_access_actor_settings, job_actor_settings, hru_actor_settings); - + if (err != 0) { + return; + } aout(self) << "Printing Settings For SUMMA Simulation\n"; check_settings_from_json(distributed_settings, diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index c0a9f5d1831e18e6e68f7f06b58009c022c86b9a..e89d72c925655417013f8ff6ad5d9cc012cc01b4 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -11,8 +11,7 @@ namespace caf { -behavior summa_client(stateful_actor<summa_client_state>* self, std::optional<std::string> config_path) { - self->state.config_path = config_path; +behavior summa_client(stateful_actor<summa_client_state>* self) { self->set_down_handler([=](const down_msg& dm){ if(dm.source == self->state.current_server) { diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index 0be313aff16675e3ec5dcc4614bf5f92ed87d7cf..717788a24dd90a00810d23084c924e20a1e225fb 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -51,6 +51,11 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett start_health_check_v, self, self->state.distributed_settings.heartbeat_interval); return { + // For when a backup server attempts to connect to the main server + [=] (connect_atom, const std::string& host, uint16_t port) { + connecting(self, host, port); + }, + // A message from a client requesting to connect [=](connect_to_server, actor client_actor, std::string hostname) { @@ -188,6 +193,35 @@ void initializeCSVOutput(std::string csv_output_path, std::string csv_output_nam csv_output.close(); } +void connecting(stateful_actor<summa_server_state>* self, const std::string& host, uint16_t port) { + self->state.current_server = nullptr; + + auto mm = self->system().middleman().actor_handle(); + self->request(mm, infinite, connect_atom_v, host, port) + .await( + [=](const node_id&, strong_actor_ptr serv, + const std::set<std::string>& ifs) { + if (!serv) { + aout(self) << R"(*** no server found at ")" << host << R"(":)" << port + << std::endl; + return; + } + if (!ifs.empty()) { + aout(self) << R"(*** typed actor found at ")" << host << R"(":)" + << port << ", but expected an untyped actor " << std::endl; + return; + } + aout(self) << "*** successfully connected to server" << std::endl; + self->state.current_server = serv; + auto hdl = actor_cast<actor>(serv); + self->monitor(hdl); + }, + [=](const error& err) { + aout(self) << R"(*** cannot connect to ")" << host << R"(":)" << port + << " => " << to_string(err) << std::endl; + }); +} + behavior client_health_check_reminder(event_based_actor* self) {