From a6952856c1761aa787cc56b1700442fbca9e790a Mon Sep 17 00:00:00 2001 From: Kyle <kyle.c.klenk@gmail.com> Date: Tue, 29 Nov 2022 00:06:27 +0000 Subject: [PATCH] Can get 1 HRU managed by the file access actor. An HRU will wait until it is told it can write again. --- .../file_access_actor/file_access_actor.hpp | 1 + .../file_access_actor/output_container.hpp | 6 ++ build/includes/global/message_atoms.hpp | 12 ++++ build/includes/hru_actor/hru_actor.hpp | 1 + .../cpp_code/file_access_actor.cpp | 43 +++++++++----- .../cpp_code/output_container.cpp | 7 +++ .../actors/hru_actor/cpp_code/hru_actor.cpp | 59 +++++++++++-------- 7 files changed, 89 insertions(+), 40 deletions(-) diff --git a/build/includes/file_access_actor/file_access_actor.hpp b/build/includes/file_access_actor/file_access_actor.hpp index 7e5b5e8..d88a772 100644 --- a/build/includes/file_access_actor/file_access_actor.hpp +++ b/build/includes/file_access_actor/file_access_actor.hpp @@ -30,6 +30,7 @@ struct file_access_state { int numFiles; int filesLoaded; int err; + int num_output_steps; Output_Container *output_container; diff --git a/build/includes/file_access_actor/output_container.hpp b/build/includes/file_access_actor/output_container.hpp index 9c40bbd..4521c1c 100644 --- a/build/includes/file_access_actor/output_container.hpp +++ b/build/includes/file_access_actor/output_container.hpp @@ -1,11 +1,15 @@ #pragma once +#include "caf/actor.hpp" #include "fortran_data_types.hpp" #include <vector> #include <iostream> struct hru_output_handles { + caf::actor hru_actor; + int index_hru; + int index_gru; // Statistic Structures void* handle_forc_stat = new_handle_var_dlength(); void* handle_prog_stat = new_handle_var_dlength(); @@ -57,5 +61,7 @@ class Output_Container { // returns the matrix of hru_outputs for writing std::vector<std::vector<hru_output_handles>> getAllHRUOutput(); + void clearAll(); + }; \ No newline at end of file diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp index 9f50765..35c2c8f 100644 --- a/build/includes/global/message_atoms.hpp +++ b/build/includes/global/message_atoms.hpp @@ -152,6 +152,18 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) // Reciever: // Summary: CAF_ADD_ATOM(summa, no) + // Sender: + // Reciever: + // Summary: + CAF_ADD_ATOM(summa, new_forcing_file) + // Sender: + // Reciever: + // Summary: + CAF_ADD_ATOM(summa, num_steps_before_write) + // Sender: + // Reciever: + // Summary: + CAF_ADD_ATOM(summa, get_num_output_steps) // Struct Types CAF_ADD_TYPE_ID(summa, (Distributed_Settings)) diff --git a/build/includes/hru_actor/hru_actor.hpp b/build/includes/hru_actor/hru_actor.hpp index 9f4b381..d4fedec 100644 --- a/build/includes/hru_actor/hru_actor.hpp +++ b/build/includes/hru_actor/hru_actor.hpp @@ -28,6 +28,7 @@ struct hru_state { int forcingFileStep; int currentForcingFile = 1; + int num_steps_until_write; // statistics structures void *handle_forcStat = new_handle_var_dlength(); // model forcing data diff --git a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp index 7cf2f2d..ac05767 100644 --- a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp +++ b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp @@ -31,9 +31,8 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr self->state.handle_ncid = new_handle_var_i(); self->state.err = 0; - int max_steps = 120; - self->state.output_container = new Output_Container(num_gru, max_steps); - + self->state.num_output_steps = 60; + self->state.output_container = new Output_Container(num_gru, self->state.num_output_steps); initalizeFileAccessActor(self); @@ -74,7 +73,9 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr [=](access_forcing, int currentFile, caf::actor refToRespondTo) { if (currentFile <= self->state.numFiles) { if(self->state.forcing_file_list[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1 - self->send(refToRespondTo, run_hru_v, + // Send the HRU actor the new forcing file + // then tell it to get back to running + self->send(refToRespondTo, new_forcing_file_v, self->state.forcing_file_list[currentFile - 1].getNumSteps(), currentFile); @@ -97,7 +98,9 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr self->send(self, access_forcing_internal_v, currentFile + 1); } - self->send(refToRespondTo, run_hru_v, + // Send the HRU actor the new forcing file + // then tell it to get back to running + self->send(refToRespondTo, new_forcing_file_v, self->state.forcing_file_list[currentFile - 1].getNumSteps(), currentFile); } @@ -149,7 +152,11 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr }, - [=](write_output, int index_gru, int index_hru, + [=] (get_num_output_steps, caf::actor hru) { + self->send(hru, num_steps_before_write_v, self->state.num_output_steps); + }, + + [=](write_output, int index_gru, int index_hru, caf::actor hru_actor, // statistic structures std::vector<std::vector<double>> forc_stat, std::vector<std::vector<double>> prog_stat, std::vector<std::vector<double>> diag_stat, std::vector<std::vector<double>> flux_stat, std::vector<std::vector<double>> indx_stat, std::vector<std::vector<double>> bvar_stat, @@ -180,6 +187,10 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr hru_output_handles hru_output; int err = 0; + // hru information + hru_output.hru_actor = hru_actor; + hru_output.index_gru = index_gru; + hru_output.index_hru = index_hru; // statistic structures set_var_dlength(forc_stat, hru_output.handle_forc_stat); set_var_dlength(prog_stat, hru_output.handle_prog_stat); @@ -211,12 +222,11 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr self->state.output_container->insertOutput(index_gru, hru_output); - aout(self) << "Is container full: " << self->state.output_container->isFull(index_gru) << "\n"; - if (self->state.output_container->isFull(index_gru)) { aout(self) << "Writing output for GRU: " << index_gru << "\n"; std::vector<std::vector<hru_output_handles>> hru_output = self->state.output_container->getAllHRUOutput(); + for (int i = 0; i < hru_output[0].size(); i++) { @@ -247,13 +257,13 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr &err); } - } - - - - - + // reset the hrus counter + // for all hrus that just wrote, reset the counter + self->state.output_container->clearAll(); + self->send(hru_actor, num_steps_before_write_v, self->state.num_output_steps); + self->send(hru_actor, run_hru_v); + } self->state.file_access_timing.updateEndPoint("write_duration"); @@ -271,6 +281,11 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr }, + [=](done_hru, caf::actor hru_actor, int index_gru, int index_hru) { + aout(self) << "HRU: " << index_hru << " is done" << "\n"; + + }, + /** * Message from JobActor * OutputManager needs to be adjusted so the failed HRUs can run again diff --git a/build/source/actors/file_access_actor/cpp_code/output_container.cpp b/build/source/actors/file_access_actor/cpp_code/output_container.cpp index f447bfe..0e8b421 100644 --- a/build/source/actors/file_access_actor/cpp_code/output_container.cpp +++ b/build/source/actors/file_access_actor/cpp_code/output_container.cpp @@ -53,3 +53,10 @@ std::vector<std::vector<hru_output_handles>> Output_Container::getAllHRUOutput() return this->hru_output_handles_vector; } + +void Output_Container::clearAll() { + for (int i = 0; i < this->max_hrus; i++) { + this->hru_output_handles_vector[i].clear(); + } +} + diff --git a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp index 340e648..02a5389 100644 --- a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp +++ b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp @@ -53,6 +53,8 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, aout(self) << "NumSteps = " << self->state.num_steps << std::endl; + self->send(self->state.file_access_actor, get_num_output_steps_v, self); + // Get attributes self->send(self->state.file_access_actor, get_attributes_params_v, self->state.indxGRU, self); @@ -99,34 +101,25 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, Initialize_HRU(self); self->send(self, start_hru_v); + }, + [=](num_steps_before_write, int num_steps) { + aout(self) << "NumSteps = " << num_steps << std::endl; + self->state.num_steps_until_write = num_steps; }, - - // The file_access_actor sends the HRU a message with - // the number of steps it can run based on the forcing data - [=](run_hru, int stepsInCurrentFFile, int iFile) { + // Run HRU for a number of timesteps + [=](run_hru) { self->state.hru_timing.updateStartPoint("total_duration"); - int err = 0; - self->state.stepsInCurrentFFile = stepsInCurrentFFile; - self->state.iFile = iFile; - self->state.forcingStep = 1; - - setTimeZoneOffset(&self->state.iFile, - &self->state.tmZoneOffsetFracDay, &err); - // We need to get the first timestep that we are on - if (self->state.timestep == 1 ) { + if (self->state.timestep == 1) { getFirstTimestep(&self->state.iFile, &self->state.forcingStep, &err); if (self->state.forcingStep == -1) { aout(self) << "HRU - Wrong starting forcing file\n";} } - - for (self->state.forcingStep; - self->state.forcingStep <= self->state.stepsInCurrentFFile; - self->state.forcingStep++) { - + while(self->state.num_steps_until_write > 0) { + self->state.num_steps_until_write--; err = Run_HRU(self); // Simulate a Timestep getAndSendOutput(self); @@ -134,25 +127,36 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, // Update counters for the fortran side updateCounters(self->state.handle_timeStruct, self->state.handle_statCounter, self->state.handle_outputTimeStep, self->state.handle_resetStats, self->state.handle_oldTime, self->state.handle_finalizeStats); - // model timestep - self->state.timestep += 1; - + self->state.timestep++; + // forcing step + self->state.forcingStep++; // HRU has finished if (self->state.timestep > self->state.num_steps) { self->send(self, done_hru_v); break; } - } - // get more forcing data from the file_access_actor - if (self->state.timestep < self->state.num_steps) { - self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile + 1, self); + if (self->state.forcingStep > self->state.stepsInCurrentFFile) { + self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile+1, self); + break; + } } - + self->state.hru_timing.updateEndPoint("total_duration"); }, + + [=](new_forcing_file, int num_forcing_steps_in_iFile, int iFile) { + int err; + self->state.hru_timing.updateStartPoint("total_duration"); + self->state.iFile = iFile; + self->state.stepsInCurrentFFile = num_forcing_steps_in_iFile; + setTimeZoneOffset(&self->state.iFile, &self->state.tmZoneOffsetFracDay, &err); + self->send(self, run_hru_v); + }, + + [=](done_hru) { // Tell our parent we are done, convert all timings to seconds @@ -163,6 +167,8 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, aout(self) << "Run Physics Duration = " << self->state.hru_timing.getDuration("run_physics_duration").value_or(-1.0) << " Seconds\n"; aout(self) << "Write Output Duration = " << self->state.hru_timing.getDuration("write_output_duration").value_or(-1.0) << " Seconds\n\n"; + self->send(self->state.file_access_actor, done_hru_v, self->state.indxGRU, self->state.indxHRU); + self->send(self->state.parent, done_hru_v, self->state.indxGRU, @@ -375,6 +381,7 @@ void getAndSendOutput(stateful_actor<hru_state>* self) { self->send(self->state.file_access_actor, write_output_v, self->state.indxGRU, self->state.indxHRU, + self, // statistic structures forc_stat_array, prog_stat_array, -- GitLab