From 409370d4d26041cf69809dd8d4890b5b736c1217 Mon Sep 17 00:00:00 2001 From: KyleKlenk <kyle.c.klenk@gmail.com> Date: Mon, 4 Apr 2022 13:51:14 -0600 Subject: [PATCH] Fixed writing of extra timesteps on copernicus --- build/source/actors/FileAccessActor.h | 156 +++++++++++------- build/source/actors/HRU.h | 6 +- build/source/actors/HRUActor.h | 147 +++++++++++------ build/source/actors/messageAtoms.h | 1 + .../driver/summaActors_wOutputStruc.f90 | 8 +- .../interface/hru_actor/cppwrap_hru.f90 | 3 - .../hru_actor/hru_subroutine_wrappers.h | 2 +- build/source/interface/subroutine_wrappers.h | 32 ---- config/Summa_Actors_Settings.json | 6 +- 9 files changed, 209 insertions(+), 152 deletions(-) delete mode 100644 build/source/interface/subroutine_wrappers.h diff --git a/build/source/actors/FileAccessActor.h b/build/source/actors/FileAccessActor.h index 00f2294..57701b6 100644 --- a/build/source/actors/FileAccessActor.h +++ b/build/source/actors/FileAccessActor.h @@ -5,6 +5,9 @@ using namespace caf; void initalizeFileAccessActor(stateful_actor<file_access_state>* self); +int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, int numStepsToWrite); +int readForcing(stateful_actor<file_access_state>* self, int currentFile); + behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU, int outputStrucSize, actor parent) { @@ -22,61 +25,35 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU }, [=](access_forcing, int currentFile, caf::actor refToRespondTo) { - // aout(self) << "Received Current FIle = " << currentFile << std::endl; - if (currentFile <= self->state.numFiles) { - if(self->state.forcFileList[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1 - // aout(self) << "ForcingFile Already Loaded \n"; - self->send(refToRespondTo, run_hru_v, - self->state.forcFileList[currentFile - 1].getNumSteps()); - - } else { - self->state.readStart = std::chrono::high_resolution_clock::now(); - - // Load the file - FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, ¤tFile, - &self->state.stepsInCurrentFile, &self->state.startGRU, - &self->state.numGRU, &self->state.err); - if (self->state.err != 0) { - aout(self) << "ERROR: Reading Forcing" << std::endl; - } - self->state.filesLoaded += 1; - self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile); - - self->state.readEnd = std::chrono::high_resolution_clock::now(); - self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd); - // Check if we have loaded all forcing files - if(self->state.filesLoaded <= self->state.numFiles) { - self->send(self, access_forcing_internal_v, currentFile + 1); - } - - self->send(refToRespondTo, run_hru_v, - self->state.forcFileList[currentFile - 1].getNumSteps()); - } + int err; + // Check if this file is greater than what we expect + if (currentFile > self->state.numFiles) { + aout(self) << "\nERROR: FILE_ACCESS_ACTOR - Current File is Larger than the Number of Files\n"; } else { - aout(self) << currentFile << "is larger than expected" << std::endl; + + err = readForcing(self, currentFile); + if (err != 0) + aout(self) << "\nERROR: FILE_ACCESS_ACTOR - READING_FORCING FAILED\n"; + } - + + // Check if we have loaded all forcing files, if no read more data + if(self->state.filesLoaded <= self->state.numFiles) { + self->send(self, access_forcing_internal_v, currentFile + 1); + } + + // Respond to HRU + self->send(refToRespondTo, run_hru_v, + self->state.forcFileList[currentFile - 1].getNumSteps()); + }, [=](access_forcing_internal, int currentFile) { + if (self->state.filesLoaded <= self->state.numFiles && currentFile <= self->state.numFiles) { - // aout(self) << "Loading in background, File:" << currentFile << "\n"; - if (self->state.forcFileList[currentFile - 1].isFileLoaded()) { - aout(self) << "File Loaded when shouldn't be \n"; - } - self->state.readStart = std::chrono::high_resolution_clock::now(); - FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, ¤tFile, - &self->state.stepsInCurrentFile, &self->state.startGRU, - &self->state.numGRU, &self->state.err); - if (self->state.err != 0) { - aout(self) << "ERROR: Reading Forcing" << std::endl; - } - self->state.filesLoaded += 1; - self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile); - self->state.readEnd = std::chrono::high_resolution_clock::now(); - self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd); + readForcing(self, currentFile); self->send(self, access_forcing_internal_v, currentFile + 1); } else { @@ -88,19 +65,33 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU [=](write_output, int indxGRU, int indxHRU, int numStepsToWrite, caf::actor refToRespondTo) { - int err = 0; - bool hruInit = self->state.outputFileInitHRU[indxGRU - 1]; - self->state.writeStart = std::chrono::high_resolution_clock::now(); - FileAccessActor_WriteOutput(self->state.handle_ncid, &self->state.outputFileExists, - &numStepsToWrite, &self->state.startGRU, &self->state.numGRU, - &hruInit, &indxGRU, &indxHRU, &err); + int err; + + err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite); if (err != 0) { - aout(self) << "ERROR: Writing Output" << std::endl; + aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n"; + } else { + self->send(refToRespondTo, done_write_v); } - self->state.outputFileInitHRU[indxGRU - 1] = true; // - self->state.writeEnd = std::chrono::high_resolution_clock::now(); - self->state.writeDuration += calculateTime(self->state.writeStart, self->state.writeEnd); - self->send(refToRespondTo, done_write_v); + + + }, + + [=](read_and_write, int indxGRU, int indxHRU, int numStepsToWrite, int currentFile, + caf::actor refToRespondTo) { + int err; + + err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite); + if (err != 0) + aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n"; + + err = readForcing(self, currentFile); + if (err != 0) + aout(self) << "\nERROR: FILE_ACCESS_ACTOR - READING_FORCING FAILED\n"; + + // Respond to HRU + self->send(refToRespondTo, run_hru_v, + self->state.forcFileList[currentFile - 1].getNumSteps()); }, @@ -160,6 +151,55 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { } } +int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, + int numStepsToWrite) { + + int err = 0; + bool hruInit = self->state.outputFileInitHRU[indxGRU - 1]; + self->state.writeStart = std::chrono::high_resolution_clock::now(); + FileAccessActor_WriteOutput(self->state.handle_ncid, &self->state.outputFileExists, + &numStepsToWrite, &self->state.startGRU, &self->state.numGRU, + &hruInit, &indxGRU, &indxHRU, &err); + self->state.outputFileInitHRU[indxGRU - 1] = true; + self->state.writeEnd = std::chrono::high_resolution_clock::now(); + self->state.writeDuration += calculateTime(self->state.writeStart, self->state.writeEnd); + return err; +} + +int readForcing(stateful_actor<file_access_state>* self, int currentFile) { + // Check if we have already loaded this file + if(self->state.forcFileList[currentFile -1].isFileLoaded()) { + if (debug) + aout(self) << "ForcingFile Already Loaded \n"; + return 0; + + } else { + + // File Needs to be loaded + self->state.readStart = std::chrono::high_resolution_clock::now(); + + // Load the file + FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, ¤tFile, + &self->state.stepsInCurrentFile, &self->state.startGRU, + &self->state.numGRU, &self->state.err); + + if (self->state.err != 0) { + if (debug) + aout(self) << "ERROR: FileAccessActor_ReadForcing\n" << + "currentFile = " << currentFile << "\n" << "number of steps = " + << self->state.stepsInCurrentFile << "\n"; + return -1; + } else { + self->state.filesLoaded += 1; + self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile); + + self->state.readEnd = std::chrono::high_resolution_clock::now(); + self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd); + return 0; + } + } + +} #endif \ No newline at end of file diff --git a/build/source/actors/HRU.h b/build/source/actors/HRU.h index 3541663..73c1a96 100644 --- a/build/source/actors/HRU.h +++ b/build/source/actors/HRU.h @@ -76,8 +76,8 @@ struct hru_state { double dt_init; // used to initialize the length of the sub-step for each HRU double upArea; // area upslope of each HRU int num_steps = 0; // number of time steps - int forcingStep = 1; // index of current time step in current forcing file - int iFile = 1; // index of current forcing file from forcing file list + int forcingStep; // index of current time step in current forcing file + int iFile; // index of current forcing file from forcing file list int dt_init_factor = 1; // factor of dt_init (coupled_em) bool printOutput; int outputFrequency; @@ -125,6 +125,8 @@ void Initialize_HRU(stateful_actor<hru_state>* self); */ int Run_HRU(stateful_actor<hru_state>* self); +bool check_HRU(stateful_actor<hru_state>* self, int err); + void initalizeTimeVars(stateful_actor<hru_state>* self); void finalizeTimeVars(stateful_actor<hru_state>* self); diff --git a/build/source/actors/HRUActor.h b/build/source/actors/HRUActor.h index 9ef3e0a..ca94bad 100644 --- a/build/source/actors/HRUActor.h +++ b/build/source/actors/HRUActor.h @@ -35,8 +35,10 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, self->state.outputStrucSize = outputStrucSize; // initialize counters - self->state.timestep = 1; - self->state.outputStep = 1; + self->state.timestep = 1; // Timestep of total simulation + self->state.outputStep = 1; // Index of the output structure + self->state.forcingStep = 1; // Index into the forcing file + self->state.iFile = 1; // Get the settings for the HRU parseSettings(self, configPath); @@ -97,56 +99,22 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, }, [=](run_hru, int stepsInCurrentFFile) { - // aout(self) << "Running HRU" << std::endl; self->state.start = std::chrono::high_resolution_clock::now(); - - self->state.stepsInCurrentFFile = stepsInCurrentFFile; + bool keepRunning = true; int err = 0; + self->state.stepsInCurrentFFile = stepsInCurrentFFile; + + while( keepRunning ) { + + err = Run_HRU(self); // Simulate a Timestep - while( err == 0 ) { - - // Check if we need to write - call is here because when we ask for more forcing we may need to write - if (self->state.outputStep >= self->state.outputStrucSize) { - if(debug) - aout(self) << "Sending Write, outputStep = " << self->state.outputStep << std::endl; - - self->send(self->state.file_access_actor, write_output_v, - self->state.indxGRU, self->state.indxHRU, self->state.outputStep, self); - self->state.outputStep = 1; - break; - } - - err = Run_HRU(self); - if (err != 0) { - // RUN FAILURE!!! Notify Parent - self->send(self->state.parent, run_failure_v, self->state.indxGRU, err); - self->quit(); - return; - }; - - // Check if HRU is done computing - if (self->state.timestep >= self->state.num_steps) { - if (debug) - aout(self) << "Sending Final Write, outputStep = " << self->state.outputStep << std::endl; - self->send(self->state.file_access_actor, write_output_v, - self->state.indxGRU, self->state.indxHRU, self->state.outputStep, self); - - self->state.end = std::chrono::high_resolution_clock::now(); - self->state.duration += calculateTime(self->state.start, self->state.end); - - return; - } - // update Timing Variables + // update Timings self->state.timestep += 1; self->state.outputStep += 1; + self->state.forcingStep += 1; + + keepRunning = check_HRU(self, err); - // check if we need more forcing information - if (self->state.forcingStep > self->state.stepsInCurrentFFile) { - if (debug) - aout(self) << "Asking for more forcing data, outputStep =" << self->state.outputStep << std::endl; - self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile + 1, self); - break; - } } self->state.end = std::chrono::high_resolution_clock::now(); @@ -375,7 +343,6 @@ int Run_HRU(stateful_actor<hru_state>* self) { self->state.handle_finshTime, self->state.handle_oldTime, &self->state.outputStep, - &self->state.forcingStep, &self->state.err); if (self->state.err != 0) { aout(self) << "Error: WriteOutput - HRU = " << self->state.indxHRU << @@ -389,6 +356,92 @@ int Run_HRU(stateful_actor<hru_state>* self) { return 0; } +bool check_HRU(stateful_actor<hru_state>* self, int err) { + + if (err != 0) { + // check for error + + self->send(self->state.parent, run_failure_v, self->state.indxGRU, err); + self->quit(); + return false; + + } else if (self->state.timestep > self->state.num_steps) { + // check if simulation is finished + self->state.outputStep -= 1; // prevents segfault + + if (debug) + aout(self) << "Sending Final Write" << + "forcingStep = " << self->state.forcingStep << "\n" << + "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << + "timeStep = " << self->state.timestep << "\n" << + "outputStep = " << self->state.outputStep << "\n"; + + self->send(self->state.file_access_actor, write_output_v, + self->state.indxGRU, self->state.indxHRU, self->state.outputStep, self); + + self->state.end = std::chrono::high_resolution_clock::now(); + self->state.duration += calculateTime(self->state.start, self->state.end); + + return false; + + } else if (self->state.outputStep > self->state.outputStrucSize && + self->state.forcingStep > self->state.stepsInCurrentFFile) { + // Special case where we need both reading and writing + self->state.outputStep -= 1; // prevents segfault + + if (debug) + aout(self) << "Need to read forcing and write to outputstruc\n" << + "forcingStep = " << self->state.forcingStep << "\n" << + "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << + "timeStep = " << self->state.timestep << "\n" << + "outputStep = " << self->state.outputStep << "\n"; + + + self->send(self->state.file_access_actor, read_and_write_v, self->state.indxGRU, + self->state.indxHRU, self->state.outputStep, self->state.iFile + 1, self); + self->state.outputStep = 1; + + return false; + + } else if (self->state.outputStep > self->state.outputStrucSize) { + // check if we need to clear the output struc + self->state.outputStep -= 1; + + if (debug) + aout(self) << "Sending Write \n" << + "forcingStep = " << self->state.forcingStep << "\n" << + "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << + "timeStep = " << self->state.timestep << "\n" << + "outputStep = " << self->state.outputStep << "\n"; + + + self->send(self->state.file_access_actor, write_output_v, + self->state.indxGRU, self->state.indxHRU, self->state.outputStep, self); + self->state.outputStep = 1; + + return false; + + } else if (self->state.forcingStep > self->state.stepsInCurrentFFile) { + // we need more forcing data + + if (debug) + aout(self) << "Asking for more forcing data\n" << + "forcingStep = " << self->state.forcingStep << "\n" << + "stepsInCurrentFFile = " << self->state.stepsInCurrentFFile << "\n" << + "timeStep = " << self->state.timestep << "\n" << + "outputStep = " << self->state.outputStep << "\n"; + + self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile + 1, self); + + return false; + + } else { + if (debug) + aout(self) << "Continuing\n"; + return true; + } +} + void deallocateHRUStructures(stateful_actor<hru_state>* self) { DeallocateStructures(self->state.handle_forcStat, diff --git a/build/source/actors/messageAtoms.h b/build/source/actors/messageAtoms.h index 0cc22d4..7b25bf6 100644 --- a/build/source/actors/messageAtoms.h +++ b/build/source/actors/messageAtoms.h @@ -28,6 +28,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) CAF_ADD_ATOM(summa, update_completed) CAF_ADD_ATOM(summa, update_failed) CAF_ADD_ATOM(summa, reset_outputCounter) + CAF_ADD_ATOM(summa, read_and_write) // HRU Actor CAF_ADD_ATOM(summa, run_hru) CAF_ADD_ATOM(summa, start_hru) diff --git a/build/source/driver/summaActors_wOutputStruc.f90 b/build/source/driver/summaActors_wOutputStruc.f90 index 011f845..5802231 100644 --- a/build/source/driver/summaActors_wOutputStruc.f90 +++ b/build/source/driver/summaActors_wOutputStruc.f90 @@ -89,7 +89,6 @@ subroutine summaActors_writeToOutputStruc(& finshTime, & ! x%var(:) -- end time for the model simulation oldTime, & ! x%var(:) -- time for the previous model time step outputStep, & ! index into the output Struc - forcingStep, & ! index of current time step in current forcing file ! run time variables err, message) USE nrtype @@ -153,7 +152,6 @@ subroutine summaActors_writeToOutputStruc(& type(var_i),intent(inout) :: finshTime ! end time for the model simulation type(var_i),intent(inout) :: oldTime ! integer(i4b),intent(in) :: outputStep ! index into the outputStructure - integer(i4b),intent(inout) :: forcingStep ! index of current time step in current forcing file ! run time variables integer(i4b),intent(out) :: err character(*),intent(out) :: message @@ -177,7 +175,8 @@ subroutine summaActors_writeToOutputStruc(& err=0; message='summa_manageOutputFiles/' ! identify the start of the writing call date_and_time(values=startWrite) - + ! print*, "HRU WRite timestep = ", modelTimeStep + ! print*, "OutputStep = ", outputStep ! ! initialize the statistics flags if(modelTimeStep==1)then @@ -317,9 +316,6 @@ subroutine summaActors_writeToOutputStruc(& if(finalizeStats%dat(iFreq)) outputTimeStep%var(iFreq) = outputTimeStep%var(iFreq) + 1 end do - ! increment forcingStep - forcingStep=forcingStep+1 - ! if finalized stats, then reset stats on the next time step resetStats%dat(:) = finalizeStats%dat(:) diff --git a/build/source/interface/hru_actor/cppwrap_hru.f90 b/build/source/interface/hru_actor/cppwrap_hru.f90 index a3a43d9..b6909fb 100644 --- a/build/source/interface/hru_actor/cppwrap_hru.f90 +++ b/build/source/interface/hru_actor/cppwrap_hru.f90 @@ -570,7 +570,6 @@ subroutine WriteOutput(& handle_finshTime, & ! end time for the model simulation handle_oldTime, & ! time for the previous model time step outputStep, & - forcingStep, & ! run time variables err) bind(C, name='WriteOutput') @@ -610,7 +609,6 @@ subroutine WriteOutput(& type(c_ptr), intent(in), value :: handle_finshTime ! end time for the model simulation type(c_ptr), intent(in), value :: handle_oldTime ! time for the previous model time step integer(c_int), intent(in) :: outputStep - integer(c_int), intent(inout) :: forcingStep ! index of current time step in current forcing file ! run time variables integer(c_int) :: err @@ -701,7 +699,6 @@ subroutine WriteOutput(& finshTime, & ! x%var(:) -- end time for the model simulation oldTime, & ! x%var(:) -- time for the previous model time step outputStep, & - forcingStep, & ! index of current time step in current forcing file ! run time variables err, message) diff --git a/build/source/interface/hru_actor/hru_subroutine_wrappers.h b/build/source/interface/hru_actor/hru_subroutine_wrappers.h index 6a5b30c..4ff9cef 100644 --- a/build/source/interface/hru_actor/hru_subroutine_wrappers.h +++ b/build/source/interface/hru_actor/hru_subroutine_wrappers.h @@ -76,7 +76,7 @@ extern "C" { void* bparStruct, void* bvarStruct, // local vars void* statCounter, void* outputTimeStep, void* resetStats, void* finalizeStats, - void* finshTime, void* oldTime, int* outputStep, int* forcingStep, int* err); + void* finshTime, void* oldTime, int* outputStep, int* err); void DeallocateStructures( void* handle_forcStat, void* handle_progStat, void* handle_diagStat, void* handle_fluxStat, diff --git a/build/source/interface/subroutine_wrappers.h b/build/source/interface/subroutine_wrappers.h deleted file mode 100644 index 19fe00a..0000000 --- a/build/source/interface/subroutine_wrappers.h +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef SUBROUTINE_WRAPPERS_H_ -#define SUBROUTINE_WRAPPERS_H_ - -// extern "C" { -// void getNumHRUGRU(int* numGRU, int* numHRU, char const *str1); - - -// //void initGlobals(char const*str1, int* numGRU, int* numHRU, int* err, void* downSlopeStruct); - -// void mDecisions_init(int* err); - -// void getNumHRUsInGRU(int* indxGRU, int* numHRUs); - -// void initDownSlope(int* indxGRU, int* numHRUs, int* err); - -// void endTimeStepDownSlope(void* handle_downSlopeStruct); - - - - - - - - - - - -// void UpdateDownSlope(int* indxHRU, void* handle_downSlopeStruct, void* handle_fluxStruct); - - - -// # endif \ No newline at end of file diff --git a/config/Summa_Actors_Settings.json b/config/Summa_Actors_Settings.json index 5acfcc6..5011998 100644 --- a/config/Summa_Actors_Settings.json +++ b/config/Summa_Actors_Settings.json @@ -5,13 +5,13 @@ }, "JobActor": { - "FileManagerPath": , + "FileManagerPath": "", "outputCSV": false, - "csvPath": + "csvPath": "" }, "HRUActor": { "printOutput": true, - "outputFrequency": 1000 + "outputFrequency": 100000 } } \ No newline at end of file -- GitLab