Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • gwu479/Summa-Actors
  • numerical_simulations_lab/actors/Summa-Actors
2 results
Show changes
Showing
with 2511 additions and 1683 deletions
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "summa_server.hpp"
#include <string>
#include <unistd.h>
#include <limits.h>
namespace caf {
// Inital behaviour that waits to connect to the lead server
behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings,
Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings);
// Function that is called ot connect to the lead server
void connecting_backup(stateful_actor<summa_server_state>* self, const std::string& host, uint16_t port);
behavior summa_backup_server(stateful_actor<summa_server_state>* self, const actor& server_actor);
}
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "settings_functions.hpp"
#include "batch.hpp"
#include "summa_actor.hpp"
#include "message_atoms.hpp"
#include <string>
#include <optional>
#include <unistd.h>
#include <limits.h>
namespace caf {
struct summa_client_state {
strong_actor_ptr current_server = nullptr;
actor current_server_actor;
std::vector<strong_actor_ptr> servers;
std::string hostname;
actor summa_actor_ref;
uint16_t port;
int batch_id;
int client_id; // id held by server
bool running = false; // initalized to false - flipped to true when client returns behavior summa_client
// tuple is the actor ref and hostname of the backup server
std::vector<std::tuple<caf::actor, std::string>> backup_servers_list;
Batch current_batch;
bool saved_batch = false;
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
behavior summa_client_init(stateful_actor<summa_client_state>* self);
behavior summa_client(stateful_actor<summa_client_state>* self);
void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port);
void findLeadServer(stateful_actor<summa_client_state>* self, strong_actor_ptr serv);
}
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "batch.hpp"
#include "batch_container.hpp"
#include "client.hpp"
#include "client_container.hpp"
#include "settings_functions.hpp"
#include "global.hpp"
#include "message_atoms.hpp"
#include <string>
#include <optional>
#include <thread>
#include <chrono>
#include <iostream>
namespace caf {
struct summa_server_state {
strong_actor_ptr current_server; // if server is a backup then this will be set to the lead server
actor current_server_actor;
std::string hostname;
std::string csv_file_path;
std::string csv_output_name = "/batch_results.csv";
Client_Container client_container;
Batch_Container batch_container;
// Actor Reference, Hostname
std::vector<std::tuple<caf::actor, std::string>> backup_servers_list;
// Settings Structures
Distributed_Settings distributed_settings;
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
// Summa Server setup behaviour - initializes the state for the server
behavior summa_server_init(stateful_actor<summa_server_state>* self,
Distributed_Settings distributed_settings,
Summa_Actor_Settings summa_actor_settings,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings);
// Summa Server behaviour - handles messages from clients
behavior summa_server(stateful_actor<summa_server_state>* self);
// Summa Server backup behaviour - handles the exit messages for clients
behavior summa_server_exit(stateful_actor<summa_server_state>* self);
// Creates the csv file that holds the results of the batches
void initializeCSVOutput(std::string csv_output_path);
// Send all connected actors the updated backup servers list
void sendAllBackupServersList(stateful_actor<summa_server_state>* self);
// Look for the lost backup server in the backup servers list and remove it
void findAndRemoveLostBackupServer(stateful_actor<summa_server_state>* self, actor_addr lost_backup_server);
// Check for an idle client to send the failed or next batch we find that is not assigned
void checkForIdleClients(stateful_actor<summa_server_state>* self);
void notifyBackupServersOfRemovedClient(stateful_actor<summa_server_state>* self, Client client);
// Finds the batch the lost client was working on and reassigns it to another client if available
// If no client is available then the batch is added back to the list to be reassigned later
void resolveLostClient(stateful_actor<summa_server_state>* self, Client client);
// Removes the backup server from the list of backup servers
// All connected actors are then notified of the change
void resolveLostBackupServer(stateful_actor<summa_server_state>* self, const down_msg& dm);
// Convience function to keep code clean - just does what you think it does
void printRemainingBatches(stateful_actor<summa_server_state>* self);
} // namespace caf
#### parent directory of the 'build' directory ####
# F_MASTER =
#### fortran compiler ####
# FC =
#### C++ compiler ####
# CC=g++
#### Includes AND Libraries ####
# INCLUDES =
# LIBRARIES =
# ACTORS_INCLUDES =
# ACTORS_LIBRARIES =
# gfortran compiler flags
ifeq "$(FC)" "gfortran"
# Production runs
FLAGS_NOAH = -O3 -ffree-form -ffree-line-length-none -fmax-errors=0 -fPIC
FLAGS_COMM = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
FLAGS_SUMMA = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
FLAGS_ACTORS = -O3
# # Debug runs
# FLAGS_NOAH = -p -g -O0 -ffree-form -ffree-line-length-none -fmax-errors=0 -fbacktrace -Wno-unused -Wno-unused-dummy-argument -fPIC
# FLAGS_COMM = -p -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
# FLAGS_SUMMA = -p -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
# FLAGS_ACTORS = -g -O0 -Wall
endif
# ifort compiler flags
ifeq "$(FC)" "ifort"
# define compiler flags
# FLAGS_NOAH = -O3 -autodouble -warn nounused -noerror_limit -FR -auto -fltconsistency -fPIC
# FLAGS_COMM = -O3 -FR -auto -warn nounused -fltconsistency -fpe0 -fPIC
# FLAGS_SUMMA = -O3 -FR -auto -warn nounused -fltconsistency -fpe0 -fPIC
# debug runs
FLAGS_NOAH = -O3 -g -autodouble -warn nounused -noerror_limit -FR -auto -fltconsistency -fPIC
FLAGS_COMM = -O3 -g -FR -auto -warn nounused -fltconsistency -fpe0 -fPIC
FLAGS_SUMMA = -O3 -g -FR -auto -warn nounused -fltconsistency -fpe0 -fPIC
endif
#========================================================================
# PART 1: Define directory paths
#========================================================================
# Core directory that contains source code
F_KORE_DIR = $(F_MASTER)/build/source
# Location of the compiled modules
MOD_PATH = $(F_MASTER)/build
# Define the directory for the executables
EXE_PATH = $(F_MASTER)/bin
#========================================================================
# PART 2: Assemble all of the SUMMA sub-routines
#========================================================================
# Define directories
DRIVER_DIR = $(F_KORE_DIR)/driver
HOOKUP_DIR = $(F_KORE_DIR)/hookup
NETCDF_DIR = $(F_KORE_DIR)/netcdf
DSHARE_DIR = $(F_KORE_DIR)/dshare
NUMREC_DIR = $(F_KORE_DIR)/numrec
NOAHMP_DIR = $(F_KORE_DIR)/noah-mp
ENGINE_DIR = $(F_KORE_DIR)/engine
INTERFACE_DIR = $(F_KORE_DIR)/interface
JOB_ACTOR_DIR = $(INTERFACE_DIR)/job_actor
FILE_ACCESS_DIR = $(INTERFACE_DIR)/file_access_actor
HRU_ACTOR_DIR = $(INTERFACE_DIR)/hru_actor
# utilities
SUMMA_NRUTIL= \
nrtype.f90 \
f2008funcs.f90 \
nr_utility.f90
NRUTIL = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_NRUTIL))
# Numerical recipes procedures
# NOTE: all numerical recipes procedures are now replaced with free versions
SUMMA_NRPROC= \
expIntegral.f90 \
spline_int.f90
NRPROC = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_NRPROC))
# Hook-up modules (set files and directory paths)
SUMMA_HOOKUP= \
ascii_util.f90 \
summaActors_FileManager.f90
HOOKUP = $(patsubst %, $(HOOKUP_DIR)/%, $(SUMMA_HOOKUP))
# Data modules
SUMMA_DATAMS= \
multiconst.f90 \
var_lookup.f90 \
data_types.f90 \
globalData.f90 \
flxMapping.f90 \
get_ixname.f90 \
popMetadat.f90 \
outpt_stat.f90
DATAMS = $(patsubst %, $(DSHARE_DIR)/%, $(SUMMA_DATAMS))
# utility modules
SUMMA_UTILMS= \
time_utils.f90 \
mDecisions.f90 \
snow_utils.f90 \
soil_utils.f90 \
updatState.f90 \
matrixOper.f90
UTILMS = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_UTILMS))
# Model guts
SUMMA_MODGUT= \
MODGUT = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_MODGUT))
# Solver
SUMMA_SOLVER= \
vegPhenlgy.f90 \
diagn_evar.f90 \
stomResist.f90 \
groundwatr.f90 \
vegSWavRad.f90 \
vegNrgFlux.f90 \
ssdNrgFlux.f90 \
vegLiqFlux.f90 \
snowLiqFlx.f90 \
soilLiqFlx.f90 \
bigAquifer.f90 \
computFlux.f90 \
computResid.f90 \
computJacob.f90 \
eval8summa.f90 \
summaSolve.f90 \
systemSolv.f90 \
varSubstep.f90 \
opSplittin.f90 \
coupled_em.f90
SOLVER = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_SOLVER))
# Interface code for Fortran-C++
SUMMA_INTERFACE= \
cppwrap_datatypes.f90 \
cppwrap_auxiliary.f90 \
cppwrap_metadata.f90 \
INTERFACE = $(patsubst %, $(INTERFACE_DIR)/%, $(SUMMA_INTERFACE))
SUMMA_FILEACCESS_INTERFACE = \
initOutputStruc.f90 \
deallocateOutputStruc.f90 \
cppwrap_fileAccess.f90
FILEACCESS_INTERFACE = $(patsubst %, $(FILE_ACCESS_DIR)/%, $(SUMMA_FILEACCESS_INTERFACE))
SUMMA_JOB_INTERFACE = \
cppwrap_job.f90
JOB_INTERFACE = $(patsubst %, $(JOB_ACTOR_DIR)/%, $(SUMMA_JOB_INTERFACE))
SUMMA_HRU_INTERFACE = \
cppwrap_hru.f90
HRU_INTERFACE = $(patsubst %, $(HRU_ACTOR_DIR)/%, $(SUMMA_HRU_INTERFACE))
# Define routines for SUMMA preliminaries
SUMMA_PRELIM= \
conv_funcs.f90 \
sunGeomtry.f90 \
convE2Temp.f90 \
allocspaceActors.f90 \
alloc_file_access.f90\
checkStruc.f90 \
childStruc.f90 \
ffile_info.f90 \
read_attribute.f90 \
read_pinit.f90 \
pOverwrite.f90 \
read_paramActors.f90 \
paramCheck.f90 \
check_icondActors.f90 \
# allocspace.f90
PRELIM = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_PRELIM))
SUMMA_NOAHMP= \
module_model_constants.F \
module_sf_noahutl.F \
module_sf_noahlsm.F \
module_sf_noahmplsm.F
NOAHMP = $(patsubst %, $(NOAHMP_DIR)/%, $(SUMMA_NOAHMP))
# Define routines for the SUMMA model runs
SUMMA_MODRUN = \
indexState.f90 \
getVectorz.f90 \
updateVars.f90 \
var_derive.f90 \
read_forcingActors.f90 \
access_forcing.f90\
access_write.f90 \
derivforce.f90 \
snowAlbedo.f90 \
canopySnow.f90 \
tempAdjust.f90 \
snwCompact.f90 \
layerMerge.f90 \
layerDivide.f90 \
volicePack.f90 \
qTimeDelay.f90
MODRUN = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_MODRUN))
# Define NetCDF routines
# OutputStrucWrite is not a netcdf subroutine and should be
# moved
SUMMA_NETCDF = \
netcdf_util.f90 \
def_output.f90 \
outputStrucWrite.f90 \
writeOutput.f90 \
read_icondActors.f90
NETCDF = $(patsubst %, $(NETCDF_DIR)/%, $(SUMMA_NETCDF))
# ... stitch together common programs
COMM_ALL = $(NRUTIL) $(NRPROC) $(HOOKUP) $(DATAMS) $(UTILMS)
# ... stitch together SUMMA programs
SUMMA_ALL = $(NETCDF) $(PRELIM) $(MODRUN) $(SOLVER)
# Define the driver routine
SUMMA_DRIVER= \
summaActors_type.f90 \
summaActors_util.f90 \
summaActors_globalData.f90 \
summaActors_init.f90 \
SummaActors_setup.f90 \
summaActors_restart.f90 \
summaActors_forcing.f90 \
SummaActors_modelRun.f90 \
summaActors_alarms.f90 \
summaActors_wOutputStruc.f90
DRIVER = $(patsubst %, $(DRIVER_DIR)/%, $(SUMMA_DRIVER))
ACTORC = $(F_KORE_DIR)/actors/main.cc
ACTOR_TEST = $(F_KORE_DIR)/testing/testing_main.cc
#========================================================================
# PART 3: compilation
#======================================================================
# Compile
all: lib main
lib: compile_noah compile_comm compile_summa link clean
main: actors actorsLink actorsClean
test: actors_test actors_testLink actorsClean
# compile Noah-MP routines
compile_noah:
$(FC) $(FLAGS_NOAH) -c $(NRUTIL) $(NOAHMP)
# compile common routines
compile_comm:
$(FC) $(FLAGS_COMM) -c $(COMM_ALL) $(INCLUDES)
# compile SUMMA routines
compile_summa:
$(FC) $(FLAGS_SUMMA) -c $(SUMMA_ALL) $(DRIVER) $(INTERFACE) $(JOB_INTERFACE) $(FILEACCESS_INTERFACE) $(HRU_INTERFACE) $(INCLUDES)
# generate library
link:
$(FC) -shared *.o -o libsumma.so
# compile the c++ portion of SummaActors
actors:
$(CC) $(FLAGS_ACTORS) -c $(ACTORC) -std=c++17 $(ACTORS_INCLUDES)
actorsLink:
$(CC) -o summaMain *.o $(ACTORS_LIBRARIES)
actors_test:
$(CC) $(FLAGS_ACTORS) -c $(ACTOR_TEST) -std=c++17 $(ACTORS_INCLUDES)
actors_testLink:
$(CC) -o summaTest *.o $(ACTORS_LIBRARIES)
actorsClean:
rm *.o
# Remove object files
clean:
rm -f *.o *.mod soil_veg_gen_parm__genmod.f90
clean_lib:
rm *.so
#ifndef FILEACCESS_H_
#define FILEACCESS_H_
#include "../interface/fortran_dataTypes.h"
#include "../interface/file_access_actor/fileAccess_subroutine_wrappers.h"
#include "caf/all.hpp"
#include "messageAtoms.h"
#include <vector>
class forcingFile {
private:
int fileID; // which file are we relative the forcing file list saved in fortran
int numSteps; // the number of steps in this forcing file
bool isLoaded; // is this file actually loaded in to RAM yet.
public:
forcingFile(int fileID) {
this->fileID = fileID;
this->numSteps = 0;
this->isLoaded = false;
}
int getNumSteps() {
return this->numSteps;
}
bool isFileLoaded() {
return this->isLoaded;
}
void updateIsLoaded() {
this->isLoaded = true;
}
void updateNumSteps(int numSteps) {
this->numSteps = numSteps;
this->isLoaded = true;
}
};
struct file_access_state {
// Variables set on Spwan
caf::actor parent;
int startGRU;
int numGRU;
void *handle_forcFileInfo = new_handle_file_info(); // Handle for the forcing file information
void *handle_ncid = new_handle_var_i(); // output file ids
bool outputFileExists = false;
int num_steps;
int outputStrucSize;
int stepsInCurrentFile;
int numFiles;
int filesLoaded;
int err = 0;
std::vector<forcingFile> forcFileList; // list of steps in file
std::vector<bool> outputFileInitHRU;
};
#endif
\ No newline at end of file
#ifndef FILEACCESSACTOR_H_
#define FILEACCESSACTOR_H_
#include "FileAccess.h"
using namespace caf;
void initalizeFileAccessActor(stateful_actor<file_access_state>* self);
behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU,
int outputStrucSize, actor parent) {
// Set File_Access_Actor variables
self->state.parent = parent;
self->state.numGRU = numGRU;
self->state.startGRU = startGRU;
self->state.outputStrucSize = outputStrucSize;
initalizeFileAccessActor(self);
return {
[=](initalize_outputStrucure) {
Init_OutputStruct(self->state.handle_forcFileInfo, &self->state.outputStrucSize,
&self->state.numGRU, &self->state.err);
},
[=](access_forcing, int currentFile, caf::actor refToRespondTo) {
// aout(self) << "Received Current FIle = " << currentFile << std::endl;
if (currentFile <= self->state.numFiles) {
if(self->state.forcFileList[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1
// aout(self) << "ForcingFile Already Loaded \n";
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
} else {
// Load the file
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
}
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
// Check if we have loaded all forcing files
if(self->state.filesLoaded <= self->state.numFiles) {
self->send(self, access_forcing_internal_v, currentFile + 1);
}
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
}
} else {
aout(self) << currentFile << "is larger than expected" << std::endl;
}
},
[=](access_forcing_internal, int currentFile) {
if (self->state.filesLoaded <= self->state.numFiles &&
currentFile <= self->state.numFiles) {
// aout(self) << "Loading in background, File:" << currentFile << "\n";
if (self->state.forcFileList[currentFile - 1].isFileLoaded()) {
aout(self) << "File Loaded when shouldn't be \n";
}
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
}
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->send(self, access_forcing_internal_v, currentFile + 1);
} else {
aout(self) << "All Forcing Files Loaded \n";
}
},
[=](write_output, int indxGRU, int indxHRU, int numStepsToWrite,
caf::actor refToRespondTo) {
int err = 0;
bool hruInit = self->state.outputFileInitHRU[indxGRU - 1];
FileAccessActor_WriteOutput(self->state.handle_ncid, &self->state.outputFileExists,
&numStepsToWrite, &self->state.startGRU, &self->state.numGRU,
&hruInit, &indxGRU, &indxHRU, &err);
if (err != 0) {
aout(self) << "ERROR: Writing Output" << std::endl;
}
self->state.outputFileInitHRU[indxGRU - 1] = true; //
self->send(refToRespondTo, done_write_v);
},
[=](deallocate_structures) {
aout(self) << "Deallocating Structure" << std::endl;
FileAccessActor_DeallocateStructures(self->state.handle_forcFileInfo, self->state.handle_ncid);
self->send(self->state.parent, deallocate_structures_v);
self->quit();
},
[=](reset_outputCounter, int indxGRU) {
resetOutputCounter(&indxGRU);
}
};
}
void initalizeFileAccessActor(stateful_actor<file_access_state>* self) {
int indx = 1;
int err = 0;
// aout(self) << "Set Up the forcing file" << std::endl;
ffile_info_C(&indx, self->state.handle_forcFileInfo, &self->state.numFiles, &err);
if (err != 0) {
aout(self) << "Error: ffile_info_C - HRU = " << indx <<
" - indxGRU = " << indx << " - refGRU = " << std::endl;
self->quit();
}
mDecisions_C(&self->state.num_steps, &err);
if (err != 0) {
aout(self) << "Error: mDecisions - FileAccess Actor " << std::endl;
self->quit();
}
read_pinit_C(&err);
read_vegitationTables(&err);
// initalize vector for knowing if HRU output has init'd
for(int i = 0; i < self->state.numGRU; i++) {
self->state.outputFileInitHRU.push_back(false);
}
self->send(self->state.parent, done_file_access_actor_init_v);
// initalize the forcingFile array
self->state.filesLoaded = 0;
for (int i = 1; i <= self->state.numFiles; i++) {
self->state.forcFileList.push_back(forcingFile(i));
}
}
#endif
\ No newline at end of file
#ifndef GRUinfo_H_
#define GRUinfo_H_
#include "caf/all.hpp"
#include <iostream>
#include <fstream>
class GRUinfo {
private:
int refGRU; // This will be the same as the refGRU
int indxGRU;
caf::actor GRU;
// Variable to update
int dt_init;
// Completed Information
int currentAttempt;
int maxAttempts;
bool completed;
bool failed;
// Timing information for the GRU
double runTime;
double initDuration;
double forcingDuration;
double runPhysicsDuration;
double writeOutputDuration;
public:
// Constructor
GRUinfo(int refGRU, int indxGRU, caf::actor gru, int dt_init, int maxAttempts) {
this->refGRU = refGRU;
this->indxGRU = indxGRU;
this->GRU = gru;
this->dt_init = dt_init;
this->currentAttempt = 1;
this->maxAttempts = maxAttempts;
this->completed = false;
this->failed = false;
}
// Deconstructor
~GRUinfo(){};
// Getters
int getRefGRU() {
return this->refGRU;
}
int getIndxGRU() {
return this->indxGRU;
}
int getDt_init() {
return this->dt_init;
}
caf::actor getActor() {
return GRU;
}
// Setters
void updateGRU(caf::actor gru) {
this->GRU = gru;
}
void updateFailed() {
if (this->failed) {
this->failed = false;
} else {
this->failed = true;
}
}
void updateCompletedToTrue(){
this->completed = true;
}
void updateDt_init() {
this->dt_init = this->dt_init * 2;
}
void updateCurrentAttempt() {
this->currentAttempt++;
}
// Methods that return Booleans
bool isMaxAttemptsReached() {
return this->maxAttempts <= this->currentAttempt;
}
bool isFailed() {
return this->failed;
}
bool isCompleted() {
return this->completed;
}
void doneRun(double runTime, double initDuration, double forcingDuration,
double runPhysicsDuration, double writeOutputDuration) {
this->runTime = runTime;
this->initDuration = initDuration;
this->forcingDuration = forcingDuration;
this->runPhysicsDuration = runPhysicsDuration;
this->writeOutputDuration = writeOutputDuration;
}
// Methods for writing statistics to a file
void writeSuccess(std::string fileName) {
std::ofstream file;
file.open(fileName, std::ios_base::app);
file << this->refGRU << ","
<< this->runTime << ","
<< this->initDuration << ","
<< this->forcingDuration << ","
<< this->runPhysicsDuration << ","
<< this->writeOutputDuration << ","
<< this->dt_init << ","
<< this->currentAttempt << "\n";
file.close();
}
void writeFail(std::string fileName) {
std::ofstream file;
file.open(fileName, std::ios_base::app);
file << this->refGRU << ","
<< this->dt_init << ","
<< this->currentAttempt << "\n";
file.close();
}
};
#endif
\ No newline at end of file
#ifndef HRU_H_
#define HRU_H_
#include "caf/all.hpp"
#include "../interface/fortran_dataTypes.h"
#include "../interface/hru_actor/hru_subroutine_wrappers.h"
#include "messageAtoms.h"
#include <fstream>
#include <string>
#include <typeinfo>
#include <stdio.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <chrono>
#include <iostream>
using namespace caf;
struct hru_state {
// Actor References
caf::actor file_access_actor;
caf::actor parent;
// Info about which HRU we are and our indexes
// into global structures in Fortran
int indxHRU; // index for hru part of derived types in FORTRAN
int indxGRU; // index for gru part of derived types in FORTRAN
int refGRU; // The actual ID of the GRU we are
// Variables for output/forcing structures
int outputStrucSize;
int outputStep;
int stepsInCurrentFFile;
int forcingFileStep;
int currentForcingFile = 1;
// statistics structures
void *handle_forcStat = new_handle_var_dlength(); // model forcing data
void *handle_progStat = new_handle_var_dlength(); // model prognostic (state) variables
void *handle_diagStat = new_handle_var_dlength(); // model diagnostic variables
void *handle_fluxStat = new_handle_var_dlength(); // model fluxes
void *handle_indxStat = new_handle_var_dlength(); // model indices
void *handle_bvarStat = new_handle_var_dlength(); // basin-average variables
// primary data structures (scalars)
void *handle_timeStruct = new_handle_var_i(); // model time data
void *handle_forcStruct = new_handle_var_d(); // model forcing data
void *handle_attrStruct = new_handle_var_d(); // local attributes for each HRU
void *handle_typeStruct = new_handle_var_i(); // local classification of soil veg etc. for each HRU
void *handle_idStruct = new_handle_var_i8();
// primary data structures (variable length vectors)
void *handle_indxStruct = new_handle_var_ilength(); // model indices
void *handle_mparStruct = new_handle_var_dlength(); // model parameters
void *handle_progStruct = new_handle_var_dlength(); // model prognostic (state) variables
void *handle_diagStruct = new_handle_var_dlength(); // model diagnostic variables
void *handle_fluxStruct = new_handle_var_dlength(); // model fluxes
// basin-average structures
void *handle_bparStruct = new_handle_var_d(); // basin-average parameters
void *handle_bvarStruct = new_handle_var_dlength(); // basin-average variables
// ancillary data structures
void *handle_dparStruct = new_handle_var_d(); // default model parameters
// Local hru data
void *handle_ncid = new_handle_var_i(); // output file ids
void *handle_statCounter = new_handle_var_i();
void *handle_outputTimeStep = new_handle_var_i();
void *handle_resetStats = new_handle_flagVec();
void *handle_finalizeStats = new_handle_flagVec();
void *handle_oldTime = new_handle_var_i();
void *handle_refTime = new_handle_var_i();
void *handle_finshTime = new_handle_var_i();
void *handle_startTime = new_handle_var_i();
// Misc Variables
int timestep = 1; // Current Timestep of HRU simulation
int computeVegFlux; // flag to indicate if we are computing fluxes over vegetation
double dt_init; // used to initialize the length of the sub-step for each HRU
double upArea; // area upslope of each HRU
int num_steps = 0; // number of time steps
int forcingStep = 1; // index of current time step in current forcing file
int iFile = 1; // index of current forcing file from forcing file list
int dt_init_factor = 1; // factor of dt_init (coupled_em)
// Julian Day variables
double fracJulDay;
double tmZoneOffsetFracDay;
int yearLength;
int err = 0; // error conotrol
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration;
std::chrono::time_point<std::chrono::system_clock> initStart;
std::chrono::time_point<std::chrono::system_clock> initEnd;
double initDuration;
std::chrono::time_point<std::chrono::system_clock> forcingStart;
std::chrono::time_point<std::chrono::system_clock> forcingEnd;
std::chrono::duration<double> forcingDuration;
std::chrono::time_point<std::chrono::system_clock> runPhysicsStart;
std::chrono::time_point<std::chrono::system_clock> runPhysicsEnd;
std::chrono::duration<double> runPhysicsDuration;
std::chrono::time_point<std::chrono::system_clock> writeOutputStart;
std::chrono::time_point<std::chrono::system_clock> writeOutputEnd;
std::chrono::duration<double> writeOutputDuration;
};
/**
Function to initalize the HRU for running
*/
void Initialize_HRU(stateful_actor<hru_state>* self);
/**
Function runs all of the hru time_steps
*/
int Run_HRU(stateful_actor<hru_state>* self);
void initalizeTimeVars(stateful_actor<hru_state>* self);
void finalizeTimeVars(stateful_actor<hru_state>* self);
void deallocateHRUStructures(stateful_actor<hru_state>* self);
#endif
\ No newline at end of file
#ifndef HRUActor_H_
#define HRUActor_H_
#include "HRU.h"
/**
* @brief HRU Actor is reponsible for carrying out the computation component of SUMMA
*
* @param self The Actor Ref
* @param refGRU The GRU we are computing in reference to the forcingFile
* @param indxGRU The GRU we are computing's index in gru_struc
* @param parent
* @return behavior
*/
behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU,
caf::actor file_access_actor, int outputStrucSize, caf::actor parent) {
// Timing Information
self->state.start = std::chrono::high_resolution_clock::now();
self->state.duration = 0.0;
self->state.initDuration = 0.0;
// Actor References
self->state.file_access_actor = file_access_actor;
self->state.parent = parent;
// Indexes into global structures
self->state.indxHRU = 1;
self->state.indxGRU = indxGRU;
self->state.refGRU = refGRU;
// OutputStructure Size (how many timesteps we can compute before we need to write)
self->state.outputStrucSize = outputStrucSize;
// init counters
self->state.timestep = 1;
self->state.outputStep = 1;
Initialize_HRU(self);
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration += std::chrono::duration_cast<std::chrono::seconds>
(self->state.end - self->state.start).count();
self->send(self->state.parent, done_init_hru_v);
return {
// Starts the HRU and tells it to ask for data from the file_access_actor
[=](start_hru) {
self->state.start = std::chrono::high_resolution_clock::now();
self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile, self);
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration += std::chrono::duration_cast<std::chrono::seconds>
(self->state.end - self->state.start).count();
},
// [=](file_information, int outputStrucSize, int stepsInCurrentFFile) {
// self->state.outputStrucSize = outputStrucSize;
// self->state.stepsInCurrentFFile = stepsInCurrentFFile;
// self->state.outputStep = 1;
// self->send(self, run_hru_v, self->state.stepsInCurrentFFile);
// },
[=](done_write) {
self->state.start = std::chrono::high_resolution_clock::now();
// We receive a done_write message so we ensure that
// stepsInCurrentFFile remains unchanged
if (self->state.timestep >= self->state.num_steps) {
finalizeTimeVars(self);
double forcingDuration = std::chrono::duration_cast<std::chrono::seconds>(self->state.forcingDuration).count();
double runPhysicsDuration = std::chrono::duration_cast<std::chrono::seconds>(self->state.runPhysicsDuration).count();
double writeOutputDuration = std::chrono::duration_cast<std::chrono::seconds>(self->state.writeOutputDuration).count();
// Tell our parent we are done
self->send(self->state.parent,
done_hru_v,
self->state.indxGRU,
self->state.duration,
self->state.initDuration,
forcingDuration,
runPhysicsDuration,
writeOutputDuration);
deallocateHRUStructures(self);
self->quit();
return;
}
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration += std::chrono::duration_cast<std::chrono::seconds>
(self->state.end - self->state.start).count();
self->send(self, run_hru_v, self->state.stepsInCurrentFFile);
},
[=](run_hru, int stepsInCurrentFFile) {
// aout(self) << "Running HRU" << std::endl;
self->state.start = std::chrono::high_resolution_clock::now();
self->state.stepsInCurrentFFile = stepsInCurrentFFile;
int err = 0;
while( err == 0 ) {
err = Run_HRU(self);
if (err != 0) {
// RUN FAILURE!!! Notify Parent
self->send(self->state.parent, run_failure_v, self->state.indxGRU, err);
self->quit();
return;
};
// Check if HRU is done computing
if (self->state.timestep >= self->state.num_steps) {
// write out data
// aout(self) << "Sending Final Write" << std::endl;
self->send(self->state.file_access_actor, write_output_v,
self->state.indxGRU, self->state.indxHRU, self->state.outputStep, self);
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration += std::chrono::duration_cast<std::chrono::seconds>
(self->state.end - self->state.start).count();
return;
}
self->state.timestep += 1;
// check if we need more forcing information
if (self->state.forcingStep > self->state.stepsInCurrentFFile) {
// aout(self) << "Asking for more forcing data" << std::endl;
self->send(self->state.file_access_actor, access_forcing_v, self->state.iFile + 1, self);
break;
}
// check if we need to write our output
if (self->state.outputStep >= self->state.outputStrucSize) {
// aout(self) << "Sending Write" << std::endl;
self->send(self->state.file_access_actor, write_output_v,
self->state.indxGRU, self->state.indxHRU, self->state.outputStep, self);
self->state.outputStep = 1;
break;
}
self->state.outputStep += 1; // value to monitor how full the output structure is
}
// HRU is done looping over timesteps in the current file
// update total timing, don't want to include waiting time
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration += std::chrono::duration_cast<std::chrono::seconds>
(self->state.end - self->state.start).count();
},
[=](dt_init_factor, int dt_init_factor) {
aout(self) << "Recieved New dt_init_factor to attempt on next run \n";
self->state.dt_init_factor = dt_init_factor;
},
};
/*********************************************************************************************************
*********************************** END ACTOR MESSAGE HANDLERS ******************************************
*********************************************************************************************************/
}
void Initialize_HRU(stateful_actor<hru_state>* self) {
self->state.initStart = std::chrono::high_resolution_clock::now();
// aout(self) << "Initalizing HRU" << std::endl;
// aout(self) << "Entering Initalize \n";
Initialize(&self->state.indxGRU,
&self->state.num_steps,
self->state.handle_forcStat,
self->state.handle_progStat,
self->state.handle_diagStat,
self->state.handle_fluxStat,
self->state.handle_indxStat,
self->state.handle_bvarStat,
self->state.handle_timeStruct,
self->state.handle_forcStruct,
self->state.handle_attrStruct,
self->state.handle_typeStruct,
self->state.handle_idStruct,
self->state.handle_indxStruct,
self->state.handle_mparStruct,
self->state.handle_progStruct,
self->state.handle_diagStruct,
self->state.handle_fluxStruct,
self->state.handle_bparStruct,
self->state.handle_bvarStruct,
self->state.handle_dparStruct,
self->state.handle_startTime,
self->state.handle_finshTime,
self->state.handle_refTime,
self->state.handle_oldTime, &self->state.err);
if (self->state.err != 0) {
aout(self) << "Error: Initialize - HRU = " << self->state.indxHRU <<
" - indxGRU = " << self->state.indxGRU << " - refGRU = "<< self->state.refGRU << std::endl;
aout(self) << "Error = " << self->state.err << "\n";
self->quit();
return;
}
// aout(self) << "Setup Param" << std::endl;
SetupParam(&self->state.indxGRU,
&self->state.indxHRU,
self->state.handle_attrStruct,
self->state.handle_typeStruct,
self->state.handle_idStruct,
self->state.handle_mparStruct,
self->state.handle_bparStruct,
self->state.handle_bvarStruct,
self->state.handle_dparStruct,
self->state.handle_startTime,
self->state.handle_oldTime,
&self->state.upArea, &self->state.err);
if (self->state.err != 0) {
aout(self) << "Error: SetupParam - HRU = " << self->state.indxHRU <<
" - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << std::endl;
self->quit();
return;
}
// aout(self) << "Restart" << std::endl;
Restart(&self->state.indxGRU,
&self->state.indxHRU,
self->state.handle_indxStruct,
self->state.handle_mparStruct,
self->state.handle_progStruct,
self->state.handle_diagStruct,
self->state.handle_fluxStruct,
self->state.handle_bvarStruct,
&self->state.dt_init, &self->state.err);
if (self->state.err != 0) {
aout(self) << "Error: Restart - HRU = " << self->state.indxHRU <<
" - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU << std::endl;
self->quit();
return;
}
// aout(self) << self->state.refGRU << " - Done Init" << std::endl;
self->state.initEnd = std::chrono::high_resolution_clock::now();
self->state.initDuration = std::chrono::duration_cast<std::chrono::seconds>
(self->state.initEnd - self->state.initStart).count();
}
/*
** RETURNS 0 on success and -1 on failure
*/
int Run_HRU(stateful_actor<hru_state>* self) {
// Housekeeping of the timing variables
if(self->state.timestep == 1) {
initalizeTimeVars(self);
}
/**********************************************************************
** READ FORCING
**********************************************************************/
self->state.forcingStart = std::chrono::high_resolution_clock::now();
Forcing(&self->state.indxGRU,
&self->state.timestep,
self->state.handle_timeStruct,
self->state.handle_forcStruct,
&self->state.iFile,
&self->state.forcingStep,
&self->state.fracJulDay,
&self->state.tmZoneOffsetFracDay,
&self->state.yearLength,
&self->state.err);
if (self->state.err != 0) {
aout(self) << "Error: Forcing - HRU = " << self->state.indxHRU <<
" - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU <<
" - Timestep = " << self->state.timestep << std::endl;
return 10;
}
self->state.forcingEnd = std::chrono::high_resolution_clock::now();
self->state.forcingDuration += self->state.forcingEnd - self->state.forcingStart;
if (self->state.timestep % 50000 == 0) {
aout(self) << self->state.refGRU << ":Accumulated Forcing Time = " <<
self->state.forcingDuration << std::endl;
aout(self) << self->state.refGRU << " - Running Timestep = " << self->state.timestep << std::endl;
aout(self) << self->state.refGRU << ":Accumulated Run Physics Time = " <<
self->state.runPhysicsDuration << std::endl;
aout(self) << self->state.refGRU << ":Accumulated WriteOutput = "
<< self->state.writeOutputDuration << std::endl;
}
/**********************************************************************
** RUN_PHYSICS
**********************************************************************/
self->state.runPhysicsStart = std::chrono::high_resolution_clock::now();
self->state.err = 0;
RunPhysics(&self->state.indxHRU,
&self->state.timestep,
self->state.handle_timeStruct,
self->state.handle_forcStruct,
self->state.handle_attrStruct,
self->state.handle_typeStruct,
self->state.handle_indxStruct,
self->state.handle_mparStruct,
self->state.handle_progStruct,
self->state.handle_diagStruct,
self->state.handle_fluxStruct,
self->state.handle_bvarStruct,
&self->state.fracJulDay,
&self->state.tmZoneOffsetFracDay,
&self->state.yearLength,
&self->state.computeVegFlux,
&self->state.dt_init,
&self->state.dt_init_factor,
&self->state.err);
if (self->state.err != 0) {
aout(self) << "Error: RunPhysics - HRU = " << self->state.indxHRU <<
" - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU <<
" - Timestep = " << self->state.timestep << std::endl;
return 20;
}
self->state.runPhysicsEnd = std::chrono::high_resolution_clock::now();
self->state.runPhysicsDuration += self->state.runPhysicsEnd - self->state.runPhysicsStart;
/**********************************************************************
** WRITE_OUTPUT
**********************************************************************/
self->state.writeOutputStart = std::chrono::high_resolution_clock::now();
WriteOutput(&self->state.indxHRU,
&self->state.indxGRU,
&self->state.timestep,
self->state.handle_forcStat,
self->state.handle_progStat,
self->state.handle_diagStat,
self->state.handle_fluxStat,
self->state.handle_indxStat,
self->state.handle_bvarStat,
self->state.handle_timeStruct,
self->state.handle_forcStruct,
self->state.handle_attrStruct,
self->state.handle_typeStruct,
self->state.handle_indxStruct,
self->state.handle_mparStruct,
self->state.handle_progStruct,
self->state.handle_diagStruct,
self->state.handle_fluxStruct,
self->state.handle_bparStruct,
self->state.handle_bvarStruct,
self->state.handle_statCounter,
self->state.handle_outputTimeStep,
self->state.handle_resetStats,
self->state.handle_finalizeStats,
self->state.handle_finshTime,
self->state.handle_oldTime,
&self->state.outputStep,
&self->state.forcingStep,
&self->state.err);
if (self->state.err != 0) {
aout(self) << "Error: WriteOutput - HRU = " << self->state.indxHRU <<
" - indxGRU = " << self->state.indxGRU << " - refGRU = " << self->state.refGRU <<
" - Timestep = " << self->state.timestep << std::endl;
return 30;
}
self->state.writeOutputEnd = std::chrono::high_resolution_clock::now();
self->state.writeOutputDuration += self->state.writeOutputEnd - self->state.writeOutputStart;
return 0;
}
void initalizeTimeVars(stateful_actor<hru_state>* self) {
self->state.forcingStart = std::chrono::high_resolution_clock::now();
self->state.forcingEnd = std::chrono::high_resolution_clock::now();
self->state.forcingDuration = self->state.forcingEnd - self->state.forcingStart;
self->state.runPhysicsStart = std::chrono::high_resolution_clock::now();
self->state.runPhysicsEnd = std::chrono::high_resolution_clock::now();
self->state.runPhysicsDuration = self->state.runPhysicsEnd - self->state.runPhysicsStart;
self->state.writeOutputStart = std::chrono::high_resolution_clock::now();
self->state.writeOutputEnd = std::chrono::high_resolution_clock::now();
self->state.writeOutputDuration = self->state.writeOutputEnd - self->state.writeOutputStart;
}
void finalizeTimeVars(stateful_actor<hru_state>* self) {
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration += std::chrono::duration_cast<std::chrono::seconds>
(self->state.end - self->state.start).count();
// // Output the timing information
// aout(self) << "DONE:" << self->state.refGRU << ":duration = " << self->state.duration << std::endl;
// aout(self) << "DONE:" << self->state.refGRU << ":initDuration = " << self->state.initDuration << std::endl;
// aout(self) << "DONE:" << self->state.refGRU << ":forcingDuration = " << self->state.forcingDuration.count() << std::endl;
// aout(self) << "DONE:" << self->state.refGRU << ":runPhysicsDuration = " << self->state.runPhysicsDuration.count() << std::endl;
// aout(self) << "DONE:" << self->state.refGRU << ":writeOutputDuration = " << self->state.writeOutputDuration.count() << std::endl;
}
void deallocateHRUStructures(stateful_actor<hru_state>* self) {
DeallocateStructures(self->state.handle_forcStat,
self->state.handle_progStat,
self->state.handle_diagStat,
self->state.handle_fluxStat,
self->state.handle_indxStat,
self->state.handle_bvarStat,
self->state.handle_timeStruct,
self->state.handle_forcStruct,
self->state.handle_attrStruct,
self->state.handle_typeStruct,
self->state.handle_idStruct,
self->state.handle_indxStruct,
self->state.handle_mparStruct,
self->state.handle_progStruct,
self->state.handle_diagStruct,
self->state.handle_fluxStruct,
self->state.handle_bparStruct,
self->state.handle_bvarStruct,
self->state.handle_dparStruct,
self->state.handle_startTime,
self->state.handle_finshTime,
self->state.handle_refTime,
self->state.handle_oldTime,
self->state.handle_ncid,
self->state.handle_statCounter,
self->state.handle_outputTimeStep,
self->state.handle_resetStats,
self->state.handle_finalizeStats,
&self->state.err);
}
#endif
\ No newline at end of file
#ifndef SUMMACLIENT_H_
#define SUMMACLIENT_H_
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "string.h"
#include <unistd.h>
#include <vector>
#include "FileAccessActor.h"
#include "../interface/job_actor/job_subroutine_wrappers.h"
#include "HRUActor.h"
#include <chrono>
#include "messageAtoms.h"
#include "GRUinfo.h"
#include <iostream>
#include <fstream>
#include <sys/stat.h>
struct job_state {
// Actor References
caf::actor file_access_actor; // actor reference for the file_access_actor
caf::actor parent; // actor reference to the top-level SummaActor
// Job Parameters
int startGRU; // Starting GRU for this job
int numGRU; // Number of GRUs for this job
std::string fileManager; // Path of the fileManager.txt file
// Variables for GRU monitoring
int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
int maxRunAttempts = 3; // Max number of attemtps to solve a GRU
std::vector<GRUinfo*> GRUList; // List of all GRUs under this job actor
int numGRUDone = 0; // The number of GRUs that have completed
int GRUInit = 0; // Number of GRUs initalized
int err = 0; // Error Code
int numGRUFailed = 0; // Number of GRUs that have failed
int outputStrucSize;
// Timing Variables
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration;
// Output File Names for Timings
std::string successOutputFile = "/scratch/gwf/gwf_cmt/kck540/summaActors/csvFiles/SuccessHRU";
std::string csvOut;
std::string failedOutputFile = "failedHRU";
std::string fileAccessActorStats = "fileAccessActor.csv";
};
void initJob(stateful_actor<job_state>* self);
void initalizeGRU(stateful_actor<job_state>* self);
void runGRUs(stateful_actor<job_state>* self);
void restartFailures(stateful_actor<job_state>* self);
#endif
\ No newline at end of file
#ifndef SUMMACLIENTACTOR_H_
#define SUMMACLIENTACTOR_H_
#include "Job.h"
using namespace caf;
/**
* @brief First Actor that is spawned that is not the Coordinator Actor.
*
* @param self
* @return behavior
*/
behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
std::string fileManager, int outputStrucSize, std::string csvOut, caf::actor parent) {
self->state.start = std::chrono::high_resolution_clock::now();
// Set Job Variables
self->state.startGRU = startGRU;
self->state.numGRU = numGRU;
self->state.fileManager = fileManager;
self->state.parent = parent;
self->state.outputStrucSize = outputStrucSize;
self->state.csvOut = csvOut;
// Initalize global variables
initJob(self);
// Spawn the file_access_actor. This will return the number of forcing files we are working with
self->state.file_access_actor = self->spawn(file_access_actor, self->state.startGRU, self->state.numGRU,
self->state.outputStrucSize, self);
aout(self) << "Job Actor Initalized \n";
return {
[=](done_file_access_actor_init) {
// Init GRU Actors and the Output Structure
self->send(self->state.file_access_actor, initalize_outputStrucure_v);
self->send(self, init_hru_v);
},
[=](init_hru) {
initalizeGRU(self);
},
[=](done_init_hru) {
// aout(self) << "Done init\n";
self->state.GRUInit++;
if (self->state.GRUInit < self->state.numGRU) {
self->send(self, init_hru_v);
} else {
aout(self) << "All GRUs are initalized\n";
self->state.GRUInit = 0; // reset counter in case we have failures
runGRUs(self);
}
},
/**
* Message from HRUActor, HRU is done the current forcing file but is not
* done its simulation and needs the next file
* indxGRU - Index into the actor array so we know which HRU this is.
* NOTE: Naming of GRU and HRU is confusing as the plan is to further seperate
* NOTE: For NA_Domain GRU is used as that is how we index the forcing file
*/
[=](done_hru, int indxGRU, double totalDuration, double initDuration,
double forcingDuration, double runPhysicsDuration, double writeOutputDuration) {
aout(self) << "GRU " << indxGRU << " Done\n";
self->state.GRUList[indxGRU - 1]->doneRun(totalDuration, initDuration, forcingDuration,
runPhysicsDuration, writeOutputDuration);
self->state.GRUList[indxGRU - 1]->writeSuccess(self->state.successOutputFile);
self->state.numGRUDone++;
// Check if we are done
if (self->state.numGRUDone >= self->state.numGRU) {
self->state.numGRUDone = 0; // just in case there were failures
if (self->state.numGRUFailed == 0) {
self->send(self->state.file_access_actor, deallocate_structures_v);
} else {
restartFailures(self);
}
}
},
[=](deallocate_structures) {
// Calculate Run-Time
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration = std::chrono::duration_cast<std::chrono::seconds>
(self->state.end - self->state.start).count();
aout(self) << "Total Job Duration:";
aout(self) << " " << self->state.duration << " Seconds\n";
aout(self) << " " << self->state.duration / 60 << " Minutes\n";
aout(self) << " " << (self->state.duration / 60) / 60 << " Hours\n";
// Delete GRUs
for (auto GRU : self->state.GRUList) {
delete GRU;
}
self->state.GRUList.clear();
// Tell Parent we are done
self->send(self->state.parent, done_job_v, self->state.numGRUFailed);
self->quit();
},
[=](run_failure, int indxGRU, int err) {
aout(self) << "GRU:" << indxGRU << "Failed \n" <<
"Will have to wait until all GRUs are done before it can be re-tried\n";
self->state.numGRUFailed++;
self->state.numGRUDone++;
self->state.GRUList[indxGRU - 1]->updateFailed();
// check if we are the last hru to complete
if (self->state.numGRUDone >= self->state.numGRU) {
restartFailures(self);
}
},
};
}
void initJob(stateful_actor<job_state>* self) {
std::ofstream file;
self->state.successOutputFile += std::to_string(self->state.startGRU) += self->state.csvOut +=".csv";
file.open(self->state.successOutputFile, std::ios_base::out);
file << "GRU" << "," << "totalDuration" << "," << "initDuration" << "," <<
"forcingDuration" << "," << "runPhysicsDuration" << "," << "writeOutputDuration" <<
"," << "dt_init" << "," << "numAttemtps" << "\n";
file.close();
int totalGRUs = 0;
int totalHRUs = 0;
int numHRUs = 0;
int err = 0;
// aout(self) << "Initalizing Globals \n";
initGlobals(self->state.fileManager.c_str(),
&totalGRUs,
&totalHRUs,
&self->state.numGRU,
&numHRUs,
&self->state.startGRU,
&err);
if (err != 0) {
aout(self) << "Error: initGlobals" << std::endl;
self->quit();
}
}
void initalizeGRU(stateful_actor<job_state>* self) {
int startGRU = self->state.GRUList.size() + self->state.startGRU;
int indexGRU = self->state.GRUList.size() + 1; // Fortran reference starts at 1
auto gru = self->spawn(hru_actor, startGRU, indexGRU, self->state.file_access_actor,
self->state.outputStrucSize, self);
self->state.GRUList.push_back(new GRUinfo(startGRU, indexGRU, gru,
self->state.dt_init_start_factor, self->state.maxRunAttempts));
}
void runGRUs(stateful_actor<job_state>* self) {
for(auto gru : self->state.GRUList) {
if(!gru->isCompleted() && !gru->isFailed()) {
self->send(gru->getActor(), start_hru_v);
}
}
}
void restartFailures(stateful_actor<job_state>* self) {
self->state.numGRU = self->state.numGRUFailed;
self->state.numGRUFailed = 0;
self->state.numGRUDone = 0;
for(auto gru : self->state.GRUList) {
if (gru->isFailed() && !gru->isMaxAttemptsReached()) {
gru->updateFailed();
self->send(self->state.file_access_actor, reset_outputCounter_v, gru->getIndxGRU());
gru->updateDt_init();
auto newGRU = self->spawn(hru_actor, gru->getRefGRU(), gru->getIndxGRU(), self->state.file_access_actor,
self->state.outputStrucSize, self);
gru->updateGRU(newGRU);
gru->updateCurrentAttempt();
self->send(gru->getActor(), dt_init_factor_v, gru->getDt_init());
} else {
aout(self) << "We are done \n";
}
}
}
#endif
#ifndef SUMMAACTOR_H_
#define SUMMAACTOR_H_
#include "SummaManager.h"
using namespace caf;
/**
* Top Level Actor for Summa. This actor recieves the number of GRUs to compute from main and
* divides them into jobs that compute one at a time.
*
* @param startGRU - starting GRU for the simulation
* @param numGRU - total number of GRUs to compute
* @param fileManager - location of file information for SUMMA
* @return behavior
*/
behavior summa_actor(stateful_actor<summa_manager>* self, int startGRU, int numGRU, std::string fileManager, std::string csvOut) {
self->state.start = std::chrono::high_resolution_clock::now();
// Set Variables
self->state.startGRU = startGRU;
self->state.numGRU = numGRU;
self->state.fileManager = fileManager;
self->state.csvOut = csvOut;
// Create the job_actor and start SUMMA
spawnJob(self);
return {
[=](done_job, int numFailed) {
self->state.numFailed += numFailed;
aout(self) << "Job Done\n";
if (self->state.numGRU <= 0) {
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration = std::chrono::duration_cast<std::chrono::seconds>
(self->state.end - self->state.start).count();
aout(self) << "Total Program Duration:";
aout(self) << " " << self->state.duration << " Seconds\n";
aout(self) << " " << self->state.duration / 60 << " Minutes\n";
aout(self) << " " << (self->state.duration / 60) / 60 << " Hours\n";
aout(self) << "Program Finished \n";
} else {
// spawn a new job
spawnJob(self);
}
},
};
}
void spawnJob(stateful_actor<summa_manager>* self) {
// Ensure we do not start a job with too many GRUs
if (self->state.numGRU > self->state.maxGRUPerJob) {
// spawn the job actor
self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.maxGRUPerJob,
self->state.fileManager, self->state.outputStrucSize, self->state.csvOut, self);
// Update GRU count
self->state.numGRU = self->state.numGRU - self->state.maxGRUPerJob;
self->state.startGRU = self->state.startGRU + self->state.maxGRUPerJob;
} else {
self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.numGRU,
self->state.fileManager, self->state.outputStrucSize, self->state.csvOut, self);
self->state.numGRU = 0;
}
}
#endif
\ No newline at end of file
#ifndef SUMMAMANGER_H_
#define SUMMAMANGER_H_
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "JobActor.h"
#include <iostream>
#include <chrono>
#include <string>
struct summa_manager {
// Timing Information
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration;
// Program Parameters
int startGRU; // starting GRU for the simulation
int numGRU; // number of GRUs to compute
std::string fileManager;// path to the fileManager.txt file
// Information about the jobs
int maxGRUPerJob = 500; // maximum number of GRUs a job can compute at once
int numFailed = 0; // Number of jobs that have failed
int outputStrucSize = 500;
std::string csvOut;
caf::actor currentJob; // Reference to the current job actor
};
/**
* @brief Function to spawn a job actor
*/
void spawnJob(stateful_actor<summa_manager>* self);
#endif
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
#include "forcing_file_info.hpp"
Forcing_File_Info::Forcing_File_Info(int file_ID) {
this->file_ID = file_ID;
this->num_steps = 0;
this->is_loaded = false;
}
int Forcing_File_Info::getNumSteps() {
return this->num_steps;
}
bool Forcing_File_Info::isFileLoaded() {
return this->is_loaded;
}
void Forcing_File_Info::updateIsLoaded() {
this->is_loaded = true;
}
void Forcing_File_Info::updateNumSteps(int num_steps) {
this->num_steps = num_steps;
this->is_loaded = true;
}
This diff is collapsed.