From d19891f99a7681c7902b52d4f1ddb3df7482f92c Mon Sep 17 00:00:00 2001
From: KyleKlenk <kyle.c.klenk@gmail.com>
Date: Thu, 19 May 2022 09:37:51 -0600
Subject: [PATCH] Testing one write block - all HRUs can finish while the
 failing one waits.

---
 build/makefile                        | 16 +++----
 build/source/actors/FileAccess.h      |  2 +-
 build/source/actors/FileAccessActor.h | 66 ++++++++++++++++-----------
 build/source/actors/HRUActor.h        |  6 ++-
 build/source/actors/JobActor.h        |  5 +-
 build/source/actors/OutputManager.h   | 60 +++++++++++++++++++++---
 build/source/testing/testing_main.cc  | 12 ++---
 7 files changed, 117 insertions(+), 50 deletions(-)

diff --git a/build/makefile b/build/makefile
index 46d5f5c..ab14e95 100644
--- a/build/makefile
+++ b/build/makefile
@@ -16,16 +16,16 @@
 
 
 # Production runs
-# FLAGS_NOAH = -O3 -ffree-form -ffree-line-length-none -fmax-errors=0 -fPIC
-# FLAGS_COMM = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
-# FLAGS_SUMMA = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
-# FLAGS_ACTORS = -O3
+FLAGS_NOAH = -O3 -ffree-form -ffree-line-length-none -fmax-errors=0 -fPIC
+FLAGS_COMM = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
+FLAGS_SUMMA = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
+FLAGS_ACTORS = -O3
 
 # # Debug runs
-FLAGS_NOAH = -pg -g -O0 -ffree-form -ffree-line-length-none -fmax-errors=0 -fbacktrace -Wno-unused -Wno-unused-dummy-argument -fPIC
-FLAGS_COMM = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
-FLAGS_SUMMA = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
-FLAGS_ACTORS = -pg -g -O0 -Wall
+# FLAGS_NOAH = -pg -g -O0 -ffree-form -ffree-line-length-none -fmax-errors=0 -fbacktrace -Wno-unused -Wno-unused-dummy-argument -fPIC
+# FLAGS_COMM = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
+# FLAGS_SUMMA = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
+# FLAGS_ACTORS = -pg -g -O0 -Wall
 
 
 #========================================================================
diff --git a/build/source/actors/FileAccess.h b/build/source/actors/FileAccess.h
index 063dc3d..7e6669e 100644
--- a/build/source/actors/FileAccess.h
+++ b/build/source/actors/FileAccess.h
@@ -50,7 +50,7 @@ struct file_access_state {
     void *handle_forcFileInfo = new_handle_file_info(); // Handle for the forcing file information
     void *handle_ncid = new_handle_var_i();               // output file ids
     OutputManager *output_manager;
-    int num_vectors_in_output_manager = 5;
+    int num_vectors_in_output_manager = 2;
     int num_steps;
     int outputStrucSize;
     int stepsInCurrentFile;
diff --git a/build/source/actors/FileAccessActor.h b/build/source/actors/FileAccessActor.h
index 6f30888..35e0ef9 100644
--- a/build/source/actors/FileAccessActor.h
+++ b/build/source/actors/FileAccessActor.h
@@ -7,6 +7,7 @@ using namespace caf;
 void initalizeFileAccessActor(stateful_actor<file_access_state>* self);
 int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, int numStepsToWrite, int returnMessage, caf::actor actorRef);
 int readForcing(stateful_actor<file_access_state>* self, int currentFile);
+int write(stateful_actor<file_access_state>* self, int listIndex);
 
 behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU, 
     int outputStrucSize, actor parent) {
@@ -67,7 +68,7 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU
                         self->state.forcFileList[currentFile - 1].getNumSteps());
                 }
             } else {
-                aout(self) << currentFile << "is larger than expected" << std::endl;
+                aout(self) << currentFile << " is larger than expected" << std::endl;
             }
             
         },
@@ -125,6 +126,17 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU
                 aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n";
         },
 
+        [=](run_failure, int indxGRU) {
+            int listIndex;
+
+            listIndex = self->state.output_manager->decrementMaxSize(indxGRU);
+          
+            // Check if this list is now full
+            if(self->state.output_manager->isFull(listIndex)) {
+                write(self, listIndex);
+            }
+        },
+
         [=](deallocate_structures) {
             aout(self) << "Deallocating Structure" << std::endl;
             FileAccessActor_DeallocateStructures(self->state.handle_forcFileInfo, self->state.handle_ncid);
@@ -212,16 +224,39 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) {
     }
 }
 
+int write(stateful_actor<file_access_state>* self, int listIndex) {
+    int err = 0;
+    int minGRU = self->state.output_manager->getMinIndex(listIndex);
+    int maxGRU = self->state.output_manager->getMaxIndex(listIndex);
+    int numStepsToWrite = self->state.output_manager->getNumStepsToWrite(listIndex);
+    FileAccessActor_WriteOutput(self->state.handle_ncid,
+        &numStepsToWrite, &minGRU, 
+        &maxGRU, &err);
+        
+    // Pop The actors and send them the correct continue message
+    while(!self->state.output_manager->isEmpty(listIndex)) {
+        std::tuple<caf::actor, int> actor = self->state.output_manager->popActor(listIndex);
+        if (get<1>(actor) == 9999) {
+            
+            self->send(get<0>(actor), done_write_v);
+
+        }  else {
+            self->send(get<0>(actor), run_hru_v, 
+                self->state.forcFileList[get<1>(actor) - 1].getNumSteps());
+        }
+    }
+
+    return 0;
+}
+
 int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, 
     int numStepsToWrite, int returnMessage, caf::actor actorRef) {
     self->state.writeStart = std::chrono::high_resolution_clock::now();
     if (debug) {
         aout(self) << "Recieved Write Request From GRU: " << indxGRU << "\n";
     }
-
-    
     int err = 0;
-    int listIndex = self->state.output_manager->addActor(actorRef, indxGRU, returnMessage);
+    int listIndex = self->state.output_manager->addActor(actorRef, indxGRU, returnMessage, numStepsToWrite);
     if (self->state.output_manager->isFull(listIndex)) {
         if (debug) {
             aout(self) << "List with Index " << listIndex << " is full and ready to write\n";
@@ -229,25 +264,7 @@ int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHR
             aout(self) << "Maximum GRU Index = " << self->state.output_manager->getMaxIndex(listIndex) << "\n";
         }
 
-        int minGRU = self->state.output_manager->getMinIndex(listIndex);
-        int maxGRU = self->state.output_manager->getMaxIndex(listIndex);
-        FileAccessActor_WriteOutput(self->state.handle_ncid,
-            &numStepsToWrite, &minGRU, 
-            &maxGRU, &err);
-        
-        // Pop The actors and send them the correct continue message
-        while(!self->state.output_manager->isEmpty(listIndex)) {
-            std::tuple<caf::actor, int> actor = self->state.output_manager->popActor(listIndex);
-            if (get<1>(actor) == 9999) {
-                
-                self->send(get<0>(actor), done_write_v);
-
-            }  else {
-                self->send(get<0>(actor), run_hru_v, 
-                    self->state.forcFileList[get<1>(actor) - 1].getNumSteps());
-            }
-        }
-        
+       err = write(self, listIndex);
 
     } else {
         if (debug) {
@@ -257,9 +274,6 @@ int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHR
     }
     
    
-
-
-
     self->state.writeEnd = std::chrono::high_resolution_clock::now();
     self->state.writeDuration += calculateTime(self->state.writeStart, self->state.writeEnd);
 
diff --git a/build/source/actors/HRUActor.h b/build/source/actors/HRUActor.h
index a0dff80..8e5db7d 100644
--- a/build/source/actors/HRUActor.h
+++ b/build/source/actors/HRUActor.h
@@ -133,6 +133,10 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU,
                 self->state.outputStep += 1;
                 self->state.forcingStep += 1;
 
+                if (self->state.indxGRU == 3 && self->state.timestep < 355) {
+                    err = 20;
+                } 
+
                 keepRunning = check_HRU(self, err); // check if we are done, need to write
 
             }
@@ -380,7 +384,7 @@ bool check_HRU(stateful_actor<hru_state>* self, int err) {
     if (err != 0) { 
         // check for error
         
-        self->send(self->state.parent, run_failure_v, self->state.indxGRU, err);
+        self->send(self->state.parent, run_failure_v, self, self->state.indxGRU, err);
         self->quit();
         return false;
     
diff --git a/build/source/actors/JobActor.h b/build/source/actors/JobActor.h
index 8dc2b6c..13c6ca7 100644
--- a/build/source/actors/JobActor.h
+++ b/build/source/actors/JobActor.h
@@ -100,7 +100,7 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
         },
 
 
-        [=](run_failure, int indxGRU, int err) {
+        [=](run_failure, caf::actor actorRef, int indxGRU, int err) {
             aout(self) << "GRU:" << self->state.GRUList[indxGRU - 1]->getRefGRU()
                 << "indxGRU = " << indxGRU << "Failed \n"
                 << "Will have to wait until all GRUs are done before it can be re-tried\n";
@@ -109,6 +109,9 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
             self->state.numGRUDone++;
             self->state.GRUList[indxGRU - 1]->updateFailed();
 
+            // Let the file_access_actor know this actor failed
+            self->send(self->state.file_access_actor, run_failure_v, indxGRU);
+
             // check if we are the last hru to complete
             if (self->state.numGRUDone >= self->state.numGRU) {
                 restartFailures(self);
diff --git a/build/source/actors/OutputManager.h b/build/source/actors/OutputManager.h
index ebb2119..b2f7f3c 100644
--- a/build/source/actors/OutputManager.h
+++ b/build/source/actors/OutputManager.h
@@ -10,8 +10,9 @@
  */
 class ActorRefList {
     private:
+        int numStepsToWrite; // We can save this value here so that we know how many steps to write
         int currentSize;
-        int maxSize;
+        unsigned int maxSize;
         int minIndex = -1; // minimum index of the actor being stored on this list
         int maxIndex = 0; // maximum index of the actor being stored on this list
         std::vector<std::tuple<caf::actor, int>> list;
@@ -41,6 +42,10 @@ class ActorRefList {
         int getMaxSize() {
             return this->maxSize;
         }
+
+        int getNumStepsToWrite() {
+            return this->numStepsToWrite;
+        }
         
         bool isFull() {
             return list.size() == this->maxSize;
@@ -53,7 +58,7 @@ class ActorRefList {
         * this is the current forcingFileList index that allows the file_access actor to know the number 
         * of steps the HRU actor that needs to compute 
         */
-        void addActor(caf::actor actor, int index, int returnMessage) {
+        void addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) {
             if (this->isFull()) {
                 throw "List is full, cannot add actor to this list";
             }
@@ -63,7 +68,7 @@ class ActorRefList {
             if (index < this->minIndex || this->minIndex < 0) {
                 this->minIndex = index;
             }
-
+            this->numStepsToWrite = numStepsToWrite;
             this->currentSize++;
             list.push_back(std::make_tuple(actor, returnMessage));
         }
@@ -87,6 +92,17 @@ class ActorRefList {
             return list.empty();
         }
 
+
+        /**
+         * When an actor fails we need to decrement the count
+         * so that this list becomes full when there is a failure
+         * 
+         * indexHRU - index of the HRU causing the error
+         */
+        void decrementMaxSize() {
+            this->maxSize--;
+        }
+
         /**
         * Remove the failed HRU from the list
         *
@@ -149,14 +165,14 @@ class OutputManager {
          * @param returnMessage Forcing File index or 9999
          * @return int The list index that actor is added to.
          */
-        int addActor(caf::actor actor, int index, int returnMessage) {
+        int addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) {
             // Index has to be subtracted by 1 because Fortran array starts at 1
             int listIndex = (index - 1) / this->avgSizeOfActorList;
             if (listIndex > this->numVectors - 1) {
                 listIndex =  this->numVectors - 1;
             }
 
-            this->list[listIndex]->addActor(actor, index, returnMessage);
+            this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite);
             return listIndex;
         }
 
@@ -175,7 +191,14 @@ class OutputManager {
         }
 
 
-        void removeFailed(caf::actor actorRef, int index) {
+        /** When a failure occurs an actor most likley will not already be on this list
+         * This method may and probably should not be used. Although needing to remove a
+         * specific element from a list may be needed.
+         * Remove the failed actor from the list
+         * Return the index of the list we removed the actor from
+         * This is so we can check if it is full
+         */
+        int removeFailed(caf::actor actorRef, int index) {
             // Find the list this actor is on
             int listIndex = (index - 1) / this->avgSizeOfActorList;
             if (listIndex > this->numVectors - 1) {
@@ -184,6 +207,31 @@ class OutputManager {
             
             this->list[listIndex]->removeFailed(actorRef);
 
+            return listIndex;
+
+        }
+
+        /**
+         *
+         */ 
+
+        int decrementMaxSize(int indexGRU) {
+            // Find the list this actor is on
+            int listIndex = (indexGRU - 1) / this->avgSizeOfActorList;
+            if (listIndex > this->numVectors - 1) {
+                listIndex =  this->numVectors - 1;
+            }
+
+            this->list[listIndex]->decrementMaxSize();
+            return listIndex;
+        }
+
+        /**
+         * Get the number of steps to write from the correct listIndex
+         */
+        int getNumStepsToWrite(int listIndex) {
+
+            return this->list[listIndex]->getNumStepsToWrite();
         }
 
         bool isFull(int listIndex) {
diff --git a/build/source/testing/testing_main.cc b/build/source/testing/testing_main.cc
index d693fab..6aa8e89 100644
--- a/build/source/testing/testing_main.cc
+++ b/build/source/testing/testing_main.cc
@@ -486,12 +486,15 @@ void testOutputManager(caf::actor_system& sys) {
 
     // Testing Remove Failed
     aout(self) << "testing Remove Failed from Output Structure \n";
-    OM2->removeFailed(a1, 1);
+    int vec;
+    vec = OM2->removeFailed(a1, 1);
+    IS_TRUE(vec == 0);
     IS_TRUE(OM2->getSize(0) == 2);
     IS_TRUE(OM2->getSize(1) == 3);
     IS_TRUE(OM2->getSize(2) == 4);
     IS_TRUE(OM2->isFull(0));
-    OM2->removeFailed(a5, 5);
+    vec = OM2->removeFailed(a5, 5);
+    IS_TRUE(vec == 1);
     IS_TRUE(OM2->getSize(0) == 2);
     IS_TRUE(OM2->getSize(1) == 2);
     IS_TRUE(OM2->getSize(2) == 4);
@@ -508,11 +511,6 @@ void testOutputManager(caf::actor_system& sys) {
     OM2->addActor(a2, 2, 2);
     OM2->addActor(a3, 3, 3);
     IS_TRUE(OM2->isFull(0));
-
-
-
-
-
 }
 
 
-- 
GitLab