diff --git a/build/includes/file_access_actor/file_access_actor.hpp b/build/includes/file_access_actor/file_access_actor.hpp index 7e5b5e8a6afb57c7784afa1c0f83ba4fc45a4618..d88a7725bf81036e288e8ab60da7993615374a14 100644 --- a/build/includes/file_access_actor/file_access_actor.hpp +++ b/build/includes/file_access_actor/file_access_actor.hpp @@ -30,6 +30,7 @@ struct file_access_state { int numFiles; int filesLoaded; int err; + int num_output_steps; Output_Container *output_container; diff --git a/build/includes/file_access_actor/output_container.hpp b/build/includes/file_access_actor/output_container.hpp index 9c40bbde7b7eed03799d5db3bfbc13767180f333..4521c1cd79ed158cbdbf8e47c6827cf538a4299c 100644 --- a/build/includes/file_access_actor/output_container.hpp +++ b/build/includes/file_access_actor/output_container.hpp @@ -1,11 +1,15 @@ #pragma once +#include "caf/actor.hpp" #include "fortran_data_types.hpp" #include <vector> #include <iostream> struct hru_output_handles { + caf::actor hru_actor; + int index_hru; + int index_gru; // Statistic Structures void* handle_forc_stat = new_handle_var_dlength(); void* handle_prog_stat = new_handle_var_dlength(); @@ -57,5 +61,7 @@ class Output_Container { // returns the matrix of hru_outputs for writing std::vector<std::vector<hru_output_handles>> getAllHRUOutput(); + void clearAll(); + }; \ No newline at end of file diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp index 9f50765e7dd0b2ebb47dba9f4e06199eecf16a13..35c2c8f1e5543f6a3cafab102b1dd8adc81fd837 100644 --- a/build/includes/global/message_atoms.hpp +++ b/build/includes/global/message_atoms.hpp @@ -152,6 +152,18 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) // Reciever: // Summary: CAF_ADD_ATOM(summa, no) + // Sender: + // Reciever: + // Summary: + CAF_ADD_ATOM(summa, new_forcing_file) + // Sender: + // Reciever: + // Summary: + CAF_ADD_ATOM(summa, num_steps_before_write) + // Sender: + // Reciever: + // Summary: + CAF_ADD_ATOM(summa, get_num_output_steps) // Struct Types CAF_ADD_TYPE_ID(summa, (Distributed_Settings)) diff --git a/build/includes/hru_actor/hru_actor.hpp b/build/includes/hru_actor/hru_actor.hpp index 9f4b3813afe6e673453a921beb509086dc63a8a0..d4fedec6f95886455f4b3825d15e3a98c29594bb 100644 --- a/build/includes/hru_actor/hru_actor.hpp +++ b/build/includes/hru_actor/hru_actor.hpp @@ -28,6 +28,7 @@ struct hru_state { int forcingFileStep; int currentForcingFile = 1; + int num_steps_until_write; // statistics structures void *handle_forcStat = new_handle_var_dlength(); // model forcing data diff --git a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp index 7cf2f2d3c063e135c9c4d1fafd13fb0720f3030c..ac0576716296abb4ed1415945065f5b6672ef7a5 100644 --- a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp +++ b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp @@ -31,9 +31,8 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr self->state.handle_ncid = new_handle_var_i(); self->state.err = 0; - int max_steps = 120; - self->state.output_container = new Output_Container(num_gru, max_steps); - + self->state.num_output_steps = 60; + self->state.output_container = new Output_Container(num_gru, self->state.num_output_steps); initalizeFileAccessActor(self); @@ -74,7 +73,9 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr [=](access_forcing, int currentFile, caf::actor refToRespondTo) { if (currentFile <= self->state.numFiles) { if(self->state.forcing_file_list[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1 - self->send(refToRespondTo, run_hru_v, + // Send the HRU actor the new forcing file + // then tell it to get back to running + self->send(refToRespondTo, new_forcing_file_v, self->state.forcing_file_list[currentFile - 1].getNumSteps(), currentFile); @@ -97,7 +98,9 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr self->send(self, access_forcing_internal_v, currentFile + 1); } - self->send(refToRespondTo, run_hru_v, + // Send the HRU actor the new forcing file + // then tell it to get back to running + self->send(refToRespondTo, new_forcing_file_v, self->state.forcing_file_list[currentFile - 1].getNumSteps(), currentFile); } @@ -149,7 +152,11 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr }, - [=](write_output, int index_gru, int index_hru, + [=] (get_num_output_steps, caf::actor hru) { + self->send(hru, num_steps_before_write_v, self->state.num_output_steps); + }, + + [=](write_output, int index_gru, int index_hru, caf::actor hru_actor, // statistic structures std::vector<std::vector<double>> forc_stat, std::vector<std::vector<double>> prog_stat, std::vector<std::vector<double>> diag_stat, std::vector<std::vector<double>> flux_stat, std::vector<std::vector<double>> indx_stat, std::vector<std::vector<double>> bvar_stat, @@ -180,6 +187,10 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr hru_output_handles hru_output; int err = 0; + // hru information + hru_output.hru_actor = hru_actor; + hru_output.index_gru = index_gru; + hru_output.index_hru = index_hru; // statistic structures set_var_dlength(forc_stat, hru_output.handle_forc_stat); set_var_dlength(prog_stat, hru_output.handle_prog_stat); @@ -211,12 +222,11 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr self->state.output_container->insertOutput(index_gru, hru_output); - aout(self) << "Is container full: " << self->state.output_container->isFull(index_gru) << "\n"; - if (self->state.output_container->isFull(index_gru)) { aout(self) << "Writing output for GRU: " << index_gru << "\n"; std::vector<std::vector<hru_output_handles>> hru_output = self->state.output_container->getAllHRUOutput(); + for (int i = 0; i < hru_output[0].size(); i++) { @@ -247,13 +257,13 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr &err); } - } - - - - - + // reset the hrus counter + // for all hrus that just wrote, reset the counter + self->state.output_container->clearAll(); + self->send(hru_actor, num_steps_before_write_v, self->state.num_output_steps); + self->send(hru_actor, run_hru_v); + } self->state.file_access_timing.updateEndPoint("write_duration"); @@ -271,6 +281,11 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr }, + [=](done_hru, caf::actor hru_actor, int index_gru, int index_hru) { + aout(self) << "HRU: " << index_hru << " is done" << "\n"; + + }, + /** * Message from JobActor * OutputManager needs to be adjusted so the failed HRUs can run again diff --git a/build/source/actors/file_access_actor/cpp_code/output_container.cpp b/build/source/actors/file_access_actor/cpp_code/output_container.cpp index f447bfefce6c32198af596059df5e269b73c6e4e..0e8b421c2e0c3b5517cfc2551e459f3dd7ce85c7 100644 --- a/build/source/actors/file_access_actor/cpp_code/output_container.cpp +++ b/build/source/actors/file_access_actor/cpp_code/output_container.cpp @@ -53,3 +53,10 @@ std::vector<std::vector<hru_output_handles>> Output_Container::getAllHRUOutput() return this->hru_output_handles_vector; } + +void Output_Container::clearAll() { + for (int i = 0; i < this->max_hrus; i++) { + this->hru_output_handles_vector[i].clear(); + } +} + diff --git a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp index 340e64828f879882b7ee6e70069b28c931656e51..02a5389e26f197e6dabb7d9201abf37db95426c0 100644 --- a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp +++ b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp @@ -53,6 +53,8 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, aout(self) << "NumSteps = " << self->state.num_steps << std::endl; + self->send(self->state.file_access_actor, get_num_output_steps_v, self); + // Get attributes self->send(self->state.file_access_actor, get_attributes_params_v, self->state.indxGRU, self); @@ -99,34 +101,25 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, Initialize_HRU(self); self->send(self, start_hru_v); + }, + [=](num_steps_before_write, int num_steps) { + aout(self) << "NumSteps = " << num_steps << std::endl; + self->state.num_steps_until_write = num_steps; }, - - // The file_access_actor sends the HRU a message with - // the number of steps it can run based on the forcing data - [=](run_hru, int stepsInCurrentFFile, int iFile) { + // Run HRU for a number of timesteps + [=](run_hru) { self->state.hru_timing.updateStartPoint("total_duration"); - int err = 0; - self->state.stepsInCurrentFFile = stepsInCurrentFFile; - self->state.iFile = iFile; - self->state.forcingStep = 1; - - setTimeZoneOffset(&self->state.iFile, - &self->state.tmZoneOffsetFracDay, &err); - // We need to get the first timestep that we are on - if (self->state.timestep == 1 ) { + if (self->state.timestep == 1) { getFirstTimestep(&self->state.iFile, &self->state.forcingStep, &err); if (self->state.forcingStep == -1) { aout(self) << "HRU - Wrong starting forcing file\n";} } - - for (self->state.forcingStep; - self->state.forcingStep <= self->state.stepsInCurrentFFile; - self->state.forcingStep++) { - + while(self->state.num_steps_until_write > 0) { + self->state.num_steps_until_write--; err = Run_HRU(self); // Simulate a Timestep getAndSendOutput(self); @@ -134,25 +127,36 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, // Update counters for the fortran side updateCounters(self->state.handle_timeStruct, self->state.handle_statCounter, self->state.handle_outputTimeStep, self->state.handle_resetStats, self->state.handle_oldTime, self->state.handle_finalizeStats); - // model timestep - self->state.timestep += 1; - + self->state.timestep++; + // forcing step + self->state.forcingStep++; // HRU has finished if (self->state.timestep > self->state.num_steps) { self->send(self, done_hru_v); break; } - } - // get more forcing data from the file_access_actor - if (self->state.timestep < self->state.num_steps) { - self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile + 1, self); + if (self->state.forcingStep > self->state.stepsInCurrentFFile) { + self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile+1, self); + break; + } } - + self->state.hru_timing.updateEndPoint("total_duration"); }, + + [=](new_forcing_file, int num_forcing_steps_in_iFile, int iFile) { + int err; + self->state.hru_timing.updateStartPoint("total_duration"); + self->state.iFile = iFile; + self->state.stepsInCurrentFFile = num_forcing_steps_in_iFile; + setTimeZoneOffset(&self->state.iFile, &self->state.tmZoneOffsetFracDay, &err); + self->send(self, run_hru_v); + }, + + [=](done_hru) { // Tell our parent we are done, convert all timings to seconds @@ -163,6 +167,8 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, aout(self) << "Run Physics Duration = " << self->state.hru_timing.getDuration("run_physics_duration").value_or(-1.0) << " Seconds\n"; aout(self) << "Write Output Duration = " << self->state.hru_timing.getDuration("write_output_duration").value_or(-1.0) << " Seconds\n\n"; + self->send(self->state.file_access_actor, done_hru_v, self->state.indxGRU, self->state.indxHRU); + self->send(self->state.parent, done_hru_v, self->state.indxGRU, @@ -375,6 +381,7 @@ void getAndSendOutput(stateful_actor<hru_state>* self) { self->send(self->state.file_access_actor, write_output_v, self->state.indxGRU, self->state.indxHRU, + self, // statistic structures forc_stat_array, prog_stat_array,