diff --git a/build/includes/file_access_actor/output_container.hpp b/build/includes/file_access_actor/output_container.hpp index c02b58f353dc8b4410d7367716fa79dc2c109f20..468eedb22f36ee3dce4bc83a4dd0204d8297e3bf 100644 --- a/build/includes/file_access_actor/output_container.hpp +++ b/build/includes/file_access_actor/output_container.hpp @@ -78,7 +78,8 @@ struct hru_output_info { struct output_partition { int start_gru; - int num_gru; + int num_gru; + int num_active_gru; int num_timesteps; int simulation_timesteps_remaining; int grus_ready_to_write; @@ -109,38 +110,9 @@ void updateNumTimeForPartition(std::shared_ptr<output_partition> &output_partiti void resetReadyToWrite(std::shared_ptr<output_partition> &output_partition); - - - - - - - - - -// This class holds the output for the HRUs as a buffer so -// we can write more data at once -class Output_Container { - private: - // Matrix charactieristics - int max_steps; // maximum number of steps we can hold for an HRU before writing - int max_hrus; // maximum number of hrus we can hold for the structure - - std::vector<std::vector<hru_output_handles>> hru_output_handles_vector; // Pointers to HRU output data - - public: - Output_Container(int max_hrus, int max_steps); - ~Output_Container(); - - // insertes output from an HRU into hru_output_handles - void insertOutput(int hru_index, hru_output_handles hru_output); - - bool isFull(int hru_index); - - // returns the matrix of hru_outputs for writing - std::vector<std::vector<hru_output_handles>> getAllHRUOutput(); - - void clearAll(); - - -}; \ No newline at end of file +/* + * Reduce the number of GRUs the partition is waiting on to write to file by 1 + * Check if the partition is ready to write to file and return the partition index if it is +*/ +std::optional<int> updatePartitionWithFailedHRU(std::vector<std::shared_ptr<output_partition>>& output_partitions, + int local_gru_index); diff --git a/build/includes/job_actor/GRUinfo.hpp b/build/includes/job_actor/GRUinfo.hpp index 04c73c7a3ea2f768781868b061e39cc382aa2a0a..99eab9f9a6d3e67437a5936b4186ab8420d137e9 100644 --- a/build/includes/job_actor/GRUinfo.hpp +++ b/build/includes/job_actor/GRUinfo.hpp @@ -47,6 +47,8 @@ class GRU { // Getters int getGlobalGRUIndex(); + int getLocalGRUIndex(); + caf::actor getGRUActor(); double getRunTime(); double getInitDuration(); @@ -66,6 +68,9 @@ class GRU { void setWriteOutputDuration(double write_output_duration); void setSuccess(); + void setFailed(); + + void decrementAttemptsLeft(); }; diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp index bc717402e7bad630503bb2f0b25b33b9c02802c0..3793b6cb878820225d398b532d10d8a57f45d1f2 100644 --- a/build/includes/job_actor/job_actor.hpp +++ b/build/includes/job_actor/job_actor.hpp @@ -43,7 +43,6 @@ struct job_state { // Timing Variables TimingInfo job_timing; - std::string hostname; // Output File Names for Timings @@ -64,8 +63,9 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings, actor parent); -void initCsvOutputFile(stateful_actor<job_state>* self); - +/* + * Start all of the GRU actors and set up their container class +*/ void initGRUs(stateful_actor<job_state>* self); /** @@ -73,9 +73,6 @@ void initGRUs(stateful_actor<job_state>* self); */ std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, std::vector<GRU*> &gru_list); -// Initalize the GRU objects and their actors -void runGRUs(stateful_actor<job_state>* self); - -void restartFailures(stateful_actor<job_state>* self); +void handleGRUError(stateful_actor<job_state>* self, const error& err, caf::actor src); } // end namespace \ No newline at end of file diff --git a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp index c94ba58a97d43186cf0e1cb028cc1105b018d098..4de5dd3bcba84fdf86c67f874059167c1dad63f1 100644 --- a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp +++ b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp @@ -180,22 +180,30 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr self->state.file_access_timing.updateEndPoint("write_duration"); }, - [=](run_failure, int indxGRU) { - int listIndex; - - // update the list in Fortran - updateFailed(&indxGRU); + [=](run_failure, int local_gru_index) { + + std::optional<int> partition_index = updatePartitionWithFailedHRU(self->state.output_partitions, local_gru_index); + + if (partition_index.has_value() && self->state.output_partitions[partition_index.value()]->num_gru < 0) { + self->state.file_access_timing.updateStartPoint("write_duration"); + + // We have a partition that is ready to write + int max_gru = self->state.output_partitions[partition_index.value()]->start_gru + self->state.output_partitions[partition_index.value()]->num_gru -1; + writeOutput(self->state.handle_ncid, &self->state.output_partitions[partition_index.value()]->num_timesteps, + &self->state.output_partitions[partition_index.value()]->start_gru, &max_gru, &self->state.err); + + updateSimulationTimestepsRemaining(self->state.output_partitions[partition_index.value()]); + updateNumTimeForPartition(self->state.output_partitions[partition_index.value()]); + resetReadyToWrite(self->state.output_partitions[partition_index.value()]); + for (auto hru_output_info : self->state.output_partitions[partition_index.value()]->hru_info_and_data) { + self->send(hru_output_info->hru_actor, num_steps_before_write_v, self->state.output_partitions[partition_index.value()]->num_timesteps); + self->send(hru_output_info->hru_actor, run_hru_v); + } + self->state.file_access_timing.updateEndPoint("write_duration"); + } }, - [=](done_hru, caf::actor hru_actor, int index_gru, int index_hru) { - aout(self) << "HRU: " << index_hru << " is done" << "\n"; - - }, - - [=](restart_failures) { - resetFailedArray(); - }, [=](deallocate_structures, std::vector<serializable_netcdf_gru_actor_info> &netcdf_gru_info) { int num_gru = netcdf_gru_info.size(); diff --git a/build/source/actors/file_access_actor/cpp_code/output_container.cpp b/build/source/actors/file_access_actor/cpp_code/output_container.cpp index 04ba29f02c8f0a08afd8f128dc35bf9dd1f8b4f0..b594243c2d7a2c300428b54217fe3850461d62c5 100644 --- a/build/source/actors/file_access_actor/cpp_code/output_container.cpp +++ b/build/source/actors/file_access_actor/cpp_code/output_container.cpp @@ -11,6 +11,7 @@ void initArrayOfOuputPartitions(std::vector<std::shared_ptr<output_partition>>& output_partitions.push_back(std::make_shared<output_partition>()); output_partitions[i]->start_gru = start_gru_counter; output_partitions[i]->num_gru = num_gru_per_partition; + output_partitions[i]->num_active_gru = num_gru_per_partition; output_partitions[i]->num_timesteps = num_timesteps; output_partitions[i]->simulation_timesteps_remaining = simulation_timesteps_remaining; output_partitions[i]->grus_ready_to_write = 0; @@ -24,6 +25,7 @@ void initArrayOfOuputPartitions(std::vector<std::shared_ptr<output_partition>>& output_partitions.push_back(std::make_shared<output_partition>()); output_partitions[num_partitions - 1]->start_gru = start_gru_counter; output_partitions[num_partitions - 1]->num_gru = num_gru_run_domain - start_gru_counter + 1; + output_partitions[num_partitions - 1]->num_active_gru = num_gru_run_domain - start_gru_counter + 1; output_partitions[num_partitions - 1]->num_timesteps = num_timesteps; output_partitions[num_partitions - 1]->simulation_timesteps_remaining = simulation_timesteps_remaining; output_partitions[num_partitions - 1]->grus_ready_to_write = 0; @@ -45,7 +47,7 @@ std::optional<int> addReadyToWriteHRU(std::vector<std::shared_ptr<output_partiti output_partitions[partition_index]->hru_info_and_data[gru_index_in_partition]->ready_to_write = true; output_partitions[partition_index]->grus_ready_to_write += 1; // If all grus are ready to write then return the partition index - if (output_partitions[partition_index]->grus_ready_to_write == output_partitions[partition_index]->num_gru) { + if (output_partitions[partition_index]->grus_ready_to_write == output_partitions[partition_index]->num_active_gru) { return partition_index; } else { @@ -85,67 +87,19 @@ void updateNumTimeForPartition(std::shared_ptr<output_partition> &output_partiti } } +std::optional<int> updatePartitionWithFailedHRU(std::vector<std::shared_ptr<output_partition>>& output_partitions, + int local_gru_index) { + int partition_index = findPatritionIndex(output_partitions[0]->num_gru, local_gru_index, output_partitions.size()); + output_partitions[partition_index]->num_active_gru -= 1; - - - - -Output_Container::Output_Container(int max_hrus, int max_steps) { - this->max_hrus = max_hrus; - this->max_steps = max_steps; - for (int i = 0; i < max_hrus; i++) { - std::vector<hru_output_handles> hru_output_handles; - this->hru_output_handles_vector.push_back(hru_output_handles); - } - -} - -Output_Container::~Output_Container(){}; - -void Output_Container::insertOutput(int hru_index, hru_output_handles hru_output) { - // adjust hru_index to be 0 based - hru_index = hru_index - 1; - try { - if (hru_index < 0 || hru_index >= this->max_hrus) - throw "HRU index out of bounds"; - - if (this->hru_output_handles_vector[hru_index].size() < this->max_steps) - this->hru_output_handles_vector[hru_index].push_back(hru_output); - else - throw "HRU output buffer full"; - - } catch (const char* msg) { - std::cerr << msg << std::endl; + // Check if the partition is now ready to write + if (output_partitions[partition_index]->grus_ready_to_write == output_partitions[partition_index]->num_active_gru) { + return partition_index; } -} - - -bool Output_Container::isFull(int hru_index) { - // adjust hru_index to be 0 based - hru_index = hru_index - 1; - try { - if (hru_index < 0 || hru_index >= this->max_hrus) - throw "HRU index out of bounds"; - - if (this->hru_output_handles_vector[hru_index].size() == this->max_steps) - return true; - else - return false; - - } catch (const char* msg) { - std::cerr << msg << std::endl; + else { + return {}; } - return false; -} - -std::vector<std::vector<hru_output_handles>> Output_Container::getAllHRUOutput() { - return this->hru_output_handles_vector; -} -void Output_Container::clearAll() { - for (int i = 0; i < this->max_hrus; i++) { - this->hru_output_handles_vector[i].clear(); - } } diff --git a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp index 1065163e7f695f13a3b14818b0e42e7ad4bf5bbe..07b6b0bb53af34d0482b4f56e22be14a5235f3d3 100644 --- a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp +++ b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp @@ -151,7 +151,7 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, if (self->state.timestep == 543 && self->state.indxGRU == 2) { - self->send(self->state.parent, hru_error::run_physics_unhandleable); + self->send(self->state.parent, hru_error::run_physics_unhandleable, self); self->quit(); return; } diff --git a/build/source/actors/job_actor/GRUinfo.cpp b/build/source/actors/job_actor/GRUinfo.cpp index 1b4631d884fbfb613c438769f36334fcdbb509e2..9360e0de5e1619db3f837439874bb4907a3fb60b 100644 --- a/build/source/actors/job_actor/GRUinfo.cpp +++ b/build/source/actors/job_actor/GRUinfo.cpp @@ -20,6 +20,14 @@ int GRU::getGlobalGRUIndex() { return this->global_gru_index; } +int GRU::getLocalGRUIndex() { + return this->local_gru_index; +} + +caf::actor GRU::getGRUActor() { + return this->gru_actor; +} + double GRU::getRunTime() { return this->run_time; } @@ -69,6 +77,13 @@ void GRU::setSuccess() { this->state = gru_state::succeeded; } +void GRU::setFailed() { + this->state = gru_state::failed; +} + +void GRU::decrementAttemptsLeft() { + this->attempts_left--; +} diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index 31c82fd6403d7117cf7acc0d16e37cd33c77d6e5..1e991f1906d3a3eb1ed6595cfb1c56bedcdb91c2 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -17,29 +17,12 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings, caf::actor parent) { - // Set the error handlers self->set_down_handler([=](const down_msg& dm) { aout(self) << "\n\n ********** DOWN HANDLER ********** \n"; aout(self) << "Lost Connection With A Connected Actor\n"; aout(self) << "Reason: " << to_string(dm.reason) << "\n"; }); - self->set_error_handler([=](const error& err) { - aout(self) << "\n\n ********** ERROR HANDLER ********** \n"; - - switch(err.category()) { - case type_id_v<hru_error>: - aout(self) << "HRU Error: " << to_string(err) << "\n"; - break; - case type_id_v<file_access_error>: - aout(self) << "File Access Error: " << to_string(err) << "\n"; - break; - default: - aout(self) << "Unknown Error: " << to_string(err) << "\n"; - break; - } - }); - self->set_exit_handler([=](const exit_msg& em) { aout(self) << "\n\n ********** EXIT HANDLER ********** \n"; aout(self) << "Exit Reason: " << to_string(em.reason) << "\n"; @@ -140,7 +123,7 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, if (self->state.gru_container.num_gru_done >= self->state.gru_container.num_gru_in_run_domain) { // Check for failures - if(self->state.gru_container.num_gru_failed == 0) { + if(self->state.gru_container.num_gru_failed == 0 || self->state.max_run_attempts == 1) { //TODO: RENAME DEALLOCATE_STURCTURES this is more of a finalize std::vector<serializable_netcdf_gru_actor_info> netcdf_gru_info = getGruNetcdfInfo( self->state.max_run_attempts, @@ -149,84 +132,53 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, } else { // TODO: Handle failures + } } + }, - } - // aout(self) << "\nDone - GRU:" << self->state.gru_list[indx_gru - 1]->getRefGRU() - // << " - IndexInJob = " << indx_gru << "\n"; - - // self->state.gru_list[indx_gru - 1]->doneRun(total_duration, init_duration, forcing_duration, - // run_physics_duration, write_output_duration); - - // if (self->state.job_actor_settings.output_csv) { - // self->state.gru_list[indx_gru - 1]->writeSuccess(self->state.success_output_file, self->state.hostname); - // } - - // self->state.num_gru_done++; - - // // Check if we are done - // if (self->state.num_gru_done >= self->state.num_gru) { - // self->state.num_gru_done = 0; // just in case there were failures - - // if (self->state.num_gru_failed == 0) { - // self->send(self->state.file_access_actor, deallocate_structures_v); - // } else { - // restartFailures(self); - // } - // } - // }, - - // [=](run_failure, caf::actor actorRef, int indx_gru, int err) { - - // aout(self) << "GRU:" << self->state.gru_list[indx_gru - 1]->getRefGRU() - // << "indx_gru = " << indx_gru << "Failed \n" - // << "Will have to wait until all GRUs are done before it can be re-tried\n"; - - // self->state.num_gru_failed++; - // self->state.gru_list[indx_gru - 1]->updateFailed(); + [=](const error& err, caf::actor src) { + aout(self) << "\n\n ********** ERROR HANDLER \n"; + switch(err.category()) { + case type_id_v<hru_error>: + aout(self) << "HRU Error: " << to_string(err) << "\n"; + handleGRUError(self, err, src); + break; + case type_id_v<file_access_error>: + aout(self) << "File Access Error: " << to_string(err) << "\n"; + break; + default: + aout(self) << "Unknown Error: " << to_string(err) << "\n"; + break; + } + }, - // // Let the file_access_actor know this actor failed - // // self->send(self->state.file_access_actor, run_failure_v, indx_gru); - // // check if we are the last hru to complete - // if (self->state.num_gru_done + self->state.num_gru_failed >= self->state.num_gru) { - // // restartFailures(self); - // self->quit(); - // } - // }, - - // [=](done_init_gru) { - // aout(self) << "GRU is Initialized\n"; - // self->quit(); - // return; - // }, - - // [=](file_access_actor_done, double read_duration, double write_duration) { - // int err = 0; - // // Delete GRUs - // for (auto GRU : self->state.gru_list) { - // delete GRU; - // } - // self->state.gru_list.clear(); + [=](file_access_actor_done, double read_duration, double write_duration) { + int err = 0; + // Delete GRUs + for (auto GRU : self->state.gru_container.gru_list) { + delete GRU; + } + self->state.gru_container.gru_list.clear(); - // self->state.job_timing.updateEndPoint("total_duration"); + self->state.job_timing.updateEndPoint("total_duration"); - // aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n"; - // aout(self) << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n"; - // aout(self) << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n"; - // aout(self) << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n\n"; + aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n"; + aout(self) << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n"; + aout(self) << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n"; + aout(self) << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n\n"; - // deallocateJobActor(&err); - // // Tell Parent we are done - // self->send(self->state.parent, - // done_job_v, - // self->state.num_gru_failed, - // self->state.job_timing.getDuration("total_duration").value_or(-1.0), - // read_duration, write_duration); - // self->quit(); - // }, + deallocateJobActor(&err); + // Tell Parent we are done + self->send(self->state.parent, + done_job_v, + self->state.num_gru_failed, + self->state.job_timing.getDuration("total_duration").value_or(-1.0), + read_duration, write_duration); + self->quit(); + }, // [=](file_access_actor_err, std::string function) { // aout(self) << "Failure in File Access Actor in function: " << function << "\n"; @@ -244,28 +196,6 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, }; } -// void initCsvOutputFile(stateful_actor<job_state>* self) { -// std::string success = "/Success"; // allows us to build the string -// if (self->state.job_actor_settings.output_csv) { -// std::ofstream file; -// self->state.success_output_file = self->state.job_actor_settings.csv_path += success += -// std::to_string(self->state.start_gru) += ".csv"; -// aout(self) << "Success Output File: " << self->state.success_output_file << "\n"; -// file.open(self->state.success_output_file, std::ios_base::out); -// file << -// "hostname," << -// "GRU," << -// "totalDuration," << -// "initDuration," << -// "forcingDuration," << -// "runPhysicsDuration," << -// "writeOutputDuration," << -// "dt_init," << -// "numAttemtps\n"; -// file.close(); -// } -// } - void initGRUs(stateful_actor<job_state>* self) { for(int i = 0; i < self->state.gru_container.num_gru_in_run_domain; i++) { // Spawn the GRU Actor @@ -286,24 +216,6 @@ void initGRUs(stateful_actor<job_state>* self) { self->state.dt_init_start_factor, self->state.max_run_attempts)); } - - - - - - // for(int i = 0; i < self->state.num_gru; i++) { - // int start_gru = self->state.gru_list.size() + self->state.start_gru; - // int index_gru = self->state.gru_list.size() + 1; // Fortran reference starts at 1 - // auto gru = self->spawn(hru_actor, - // start_gru, - // index_gru, - // self->state.hru_actor_settings, - // self->state.file_access_actor, - // self); - // self->state.gru_list.push_back(new GRUinfo(start_gru, index_gru, gru, - // self->state.dt_init_start_factor, self->state.max_run_attempts)); - // } - } std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, std::vector<GRU*> &gru_list) { @@ -326,45 +238,40 @@ std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_att return gru_netcdf_info; } +void handleGRUError(stateful_actor<job_state>* self, const error& err, caf::actor src) { + aout(self) << "Handling HRU Error: " << to_string(err) << "\n"; + + // Find the GRU that failed + for(auto GRU : self->state.gru_container.gru_list) { + if (GRU->getGRUActor() == src) { + GRU->setFailed(); + GRU->decrementAttemptsLeft(); + self->state.gru_container.num_gru_done++; + self->state.num_gru_failed++; + self->send(self->state.file_access_actor, run_failure_v, GRU->getLocalGRUIndex()); + + // Check if we have finished all active GRUs + if (self->state.gru_container.num_gru_done == self->state.gru_container.num_gru_in_run_domain) { + // Check for failures + if(self->state.gru_container.num_gru_failed == 0 || self->state.max_run_attempts == 1) { + //TODO: RENAME DEALLOCATE_STURCTURES this is more of a finalize + std::vector<serializable_netcdf_gru_actor_info> netcdf_gru_info = getGruNetcdfInfo( + self->state.max_run_attempts, + self->state.gru_container.gru_list); + self->send(self->state.file_access_actor, deallocate_structures_v, netcdf_gru_info); + + } else { + // TODO: Handle failures + + } + } + break; + } + } -// void runGRUs(stateful_actor<job_state>* self) { -// for(auto gru : self->state.gru_list) { -// if(!gru->isCompleted() && !gru->isFailed()) { -// self->send(gru->getActor(), start_hru_v); -// } -// } -// } - -// void restartFailures(stateful_actor<job_state>* self) { -// // Need to let the file_access_actor know so it can set up the new output Manager -// self->send(self->state.file_access_actor, restart_failures_v); - -// self->state.num_gru = self->state.num_gru_failed; -// self->state.num_gru_failed = 0; -// self->state.num_gru_done = 0; - - -// for(auto gru : self->state.gru_list) { -// if (gru->isFailed() && !gru->isMaxAttemptsReached()) { -// gru->updateFailed(); -// gru->updateDt_init(); -// auto newGRU = self->spawn(hru_actor, gru->getRefGRU(), gru->getIndxGRU(), -// self->state.hru_actor_settings, self->state.file_access_actor, -// self); -// gru->updateGRU(newGRU); -// gru->updateCurrentAttempt(); -// self->send(gru->getActor(), dt_init_factor_v, gru->getDt_init()); -// } else { -// // Max attempts reached, so we are done with this GRU -// self->state.gru_list[gru->getIndxGRU() - 1]->doneRun(-1, -1, -1, -1, -1); -// if (self->state.job_actor_settings.output_csv) { -// self->state.gru_list[gru->getIndxGRU() - 1]->writeSuccess(self->state.success_output_file, self->state.hostname); -// } -// self->state.num_gru_done++; -// } -// } -// } + +} } // End Namespace caf