From eb086bc378e202386d757bc62c44fbe2f9c2eeb9 Mon Sep 17 00:00:00 2001
From: kck540 <kyle.klenk@usask.ca>
Date: Mon, 25 Apr 2022 15:22:19 -0400
Subject: [PATCH] Cleaned up Code Formating Sorted Scripts Created
 Configuration Script for Submitting Jobs

---
 build/source/actors/JobActor.h                | 73 +++++++++++-------
 config/Summa_Actors_Settings.json             |  4 +-
 config/caf-application.conf                   |  7 +-
 config/configuration.py                       | 74 ++++++++++++++++++-
 config/fileManager.txt                        |  2 +-
 .../ChunkingScripts/NA_Domain_Chunking.py     | 51 +++++++++++++
 .../NA_Domain_Chunking_Script.sh              | 25 +++++++
 .../{ => OutputVerification}/checkOutput.py   |  0
 .../{ => OutputVerification}/checkbit4bit.py  |  0
 .../{ => OutputVerification}/compareOutput.py |  0
 .../{ => StatisticsScripts}/resourageUsage.py |  0
 utils/netcdf/{ => etc}/mergeNetcdf.py         |  0
 12 files changed, 197 insertions(+), 39 deletions(-)
 create mode 100644 utils/netcdf/ChunkingScripts/NA_Domain_Chunking.py
 create mode 100644 utils/netcdf/ChunkingScripts/NA_Domain_Chunking_Script.sh
 rename utils/netcdf/{ => OutputVerification}/checkOutput.py (100%)
 rename utils/netcdf/{ => OutputVerification}/checkbit4bit.py (100%)
 rename utils/netcdf/{ => OutputVerification}/compareOutput.py (100%)
 rename utils/netcdf/{ => StatisticsScripts}/resourageUsage.py (100%)
 rename utils/netcdf/{ => etc}/mergeNetcdf.py (100%)

diff --git a/build/source/actors/JobActor.h b/build/source/actors/JobActor.h
index 387f4f4..aa7c1db 100644
--- a/build/source/actors/JobActor.h
+++ b/build/source/actors/JobActor.h
@@ -43,16 +43,20 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
     aout(self) << "Job Actor Initalized \n";
 
     return {
-        [=](done_file_access_actor_init) {
-            // Init GRU Actors and the Output Structure
-            self->send(self->state.file_access_actor, initalize_outputStructure_v);
-            self->send(self, init_hru_v);
-        },
-
+    // *******************************************************************************************
+    // *********************************** INTERFACE WITH HRU ************************************
+    // *******************************************************************************************
+        
+        /**
+         * 
+         */
         [=](init_hru) {
             initalizeGRU(self);
         },
 
+        /**
+         * 
+         */
         [=](done_init_hru) {
             if (debug) {
                 aout(self) << "Done Init\n";
@@ -68,13 +72,6 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
             }
         },
 
-        /**
-         * Message from HRUActor, HRU is done the current forcing file but is not
-         * done its simulation and needs the next file
-         * indxGRU - Index into the actor array so we know which HRU this is.
-         * NOTE: Naming of GRU and HRU is confusing as the plan is to further seperate
-         * NOTE: For NA_Domain GRU is used as that is how we index the forcing file
-         */ 
 
         [=](done_hru, int indxGRU, double totalDuration, double initDuration, 
             double forcingDuration, double runPhysicsDuration, double writeOutputDuration) {
@@ -102,6 +99,38 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
             }
         },
 
+
+        [=](run_failure, int indxGRU, int err) {
+            aout(self) << "GRU:" << self->state.GRUList[indxGRU - 1]->getRefGRU()
+                << "indxGRU = " << indxGRU << "Failed \n"
+                << "Will have to wait until all GRUs are done before it can be re-tried\n";
+            
+            self->state.numGRUFailed++;
+            self->state.numGRUDone++;
+            self->state.GRUList[indxGRU - 1]->updateFailed();
+
+            // check if we are the last hru to complete
+            if (self->state.numGRUDone >= self->state.numGRU) {
+                restartFailures(self);
+            }
+        },
+
+    // *******************************************************************************************
+    // ******************************* END INTERFACE WITH HRU ************************************
+    // *******************************************************************************************
+
+    // *******************************************************************************************
+    // ****************************** INTERFACE WITH FileAccessActor *****************************
+    // *******************************************************************************************
+        /**
+         * 
+         */
+        [=](done_file_access_actor_init) {
+            // Init GRU Actors and the Output Structure
+            self->send(self->state.file_access_actor, initalize_outputStructure_v);
+            self->send(self, init_hru_v);
+        },
+
         [=](file_access_actor_done, double readDuration, double writeDuration) {
             int err = 0;
             if (debug) {
@@ -136,21 +165,9 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
             self->send(self->state.parent, done_job_v, self->state.numGRUFailed);
             self->quit();
         },
-
-        [=](run_failure, int indxGRU, int err) {
-            aout(self) << "GRU:" << self->state.GRUList[indxGRU - 1]->getRefGRU()
-                << "indxGRU = " << indxGRU << "Failed \n"
-                << "Will have to wait until all GRUs are done before it can be re-tried\n";
-            
-            self->state.numGRUFailed++;
-            self->state.numGRUDone++;
-            self->state.GRUList[indxGRU - 1]->updateFailed();
-
-            // check if we are the last hru to complete
-            if (self->state.numGRUDone >= self->state.numGRU) {
-                restartFailures(self);
-            }
-        },
+    // *******************************************************************************************
+    // ************************** END INTERFACE WITH FileAccessActor *****************************
+    // *******************************************************************************************
 
     };
 }
diff --git a/config/Summa_Actors_Settings.json b/config/Summa_Actors_Settings.json
index db2de25..debfbef 100644
--- a/config/Summa_Actors_Settings.json
+++ b/config/Summa_Actors_Settings.json
@@ -5,7 +5,7 @@
         "job-name": "SummaActors",
         "account": "rpp-kshook",
         "numHRUs": 517315,
-        "maxNumberOfJobs": 1000,
+        "maxNumberOfJobs": 500,
         "maxGRUsPerSubmission": 1000,
         "executablePath": "/home/kklenk/SummaProjects/Summa-Actors/bin/summaMain"
     },
@@ -37,7 +37,7 @@
 
     "SummaActor": {
         "OuputStructureSize": 250,
-        "maxGRUPerJob": 250
+        "maxGRUPerJob": 500
     },
     
     "JobActor": {
diff --git a/config/caf-application.conf b/config/caf-application.conf
index bf14c0d..d78c428 100644
--- a/config/caf-application.conf
+++ b/config/caf-application.conf
@@ -1,6 +1,5 @@
-caf {
-  # Parameters selecting a default scheduler.
+caf { 
   scheduler {
-    max-threads = 1
-  }
+   max-threads = 8
+    }
 }
\ No newline at end of file
diff --git a/config/configuration.py b/config/configuration.py
index b429768..d3a475e 100644
--- a/config/configuration.py
+++ b/config/configuration.py
@@ -1,3 +1,4 @@
+from distutils.command.config import config
 import json
 import os
 from os.path import exists
@@ -15,6 +16,9 @@ def create_init_config():
     Job_Actor_Settings = ["FileManagerPath", "outputCSV", "csvPath"]
     HRU_Actor_Settings = ["printOutput", "outputFrequency"]
 
+"""
+Function that creates the paths for the slurm output and the netCDF data
+"""
 def create_output_path(outputPath):
     print("The output path exists, now seperating this run by today's date")
     today = date.today()
@@ -52,8 +56,18 @@ def create_file_manager():
         fileManager.write(key + "    \'{}\'\n".format(value))
     fileManager.close()
     print("File Manager for this job has been created")
+    return outputSlurm
 
 
+def create_caf_config(numCPUs):
+    caf_config_name = "caf-application.conf"
+    caf_config = open(caf_config_name, "w")
+    caf_config.write("caf {{ \n  scheduler {{\n   max-threads = {}\n    }}\n}}".format(numCPUs))
+    caf_config.close()
+    
+    caf_config_path = os.getcwd()
+    caf_config_path += caf_config_name
+    return caf_config_path
 
 """
 Function to create the a list of the jobs will run
@@ -65,10 +79,60 @@ def create_job_list():
     json_file.close()
 
     numberOfTasks = SummaSettings["JobSubmissionParams"]["numHRUs"]
-    maxGRU = SummaSettings["JobSubmissionParams"]["maxGRUsPerSubmission"]
+    GRUPerJob = SummaSettings["JobSubmissionParams"]["maxGRUsPerSubmission"]
     numCPUs = SummaSettings["JobSubmissionParams"]["cpus-per-task"]
     print(numberOfTasks)
-    print(maxGRU)
+    print(GRUPerJob)
+    print(numCPUs)
+
+    # we need to get the full path of the summa binary
+    os.chdir("../build")
+    summaPath = os.getcwd()
+    summaPath += "/summaMain"
+    os.chdir("../config")
+    config_dir = os.getcwd()
+    caf_config_path = create_caf_config(numCPUs)
+
+
+    # we want to assemble the job list
+    job_list = open("job_list.txt", "w")
+    gruStart = 1
+    jobCount = 0
+    while gruStart < numberOfTasks:
+        if (numberOfTasks - gruStart < GRUPerJob):
+            job_list.write("{} -g {} -n {} -c {} --config-file={}\n".format(summaPath,\
+                gruStart, numberOfTasks - gruStart, config_dir, caf_config_path))
+        else:
+            job_list.write("{} -g {} -n {} -c {} --config-file={}\n".format(summaPath,\
+                gruStart, GRUPerJob, config_dir, caf_config_path))
+        gruStart += GRUPerJob
+        jobCount += 1
+    
+    return jobCount
+
+
+def create_sbatch_file(jobCount, outputSlurm):
+    json_file = open("Summa_Actors_Settings.json")
+    SummaSettings = json.load(json_file)
+    json_file.close()
+
+    numCPUs = SummaSettings["JobSubmissionParams"]["cpus-per-task"]
+    memory = SummaSettings["JobSubmissionParams"]["memory"]
+    jobName = SummaSettings["JobSubmissionParams"]["job-name"]
+    account = SummaSettings["JobSubmissionParams"]["account"]
+
+
+    sbatch = open("run_summa.sh", "w")
+    sbatch.write("#!/bin/bash\n")
+    sbatch.write("#SBATCH --cpus-per-task={}\n".format(numCPUs))
+    sbatch.write("#SBATCH --time=24:00:00\n")
+    sbatch.write("#SBATCH --mem={}\n".format(memory))
+    sbatch.write("#SBATCH --job-name={}\n".format(jobName))
+    sbatch.write("#SBATCH --account={}\n".format(account))
+    sbatch.write("#SBATCH --output={}\n".format(outputSlurm))
+    sbatch.write("#SBATCH --array0-{}\n\n".format(jobCount))
+    sbatch.write("LINE=$(sed -n \"$SLRUM_ARRAY_TASK_ID\"p{}".format(os.getcwd()+"/job_list.txt"))
+    
 
 
 
@@ -83,8 +147,10 @@ def init_run():
     Summa_Settings_Path = './Summa_Actors_Settings.json'
     if exists('./Summa_Actors_Settings.json'):
         print("File Exists, What do we do next")
-        create_file_manager()
-        create_job_list()
+        outputSlurm = create_file_manager()
+        jobCount = create_job_list()
+        create_sbatch_file(jobCount, outputSlurm)
+
     else:
         print("File Does not Exist and we need to create it")
         create_init_config()       
diff --git a/config/fileManager.txt b/config/fileManager.txt
index 06fb406..622b4ae 100644
--- a/config/fileManager.txt
+++ b/config/fileManager.txt
@@ -4,7 +4,7 @@ simEndTime    '2019-12-31 23:00'
 tmZoneInfo    'utcTime'
 settingsPath    '/project/6008034/kklenk/settings/SummaActorsSettings/'
 forcingPath    '/project/6008034/kklenk/forcingChunked/'
-outputPath    '/home/kklenk/projects/rpp-kshook/kklenk/SummaActorsOutput/Apr-18-2022/netcdf/'
+outputPath    '/home/kklenk/projects/rpp-kshook/kklenk/SummaActorsOutput/Apr-22-2022/netcdf/'
 forcingFreq    'month'
 forcingStart    '1979-01-01'
 decisionsFile    'modelDecisions.txt'
diff --git a/utils/netcdf/ChunkingScripts/NA_Domain_Chunking.py b/utils/netcdf/ChunkingScripts/NA_Domain_Chunking.py
new file mode 100644
index 0000000..a3d40c5
--- /dev/null
+++ b/utils/netcdf/ChunkingScripts/NA_Domain_Chunking.py
@@ -0,0 +1,51 @@
+import subprocess
+import sys
+
+# nccopy -c spechum:744,1000 -c airtemp:744,1000 -c pptrate:744,1000 -c SWRadAtm:744,1000 -c LWRadAtm:744,1000 -c airpres:744,1000 
+# -c windspd:744,1000 NorthAmerica_remapped_1979-01-01-00-00-00.nc NorthAmerica_remapped_1979-01-01-00-00-00-chunked.nc
+def chunkCommand(timesteps, infile, outfile):
+    bashCommand = subprocess.run(["nccopy", "-c", "spechum:{},1000".format(timesteps), "-c", "airtemp:{},1000".format(timesteps), \
+        "-c", "pptrate:{},1000".format(timesteps), "-c", "SWRadAtm:{},1000".format(timesteps), "-c", "LWRadAtm:{},1000".format(timesteps), \
+        "-c", "airpres:{},1000".format(timesteps), "-c", "windspd:{},1000".format(timesteps), "{}".format(infile), "{}".format(outfile)])
+    print("Exit Code = %d" % bashCommand.returncode)
+
+
+def checkTimeSteps(year, month):
+    if month == 1 or month == 3 or month == 5 or month == 7 or month == 8 or \
+        month == 10 or month == 12:
+        return str(744)
+    elif month == 2:
+        if year % 4 == 0:
+            return str(696)
+        else:
+            return str(672)
+    elif month == 4 or month == 6 or month == 9 or month == 11:
+        return str(720)
+
+
+def chunkYear(year):
+    year = sys.argv[1]
+    month = 1
+    year = int(year)
+    while month != 13:
+        infile = "/project/6008034/kklenk/forcing/NorthAmerica_remapped_{}-{monthS}-01-00-00-00.nc".format(str(year), monthS=(str(0)+str(month)) if month < 10 else str(month))
+        outfile = "/home/kklenk/scratch/corruptionTest/NorthAmerica_remapped_{}-{monthS}-01-00-00-00-chunked.nc".format(str(year), monthS=(str(0)+str(month)) if month < 10 else str(month))
+
+        timesteps = checkTimeSteps(year, month)
+
+        print(infile)
+        print(outfile)
+
+        chunkCommand(timesteps, infile, outfile)
+        month += 1
+
+def chunkSpecificFile(year, month):
+    infile = "/project/6008034/kklenk/forcing/NorthAmerica_remapped_{}-{monthS}-01-00-00-00.nc".format(str(year), monthS=(str(0)+str(month)) if month < 10 else str(month))
+    outfile = "/home/kklenk/scratch/forcingData/NorthAmerica_remapped_{}-{monthS}-01-00-00-00-chunked.nc".format(str(year), monthS=(str(0)+str(month)) if month < 10 else str(month))
+
+    timesteps = checkTimeSteps(year, month)
+    print(infile)
+    print(outfile)
+    chunkCommand(timesteps, infile, outfile)
+
+chunkSpecificFile(1983, 5)
\ No newline at end of file
diff --git a/utils/netcdf/ChunkingScripts/NA_Domain_Chunking_Script.sh b/utils/netcdf/ChunkingScripts/NA_Domain_Chunking_Script.sh
new file mode 100644
index 0000000..7ac6cdb
--- /dev/null
+++ b/utils/netcdf/ChunkingScripts/NA_Domain_Chunking_Script.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+#SBATCH --cpus-per-task=1
+#SBATCH --time=1:15:00
+#SBATCH --mem=20G
+#SBATCH --job-name=Forcing_dataConversion
+#SBATCH --mail-user=kyle.klenk@usask.ca
+#SBATCH --mail-type=ALL
+#SBATCH --output=/home/kklenk/scratch/SummaActorsOutput/slurm/forcingdata-%A_%a.out
+#SBATCH --account=def-spiteri_cpu
+
+# ----------------------------------------------------------------------------------------------
+# RUN WITH:
+# sbatch --array1-[number of jobs] [script name]
+# sbatch --array=0-100 run_all.sh
+# ----------------------------------------------------------------------------------------------
+
+
+
+YEAR=1979
+
+offset=$SLURM_ARRAY_TASK_ID
+
+start=$(( YEAR + offset ))
+
+python3 /project/6008034/kklenk/NA_Domain_Chunking.py ${start}
\ No newline at end of file
diff --git a/utils/netcdf/checkOutput.py b/utils/netcdf/OutputVerification/checkOutput.py
similarity index 100%
rename from utils/netcdf/checkOutput.py
rename to utils/netcdf/OutputVerification/checkOutput.py
diff --git a/utils/netcdf/checkbit4bit.py b/utils/netcdf/OutputVerification/checkbit4bit.py
similarity index 100%
rename from utils/netcdf/checkbit4bit.py
rename to utils/netcdf/OutputVerification/checkbit4bit.py
diff --git a/utils/netcdf/compareOutput.py b/utils/netcdf/OutputVerification/compareOutput.py
similarity index 100%
rename from utils/netcdf/compareOutput.py
rename to utils/netcdf/OutputVerification/compareOutput.py
diff --git a/utils/netcdf/resourageUsage.py b/utils/netcdf/StatisticsScripts/resourageUsage.py
similarity index 100%
rename from utils/netcdf/resourageUsage.py
rename to utils/netcdf/StatisticsScripts/resourageUsage.py
diff --git a/utils/netcdf/mergeNetcdf.py b/utils/netcdf/etc/mergeNetcdf.py
similarity index 100%
rename from utils/netcdf/mergeNetcdf.py
rename to utils/netcdf/etc/mergeNetcdf.py
-- 
GitLab