From 737f286231256739ae7efc6ff7c2e86af80a58d1 Mon Sep 17 00:00:00 2001
From: Kyle Klenk <kyle.c.klenk@gmail.com>
Date: Fri, 20 May 2022 16:54:04 -0600
Subject: [PATCH] fixed restarting of GRU

---
 build/makefile                                | 18 +++---
 build/source/actors/FileAccessActor.h         | 10 +++-
 build/source/actors/HRUActor.h                |  6 +-
 build/source/actors/JobActor.h                |  3 +
 build/source/actors/OutputManager.h           | 59 ++++++++++++++++---
 build/source/actors/messageAtoms.h            |  1 +
 .../file_access_actor/cppwrap_fileAccess.f90  |  8 +++
 .../fileAccess_subroutine_wrappers.h          |  2 +
 build/source/netcdf/writeOutput.f90           | 39 ++++++++----
 .../OutputVerification/compareOutput.py       | 14 ++---
 10 files changed, 118 insertions(+), 42 deletions(-)

diff --git a/build/makefile b/build/makefile
index 3b8270d..e622bef 100644
--- a/build/makefile
+++ b/build/makefile
@@ -15,17 +15,17 @@ ACTORS_INCLUDES = -I/usr/include -I/usr/local/include
 ACTORS_LIBRARIES = -L/home/linuxbrew.linuxbrew/lib -L/home/kklenk/Summa-Actors/build -lcaf_core -lcaf_io -lsumma -lopenblas -lnetcdff
 
 
-# # Production runs
-# FLAGS_NOAH = -O3 -ffree-form -ffree-line-length-none -fmax-errors=0 -fPIC
-# FLAGS_COMM = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
-# FLAGS_SUMMA = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
-# FLAGS_ACTORS = -O3
+# Production runs
+FLAGS_NOAH = -O3 -ffree-form -ffree-line-length-none -fmax-errors=0 -fPIC
+FLAGS_COMM = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
+FLAGS_SUMMA = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
+FLAGS_ACTORS = -O3
 
 # # Debug runs
-FLAGS_NOAH = -pg -g -O0 -ffree-form -ffree-line-length-none -fmax-errors=0 -fbacktrace -Wno-unused -Wno-unused-dummy-argument -fPIC
-FLAGS_COMM = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
-FLAGS_SUMMA = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
-FLAGS_ACTORS = -pg -g -O0 -Wall
+# FLAGS_NOAH = -pg -g -O0 -ffree-form -ffree-line-length-none -fmax-errors=0 -fbacktrace -Wno-unused -Wno-unused-dummy-argument -fPIC
+# FLAGS_COMM = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
+# FLAGS_SUMMA = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
+# FLAGS_ACTORS = -pg -g -O0 -Wall
 
 
 #========================================================================
diff --git a/build/source/actors/FileAccessActor.h b/build/source/actors/FileAccessActor.h
index 83a9fb4..0fe741b 100644
--- a/build/source/actors/FileAccessActor.h
+++ b/build/source/actors/FileAccessActor.h
@@ -132,7 +132,6 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU
             // update the list in Fortran
             updateFailed(&indxGRU);
 
-
             listIndex = self->state.output_manager->decrementMaxSize(indxGRU);
           
             // Check if this list is now full
@@ -141,6 +140,15 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU
             }
         },
 
+        /**
+         * Message from JobActor
+         * OutputManager needs to be adjusted so the failed HRUs can run again
+         */
+        [=](restart_failures) {
+            resetFailedArray();
+            self->state.output_manager->restartFailures();
+        },
+
         [=](deallocate_structures) {
             aout(self) << "Deallocating Structure" << std::endl;
             FileAccessActor_DeallocateStructures(self->state.handle_forcFileInfo, self->state.handle_ncid);
diff --git a/build/source/actors/HRUActor.h b/build/source/actors/HRUActor.h
index 672d8e5..12ea480 100644
--- a/build/source/actors/HRUActor.h
+++ b/build/source/actors/HRUActor.h
@@ -133,9 +133,9 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU,
                 self->state.outputStep += 1;
                 self->state.forcingStep += 1;
 
-                if (self->state.timestep == 450 && self->state.indxGRU == 5) {
-                    err = 20;
-                }
+                // if (self->state.timestep == 450 && self->state.indxGRU == 5) {
+                //     err = 20;
+                // }
 
                 keepRunning = check_HRU(self, err); // check if we are done, need to write
 
diff --git a/build/source/actors/JobActor.h b/build/source/actors/JobActor.h
index 13c6ca7..d2087dd 100644
--- a/build/source/actors/JobActor.h
+++ b/build/source/actors/JobActor.h
@@ -282,6 +282,9 @@ void runGRUs(stateful_actor<job_state>* self) {
 }
 
 void restartFailures(stateful_actor<job_state>* self) {
+    // Need to let the file_access_actor know so it can set up the new output Manager
+    self->send(self->state.file_access_actor, restart_failures_v);
+
     self->state.numGRU = self->state.numGRUFailed;
     self->state.numGRUFailed = 0;
     self->state.numGRUDone = 0;
diff --git a/build/source/actors/OutputManager.h b/build/source/actors/OutputManager.h
index b2f7f3c..5faa5f1 100644
--- a/build/source/actors/OutputManager.h
+++ b/build/source/actors/OutputManager.h
@@ -134,7 +134,10 @@ class OutputManager {
 
         int numVectors;
         int avgSizeOfActorList;
+        bool runningFailures;
         std::vector<ActorRefList*> list;
+        std::vector<int> failedHRU;
+        std::vector<int> failureReRun; // index used so we can add failedHRUs if they fail a second time
 
 
 
@@ -144,6 +147,7 @@ class OutputManager {
             this->numVectors = numVectors;
             int sizeOfOneVector = totalNumActors / numVectors;
             this->avgSizeOfActorList = sizeOfOneVector;
+            this->runningFailures = false;
             // Create the first n-1 vectors with the same size 
             for (int i = 0; i < numVectors - 1; i++) {
                 auto refList = new ActorRefList(sizeOfOneVector);
@@ -166,13 +170,29 @@ class OutputManager {
          * @return int The list index that actor is added to.
          */
         int addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) {
-            // Index has to be subtracted by 1 because Fortran array starts at 1
-            int listIndex = (index - 1) / this->avgSizeOfActorList;
-            if (listIndex > this->numVectors - 1) {
-                listIndex =  this->numVectors - 1;
+            int listIndex;
+            if (this->runningFailures) {
+                // find the index of the structure this HRU is in
+                auto it = find(this->failureReRun.begin(), this->failureReRun.end(), index);
+
+                if (it != this->failureReRun.end()) {
+                    listIndex = it - this->failureReRun.begin();
+                } else {
+                    throw "Element Not Found in failureReRun list";
+                }
+
+                this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite);
+
+            } else {
+                // Index has to be subtracted by 1 because Fortran array starts at 1
+                listIndex = (index - 1) / this->avgSizeOfActorList;
+                if (listIndex > this->numVectors - 1) {
+                    listIndex =  this->numVectors - 1;
+                }
+
+                this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite);
             }
 
-            this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite);
             return listIndex;
         }
 
@@ -208,16 +228,18 @@ class OutputManager {
             this->list[listIndex]->removeFailed(actorRef);
 
             return listIndex;
-
         }
 
         /**
-         *
+         * Decrease the size of the list
+         * Add this GRU to the failed list
          */ 
+        int decrementMaxSize(int indexHRU) {
+            
+            this->failedHRU.push_back(indexHRU);
 
-        int decrementMaxSize(int indexGRU) {
             // Find the list this actor is on
-            int listIndex = (indexGRU - 1) / this->avgSizeOfActorList;
+            int listIndex = (indexHRU - 1) / this->avgSizeOfActorList;
             if (listIndex > this->numVectors - 1) {
                 listIndex =  this->numVectors - 1;
             }
@@ -226,6 +248,21 @@ class OutputManager {
             return listIndex;
         }
 
+        void restartFailures() {
+            this->list.clear();
+            this->numVectors = this->failedHRU.size();
+            for (unsigned int i = 0; i < this->failedHRU.size(); i++) {
+                auto refList = new ActorRefList(1);
+                this->list.push_back(refList);
+            }
+
+            this->failureReRun = this->failedHRU;
+            this->failedHRU.clear();
+
+            this->runningFailures = true;
+
+        }
+
         /**
          * Get the number of steps to write from the correct listIndex
          */
@@ -260,6 +297,10 @@ class OutputManager {
             return this->list[listIndex]->getMaxIndex();
         }
 
+        void addFailed(int indxHRU) {
+            this->failedHRU.push_back(indxHRU);
+        }
+
 };
 
 #endif 
\ No newline at end of file
diff --git a/build/source/actors/messageAtoms.h b/build/source/actors/messageAtoms.h
index d845c79..d8a3c4f 100644
--- a/build/source/actors/messageAtoms.h
+++ b/build/source/actors/messageAtoms.h
@@ -32,6 +32,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id)
     CAF_ADD_ATOM(summa, reset_outputCounter)
     CAF_ADD_ATOM(summa, read_and_write)
     CAF_ADD_ATOM(summa, write_param)
+    CAF_ADD_ATOM(summa, restart_failures)
     // HRU Actor
     CAF_ADD_ATOM(summa, run_hru)
     CAF_ADD_ATOM(summa, start_hru)
diff --git a/build/source/interface/file_access_actor/cppwrap_fileAccess.f90 b/build/source/interface/file_access_actor/cppwrap_fileAccess.f90
index 1485c65..18f6d2e 100644
--- a/build/source/interface/file_access_actor/cppwrap_fileAccess.f90
+++ b/build/source/interface/file_access_actor/cppwrap_fileAccess.f90
@@ -144,6 +144,14 @@ subroutine updateFailed(indxHRU) bind(C, name="updateFailed")
   failedHRUs(indxHRU) = .true.
 end subroutine
 
+subroutine resetFailedArray() bind(C, name="resetFailedArray")
+  USE globalData,only:failedHRUs
+  implicit none
+
+  failedHRUs(:) = .false.
+
+end subroutine
+
 
 subroutine resetOutputCounter(indxGRU) bind(C, name="resetOutputCounter")
   USE globalData,only:outputTimeStep
diff --git a/build/source/interface/file_access_actor/fileAccess_subroutine_wrappers.h b/build/source/interface/file_access_actor/fileAccess_subroutine_wrappers.h
index 15fe94c..795afc4 100644
--- a/build/source/interface/file_access_actor/fileAccess_subroutine_wrappers.h
+++ b/build/source/interface/file_access_actor/fileAccess_subroutine_wrappers.h
@@ -12,6 +12,8 @@ extern "C" {
 
   void updateFailed(int* indxHRU);
 
+  void resetFailedArray();
+
   void resetOutputCounter(int* indxGRU);
   
   void mDecisions_C(int* numSteps, int* err);
diff --git a/build/source/netcdf/writeOutput.f90 b/build/source/netcdf/writeOutput.f90
index 0a8929a..7ce8d7d 100644
--- a/build/source/netcdf/writeOutput.f90
+++ b/build/source/netcdf/writeOutput.f90
@@ -226,12 +226,16 @@ subroutine writeData(ncid,outputTimestep,outputTimestepUpdate,maxLayers,indxGRU,
         call netcdf_err(err,message); if (err/=0) return
         
         ! make sure the HRU we are using has not failed
-        do iGRU = minGRU, maxGRU
-          if(.not.failedHRUs(iGRU))then
-            verifiedGRUIndex = iGRU
-            exit
-          endif
-        end do
+        if (minGRU == maxGRU)then
+          verifiedGRUIndex = minGRU
+        else 
+          do iGRU = minGRU, maxGRU
+            if(.not.failedHRUs(iGRU))then
+              verifiedGRUIndex = iGRU
+              exit
+            endif
+          end do  
+        endif
 
         do iStep = 1, nSteps
           ! check if we want this timestep
@@ -257,17 +261,26 @@ subroutine writeData(ncid,outputTimestep,outputTimestepUpdate,maxLayers,indxGRU,
           if(meta(iVar)%varType==iLookVarType%scalarv) then
             select type(stat)
               class is (gru_hru_time_doubleVec)
-                gruCounter = 0
-                do iGRU = minGRU, maxGRU
-                  stepCounter = 0
-                  gruCounter = gruCounter + 1
+                if (minGRU == maxGRU)then
+                  gruCounter = 1
                   do iStep = 1, nSteps
                     if(.not.outputStructure(1)%finalizeStats(1)%gru(verifiedGRUIndex)%hru(1)%tim(iStep)%dat(iFreq)) cycle
                     stepCounter = stepCounter + 1
-                    realVec(gruCounter, stepCounter) = stat%gru(iGRU)%hru(1)%var(map(iVar))%tim(iStep)%dat(iFreq)
+                    realVec(gruCounter, stepCounter) = stat%gru(verifiedGRUIndex)%hru(1)%var(map(iVar))%tim(iStep)%dat(iFreq)
                   end do ! iStep
-                end do ! iGRU
-
+                else 
+                  gruCounter = 0
+                  do iGRU = minGRU, maxGRU
+                    stepCounter = 0
+                    gruCounter = gruCounter + 1
+                    do iStep = 1, nSteps
+                      if(.not.outputStructure(1)%finalizeStats(1)%gru(verifiedGRUIndex)%hru(1)%tim(iStep)%dat(iFreq)) cycle
+                      stepCounter = stepCounter + 1
+                      realVec(gruCounter, stepCounter) = stat%gru(iGRU)%hru(1)%var(map(iVar))%tim(iStep)%dat(iFreq)
+                    end do ! iStep
+                  end do ! iGRU  
+                endif
+                
                 err = nf90_put_var(ncid%var(iFreq),meta(iVar)%ncVarID(iFreq),realVec(1:gruCounter, 1:stepCounter),start=(/minGRU,outputTimestep(iFreq)/),count=(/numGRU,stepCounter/))
                 if (outputTimeStepUpdate(iFreq) /= stepCounter ) then
                   print*, "ERROR Missmatch in Steps - stat doubleVec"
diff --git a/utils/netcdf/OutputVerification/compareOutput.py b/utils/netcdf/OutputVerification/compareOutput.py
index 5031ad1..fb7b0ce 100644
--- a/utils/netcdf/OutputVerification/compareOutput.py
+++ b/utils/netcdf/OutputVerification/compareOutput.py
@@ -3,7 +3,7 @@ from os.path import isfile, join
 from pathlib import Path
 import xarray as xr 
 
-numHRU = 10
+numHRU = 1
 
 time = 'time'
 scalarSWE = 'scalarSWE'
@@ -28,8 +28,8 @@ varList = [time, scalarSWE, scalarCanopyWat, scalarAquiferStorage, scalarTotalSo
     scalarTotalET, scalarTotalRunoff, scalarNetRadiation]
 
 filename = "out.txt"
-originalPath = Path('/u1/kck540/output/SummaActors/Apr-29-2022/benchData/SummaActorsGRU1-10_timestep.nc')
-actorsPath = Path('/u1/kck540/output/SummaActors/Apr-29-2022/SummaActorsGRU1-10_timestep.nc')
+originalPath = Path('/gladwell/kck540/output/SummaOriginal/failingHRUCheck/SummaOriginal_G079506-079510_day.nc')
+actorsPath = Path('/gladwell/kck540/output/SummaActors/FaillingHRUCheck/SummaActorsGRU79500-10_day.nc')
 
 originalDataset = xr.open_dataset(originalPath)
 actorsDataset = xr.open_dataset(actorsPath)
@@ -37,11 +37,11 @@ actorsDataset = xr.open_dataset(actorsPath)
 allHRUsOriginal = []
 allHRUsActors = []
 
-for ihru in range(0, numHRU):
-  allHRUsOriginal.append(originalDataset.isel(hru=ihru).copy())
+# for ihru in range(0, numHRU):
+allHRUsOriginal.append(originalDataset.isel(hru=2).copy())
 
-for ihru in range(0, numHRU):
-  allHRUsActors.append(actorsDataset.isel(hru=ihru).copy())
+# for ihru in range(6, numHRU):
+allHRUsActors.append(actorsDataset.isel(hru=7).copy())
 
 file = open(filename, "w")
 for i in range(0, numHRU):
-- 
GitLab