diff --git a/build/source/actors/JobActor.h b/build/source/actors/JobActor.h index 387f4f4a36e311a68e1bf1010f6fd3a3c1cd24e5..aa7c1db67ef013aab689e481792ea429cbe14698 100644 --- a/build/source/actors/JobActor.h +++ b/build/source/actors/JobActor.h @@ -43,16 +43,20 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU, aout(self) << "Job Actor Initalized \n"; return { - [=](done_file_access_actor_init) { - // Init GRU Actors and the Output Structure - self->send(self->state.file_access_actor, initalize_outputStructure_v); - self->send(self, init_hru_v); - }, - + // ******************************************************************************************* + // *********************************** INTERFACE WITH HRU ************************************ + // ******************************************************************************************* + + /** + * + */ [=](init_hru) { initalizeGRU(self); }, + /** + * + */ [=](done_init_hru) { if (debug) { aout(self) << "Done Init\n"; @@ -68,13 +72,6 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU, } }, - /** - * Message from HRUActor, HRU is done the current forcing file but is not - * done its simulation and needs the next file - * indxGRU - Index into the actor array so we know which HRU this is. - * NOTE: Naming of GRU and HRU is confusing as the plan is to further seperate - * NOTE: For NA_Domain GRU is used as that is how we index the forcing file - */ [=](done_hru, int indxGRU, double totalDuration, double initDuration, double forcingDuration, double runPhysicsDuration, double writeOutputDuration) { @@ -102,6 +99,38 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU, } }, + + [=](run_failure, int indxGRU, int err) { + aout(self) << "GRU:" << self->state.GRUList[indxGRU - 1]->getRefGRU() + << "indxGRU = " << indxGRU << "Failed \n" + << "Will have to wait until all GRUs are done before it can be re-tried\n"; + + self->state.numGRUFailed++; + self->state.numGRUDone++; + self->state.GRUList[indxGRU - 1]->updateFailed(); + + // check if we are the last hru to complete + if (self->state.numGRUDone >= self->state.numGRU) { + restartFailures(self); + } + }, + + // ******************************************************************************************* + // ******************************* END INTERFACE WITH HRU ************************************ + // ******************************************************************************************* + + // ******************************************************************************************* + // ****************************** INTERFACE WITH FileAccessActor ***************************** + // ******************************************************************************************* + /** + * + */ + [=](done_file_access_actor_init) { + // Init GRU Actors and the Output Structure + self->send(self->state.file_access_actor, initalize_outputStructure_v); + self->send(self, init_hru_v); + }, + [=](file_access_actor_done, double readDuration, double writeDuration) { int err = 0; if (debug) { @@ -136,21 +165,9 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU, self->send(self->state.parent, done_job_v, self->state.numGRUFailed); self->quit(); }, - - [=](run_failure, int indxGRU, int err) { - aout(self) << "GRU:" << self->state.GRUList[indxGRU - 1]->getRefGRU() - << "indxGRU = " << indxGRU << "Failed \n" - << "Will have to wait until all GRUs are done before it can be re-tried\n"; - - self->state.numGRUFailed++; - self->state.numGRUDone++; - self->state.GRUList[indxGRU - 1]->updateFailed(); - - // check if we are the last hru to complete - if (self->state.numGRUDone >= self->state.numGRU) { - restartFailures(self); - } - }, + // ******************************************************************************************* + // ************************** END INTERFACE WITH FileAccessActor ***************************** + // ******************************************************************************************* }; } diff --git a/config/Summa_Actors_Settings.json b/config/Summa_Actors_Settings.json index db2de25c631bc777e903f7ba7e1dfc9b81be1e08..debfbefda6457857621ee1facdbcf0050df4e26f 100644 --- a/config/Summa_Actors_Settings.json +++ b/config/Summa_Actors_Settings.json @@ -5,7 +5,7 @@ "job-name": "SummaActors", "account": "rpp-kshook", "numHRUs": 517315, - "maxNumberOfJobs": 1000, + "maxNumberOfJobs": 500, "maxGRUsPerSubmission": 1000, "executablePath": "/home/kklenk/SummaProjects/Summa-Actors/bin/summaMain" }, @@ -37,7 +37,7 @@ "SummaActor": { "OuputStructureSize": 250, - "maxGRUPerJob": 250 + "maxGRUPerJob": 500 }, "JobActor": { diff --git a/config/caf-application.conf b/config/caf-application.conf index bf14c0d38da0ec824c4e63696a7a32ad87f53b0b..d78c42846f982eea18a66e2d6c77263a63b69e19 100644 --- a/config/caf-application.conf +++ b/config/caf-application.conf @@ -1,6 +1,5 @@ -caf { - # Parameters selecting a default scheduler. +caf { scheduler { - max-threads = 1 - } + max-threads = 8 + } } \ No newline at end of file diff --git a/config/configuration.py b/config/configuration.py index b4297685a21c40edb518c3a9ba1ce86ff12187a8..d3a475e9314ab503b4259917c6030a41a4accb9d 100644 --- a/config/configuration.py +++ b/config/configuration.py @@ -1,3 +1,4 @@ +from distutils.command.config import config import json import os from os.path import exists @@ -15,6 +16,9 @@ def create_init_config(): Job_Actor_Settings = ["FileManagerPath", "outputCSV", "csvPath"] HRU_Actor_Settings = ["printOutput", "outputFrequency"] +""" +Function that creates the paths for the slurm output and the netCDF data +""" def create_output_path(outputPath): print("The output path exists, now seperating this run by today's date") today = date.today() @@ -52,8 +56,18 @@ def create_file_manager(): fileManager.write(key + " \'{}\'\n".format(value)) fileManager.close() print("File Manager for this job has been created") + return outputSlurm +def create_caf_config(numCPUs): + caf_config_name = "caf-application.conf" + caf_config = open(caf_config_name, "w") + caf_config.write("caf {{ \n scheduler {{\n max-threads = {}\n }}\n}}".format(numCPUs)) + caf_config.close() + + caf_config_path = os.getcwd() + caf_config_path += caf_config_name + return caf_config_path """ Function to create the a list of the jobs will run @@ -65,10 +79,60 @@ def create_job_list(): json_file.close() numberOfTasks = SummaSettings["JobSubmissionParams"]["numHRUs"] - maxGRU = SummaSettings["JobSubmissionParams"]["maxGRUsPerSubmission"] + GRUPerJob = SummaSettings["JobSubmissionParams"]["maxGRUsPerSubmission"] numCPUs = SummaSettings["JobSubmissionParams"]["cpus-per-task"] print(numberOfTasks) - print(maxGRU) + print(GRUPerJob) + print(numCPUs) + + # we need to get the full path of the summa binary + os.chdir("../build") + summaPath = os.getcwd() + summaPath += "/summaMain" + os.chdir("../config") + config_dir = os.getcwd() + caf_config_path = create_caf_config(numCPUs) + + + # we want to assemble the job list + job_list = open("job_list.txt", "w") + gruStart = 1 + jobCount = 0 + while gruStart < numberOfTasks: + if (numberOfTasks - gruStart < GRUPerJob): + job_list.write("{} -g {} -n {} -c {} --config-file={}\n".format(summaPath,\ + gruStart, numberOfTasks - gruStart, config_dir, caf_config_path)) + else: + job_list.write("{} -g {} -n {} -c {} --config-file={}\n".format(summaPath,\ + gruStart, GRUPerJob, config_dir, caf_config_path)) + gruStart += GRUPerJob + jobCount += 1 + + return jobCount + + +def create_sbatch_file(jobCount, outputSlurm): + json_file = open("Summa_Actors_Settings.json") + SummaSettings = json.load(json_file) + json_file.close() + + numCPUs = SummaSettings["JobSubmissionParams"]["cpus-per-task"] + memory = SummaSettings["JobSubmissionParams"]["memory"] + jobName = SummaSettings["JobSubmissionParams"]["job-name"] + account = SummaSettings["JobSubmissionParams"]["account"] + + + sbatch = open("run_summa.sh", "w") + sbatch.write("#!/bin/bash\n") + sbatch.write("#SBATCH --cpus-per-task={}\n".format(numCPUs)) + sbatch.write("#SBATCH --time=24:00:00\n") + sbatch.write("#SBATCH --mem={}\n".format(memory)) + sbatch.write("#SBATCH --job-name={}\n".format(jobName)) + sbatch.write("#SBATCH --account={}\n".format(account)) + sbatch.write("#SBATCH --output={}\n".format(outputSlurm)) + sbatch.write("#SBATCH --array0-{}\n\n".format(jobCount)) + sbatch.write("LINE=$(sed -n \"$SLRUM_ARRAY_TASK_ID\"p{}".format(os.getcwd()+"/job_list.txt")) + @@ -83,8 +147,10 @@ def init_run(): Summa_Settings_Path = './Summa_Actors_Settings.json' if exists('./Summa_Actors_Settings.json'): print("File Exists, What do we do next") - create_file_manager() - create_job_list() + outputSlurm = create_file_manager() + jobCount = create_job_list() + create_sbatch_file(jobCount, outputSlurm) + else: print("File Does not Exist and we need to create it") create_init_config() diff --git a/config/fileManager.txt b/config/fileManager.txt index 06fb4061d6d6fe26bde76f665ebab5ce84a44b25..622b4ae15a62f54461d74cced3a3b789b459a127 100644 --- a/config/fileManager.txt +++ b/config/fileManager.txt @@ -4,7 +4,7 @@ simEndTime '2019-12-31 23:00' tmZoneInfo 'utcTime' settingsPath '/project/6008034/kklenk/settings/SummaActorsSettings/' forcingPath '/project/6008034/kklenk/forcingChunked/' -outputPath '/home/kklenk/projects/rpp-kshook/kklenk/SummaActorsOutput/Apr-18-2022/netcdf/' +outputPath '/home/kklenk/projects/rpp-kshook/kklenk/SummaActorsOutput/Apr-22-2022/netcdf/' forcingFreq 'month' forcingStart '1979-01-01' decisionsFile 'modelDecisions.txt' diff --git a/utils/netcdf/ChunkingScripts/NA_Domain_Chunking.py b/utils/netcdf/ChunkingScripts/NA_Domain_Chunking.py new file mode 100644 index 0000000000000000000000000000000000000000..a3d40c5868e808706b09b6aa895ec8d317ab037d --- /dev/null +++ b/utils/netcdf/ChunkingScripts/NA_Domain_Chunking.py @@ -0,0 +1,51 @@ +import subprocess +import sys + +# nccopy -c spechum:744,1000 -c airtemp:744,1000 -c pptrate:744,1000 -c SWRadAtm:744,1000 -c LWRadAtm:744,1000 -c airpres:744,1000 +# -c windspd:744,1000 NorthAmerica_remapped_1979-01-01-00-00-00.nc NorthAmerica_remapped_1979-01-01-00-00-00-chunked.nc +def chunkCommand(timesteps, infile, outfile): + bashCommand = subprocess.run(["nccopy", "-c", "spechum:{},1000".format(timesteps), "-c", "airtemp:{},1000".format(timesteps), \ + "-c", "pptrate:{},1000".format(timesteps), "-c", "SWRadAtm:{},1000".format(timesteps), "-c", "LWRadAtm:{},1000".format(timesteps), \ + "-c", "airpres:{},1000".format(timesteps), "-c", "windspd:{},1000".format(timesteps), "{}".format(infile), "{}".format(outfile)]) + print("Exit Code = %d" % bashCommand.returncode) + + +def checkTimeSteps(year, month): + if month == 1 or month == 3 or month == 5 or month == 7 or month == 8 or \ + month == 10 or month == 12: + return str(744) + elif month == 2: + if year % 4 == 0: + return str(696) + else: + return str(672) + elif month == 4 or month == 6 or month == 9 or month == 11: + return str(720) + + +def chunkYear(year): + year = sys.argv[1] + month = 1 + year = int(year) + while month != 13: + infile = "/project/6008034/kklenk/forcing/NorthAmerica_remapped_{}-{monthS}-01-00-00-00.nc".format(str(year), monthS=(str(0)+str(month)) if month < 10 else str(month)) + outfile = "/home/kklenk/scratch/corruptionTest/NorthAmerica_remapped_{}-{monthS}-01-00-00-00-chunked.nc".format(str(year), monthS=(str(0)+str(month)) if month < 10 else str(month)) + + timesteps = checkTimeSteps(year, month) + + print(infile) + print(outfile) + + chunkCommand(timesteps, infile, outfile) + month += 1 + +def chunkSpecificFile(year, month): + infile = "/project/6008034/kklenk/forcing/NorthAmerica_remapped_{}-{monthS}-01-00-00-00.nc".format(str(year), monthS=(str(0)+str(month)) if month < 10 else str(month)) + outfile = "/home/kklenk/scratch/forcingData/NorthAmerica_remapped_{}-{monthS}-01-00-00-00-chunked.nc".format(str(year), monthS=(str(0)+str(month)) if month < 10 else str(month)) + + timesteps = checkTimeSteps(year, month) + print(infile) + print(outfile) + chunkCommand(timesteps, infile, outfile) + +chunkSpecificFile(1983, 5) \ No newline at end of file diff --git a/utils/netcdf/ChunkingScripts/NA_Domain_Chunking_Script.sh b/utils/netcdf/ChunkingScripts/NA_Domain_Chunking_Script.sh new file mode 100644 index 0000000000000000000000000000000000000000..7ac6cdbc66e44e978ee99b7dd379348556b59167 --- /dev/null +++ b/utils/netcdf/ChunkingScripts/NA_Domain_Chunking_Script.sh @@ -0,0 +1,25 @@ +#!/bin/bash +#SBATCH --cpus-per-task=1 +#SBATCH --time=1:15:00 +#SBATCH --mem=20G +#SBATCH --job-name=Forcing_dataConversion +#SBATCH --mail-user=kyle.klenk@usask.ca +#SBATCH --mail-type=ALL +#SBATCH --output=/home/kklenk/scratch/SummaActorsOutput/slurm/forcingdata-%A_%a.out +#SBATCH --account=def-spiteri_cpu + +# ---------------------------------------------------------------------------------------------- +# RUN WITH: +# sbatch --array1-[number of jobs] [script name] +# sbatch --array=0-100 run_all.sh +# ---------------------------------------------------------------------------------------------- + + + +YEAR=1979 + +offset=$SLURM_ARRAY_TASK_ID + +start=$(( YEAR + offset )) + +python3 /project/6008034/kklenk/NA_Domain_Chunking.py ${start} \ No newline at end of file diff --git a/utils/netcdf/checkOutput.py b/utils/netcdf/OutputVerification/checkOutput.py similarity index 100% rename from utils/netcdf/checkOutput.py rename to utils/netcdf/OutputVerification/checkOutput.py diff --git a/utils/netcdf/checkbit4bit.py b/utils/netcdf/OutputVerification/checkbit4bit.py similarity index 100% rename from utils/netcdf/checkbit4bit.py rename to utils/netcdf/OutputVerification/checkbit4bit.py diff --git a/utils/netcdf/compareOutput.py b/utils/netcdf/OutputVerification/compareOutput.py similarity index 100% rename from utils/netcdf/compareOutput.py rename to utils/netcdf/OutputVerification/compareOutput.py diff --git a/utils/netcdf/resourageUsage.py b/utils/netcdf/StatisticsScripts/resourageUsage.py similarity index 100% rename from utils/netcdf/resourageUsage.py rename to utils/netcdf/StatisticsScripts/resourageUsage.py diff --git a/utils/netcdf/mergeNetcdf.py b/utils/netcdf/etc/mergeNetcdf.py similarity index 100% rename from utils/netcdf/mergeNetcdf.py rename to utils/netcdf/etc/mergeNetcdf.py