Skip to content
Snippets Groups Projects
Commit 81ae134b authored by Kyle Klenk's avatar Kyle Klenk
Browse files

made job_actor vars clearer and match the rest of the file.

readDimension is now called from job_actor
parent 51ec0448
No related branches found
No related tags found
No related merge requests found
......@@ -11,40 +11,39 @@ struct job_state {
caf::actor parent; // actor reference to the top-level SummaActor
// Job Parameters
int startGRU; // Starting GRU for this job
int numGRU; // Number of GRUs for this job
std::string configPath;
int start_gru; // Starting GRU for this job
int num_gru; // Number of GRUs for this job
int num_hru;
std::string config_path;
std::string fileManager; // Path of the fileManager.txt file
std::string file_manager; // Path of the fileManager.txt file
// Variables for GRU monitoring
int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
int maxRunAttempts = 3; // Max number of attemtps to solve a GRU
std::vector<GRUinfo*> GRUList; // List of all GRUs under this job actor
int numGRUDone = 0; // The number of GRUs that have completed
int GRUInit = 0; // Number of GRUs initalized
int max_run_attempts = 3; // Max number of attemtps to solve a GRU
std::vector<GRUinfo*> gru_list; // List of all GRUs under this job actor
int num_gru_done = 0; // The number of GRUs that have completed
int gru_init = 0; // Number of GRUs initalized
int err = 0; // Error Code
int numGRUFailed = 0; // Number of GRUs that have failed
int outputStrucSize;
int num_gru_failed = 0; // Number of GRUs that have failed
int output_struct_size;
// Timing Variables
TimingInfo job_timing;
// Output File Names for Timings
bool outputCSV;
std::string csvOut;
std::string csvPath;
std::string successOutputFile;
std::string failedOutputFile = "failedHRU";
std::string fileAccessActorStats = "fileAccessActor.csv";
bool output_csv;
std::string csv_out;
std::string csv_path;
std::string success_output_file;
std::string failed_output_file = "failedHRU";
std::string file_access_actor_stats = "fileAccessActor.csv";
};
behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
std::string configPath, int outputStrucSize, actor parent);
int parseSettings(stateful_actor<job_state>* self, std::string configPath);
behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru,
std::string config_path, int output_struct_size, actor parent);
void initJob(stateful_actor<job_state>* self);
......
......@@ -8,6 +8,8 @@ extern "C" {
void defineGlobalData(int* start_gru_index, int* err);
void readDimension(int* num_gru, int* num_hru, int* start_gru_index, int* err);
void cleanUpJobActor(int* err);
}
\ No newline at end of file
......@@ -80,15 +80,15 @@ subroutine initGlobals(file_manager, totalGRUs, totalHRUs, numGRUs, numHRUs, sta
! *****************************************************************************
! *** read the number of GRUs and HRUs
! *****************************************************************************
attrFile = trim(SETTINGS_PATH)//trim(LOCAL_ATTRIBUTES)
call read_dimension(trim(attrFile),fileGRU,fileHRU,numGRUs,numHRUs,startGRUIndex,err,cmessage)
if(err/=0)then
message=trim(message)//trim(cmessage)
print*, cmessage
return
endif
totalGRUs = fileGRU
totalHRUs = fileHRU
! attrFile = trim(SETTINGS_PATH)//trim(LOCAL_ATTRIBUTES)
! call read_dimension(trim(attrFile),fileGRU,fileHRU,numGRUs,numHRUs,startGRUIndex,err,cmessage)
! if(err/=0)then
! message=trim(message)//trim(cmessage)
! print*, cmessage
! return
! endif
! totalGRUs = fileGRU
! totalHRUs = fileHRU
! *****************************************************************************
! *** read the number of snow and soil layers
......
......@@ -19,8 +19,8 @@ namespace caf {
* @param self
* @return behavior
*/
behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
std::string configPath, int outputStrucSize, caf::actor parent) {
behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru,
std::string config_path, int output_struct_size, caf::actor parent) {
// Timinig Information
self->state.job_timing = TimingInfo();
self->state.job_timing.addTimePoint("total_duration");
......@@ -28,48 +28,58 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
// Set Job Variables
self->state.startGRU = startGRU;
self->state.numGRU = numGRU;
self->state.configPath = configPath;
self->state.start_gru = start_gru;
self->state.num_gru = num_gru;
self->state.config_path = config_path;
self->state.parent = parent;
self->state.outputStrucSize = outputStrucSize;
self->state.output_struct_size = output_struct_size;
// Get All Settings
self->state.fileManager = getSettings(self->state.configPath, "JobActor", "FileManagerPath",
self->state.fileManager).value_or("");
self->state.outputCSV = getSettings(self->state.configPath, "JobActor", "outputCSV",
self->state.outputCSV).value_or(false);
if (self->state.outputCSV) {
self->state.csvPath = getSettings(self->state.configPath, "JobActor", "csvPath",
self->state.csvPath).value_or("");
if (self->state.csvPath == ""){ // check if we found the value if not set outputCSV to false
self->state.outputCSV = false;
self->state.file_manager = getSettings(self->state.config_path, "JobActor", "FileManagerPath",
self->state.file_manager).value_or("");
if(self->state.file_manager == "") {
aout(self) << "ERROR: Job_Actor - getSettings() - file_manager_path\n";
self->quit();
return {}; // Failure
}
self->state.output_csv = getSettings(self->state.config_path, "JobActor", "outputCSV",
self->state.output_csv).value_or(false);
if (self->state.output_csv) {
self->state.csv_path = getSettings(self->state.config_path, "JobActor", "csvPath",
self->state.csv_path).value_or("");
if (self->state.csv_path == ""){ // check if we found the value if not set output_csv to false
self->state.output_csv = false;
}
}
// Print Settings
aout(self) << "\nSETTINGS FOR JOB_ACTOR\n" <<
"File Manager Path = " << self->state.fileManager << "\n" <<
"outputCSV = " << self->state.outputCSV << "\n";
if (self->state.outputCSV) {
aout(self) << "csvPath = " << self->state.csvPath << "\n";
"File Manager Path = " << self->state.file_manager << "\n" <<
"output_csv = " << self->state.output_csv << "\n";
if (self->state.output_csv) {
aout(self) << "csv_path = " << self->state.csv_path << "\n";
}
// Initalize global variables
int err = 0;
setTimesDirsAndFiles(self->state.fileManager.c_str(), &err);
setTimesDirsAndFiles(self->state.file_manager.c_str(), &err);
if (err != 0) {
aout(self) << "ERROR: Job_Actor - setTimesDirsAndFiles\n";
}
defineGlobalData(&self->state.startGRU, &err);
defineGlobalData(&self->state.start_gru, &err);
if (err != 0) {
aout(self) << "ERROR: Job_Actor - defineGlobalData\n";
}
readDimension(&self->state.num_gru, &self->state.num_hru, &self->state.start_gru, &err);
if (err != 0) {
aout(self) << "ERROR: Job_Actor - readDimension\n";
}
initJob(self);
// Spawn the file_access_actor. This will return the number of forcing files we are working with
self->state.file_access_actor = self->spawn(file_access_actor, self->state.startGRU, self->state.numGRU,
self->state.outputStrucSize, self->state.configPath, self);
self->state.file_access_actor = self->spawn(file_access_actor, self->state.start_gru, self->state.num_gru,
self->state.output_struct_size, self->state.config_path, self);
aout(self) << "Job Actor Initalized \n";
......@@ -85,35 +95,35 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
aout(self) << "Done Init\n";
}
// aout(self) << "Done init\n";
self->state.GRUInit++;
if (self->state.GRUInit < self->state.numGRU) {
self->state.gru_init++;
if (self->state.gru_init < self->state.num_gru) {
self->send(self, init_hru_v);
} else {
aout(self) << "All GRUs are initalized\n";
self->state.GRUInit = 0; // reset counter in case we have failures
self->state.gru_init = 0; // reset counter in case we have failures
runGRUs(self);
}
},
[=](done_hru, int indx_gru, double total_duration, double init_duration,
double forcing_duration, double run_physics_duration, double write_output_duration) {
aout(self) << "\nDone - GRU:" << self->state.GRUList[indx_gru - 1]->getRefGRU()
aout(self) << "\nDone - GRU:" << self->state.gru_list[indx_gru - 1]->getRefGRU()
<< " - IndexInJob = " << indx_gru << "\n";
self->state.GRUList[indx_gru - 1]->doneRun(total_duration, init_duration, forcing_duration,
self->state.gru_list[indx_gru - 1]->doneRun(total_duration, init_duration, forcing_duration,
run_physics_duration, write_output_duration);
if (self->state.outputCSV) {
self->state.GRUList[indx_gru - 1]->writeSuccess(self->state.successOutputFile);
if (self->state.output_csv) {
self->state.gru_list[indx_gru - 1]->writeSuccess(self->state.success_output_file);
}
self->state.numGRUDone++;
self->state.num_gru_done++;
// Check if we are done
if (self->state.numGRUDone >= self->state.numGRU) {
self->state.numGRUDone = 0; // just in case there were failures
if (self->state.num_gru_done >= self->state.num_gru) {
self->state.num_gru_done = 0; // just in case there were failures
if (self->state.numGRUFailed == 0) {
if (self->state.num_gru_failed == 0) {
self->send(self->state.file_access_actor, deallocate_structures_v);
} else {
restartFailures(self);
......@@ -121,20 +131,20 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
}
},
[=](run_failure, caf::actor actorRef, int indxGRU, int err) {
aout(self) << "GRU:" << self->state.GRUList[indxGRU - 1]->getRefGRU()
<< "indxGRU = " << indxGRU << "Failed \n"
[=](run_failure, caf::actor actorRef, int indx_gru, int err) {
aout(self) << "GRU:" << self->state.gru_list[indx_gru - 1]->getRefGRU()
<< "indx_gru = " << indx_gru << "Failed \n"
<< "Will have to wait until all GRUs are done before it can be re-tried\n";
self->state.numGRUFailed++;
self->state.numGRUDone++;
self->state.GRUList[indxGRU - 1]->updateFailed();
self->state.num_gru_failed++;
self->state.num_gru_done++;
self->state.gru_list[indx_gru - 1]->updateFailed();
// Let the file_access_actor know this actor failed
self->send(self->state.file_access_actor, run_failure_v, indxGRU);
self->send(self->state.file_access_actor, run_failure_v, indx_gru);
// check if we are the last hru to complete
if (self->state.numGRUDone >= self->state.numGRU) {
if (self->state.num_gru_done >= self->state.num_gru) {
restartFailures(self);
}
},
......@@ -143,8 +153,8 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
// Init GRU Actors and the Output Structure
self->send(self, init_hru_v);
// auto gru = self->spawn(gru_actor, 1, 1,
// self->state.configPath,
// self->state.outputStrucSize, self);
// self->state.config_path,
// self->state.output_struct_size, self);
// self->send(gru, init_gru_v);
},
......@@ -154,16 +164,16 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
aout(self) << "\n********************************\n";
aout(self) << "Outputing Timing Info for HRUs\n";
for(auto gru : self->state.GRUList) {
for(auto gru : self->state.gru_list) {
gru->printOutput();
}
aout(self) << "********************************\n";
}
// Delete GRUs
for (auto GRU : self->state.GRUList) {
for (auto GRU : self->state.gru_list) {
delete GRU;
}
self->state.GRUList.clear();
self->state.gru_list.clear();
self->state.job_timing.updateEndPoint("total_duration");
......@@ -177,7 +187,7 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
// Tell Parent we are done
self->send(self->state.parent,
done_job_v,
self->state.numGRUFailed,
self->state.num_gru_failed,
self->state.job_timing.getDuration("total_duration").value_or(-1.0),
read_duration, write_duration);
self->quit();
......@@ -188,8 +198,8 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
if (function == "def_output") {
aout(self) << "Error with the output file, will try creating it agian\n";
std::this_thread::sleep_for(std::chrono::seconds(5));
self->state.file_access_actor = self->spawn(file_access_actor, self->state.startGRU, self->state.numGRU,
self->state.outputStrucSize, self->state.configPath, self);
self->state.file_access_actor = self->spawn(file_access_actor, self->state.start_gru, self->state.num_gru,
self->state.output_struct_size, self->state.config_path, self);
} else {
aout(self) << "Letting Parent Know we are quitting\n";
self->send(self->state.parent, err_v);
......@@ -207,11 +217,11 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
void initJob(stateful_actor<job_state>* self) {
std::string success = "Success"; // allows us to build the string
if (self->state.outputCSV) {
if (self->state.output_csv) {
std::ofstream file;
self->state.successOutputFile = self->state.csvPath += success +=
std::to_string(self->state.startGRU) += ".csv";
file.open(self->state.successOutputFile, std::ios_base::out);
self->state.success_output_file = self->state.csv_path += success +=
std::to_string(self->state.start_gru) += ".csv";
file.open(self->state.success_output_file, std::ios_base::out);
file <<
"GRU," <<
"totalDuration," <<
......@@ -230,12 +240,12 @@ void initJob(stateful_actor<job_state>* self) {
int err = 0;
// aout(self) << "Initalizing Globals \n";
initGlobals(self->state.fileManager.c_str(),
initGlobals(self->state.file_manager.c_str(),
&totalGRUs,
&totalHRUs,
&self->state.numGRU,
&self->state.num_gru,
&numHRUs,
&self->state.startGRU,
&self->state.start_gru,
&err);
if (err != 0) {
aout(self) << "Error: initGlobals" << std::endl;
......@@ -244,17 +254,17 @@ void initJob(stateful_actor<job_state>* self) {
}
void initalizeGRU(stateful_actor<job_state>* self) {
int startGRU = self->state.GRUList.size() + self->state.startGRU;
int indexGRU = self->state.GRUList.size() + 1; // Fortran reference starts at 1
auto gru = self->spawn(hru_actor, startGRU, indexGRU,
self->state.configPath, self->state.file_access_actor,
self->state.outputStrucSize, self);
self->state.GRUList.push_back(new GRUinfo(startGRU, indexGRU, gru,
self->state.dt_init_start_factor, self->state.maxRunAttempts));
int start_gru = self->state.gru_list.size() + self->state.start_gru;
int index_gru = self->state.gru_list.size() + 1; // Fortran reference starts at 1
auto gru = self->spawn(hru_actor, start_gru, index_gru,
self->state.config_path, self->state.file_access_actor,
self->state.output_struct_size, self);
self->state.gru_list.push_back(new GRUinfo(start_gru, index_gru, gru,
self->state.dt_init_start_factor, self->state.max_run_attempts));
}
void runGRUs(stateful_actor<job_state>* self) {
for(auto gru : self->state.GRUList) {
for(auto gru : self->state.gru_list) {
if(!gru->isCompleted() && !gru->isFailed()) {
self->send(gru->getActor(), start_hru_v);
}
......@@ -265,17 +275,17 @@ void restartFailures(stateful_actor<job_state>* self) {
// Need to let the file_access_actor know so it can set up the new output Manager
self->send(self->state.file_access_actor, restart_failures_v);
self->state.numGRU = self->state.numGRUFailed;
self->state.numGRUFailed = 0;
self->state.numGRUDone = 0;
for(auto gru : self->state.GRUList) {
self->state.num_gru = self->state.num_gru_failed;
self->state.num_gru_failed = 0;
self->state.num_gru_done = 0;
for(auto gru : self->state.gru_list) {
if (gru->isFailed() && !gru->isMaxAttemptsReached()) {
gru->updateFailed();
self->send(self->state.file_access_actor, reset_outputCounter_v, gru->getIndxGRU());
gru->updateDt_init();
auto newGRU = self->spawn(hru_actor, gru->getRefGRU(), gru->getIndxGRU(),
self->state.configPath,self->state.file_access_actor,
self->state.outputStrucSize, self);
self->state.config_path,self->state.file_access_actor,
self->state.output_struct_size, self);
gru->updateGRU(newGRU);
gru->updateCurrentAttempt();
self->send(gru->getActor(), dt_init_factor_v, gru->getDt_init());
......
......@@ -19,6 +19,7 @@
! along with this program. If not, see <http://www.gnu.org/licenses/>.
module read_attribute_module
USE, intrinsic :: iso_c_binding
USE nrtype
implicit none
private
......@@ -29,7 +30,8 @@ contains
! ************************************************************************************************
! public subroutine read_dimension: read HRU and GRU dimension information on local attributes
! ************************************************************************************************
subroutine read_dimension(attrFile,fileGRU,fileHRU,numGRUs,numHRUs,startGRU,err,message)
subroutine read_dimension(numGRUs,numHRUs,startGRU,err) bind(C, name="readDimension")
USE netcdf
USE netcdf_util_module,only:nc_file_open ! open netcdf file
USE netcdf_util_module,only:nc_file_close ! close netcdf file
......@@ -37,24 +39,30 @@ subroutine read_dimension(attrFile,fileGRU,fileHRU,numGRUs,numHRUs,startGRU,err,
! provide access to global data
USE globalData,only:gru_struc ! gru->hru mapping structure
USE globalData,only:index_map ! hru->gru mapping structure
! file paths for attribute file
USE summaActors_FileManager,only:SETTINGS_PATH ! define path to settings files (e.g., parameters, soil and veg. tables)
USE summaActors_FileManager,only:LOCAL_ATTRIBUTES ! name of model initial attributes file
implicit none
! Dummy Variables
character(*),intent(in) :: attrFile ! name of attributed file
integer(i4b),intent(out) :: fileGRU ! number of GRUs in the input file
integer(i4b),intent(out) :: fileHRU ! number of HRUs in the input file
integer(i4b),intent(in) :: numGRUs ! number of GRUs for the run domain
integer(i4b),intent(out) :: numHRUs ! number of HRUs for the run domain (value filled in this subroutine)
integer(i4b),intent(in) :: startGRU ! Index of the starting GRU
integer(i4b),intent(out) :: err ! error code
character(*),intent(out) :: message ! error message
! Local Variables
character(len=256) :: attrFile ! name of attributed file
integer(i4b) :: fileGRU ! number of GRUs in the input file
integer(i4b) :: fileHRU ! number of HRUs in the input file
integer(i4b) :: iHRU ! HRU couinting index
integer(i4b) :: iGRU ! GRU loop index
integer(8),allocatable :: gru_id(:),hru_id(:)! read gru/hru IDs in from attributes file
integer(8),allocatable :: hru2gru_id(:) ! read hru->gru mapping in from attributes file
integer(i4b),allocatable :: hru_ix(:) ! hru index for search
character(len=256) :: message ! error message
! define variables for NetCDF file operation
integer(i4b) :: ncID ! NetCDF file ID
......@@ -64,6 +72,8 @@ subroutine read_dimension(attrFile,fileGRU,fileHRU,numGRUs,numHRUs,startGRU,err,
character(len=256) :: cmessage ! error message for downwind routine
err=0; message="read_dimension/"
attrFile = trim(SETTINGS_PATH)//trim(LOCAL_ATTRIBUTES)
! open nc file
call nc_file_open(trim(attrFile),nf90_noWrite,ncID,err,cmessage)
if(err/=0)then; message=trim(message)//trim(cmessage); return; end if
......
Run start time on system: ccyy=2022 - mm=08 - dd=18 - hh=19 - mi=06 - ss=04.983
Run start time on system: ccyy=2022 - mm=08 - dd=18 - hh=19 - mi=40 - ss=37.839
Run start time on system: ccyy=2022 - mm=08 - dd=18 - hh=19 - mi=06 - ss=14.868
Run start time on system: ccyy=2022 - mm=08 - dd=18 - hh=19 - mi=40 - ss=47.915
Run start time on system: ccyy=2022 - mm=08 - dd=18 - hh=19 - mi=06 - ss=27.048
Run start time on system: ccyy=2022 - mm=08 - dd=18 - hh=19 - mi=41 - ss=00.336
Run start time on system: ccyy=2022 - mm=08 - dd=18 - hh=19 - mi=06 - ss=39.863
Run start time on system: ccyy=2022 - mm=08 - dd=18 - hh=19 - mi=41 - ss=13.096
Run start time on system: ccyy=2022 - mm=08 - dd=18 - hh=19 - mi=07 - ss=14.172
Run start time on system: ccyy=2022 - mm=08 - dd=18 - hh=19 - mi=41 - ss=47.568
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment