diff --git a/build/includes/hru_actor/hru_batch_actor.hpp b/build/includes/hru_actor/hru_batch_actor.hpp new file mode 100644 index 0000000000000000000000000000000000000000..af3f3b31cf5e361bc5d1dc831b1dd1cc69be74eb --- /dev/null +++ b/build/includes/hru_actor/hru_batch_actor.hpp @@ -0,0 +1,22 @@ +#pragma once +#include "caf/all.hpp" +#include "hru_actor.hpp" + +namespace caf { + +struct hru_batch_state { + // Actor References + caf::actor file_access_actor; + caf::actor parent; + + std::vector<caf::actor> hru_actors; + int num_done = 0; +}; + +behavior hru_batch_actor(stateful_actor<hru_batch_state>* self, + int start_gru_local, int start_gru_global, int num_gru, + HRU_Actor_Settings hru_actor_settings, + caf::actor file_access_actor, caf::actor parent); + + +} // namespace caf \ No newline at end of file diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp index 5ee14cd1f32374c32da89c1d662ccbbda0ccc643..ba69073e7b86ad9e292dd34a2cf9f58d489e65c7 100644 --- a/build/includes/job_actor/job_actor.hpp +++ b/build/includes/job_actor/job_actor.hpp @@ -7,6 +7,7 @@ #include "global.hpp" #include "json.hpp" #include "hru_actor.hpp" +#include "hru_batch_actor.hpp" #include "message_atoms.hpp" #include "file_access_actor.hpp" #include <unistd.h> @@ -89,6 +90,11 @@ behavior job_actor(stateful_actor<job_state>* self, /********************************************* * Functions for the Job Actor *********************************************/ +// Spawn HRU Actors Individually +void spawnHRUActors(stateful_actor<job_state>* self, bool normal_mode); +// Spawn HRU Batch Actors +void spawnHRUBatches(stateful_actor<job_state>* self); + /** Get the information for the GRUs that will be written to the netcdf file */ std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, diff --git a/build/includes/summa_actor/summa_actor.hpp b/build/includes/summa_actor/summa_actor.hpp index 36457e1e7c47659c764593dc59aae4c8b01e5e66..8a736afdb93dd527f6781dec1f41c40781c0f366 100644 --- a/build/includes/summa_actor/summa_actor.hpp +++ b/build/includes/summa_actor/summa_actor.hpp @@ -13,47 +13,53 @@ namespace caf { struct job_timing_info { - std::vector<double> job_duration; - std::vector<double> job_read_duration; - std::vector<double> job_write_duration; + std::vector<double> job_duration; + std::vector<double> job_read_duration; + std::vector<double> job_write_duration; }; struct summa_actor_state { - // Timing Information For Summa-Actor - TimingInfo summa_actor_timing; - struct job_timing_info timing_info_for_jobs; - - // Program Parameters - int startGRU; // starting GRU for the simulation - int numGRU; // number of GRUs to compute - int fileGRU; // number of GRUs in the file - std::string configPath; // path to the fileManager.txt file - int numFailed = 0; // Number of jobs that have failed - caf::actor currentJob; // Reference to the current job actor - caf::actor parent; - - // Batches - Batch_Container batch_container; - int current_batch_id; - - - // settings for all child actors (save in case we need to recover) - Summa_Actor_Settings summa_actor_settings; - File_Access_Actor_Settings file_access_actor_settings; - Job_Actor_Settings job_actor_settings; - HRU_Actor_Settings hru_actor_settings; + // Timing Information For Summa-Actor + TimingInfo summa_actor_timing; + struct job_timing_info timing_info_for_jobs; + + // Program Parameters + int startGRU; // starting GRU for the simulation + int numGRU; // number of GRUs to compute + int fileGRU; // number of GRUs in the file + std::string configPath; // path to the fileManager.txt file + int numFailed = 0; // Number of jobs that have failed + caf::actor currentJob; // Reference to the current job actor + caf::actor parent; + + // Batches + Batch_Container batch_container; + int current_batch_id; + + + // settings for all child actors (save in case we need to recover) + Summa_Actor_Settings summa_actor_settings; + File_Access_Actor_Settings file_access_actor_settings; + Job_Actor_Settings job_actor_settings; + HRU_Actor_Settings hru_actor_settings; }; + behavior summa_actor(stateful_actor<summa_actor_state>* self, - int startGRU, int numGRU, - Summa_Actor_Settings summa_actor_settings, - File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, - HRU_Actor_Settings hru_actor_settings, actor parent); + int startGRU, int numGRU, + Summa_Actor_Settings summa_actor_settings, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings, actor parent); void spawnJob(stateful_actor<summa_actor_state>* self); +} // namespace caf +// Helper Function to extract a string from the line of a file that +// is enclosed in quotes +std::string extractEnclosed(const std::string& line); +// Gets the number of GRUs from the attribute file +int getNumGRUInFile(const std::string &file_manager); -} // namespace caf \ No newline at end of file diff --git a/build/source/actors/hru_actor/hru_batch_actor.cpp b/build/source/actors/hru_actor/hru_batch_actor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9cde3fd7c08a9afe95f84daf90ac11c228892367 --- /dev/null +++ b/build/source/actors/hru_actor/hru_batch_actor.cpp @@ -0,0 +1,55 @@ +#include "hru_batch_actor.hpp" + +namespace caf { + +behavior hru_batch_actor(stateful_actor<hru_batch_state>* self, + int start_gru_local, int start_gru_global, int num_gru, + HRU_Actor_Settings hru_actor_settings, + caf::actor file_access_actor, caf::actor parent) { + // aout(self) << "HRU Batch Actor Started\n" + // << "\tStart GRU Local: " << start_gru_local << "\n" + // << "\tStart GRU Global: " << start_gru_global << "\n" + // << "\tNum GRU: " << num_gru << "\n"; + + self->state.file_access_actor = file_access_actor; + self->state.parent = parent; + + + for (int i = 0; i < num_gru; i++) { + self->state.hru_actors.push_back( + self->spawn(hru_actor, start_gru_global + i, start_gru_local + i, + hru_actor_settings, file_access_actor, self) + ); + } + + + + return { + [=](update_timeZoneOffset, int iFile) { + // aout(self) << "HRU Batch Actor - Update Time Zone Offset\n"; + for (auto& hru_actor : self->state.hru_actors) { + self->send(hru_actor, update_timeZoneOffset_v, iFile); + } + }, + + [=](update_hru, int timestep, int forcingstep) { + // aout(self) << "HRU Batch Actor - Update HRU\n"; + for (auto& hru_actor : self->state.hru_actors) { + self->send(hru_actor, update_hru_v, timestep, forcingstep); + } + }, + [=](done_update) { + // aout(self) << "HRU Batch Actor - Done Update\n"; + self->state.num_done++; + if (self->state.num_done == self->state.hru_actors.size()) { + self->send(self->state.parent, done_update_v); + self->state.num_done = 0; + } + } + }; + +} + + + +} \ No newline at end of file diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index 412e85430cafe98908c2ebbabd1383820bd96606..12208132c680b164a60631df75d1538138f9ba39 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -3,6 +3,8 @@ using json = nlohmann::json; using chrono_time = std::chrono::time_point<std::chrono::system_clock>; +bool batching = true; +int batch_size = 10; namespace caf { @@ -91,24 +93,12 @@ behavior job_actor(stateful_actor<job_state>* self, auto& gru_container = self->state.gru_container; - // Spawn GRUs - for (int i = 0; i < gru_container.num_gru_in_run_domain; i++) { - auto global_gru_index = gru_container.gru_list.size() - + self->state.start_gru; - auto local_gru_index = gru_container.gru_list.size() + 1; - - auto gru = self->spawn(hru_actor,global_gru_index, local_gru_index, - self->state.hru_actor_settings, - self->state.file_access_actor, self); - - // Create the GRU object (Job uses this to keep track of GRU status) - gru_container.gru_list.push_back(new GRU(global_gru_index, - local_gru_index, gru, - self->state.dt_init_start_factor, - self->state.hru_actor_settings.rel_tol, - self->state.hru_actor_settings.abs_tol, - self->state.max_run_attempts)); - } + // Spawn HRUs in batches or individually + if (batching) + spawnHRUBatches(self); + else + spawnHRUActors(self, false); + aout(self) << "GRUs Initialized\n"; self->send(self->state.file_access_actor, access_forcing_v, @@ -118,7 +108,7 @@ behavior job_actor(stateful_actor<job_state>* self, // # Normal Mode // ##################################################### aout(self) << "Job_Actor: Normal Mode\n"; - self->send(self, init_normal_mode_v); + spawnHRUActors(self, true); } }, @@ -149,7 +139,8 @@ behavior job_actor(stateful_actor<job_state>* self, [=](done_update){ self->state.num_gru_done_timestep++; - if (self->state.num_gru_done_timestep >= self->state.num_gru) { + if (self->state.num_gru_done_timestep >= + self->state.gru_container.gru_list.size()) { aout(self) << "Job_Actor: Done Update for timestep:" << self->state.timestep << "\n"; // write the output @@ -203,38 +194,6 @@ behavior job_actor(stateful_actor<job_state>* self, // ##################################################### // # Normal Mode Start // ##################################################### - [=](init_normal_mode) { - auto& gru_container = self->state.gru_container; - - gru_container.gru_start_time = std::chrono::high_resolution_clock::now(); - gru_container.run_attempts_left = self->state.max_run_attempts; - gru_container.run_attempts_left--; - - - // Spawn the GRUs - for(int i = 0; i < gru_container.num_gru_in_run_domain; i++) { - auto global_gru_index = gru_container.gru_list.size() - + self->state.start_gru; - auto local_gru_index = gru_container.gru_list.size() + 1; - - auto gru = self->spawn(hru_actor, global_gru_index, local_gru_index, - self->state.hru_actor_settings, - self->state.file_access_actor, self); - - // Create the GRU object (Job uses this to keep track of GRU status) - gru_container.gru_list.push_back( - new GRU(global_gru_index, local_gru_index, gru, - self->state.dt_init_start_factor, - self->state.hru_actor_settings.rel_tol, - self->state.hru_actor_settings.abs_tol, - self->state.max_run_attempts)); - - // Start the HRU_Actor to compute all timesteps asynchonously - self->send(gru, update_hru_async_v); - } - }, // end init_gru - - [=](done_hru, int local_gru_index) { auto& gru_container = self->state.gru_container; using namespace std::chrono; @@ -271,141 +230,205 @@ behavior job_actor(stateful_actor<job_state>* self, }, - [=](restart_failures) { - aout(self) << "Job_Actor: Restarting GRUs that Failed\n"; + [=](restart_failures) { + aout(self) << "Job_Actor: Restarting GRUs that Failed\n"; - self->state.gru_container.num_gru_done = 0; - self->state.gru_container.num_gru_in_run_domain = self->state.gru_container.num_gru_failed; - self->state.gru_container.num_gru_failed = 0; + self->state.gru_container.num_gru_done = 0; + self->state.gru_container.num_gru_in_run_domain = self->state.gru_container.num_gru_failed; + self->state.gru_container.num_gru_failed = 0; - self->send(self->state.file_access_actor, restart_failures_v); // notify file_access_actor + self->send(self->state.file_access_actor, restart_failures_v); // notify file_access_actor - // Set Sundials tolerance or decrease timestep length - if (self->state.hru_actor_settings.rel_tol > 0 && - self->state.hru_actor_settings.abs_tol > 0) { - self->state.hru_actor_settings.rel_tol /= 10; - self->state.hru_actor_settings.abs_tol /= 10; - } else { - self->state.hru_actor_settings.dt_init_factor *= 2; - } + // Set Sundials tolerance or decrease timestep length + if (self->state.hru_actor_settings.rel_tol > 0 && + self->state.hru_actor_settings.abs_tol > 0) { + self->state.hru_actor_settings.rel_tol /= 10; + self->state.hru_actor_settings.abs_tol /= 10; + } else { + self->state.hru_actor_settings.dt_init_factor *= 2; + } - for(auto GRU : self->state.gru_container.gru_list) { - if(GRU->isFailed()) { - GRU->setRunning(); - GRU->decrementAttemptsLeft(); - auto global_gru_index = GRU->getGlobalGRUIndex(); - auto local_gru_index = GRU->getLocalGRUIndex(); - auto gru_actor = self->spawn(hru_actor, - global_gru_index, - local_gru_index, - self->state.hru_actor_settings, - self->state.file_access_actor, - self); - self->state.gru_container.gru_list[local_gru_index-1]->setGRUActor(gru_actor); - } + for(auto GRU : self->state.gru_container.gru_list) { + if(GRU->isFailed()) { + GRU->setRunning(); + GRU->decrementAttemptsLeft(); + auto global_gru_index = GRU->getGlobalGRUIndex(); + auto local_gru_index = GRU->getLocalGRUIndex(); + auto gru_actor = self->spawn(hru_actor, + global_gru_index, + local_gru_index, + self->state.hru_actor_settings, + self->state.file_access_actor, + self); + self->state.gru_container.gru_list[local_gru_index-1]->setGRUActor(gru_actor); } - }, + } + }, - [=](finalize) { - std::vector<serializable_netcdf_gru_actor_info> - netcdf_gru_info = getGruNetcdfInfo( - self->state.max_run_attempts,self->state.gru_container.gru_list); - - - self->state.num_gru_failed = std::count_if(netcdf_gru_info.begin(), - netcdf_gru_info.end(), [](auto& gru_info) { - return !gru_info.successful; - }); - - self->request(self->state.file_access_actor, infinite, finalize_v).await( - [=](std::tuple<double, double> read_write_duration) { - int err = 0; - for (auto GRU : self->state.gru_container.gru_list) { - delete GRU; - } - self->state.gru_container.gru_list.clear(); - - self->state.job_timing.updateEndPoint("total_duration"); - - aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n" - << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n" - << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n" - << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n" - << "________________________________________________________________________\n\n"; - - deallocateJobActor(&err); - - // Tell Parent we are done - self->send(self->state.parent, - done_job_v, - self->state.num_gru_failed, - self->state.job_timing.getDuration("total_duration").value_or(-1.0), - std::get<0>(read_write_duration), - std::get<1>(read_write_duration)); - self->quit(); - - }); - }, - - // Handle Sundials Error - [=](err_atom, caf::actor src, double rtol, double atol) { - self->state.hru_actor_settings.rel_tol = rtol; - self->state.hru_actor_settings.abs_tol = atol; - handleGRUError(self, src); - }, - - [=](const error& err, caf::actor src) { + [=](finalize) { + std::vector<serializable_netcdf_gru_actor_info> + netcdf_gru_info = getGruNetcdfInfo( + self->state.max_run_attempts,self->state.gru_container.gru_list); - aout(self) << "\n\n ********** ERROR HANDLER \n"; + + self->state.num_gru_failed = std::count_if(netcdf_gru_info.begin(), + netcdf_gru_info.end(), [](auto& gru_info) { + return !gru_info.successful; + }); + + self->request(self->state.file_access_actor, infinite, finalize_v).await( + [=](std::tuple<double, double> read_write_duration) { + int err = 0; + for (auto GRU : self->state.gru_container.gru_list) { + delete GRU; + } + self->state.gru_container.gru_list.clear(); + + self->state.job_timing.updateEndPoint("total_duration"); + + aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n" + << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n" + << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n" + << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n" + << "________________________________________________________________________\n\n"; + + deallocateJobActor(&err); + + // Tell Parent we are done + self->send(self->state.parent, + done_job_v, + self->state.num_gru_failed, + self->state.job_timing.getDuration("total_duration").value_or(-1.0), + std::get<0>(read_write_duration), + std::get<1>(read_write_duration)); + self->quit(); + + }); + }, + + // Handle Sundials Error + [=](err_atom, caf::actor src, double rtol, double atol) { + self->state.hru_actor_settings.rel_tol = rtol; + self->state.hru_actor_settings.abs_tol = atol; + handleGRUError(self, src); + }, + + [=](const error& err, caf::actor src) { + + aout(self) << "\n\n ********** ERROR HANDLER \n"; + + switch(err.category()) { - switch(err.category()) { - - case type_id_v<hru_error>: - aout(self) << "HRU Error: " << to_string(err) << "\n"; - handleGRUError(self, src); - - break; - case type_id_v<file_access_error>: - if (err == file_access_error::mDecisions_error) { - aout(self) << "Check mDecisions File For Correctness"; - } else { - aout(self) << "File Access Error: " << to_string(err) << "No Handling Implemented\n"; - } - for (auto GRU : self->state.gru_container.gru_list) { - self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown); - } - self->quit(); - break; - default: - aout(self) << "Unknown Error: " << to_string(err) << "\n"; - break; - } - }, + case type_id_v<hru_error>: + aout(self) << "HRU Error: " << to_string(err) << "\n"; + handleGRUError(self, src); + + break; + case type_id_v<file_access_error>: + if (err == file_access_error::mDecisions_error) { + aout(self) << "Check mDecisions File For Correctness"; + } else { + aout(self) << "File Access Error: " << to_string(err) << "No Handling Implemented\n"; + } + for (auto GRU : self->state.gru_container.gru_list) { + self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown); + } + self->quit(); + break; + default: + aout(self) << "Unknown Error: " << to_string(err) << "\n"; + break; + } + }, }; } -std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, std::vector<GRU*> &gru_list) { - std::vector<serializable_netcdf_gru_actor_info> gru_netcdf_info; - for(auto gru : gru_list) { +void spawnHRUActors(stateful_actor<job_state>* self, bool normal_mode) { + auto& gru_container = self->state.gru_container; + gru_container.gru_start_time = std::chrono::high_resolution_clock::now(); + gru_container.run_attempts_left = self->state.max_run_attempts; + gru_container.run_attempts_left--; + + for (int i = 0; i < gru_container.num_gru_in_run_domain; i++) { + auto global_gru_index = gru_container.gru_list.size() + + self->state.start_gru; + auto local_gru_index = gru_container.gru_list.size() + 1; + + auto gru = self->spawn(hru_actor, global_gru_index, local_gru_index, + self->state.hru_actor_settings, + self->state.file_access_actor, self); + + // Create the GRU object (Job uses this to keep track of GRU status) + gru_container.gru_list.push_back(new GRU(global_gru_index, + local_gru_index, gru, + self->state.dt_init_start_factor, + self->state.hru_actor_settings.rel_tol, + self->state.hru_actor_settings.abs_tol, + self->state.max_run_attempts)); + + if (normal_mode) self->send(gru, update_hru_async_v); + } + +} - serializable_netcdf_gru_actor_info gru_info; - gru_info.run_time = gru->getRunTime(); - gru_info.init_duration = gru->getInitDuration(); - gru_info.forcing_duration = gru->getForcingDuration(); - gru_info.run_physics_duration = gru->getRunPhysicsDuration(); - gru_info.write_output_duration = gru->getWriteOutputDuration(); - - gru_info.num_attempts = max_run_attempts - gru->getAttemptsLeft() + 1; - gru_info.successful = is_success(gru->getStatus()); - gru_info.rel_tol = gru->getRelTol(); - gru_info.abs_tol = gru->getAbsTol(); +void spawnHRUBatches(stateful_actor<job_state>* self) { + auto& gru_container = self->state.gru_container; + gru_container.gru_start_time = std::chrono::high_resolution_clock::now(); + gru_container.run_attempts_left = self->state.max_run_attempts; + gru_container.run_attempts_left--; + + int remaining_hru_to_batch = gru_container.num_gru_in_run_domain; + int start_hru_global = self->state.start_gru; + int start_hru_local = 1; + + while (remaining_hru_to_batch > 0) { + int current_batch_size = std::min(batch_size, remaining_hru_to_batch); + auto gru_batch = self->spawn(hru_batch_actor, start_hru_local, + start_hru_global, current_batch_size, + self->state.hru_actor_settings, + self->state.file_access_actor, self); - gru_netcdf_info.push_back(gru_info); + gru_container.gru_list.push_back(new GRU(start_hru_global, + start_hru_local, gru_batch, + self->state.dt_init_start_factor, + self->state.hru_actor_settings.rel_tol, + self->state.hru_actor_settings.abs_tol, + self->state.max_run_attempts)); - } - return gru_netcdf_info; + remaining_hru_to_batch -= current_batch_size; + start_hru_local += current_batch_size; + start_hru_global += current_batch_size; + } +} + + + + + + +std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo( + int max_run_attempts, std::vector<GRU*> &gru_list) { + std::vector<serializable_netcdf_gru_actor_info> gru_netcdf_info; + + for(auto gru : gru_list) { + serializable_netcdf_gru_actor_info gru_info; + gru_info.run_time = gru->getRunTime(); + gru_info.init_duration = gru->getInitDuration(); + gru_info.forcing_duration = gru->getForcingDuration(); + gru_info.run_physics_duration = gru->getRunPhysicsDuration(); + gru_info.write_output_duration = gru->getWriteOutputDuration(); + + gru_info.num_attempts = max_run_attempts - gru->getAttemptsLeft() + 1; + gru_info.successful = is_success(gru->getStatus()); + gru_info.rel_tol = gru->getRelTol(); + gru_info.abs_tol = gru->getAbsTol(); + + gru_netcdf_info.push_back(gru_info); + } + + return gru_netcdf_info; } diff --git a/build/source/actors/summa_actor/summa_actor.cpp b/build/source/actors/summa_actor/summa_actor.cpp index 532780871278746cd3591515fba9252894f49dea..9e378d6791da9ca63ece13d85dd3dcc7b1878580 100644 --- a/build/source/actors/summa_actor/summa_actor.cpp +++ b/build/source/actors/summa_actor/summa_actor.cpp @@ -12,55 +12,6 @@ #include <netcdf.h> using json = nlohmann::json; -// Helper function to extract the information from the file_manager -std::string extractEnclosed(const std::string& line) { - std::size_t first_quote = line.find_first_of("'"); - std::size_t last_quote = line.find_last_of("'"); - if (first_quote != std::string::npos && last_quote != std::string::npos - && first_quote < last_quote) { - return line.substr(first_quote + 1, last_quote - first_quote - 1); - } - return ""; -} - -// Check the number of GRUs in the attribute file -int getNumGRUInFile(const std::string &file_manager) { - std::ifstream file(file_manager); - std::string attributeFile, settingPath; - if (!file.is_open()) - return -1; - - std::string line; - while (std::getline(file, line)) { - if (line.compare(0, 13, "attributeFile") == 0) - attributeFile = extractEnclosed(line); - if (line.compare(0, 12, "settingsPath") == 0) - settingPath = extractEnclosed(line); - } - - file.close(); - - size_t fileGRU = -1; - int ncid, gru_dim; - if (attributeFile.empty() || settingPath.empty()) - return fileGRU; - - std::string combined = settingPath + attributeFile; - - if (NC_NOERR != nc_open(combined.c_str(), NC_NOWRITE, &ncid)) - return fileGRU; - if (NC_NOERR != nc_inq_dimid(ncid, "gru", &gru_dim)) { - nc_close(ncid); - return -1; - } - if (NC_NOERR != nc_inq_dimlen(ncid, gru_dim, &fileGRU)) { - nc_close(ncid); - return -1; - } - nc_close(ncid); - return fileGRU; -} - namespace caf { behavior summa_actor(stateful_actor<summa_actor_state>* self, @@ -100,17 +51,18 @@ behavior summa_actor(stateful_actor<summa_actor_state>* self, self->state.numGRU, self->state.summa_actor_settings.max_gru_per_job); - aout(self) << "Starting SUMMA With " << - self->state.batch_container.getBatchesRemaining() << " Batches\n"; - aout(self) << "###################################################\n" - << self->state.batch_container.getBatchesAsString() - << "###################################################\n"; + aout(self) << "Starting SUMMA With " + << self->state.batch_container.getBatchesRemaining() + << " Batches\n" + << "###################################################\n" + << self->state.batch_container.getBatchesAsString() + << "###################################################\n"; std::optional<Batch> batch = self->state.batch_container.getUnsolvedBatch(); if (!batch.has_value()) { aout(self) << "ERROR--Summa_Actor: No Batches To Solve\n"; - self->quit(); + self->quit(); return {}; } self->state.current_batch_id = batch->getBatchID(); aout(self) << "Starting Batch " << self->state.current_batch_id + 1 << "\n"; @@ -189,3 +141,54 @@ void spawnJob(stateful_actor<summa_actor_state>* self) { } } // end namespace + + +std::string extractEnclosed(const std::string& line) { + std::size_t first_quote = line.find_first_of("'"); + std::size_t last_quote = line.find_last_of("'"); + if (first_quote != std::string::npos && last_quote != std::string::npos + && first_quote < last_quote) { + return line.substr(first_quote + 1, last_quote - first_quote - 1); + } + return ""; +} + +int getNumGRUInFile(const std::string &file_manager) { + std::ifstream file(file_manager); + std::string attributeFile, settingPath; + if (!file.is_open()) + return -1; + + std::string line; + while (std::getline(file, line)) { + if (line.compare(0, 13, "attributeFile") == 0) + attributeFile = extractEnclosed(line); + if (line.compare(0, 12, "settingsPath") == 0) + settingPath = extractEnclosed(line); + } + + file.close(); + + size_t fileGRU = -1; + int ncid, gru_dim; + if (attributeFile.empty() || settingPath.empty()) + return fileGRU; + + std::string combined = settingPath + attributeFile; + + if (NC_NOERR != nc_open(combined.c_str(), NC_NOWRITE, &ncid)) + return fileGRU; + if (NC_NOERR != nc_inq_dimid(ncid, "gru", &gru_dim)) { + nc_close(ncid); + return -1; + } + if (NC_NOERR != nc_inq_dimlen(ncid, gru_dim, &fileGRU)) { + nc_close(ncid); + return -1; + } + nc_close(ncid); + return fileGRU; +} + + + diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index 6f11d6cfecec9089e9a52b2ab4e3be1bcb2193d8..f2b086e76e17bbf6092ac73154aba6111b78d7e8 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -1,44 +1,32 @@ #include "summa_client.hpp" namespace caf { -// behavior summa_client_init(stateful_actor<summa_client_state>* self) { -// // aout(self) << "Client Has Started Successfully" << std::endl; - -// char host[HOST_NAME_MAX]; -// gethostname(host, HOST_NAME_MAX); -// self->state.hostname = host; -// self->set_down_handler([=](const down_msg& dm){ -// if(dm.source == self->state.current_server) { -// aout(self) << "*** Lost Connection to Server" << std::endl; -// self->state.current_server = nullptr; -// // try to connect to new server -// if (self->state.backup_servers_list.size() > 0) { -// aout(self) << "Trying to connect to backup server\n"; -// std::this_thread::sleep_for(std::chrono::seconds(3)); -// // TODO: Not obvious where the code goes from here. -// connecting(self, std::get<1>(self->state.backup_servers_list[0]), -// self->state.port); - -// } else { -// aout(self) << "No backup servers available" << std::endl; -// } -// } -// }); - -// return { -// [=] (connect_atom, const std::string& host, uint16_t port) { -// aout(self) << "Received a connect request while not running\n"; -// connecting(self, host, port); -// } -// }; -// } - - behavior summa_client(stateful_actor<summa_client_state>* self, Distributed_Settings distributed_settings) { - self->state.running = true; self->state.distributed_settings = distributed_settings; + char host[HOST_NAME_MAX]; + gethostname(host, HOST_NAME_MAX); + self->state.hostname = host; + + self->set_down_handler([=](const down_msg& dm){ + if(dm.source == self->state.current_server) { + aout(self) << "*** Lost Connection to Server" << std::endl; + self->state.current_server = nullptr; + // try to connect to new server + if (self->state.backup_servers_list.size() > 0) { + aout(self) << "Trying to connect to backup server\n"; + std::this_thread::sleep_for(std::chrono::seconds(3)); + // TODO: Not obvious where the code goes from here. + // connecting(self, std::get<1>(self->state.backup_servers_list[0]), + // self->state.port); + + } else { + aout(self) << "No backup servers available" << std::endl; + } + } + }); + for (auto host : distributed_settings.servers_list) { auto server = self->system().middleman().remote_actor(host, distributed_settings.port);