#pragma once
#include "caf/all.hpp"
#include <iostream>
#include <fstream>
* Determine the state of the GRU
enum class gru_state {
int is_success(const gru_state& state);
* Class that holds information about the running GRUs. This class is held by the job actor
* The GRU/HRU actors that carry out the simulation are held by the GRU class
class GRU {
int global_gru_index; // The index of the GRU in the netcdf file
int local_gru_index; // The index of the GRU within this job
caf::actor gru_actor; // The actor for the GRU
// Modifyable Parameters
int dt_init_factor; // The initial dt for the GRU
double rel_tol; // The relative tolerance for the GRU
double abs_tol; // The absolute tolerance for the GRU
// Status Information
int attempts_left; // The number of attempts left for the GRU to succeed
gru_state state; // The state of the GRU
// Timing Information
double run_time = 0.0; // The total time to run the GRU
double init_duration = 0.0; // The time to initialize the GRU
double forcing_duration = 0.0; // The time to read the forcing data
double run_physics_duration = 0.0; // The time to run the physics
double write_output_duration = 0.0; // The time to write the output
// Constructor
GRU(int global_gru_index, int local_gru_index, caf::actor gru_actor, int dt_init_factor,
double rel_tol, double abs_tol, int max_attempts);
// Deconstructor
// Getters
int getGlobalGRUIndex();
int getLocalGRUIndex();
caf::actor getGRUActor();
double getRunTime();
double getInitDuration();
double getForcingDuration();
double getRunPhysicsDuration();
double getWriteOutputDuration();
double getRelTol();
double getAbsTol();
double getAttemptsLeft();
gru_state getStatus();
bool isFailed();
// Setters
void setRunTime(double run_time);
void setInitDuration(double init_duration);
void setForcingDuration(double forcing_duration);
void setRunPhysicsDuration(double run_physics_duration);
void setWriteOutputDuration(double write_output_duration);
void setRelTol(double rel_tol);
void setAbsTol(double abs_tol);
void setSuccess();
void setFailed();
void setRunning();
void decrementAttemptsLeft();
void setGRUActor(caf::actor gru_actor);
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "GRU.hpp"
#include "timing_info.hpp"
#include "settings_functions.hpp"
#include "global.hpp"
#include "json.hpp"
#include "hru_actor.hpp"
#include "message_atoms.hpp"
#include "file_access_actor.hpp"
#include <unistd.h>
#include <limits.h>
* Job Actor Fortran Functions
extern "C" {
void job_init_fortran(char const* file_manager, int* start_gru_index, int* num_gru, int* num_hru, int* err);
void deallocateJobActor(int* err);
* Job Actor state variables
namespace caf {
using chrono_time = std::chrono::time_point<std::chrono::system_clock>;
// Holds information about the GRUs
struct GRU_Container {
std::vector<GRU*> gru_list;
chrono_time gru_start_time; // Vector of start times for each GRU
int num_gru_done = 0;
int num_gru_failed = 0; // number of grus that are waiting to be restarted
int num_gru_in_run_domain = 0; // number of grus we are currently solving for
int run_attempts_left = 1; // current run attempt for all grus
struct job_state {
// Actor References
caf::actor file_access_actor; // actor reference for the file_access_actor
caf::actor parent; // actor reference to the top-level SummaActor
// Job Parameters
int start_gru; // Starting GRU for this job
int num_gru; // Number of GRUs for this job
int num_hru;
int max_run_attempts = 1; // Max number of attempts to solve a GRU
GRU_Container gru_container;
// Variables for GRU monitoring
int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
int num_gru_done = 0; // The number of GRUs that have completed
int num_gru_failed = 0; // Number of GRUs that have failed
// Timing Variables
TimingInfo job_timing;
std::string hostname;
// settings for all child actors (save in case we need to recover)
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
/** The Job Actor */
behavior job_actor(stateful_actor<job_state>* self,
int start_gru, int num_gru,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings,
actor parent);
* Functions for the Job Actor
/** Get the information for the GRUs that will be written to the netcdf file */
std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts,
std::vector<GRU*> &gru_list);
void handleGRUError(stateful_actor<job_state>* self, caf::actor src);
} // end namespace
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <string>
class Batch {
int batch_id_;
int start_hru_;
int num_hru_;
double run_time_;
double read_time_;
double write_time_;
bool assigned_to_actor_;
bool solved_;
Batch(int batch_id = -1, int start_hru = -1, int num_hru = -1);
// Getters
int getBatchID();
int getStartHRU();
int getNumHRU();
double getRunTime();
double getReadTime();
double getWriteTime();
bool isAssigned();
bool isSolved();
std::string getBatchInfoString();
// Setters
void updateRunTime(double run_time);
void updateReadTime(double read_time);
void updateWriteTime(double write_time);
void updateAssigned(bool boolean);
void updateSolved(bool boolean);
void printBatchInfo();
void writeBatchToFile(std::string csv_output, std::string hostname);
std::string toString();
void assignToActor(std::string hostname, caf::actor assigned_actor);
template <class Inspector>
friend bool inspect(Inspector& inspector, Batch& batch) {
return inspector.object(batch).fields(
inspector.field("batch_id", batch.batch_id_),
inspector.field("start_hru", batch.start_hru_),
inspector.field("num_hru", batch.num_hru_),
inspector.field("run_time", batch.run_time_),
inspector.field("read_time", batch.read_time_),
inspector.field("write_time", batch.write_time_),
inspector.field("assigned_to_actor", batch.assigned_to_actor_),
inspector.field("solved", batch.solved_));
\ No newline at end of file
#include "caf/all.hpp"
#pragma once
#include "client.hpp"
class Batch_Container {
int start_hru_;
int total_hru_count_;
int num_hru_per_batch_;
int batches_remaining_;
std::vector<Batch> batch_list_;
// Assemble the total number of HRUs given by the user into batches.
void assembleBatches();
// Creating the batch_manager will also create the batches
// with the two parameters that are passed in.
Batch_Container(int start_hru = 1, int total_hru_count = 0,
int num_hru_per_batch = 0);
// returns the size of the batch list
int getBatchesRemaining();
int getTotalBatches();
// Find an unsolved batch, set it to assigned and return it.
std::optional<Batch> getUnsolvedBatch();
// Update the batch status to solved and write the output to a file.
void updateBatch_success(Batch successful_batch, std::string output_csv, std::string hostname);
// Update the batch status but do not write the output to a file.
void updateBatch_success(Batch successful_batch);
// Update batch by id
void updateBatch_success(int batch_id, double run_time, double read_time,
double write_time);
// Update the batch to assigned = true
void setBatchAssigned(Batch batch);
// Update the batch to assigned = false
void setBatchUnassigned(Batch batch);
// Check if there are batches left to solve
bool hasUnsolvedBatches();
// TODO: Needs implementation
void updateBatch_failure(Batch failed_batch);
std::string getAllBatchInfoString();
double getTotalReadTime();
double getTotalWriteTime();
* A client has found to be disconnected. Unassign all batches
* that were assigned to the disconnected client. The client id
* is passed in as a parameter
void updatedBatch_disconnectedClient(int client_id);
* Create the csv file for the completed batches.
void inititalizeCSVOutput(std::string csv_output_name);
* @brief Print the batches from the batch list
void printBatches();
std::string getBatchesAsString();
* @brief Find the batch with the batch_id parameter
* update the batches assigned actor member variable to false
void updateBatchStatus_LostClient(int batch_id);
template <class Inspector>
friend bool inspect(Inspector& inspector, Batch_Container& batch_container) {
return inspector.object(batch_container).fields(
inspector.field("total_hru_count", batch_container.total_hru_count_),
inspector.field("num_hru_per_batch", batch_container.num_hru_per_batch_),
inspector.field("batches_remaining", batch_container.batches_remaining_),
inspector.field("batch_list", batch_container.batch_list_));
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <optional>
#include "batch.hpp"
class Client {
caf::actor client_actor;
std::string hostname;
int id;
int batches_solved;
std::optional<Batch> current_batch;
Client(int id = -1, caf::actor client_actor = nullptr, std::string hostname = "");
// ####################################################################
// Getters
// ####################################################################
caf::actor getActor();
int getID();
std::string getHostname();
std::optional<Batch> getBatch();
// ####################################################################
// Setters
// ####################################################################
void setBatch(std::optional<Batch> batch);
// ####################################################################
// Methods
// ####################################################################
std::string toString();
// Serialization so CAF can send an object of this class to another actor
template <class Inspector>
friend bool inspect(Inspector& inspector, Client& client) {
return inspector.object(client).fields(
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <vector>
#include "batch.hpp"
#include "client.hpp"
#include <optional>
class Client_Container {
std::vector<Client> client_list;
int id_counter;
// ####################################################################
// Getters
// ####################################################################
int getNumClients();
std::vector<Client> getClientList();
std::optional<Client> getClient(caf::actor_addr client_ref);
// ####################################################################
// Setters
// ####################################################################
void setBatchForClient(caf::actor client_ref, std::optional<Batch> batch);
// ####################################################################
// Methods
// ####################################################################
// add a new client to the client_list
void addClient(caf::actor client_actor, std::string hostname);
// remove a client from the client_list
void removeClient(Client client);
// return a client that is not solving a batch or return an empty optional
std::optional<Client> getIdleClient();
// Check if the client list is empty
bool isEmpty();
// pops client at the end of the list
Client removeClient_fromBack();
// return printable string
std::string toString();
template <class Inspector>
friend bool inspect(Inspector& inspector, Client_Container& client_container) {
return inspector.object(client_container).fields(
inspector.field("client_list", client_container.client_list),
inspector.field("id_counter", client_container.id_counter));
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "timing_info.hpp"
#include "settings_functions.hpp"
#include "batch_container.hpp"
#include <chrono>
#include <string>
#include <vector>
namespace caf {
struct job_timing_info {
std::vector<double> job_duration;
std::vector<double> job_read_duration;
std::vector<double> job_write_duration;
struct summa_actor_state {
// Timing Information For Summa-Actor
TimingInfo summa_actor_timing;
struct job_timing_info timing_info_for_jobs;
// Program Parameters
int startGRU; // starting GRU for the simulation
int numGRU; // number of GRUs to compute
int fileGRU; // number of GRUs in the file
std::string configPath; // path to the fileManager.txt file
int numFailed = 0; // Number of jobs that have failed
caf::actor currentJob; // Reference to the current job actor
caf::actor parent;
// Batches
Batch_Container batch_container;
int current_batch_id;
// settings for all child actors (save in case we need to recover)
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
behavior summa_actor(stateful_actor<summa_actor_state>* self,
int startGRU, int numGRU,
Summa_Actor_Settings summa_actor_settings,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings, actor parent);
void spawnJob(stateful_actor<summa_actor_state>* self);
} // namespace caf
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "summa_server.hpp"
#include <string>
#include <unistd.h>
#include <limits.h>
namespace caf {
// Inital behaviour that waits to connect to the lead server
behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings,
Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings);
// Function that is called ot connect to the lead server
void connecting_backup(stateful_actor<summa_server_state>* self, const std::string& host, uint16_t port);
behavior summa_backup_server(stateful_actor<summa_server_state>* self, const actor& server_actor);
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "settings_functions.hpp"
#include "batch.hpp"
#include "summa_actor.hpp"
#include "message_atoms.hpp"
#include <string>
#include <optional>
#include <unistd.h>
#include <limits.h>
namespace caf {
struct summa_client_state {
strong_actor_ptr current_server = nullptr;
actor current_server_actor;
std::vector<strong_actor_ptr> servers;
std::string hostname;
actor summa_actor_ref;
uint16_t port;
int batch_id;
int client_id; // id held by server
bool running = false; // initalized to false - flipped to true when client returns behavior summa_client
// tuple is the actor ref and hostname of the backup server
std::vector<std::tuple<caf::actor, std::string>> backup_servers_list;
Batch current_batch;
bool saved_batch = false;
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
behavior summa_client_init(stateful_actor<summa_client_state>* self);
behavior summa_client(stateful_actor<summa_client_state>* self);
void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port);
void findLeadServer(stateful_actor<summa_client_state>* self, strong_actor_ptr serv);
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "batch.hpp"
#include "batch_container.hpp"
#include "client.hpp"
#include "client_container.hpp"
#include "settings_functions.hpp"
#include "global.hpp"
#include "message_atoms.hpp"
#include <string>
#include <optional>
#include <thread>
#include <chrono>
#include <iostream>
namespace caf {
struct summa_server_state {
strong_actor_ptr current_server; // if server is a backup then this will be set to the lead server
actor current_server_actor;
std::string hostname;
std::string csv_file_path;
std::string csv_output_name = "/batch_results.csv";
Client_Container client_container;
Batch_Container batch_container;
// Actor Reference, Hostname
std::vector<std::tuple<caf::actor, std::string>> backup_servers_list;
// Settings Structures
Distributed_Settings distributed_settings;
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
// Summa Server setup behaviour - initializes the state for the server
behavior summa_server_init(stateful_actor<summa_server_state>* self,
Distributed_Settings distributed_settings,
Summa_Actor_Settings summa_actor_settings,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings);
// Summa Server behaviour - handles messages from clients
behavior summa_server(stateful_actor<summa_server_state>* self);
// Summa Server backup behaviour - handles the exit messages for clients
behavior summa_server_exit(stateful_actor<summa_server_state>* self);
// Creates the csv file that holds the results of the batches
void initializeCSVOutput(std::string csv_output_path);
// Send all connected actors the updated backup servers list
void sendAllBackupServersList(stateful_actor<summa_server_state>* self);
// Look for the lost backup server in the backup servers list and remove it
void findAndRemoveLostBackupServer(stateful_actor<summa_server_state>* self, actor_addr lost_backup_server);
// Check for an idle client to send the failed or next batch we find that is not assigned
void checkForIdleClients(stateful_actor<summa_server_state>* self);
void notifyBackupServersOfRemovedClient(stateful_actor<summa_server_state>* self, Client client);
// Finds the batch the lost client was working on and reassigns it to another client if available
// If no client is available then the batch is added back to the list to be reassigned later
void resolveLostClient(stateful_actor<summa_server_state>* self, Client client);
// Removes the backup server from the list of backup servers
// All connected actors are then notified of the change
void resolveLostBackupServer(stateful_actor<summa_server_state>* self, const down_msg& dm);
// Convience function to keep code clean - just does what you think it does
void printRemainingBatches(stateful_actor<summa_server_state>* self);
} // namespace caf
#### parent directory of the 'build' directory ####
# F_MASTER = /home/kklenk/Summa-Actors
# #### fortran compiler ####
# FC = gfortran
# #### C++ compiler ####
# CC=g++
# #### Includes AND Libraries ####
# INCLUDES = -I/usr/include
# LIBRARIES = -L/usr/lib -lnetcdff -lnetcdf -lopenblas
# ACTORS_INCLUDES = -I/usr/include -I/usr/local/include
# ACTORS_LIBRARIES = -L/home/linuxbrew.linuxbrew/lib -L/home/kklenk/Summa-Actors/build -lcaf_core -lcaf_io -lsumma -lopenblas -lnetcdff
# Production runs
FLAGS_NOAH = -O3 -ffree-form -ffree-line-length-none -fmax-errors=0 -fPIC
FLAGS_COMM = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
FLAGS_SUMMA = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
# # Debug runs
# FLAGS_NOAH = -pg -g -O0 -ffree-form -ffree-line-length-none -fmax-errors=0 -fbacktrace -Wno-unused -Wno-unused-dummy-argument -fPIC
# FLAGS_COMM = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
# FLAGS_SUMMA = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
# FLAGS_ACTORS = -pg -g -O0 -Wall
# PART 1: Define directory paths
# Core directory that contains source code
F_KORE_DIR = $(F_MASTER)/build/source
# Location of the compiled modules
MOD_PATH = $(F_MASTER)/build
# Define the directory for the executables
# PART 2: Assemble all of the SUMMA sub-routines
# Define directories
NOAHMP_DIR = $(F_KORE_DIR)/noah-mp
FILE_ACCESS_DIR = $(INTERFACE_DIR)/file_access_actor
# utilities
nrtype.f90 \
f2008funcs.f90 \
NRUTIL = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_NRUTIL))
# Numerical recipes procedures
# NOTE: all numerical recipes procedures are now replaced with free versions
expIntegral.f90 \
NRPROC = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_NRPROC))
# Hook-up modules (set files and directory paths)
ascii_util.f90 \
HOOKUP = $(patsubst %, $(HOOKUP_DIR)/%, $(SUMMA_HOOKUP))
# Data modules
multiconst.f90 \
var_lookup.f90 \
data_types.f90 \
globalData.f90 \
flxMapping.f90 \
get_ixname.f90 \
popMetadat.f90 \
DATAMS = $(patsubst %, $(DSHARE_DIR)/%, $(SUMMA_DATAMS))
# utility modules
time_utils.f90 \
mDecisions.f90 \
snow_utils.f90 \
soil_utils.f90 \
updatState.f90 \
UTILMS = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_UTILMS))
# Model guts
MODGUT = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_MODGUT))
# Solver
vegPhenlgy.f90 \
diagn_evar.f90 \
stomResist.f90 \
groundwatr.f90 \
vegSWavRad.f90 \
vegNrgFlux.f90 \
ssdNrgFlux.f90 \
vegLiqFlux.f90 \
snowLiqFlx.f90 \
soilLiqFlx.f90 \
bigAquifer.f90 \
computFlux.f90 \
computResid.f90 \
computJacob.f90 \
eval8summa.f90 \
summaSolve.f90 \
systemSolv.f90 \
varSubstep.f90 \
opSplittin.f90 \
SOLVER = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_SOLVER))
# Interface code for Fortran-C++
cppwrap_datatypes.f90 \
cppwrap_auxiliary.f90 \
cppwrap_metadata.f90 \
initOutputStruc.f90 \
deallocateOutputStruc.f90 \
# Define routines for SUMMA preliminaries
conv_funcs.f90 \
sunGeomtry.f90 \
convE2Temp.f90 \
allocspaceActors.f90 \
checkStruc.f90 \
childStruc.f90 \
ffile_info.f90 \
read_attribute.f90 \
read_pinit.f90 \
pOverwrite.f90 \
read_paramActors.f90 \
paramCheck.f90 \
check_icondActors.f90 \
# allocspace.f90
PRELIM = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_PRELIM))
module_model_constants.F \
module_sf_noahutl.F \
module_sf_noahlsm.F \
NOAHMP = $(patsubst %, $(NOAHMP_DIR)/%, $(SUMMA_NOAHMP))
# Define routines for the SUMMA model runs
indexState.f90 \
getVectorz.f90 \
updateVars.f90 \
var_derive.f90 \
read_forcingActors.f90 \
access_write.f90 \
derivforce.f90 \
snowAlbedo.f90 \
canopySnow.f90 \
tempAdjust.f90 \
snwCompact.f90 \
layerMerge.f90 \
layerDivide.f90 \
volicePack.f90 \
MODRUN = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_MODRUN))
# Define NetCDF routines
# OutputStrucWrite is not a netcdf subroutine and should be
# moved
netcdf_util.f90 \
def_output.f90 \
outputStrucWrite.f90 \
writeOutput.f90 \
NETCDF = $(patsubst %, $(NETCDF_DIR)/%, $(SUMMA_NETCDF))
# ... stitch together common programs
# ... stitch together SUMMA programs
# Define the driver routine
summaActors_type.f90 \
summaActors_util.f90 \
summaActors_globalData.f90 \
summaActors_init.f90 \
SummaActors_setup.f90 \
summaActors_restart.f90 \
summaActors_forcing.f90 \
SummaActors_modelRun.f90 \
summaActors_alarms.f90 \
DRIVER = $(patsubst %, $(DRIVER_DIR)/%, $(SUMMA_DRIVER))
ACTORC = $(F_KORE_DIR)/actors/
ACTOR_TEST = $(F_KORE_DIR)/testing/
# PART 3: compilation
# Compile
all: lib main
lib: compile_noah compile_comm compile_summa link clean
main: actors actorsLink actorsClean
test: actors_test actors_testLink actorsClean
# compile Noah-MP routines
# compile common routines
# compile SUMMA routines
# generate library
$(FC) -shared *.o -o
# compile the c++ portion of SummaActors
$(CC) -o summaMain *.o $(ACTORS_LIBRARIES)
$(CC) -o summaTest *.o $(ACTORS_LIBRARIES)
rm *.o
# Remove object files
rm -f *.o *.mod soil_veg_gen_parm__genmod.f90
rm *.so
#include "../interface/fortran_dataTypes.h"
#include "../interface/file_access_actor/fileAccess_subroutine_wrappers.h"
#include "caf/all.hpp"
#include "messageAtoms.h"
#include "OutputManager.h"
#include <vector>
#include <chrono>
#include "global.h"
class forcingFile {
int fileID; // which file are we relative the forcing file list saved in fortran
int numSteps; // the number of steps in this forcing file
bool isLoaded; // is this file actually loaded in to RAM yet.
forcingFile(int fileID) {
this->fileID = fileID;
this->numSteps = 0;
this->isLoaded = false;
int getNumSteps() {
return this->numSteps;
bool isFileLoaded() {
return this->isLoaded;
void updateIsLoaded() {
this->isLoaded = true;
void updateNumSteps(int numSteps) {
this->numSteps = numSteps;
this->isLoaded = true;
struct file_access_state {
// Variables set on Spwan
caf::actor parent;
int startGRU;
int numGRU;
void *handle_forcFileInfo = new_handle_file_info(); // Handle for the forcing file information
void *handle_ncid = new_handle_var_i(); // output file ids
OutputManager *output_manager;
int num_vectors_in_output_manager = 8;
int num_steps;
int outputStrucSize;
int stepsInCurrentFile;
int numFiles;
int filesLoaded;
int err = 0;
std::vector<forcingFile> forcFileList; // list of steps in file
std::vector<bool> outputFileInitHRU;
std::chrono::time_point<std::chrono::system_clock> readStart;
std::chrono::time_point<std::chrono::system_clock> readEnd;
double readDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> writeStart;
std::chrono::time_point<std::chrono::system_clock> writeEnd;
double writeDuration = 0.0;
\ No newline at end of file
#include "FileAccess.h"
using namespace caf;
void initalizeFileAccessActor(stateful_actor<file_access_state>* self);
int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, int numStepsToWrite, int returnMessage, caf::actor actorRef);
int readForcing(stateful_actor<file_access_state>* self, int currentFile);
int write(stateful_actor<file_access_state>* self, int listIndex);
behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU,
int outputStrucSize, actor parent) {
// Set File_Access_Actor variables
self->state.parent = parent;
self->state.numGRU = numGRU;
self->state.startGRU = startGRU;
self->state.outputStrucSize = outputStrucSize;
aout(self) << "\nFile Access Actor Started\n";
return {
[=](initalize_outputStructure) {
aout(self) << "Initalizing Output Structure" << std::endl;
Init_OutputStruct(self->state.handle_forcFileInfo, &self->state.outputStrucSize,
&self->state.numGRU, &self->state.err);
[=](write_param, int indxGRU, int indxHRU) {
int err;
err = 0;
Write_HRU_Param(self->state.handle_ncid, &indxGRU, &indxHRU, &err);
if (err != 0) {
aout(self) << "ERROR: Write_HRU_PARAM -- For HRU = " << indxHRU << "\n";
[=](access_forcing, int currentFile, caf::actor refToRespondTo) {
// aout(self) << "Received Current FIle = " << currentFile << std::endl;
if (currentFile <= self->state.numFiles) {
if(self->state.forcFileList[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1
// aout(self) << "ForcingFile Already Loaded \n";
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
} else {
self->state.readStart = std::chrono::high_resolution_clock::now();
// Load the file
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
// Check if we have loaded all forcing files
if(self->state.filesLoaded <= self->state.numFiles) {
self->send(self, access_forcing_internal_v, currentFile + 1);
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
} else {
aout(self) << currentFile << " is larger than expected" << std::endl;
[=](access_forcing_internal, int currentFile) {
if (self->state.filesLoaded <= self->state.numFiles &&
currentFile <= self->state.numFiles) {
// aout(self) << "Loading in background, File:" << currentFile << "\n";
if (self->state.forcFileList[currentFile - 1].isFileLoaded()) {
aout(self) << "File Loaded when shouldn't be \n";
self->state.readStart = std::chrono::high_resolution_clock::now();
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
self->send(self, access_forcing_internal_v, currentFile + 1);
} else {
aout(self) << "All Forcing Files Loaded \n";
[=](write_output, int indxGRU, int indxHRU, int numStepsToWrite,
caf::actor refToRespondTo) {
int err;
int returnMessage = 9999;
err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite, returnMessage, refToRespondTo);
if (err != 0) {
aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n";
[=](read_and_write, int indxGRU, int indxHRU, int numStepsToWrite, int currentFile,
caf::actor refToRespondTo) {
int err;
err = readForcing(self, currentFile);
if (err != 0)
err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite, currentFile, refToRespondTo);
if (err != 0)
aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n";
[=](run_failure, int indxGRU) {
int listIndex;
// update the list in Fortran
listIndex = self->state.output_manager->decrementMaxSize(indxGRU);
// Check if this list is now full
if(self->state.output_manager->isFull(listIndex)) {
write(self, listIndex);
* Message from JobActor
* OutputManager needs to be adjusted so the failed HRUs can run again
[=](restart_failures) {
[=](deallocate_structures) {
aout(self) << "Deallocating Structure" << std::endl;
FileAccessActor_DeallocateStructures(self->state.handle_forcFileInfo, self->state.handle_ncid);
self->state.readDuration = self->state.readDuration / 1000; // Convert to milliseconds
self->state.readDuration = self->state.readDuration / 1000; // Convert to seconds
self->state.writeDuration = self->state.writeDuration / 1000; // Convert to milliseconds
self->state.writeDuration = self->state.writeDuration / 1000; // Convert to milliseconds
self->send(self->state.parent, file_access_actor_done_v, self->state.readDuration,
[=](reset_outputCounter, int indxGRU) {
void initalizeFileAccessActor(stateful_actor<file_access_state>* self) {
int indx = 1;
int err = 0;
// aout(self) << "Set Up the forcing file" << std::endl;
ffile_info_C(&indx, self->state.handle_forcFileInfo, &self->state.numFiles, &err);
if (err != 0) {
aout(self) << "Error: ffile_info_C - File_Access_Actor \n";
std::string function = "ffile_info_C";
self->send(self->state.parent, file_access_actor_err_v, function);
mDecisions_C(&self->state.num_steps, &err);
if (err != 0) {
aout(self) << "Error: mDecisions - FileAccess Actor \n";
std::string function = "mDecisions_C";
self->send(self->state.parent, file_access_actor_err_v, function);
if (err != 0) {
aout(self) << "ERROR: read_pinit_C\n";
std::string function = "read_pinit_C";
self->send(self->state.parent, file_access_actor_err_v, function);
if (err != 0) {
aout(self) << "ERROR: read_vegitationTables\n";
std::string function = "read_vegitationTables";
self->send(self->state.parent, file_access_actor_err_v, function);
Create_Output_File(self->state.handle_ncid, &self->state.numGRU, &self->state.startGRU, &err);
if (err != 0) {
aout(self) << "ERROR: Create_OutputFile\n";
std::string function = "Create_Output_File";
self->send(self->state.parent, file_access_actor_err_v, function);
// Initalize the output Structure
aout(self) << "Initalizing Output Structure" << std::endl;
Init_OutputStruct(self->state.handle_forcFileInfo, &self->state.outputStrucSize,
&self->state.numGRU, &self->state.err);
// Initalize the output manager
self->state.output_manager = new OutputManager(self->state.num_vectors_in_output_manager, self->state.numGRU);
self->send(self->state.parent, done_file_access_actor_init_v);
// initalize the forcingFile array
self->state.filesLoaded = 0;
for (int i = 1; i <= self->state.numFiles; i++) {
int write(stateful_actor<file_access_state>* self, int listIndex) {
int err = 0;
int minGRU = self->state.output_manager->getMinIndex(listIndex);
int maxGRU = self->state.output_manager->getMaxIndex(listIndex);
int numStepsToWrite = self->state.output_manager->getNumStepsToWrite(listIndex);
&numStepsToWrite, &minGRU,
&maxGRU, &err);
// Pop The actors and send them the correct continue message
while(!self->state.output_manager->isEmpty(listIndex)) {
std::tuple<caf::actor, int> actor = self->state.output_manager->popActor(listIndex);
if (get<1>(actor) == 9999) {
self->send(get<0>(actor), done_write_v);
} else {
self->send(get<0>(actor), run_hru_v,
self->state.forcFileList[get<1>(actor) - 1].getNumSteps());
return 0;
int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU,
int numStepsToWrite, int returnMessage, caf::actor actorRef) {
self->state.writeStart = std::chrono::high_resolution_clock::now();
if (debug) {
aout(self) << "Recieved Write Request From GRU: " << indxGRU << "\n";
int err = 0;
int listIndex = self->state.output_manager->addActor(actorRef, indxGRU, returnMessage, numStepsToWrite);
if (self->state.output_manager->isFull(listIndex)) {
if (debug) {
aout(self) << "List with Index " << listIndex << " is full and ready to write\n";
aout(self) << "Minimum GRU Index = " << self->state.output_manager->getMinIndex(listIndex) << "\n";
aout(self) << "Maximum GRU Index = " << self->state.output_manager->getMaxIndex(listIndex) << "\n";
err = write(self, listIndex);
} else {
if (debug) {
aout(self) << "List with Index " << listIndex << " is not full yet waiting to write\n";
aout(self) << "Size of list is " << self->state.output_manager->getSize(listIndex) << "\n";
self->state.writeEnd = std::chrono::high_resolution_clock::now();
self->state.writeDuration += calculateTime(self->state.writeStart, self->state.writeEnd);
return err;
int readForcing(stateful_actor<file_access_state>* self, int currentFile) {
// Check if we have already loaded this file
if(self->state.forcFileList[currentFile -1].isFileLoaded()) {
if (debug)
aout(self) << "ForcingFile Already Loaded \n";
return 0;
} else {
// File Needs to be loaded
self->state.readStart = std::chrono::high_resolution_clock::now();
// Load the file
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
if (debug)
aout(self) << "ERROR: FileAccessActor_ReadForcing\n" <<
"currentFile = " << currentFile << "\n" << "number of steps = "
<< self->state.stepsInCurrentFile << "\n";
return -1;
} else {
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
return 0;
\ No newline at end of file
#ifndef GRUinfo_H_
#define GRUinfo_H_
#include "caf/all.hpp"
#include <iostream>
#include <fstream>
#include "Job.h"
class GRUinfo {
int refGRU; // This will be the same as the refGRU
int indxGRU;
caf::actor GRU;
// Variable to update
int dt_init;
// Completed Information
int currentAttempt;
int maxAttempts;
bool completed;
bool failed;
// Timing information for the GRU
double runTime;
double initDuration;
double forcingDuration;
double runPhysicsDuration;
double writeOutputDuration;
// Constructor
GRUinfo(int refGRU, int indxGRU, caf::actor gru, int dt_init, int maxAttempts) {
this->refGRU = refGRU;
this->indxGRU = indxGRU;
this->GRU = gru;
this->dt_init = dt_init;
this->currentAttempt = 1;
this->maxAttempts = maxAttempts;
this->completed = false;
this->failed = false;
// Deconstructor
// Getters
int getRefGRU() {
return this->refGRU;
int getIndxGRU() {
return this->indxGRU;
int getDt_init() {
return this->dt_init;
caf::actor getActor() {
return GRU;
// Setters
void updateGRU(caf::actor gru) {
this->GRU = gru;
void updateFailed() {
if (this->failed) {
this->failed = false;
} else {
this->failed = true;
void updateCompletedToTrue(){
this->completed = true;
void updateDt_init() {
this->dt_init = this->dt_init * 2;
void updateCurrentAttempt() {
// Methods that return Booleans
bool isMaxAttemptsReached() {
return this->maxAttempts <= this->currentAttempt;
bool isFailed() {
return this->failed;
bool isCompleted() {
return this->completed;
void doneRun(double runTime, double initDuration, double forcingDuration,
double runPhysicsDuration, double writeOutputDuration) {
this->runTime = runTime;
this->initDuration = initDuration;
this->forcingDuration = forcingDuration;
this->runPhysicsDuration = runPhysicsDuration;
this->writeOutputDuration = writeOutputDuration;
// Methods for writing statistics to a file
void writeSuccess(std::string fileName) {
std::ofstream file;, std::ios_base::app);
file << this->refGRU << ","
<< this->runTime << ","
<< this->initDuration << ","
<< this->forcingDuration << ","
<< this->runPhysicsDuration << ","
<< this->writeOutputDuration << ","
<< this->dt_init << ","
<< this->currentAttempt << "\n";
void writeFail(std::string fileName) {
std::ofstream file;, std::ios_base::app);
file << this->refGRU << ","
<< this->dt_init << ","
<< this->currentAttempt << "\n";
void printOutput() {
std::cout << "\nGRU = " << this->refGRU << "\n" <<
"RunTime = " << this->runTime << "\n" <<
"initDuration = " << this->initDuration << "\n" <<
"forcingDuration = " << this->forcingDuration << "\n" <<
"runPhysicsDuration = " << this->runPhysicsDuration << "\n" <<
"writeOutputDuration = " << this->writeOutputDuration << "\n\n";
\ No newline at end of file
#ifndef HRU_H_
#define HRU_H_
#include "caf/all.hpp"
#include "../interface/fortran_dataTypes.h"
#include "../interface/hru_actor/hru_subroutine_wrappers.h"
#include "messageAtoms.h"
#include <fstream>
#include <string>
#include <typeinfo>
#include <stdio.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <chrono>
#include <iostream>
#include "json.hpp"
#include "global.h"
using namespace caf;
struct hru_state {
// Actor References
caf::actor file_access_actor;
caf::actor parent;
// Info about which HRU we are and our indexes
// into global structures in Fortran
int indxHRU; // index for hru part of derived types in FORTRAN
int indxGRU; // index for gru part of derived types in FORTRAN
int refGRU; // The actual ID of the GRU we are
// Variables for output/forcing structures
int outputStrucSize;
int outputStep;
int stepsInCurrentFFile;
int forcingFileStep;
int currentForcingFile = 1;
// statistics structures
void *handle_forcStat = new_handle_var_dlength(); // model forcing data
void *handle_progStat = new_handle_var_dlength(); // model prognostic (state) variables
void *handle_diagStat = new_handle_var_dlength(); // model diagnostic variables
void *handle_fluxStat = new_handle_var_dlength(); // model fluxes
void *handle_indxStat = new_handle_var_dlength(); // model indices
void *handle_bvarStat = new_handle_var_dlength(); // basin-average variables
// primary data structures (scalars)
void *handle_timeStruct = new_handle_var_i(); // model time data
void *handle_forcStruct = new_handle_var_d(); // model forcing data
void *handle_attrStruct = new_handle_var_d(); // local attributes for each HRU
void *handle_typeStruct = new_handle_var_i(); // local classification of soil veg etc. for each HRU
void *handle_idStruct = new_handle_var_i8();
// primary data structures (variable length vectors)
void *handle_indxStruct = new_handle_var_ilength(); // model indices
void *handle_mparStruct = new_handle_var_dlength(); // model parameters
void *handle_progStruct = new_handle_var_dlength(); // model prognostic (state) variables
void *handle_diagStruct = new_handle_var_dlength(); // model diagnostic variables
void *handle_fluxStruct = new_handle_var_dlength(); // model fluxes
// basin-average structures
void *handle_bparStruct = new_handle_var_d(); // basin-average parameters
void *handle_bvarStruct = new_handle_var_dlength(); // basin-average variables
// ancillary data structures
void *handle_dparStruct = new_handle_var_d(); // default model parameters
// Local hru data
void *handle_ncid = new_handle_var_i(); // output file ids
void *handle_statCounter = new_handle_var_i();
void *handle_outputTimeStep = new_handle_var_i();
void *handle_resetStats = new_handle_flagVec();
void *handle_finalizeStats = new_handle_flagVec();
void *handle_oldTime = new_handle_var_i();
void *handle_refTime = new_handle_var_i();
void *handle_finshTime = new_handle_var_i();
void *handle_startTime = new_handle_var_i();
// Misc Variables
int timestep = 1; // Current Timestep of HRU simulation
int computeVegFlux; // flag to indicate if we are computing fluxes over vegetation
double dt_init; // used to initialize the length of the sub-step for each HRU
double upArea; // area upslope of each HRU
int num_steps = 0; // number of time steps
int forcingStep; // index of current time step in current forcing file
int iFile; // index of current forcing file from forcing file list
int dt_init_factor = 1; // factor of dt_init (coupled_em)
bool printOutput;
int outputFrequency;
// Julian Day variables
double fracJulDay;
double tmZoneOffsetFracDay;
int yearLength;
int err = 0; // error conotrol
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration = 0.0;
std::chrono::time_point<std::chrono::system_clock> initStart;
std::chrono::time_point<std::chrono::system_clock> initEnd;
double initDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> forcingStart;
std::chrono::time_point<std::chrono::system_clock> forcingEnd;
double forcingDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> runPhysicsStart;
std::chrono::time_point<std::chrono::system_clock> runPhysicsEnd;
double runPhysicsDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> writeOutputStart;
std::chrono::time_point<std::chrono::system_clock> writeOutputEnd;
double writeOutputDuration = 0.0;
* @brief Get the settings from the settings JSON file
* @param self Actor State
* @param configPath Path to the directory that contains the settings file
void parseSettings(stateful_actor<hru_state>* self, std::string configPath);
Function to initalize the HRU for running
void Initialize_HRU(stateful_actor<hru_state>* self);
Function runs all of the hru time_steps
int Run_HRU(stateful_actor<hru_state>* self);
bool check_HRU(stateful_actor<hru_state>* self, int err);
void initalizeTimeVars(stateful_actor<hru_state>* self);
void finalizeTimeVars(stateful_actor<hru_state>* self);
void deallocateHRUStructures(stateful_actor<hru_state>* self);
void printOutput(stateful_actor<hru_state>* self);
\ No newline at end of file
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "string.h"
#include <unistd.h>
#include <vector>
#include "FileAccessActor.h"
#include "../interface/job_actor/job_subroutine_wrappers.h"
#include "HRUActor.h"
#include <chrono>
#include "messageAtoms.h"
#include "GRUinfo.h"
#include <iostream>
#include <fstream>
#include <sys/stat.h>
#include "json.hpp"
#include "global.h"
struct job_state {
// Actor References
caf::actor file_access_actor; // actor reference for the file_access_actor
caf::actor parent; // actor reference to the top-level SummaActor
// Job Parameters
int startGRU; // Starting GRU for this job
int numGRU; // Number of GRUs for this job
std::string configPath;
std::string fileManager; // Path of the fileManager.txt file
// Variables for GRU monitoring
int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
int maxRunAttempts = 3; // Max number of attemtps to solve a GRU
std::vector<GRUinfo*> GRUList; // List of all GRUs under this job actor
int numGRUDone = 0; // The number of GRUs that have completed
int GRUInit = 0; // Number of GRUs initalized
int err = 0; // Error Code
int numGRUFailed = 0; // Number of GRUs that have failed
int outputStrucSize;
// Timing Variables
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration;
// Output File Names for Timings
bool outputCSV;
std::string csvOut;
std::string csvPath;
std::string successOutputFile;
std::string failedOutputFile = "failedHRU";
std::string fileAccessActorStats = "fileAccessActor.csv";
int parseSettings(stateful_actor<job_state>* self, std::string configPath);
void initJob(stateful_actor<job_state>* self);
void initalizeGRU(stateful_actor<job_state>* self);
void runGRUs(stateful_actor<job_state>* self);
void restartFailures(stateful_actor<job_state>* self);
\ No newline at end of file
#include "Job.h"
using namespace caf;
using json = nlohmann::json;
* @brief First Actor that is spawned that is not the Coordinator Actor.
* @param self
* @return behavior
behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
std::string configPath, int outputStrucSize, caf::actor parent) {
self->state.start = std::chrono::high_resolution_clock::now();
// Set Job Variables
self->state.startGRU = startGRU;
self->state.numGRU = numGRU;
self->state.configPath = configPath;
self->state.parent = parent;
self->state.outputStrucSize = outputStrucSize;
if (parseSettings(self, configPath) == -1) {
aout(self) << "ERROR WITH JSON SETTINGS FILE!!!\n";
} else {
aout(self) << "\nSETTINGS FOR JOB_ACTOR\n" <<
"File Manager Path = " << self->state.fileManager << "\n" <<
"outputCSV = " << self->state.outputCSV << "\n";
if (self->state.outputCSV) {
aout(self) << "csvPath = " << self->state.csvPath << "\n";
// Initalize global variables
// Spawn the file_access_actor. This will return the number of forcing files we are working with
self->state.file_access_actor = self->spawn(file_access_actor, self->state.startGRU, self->state.numGRU,
self->state.outputStrucSize, self);
aout(self) << "Job Actor Initalized \n";
return {
// *******************************************************************************************
// *********************************** INTERFACE WITH HRU ************************************
// *******************************************************************************************
[=](init_hru) {
[=](done_init_hru) {
if (debug) {
aout(self) << "Done Init\n";
// aout(self) << "Done init\n";
if (self->state.GRUInit < self->state.numGRU) {
self->send(self, init_hru_v);
} else {
aout(self) << "All GRUs are initalized\n";
self->state.GRUInit = 0; // reset counter in case we have failures
[=](done_hru, int indxGRU, double totalDuration, double initDuration,
double forcingDuration, double runPhysicsDuration, double writeOutputDuration) {
aout(self) << "GRU:" << self->state.GRUList[indxGRU - 1]->getRefGRU()
<< "indxGRU = " << indxGRU << "Done \n";
self->state.GRUList[indxGRU - 1]->doneRun(totalDuration, initDuration, forcingDuration,
runPhysicsDuration, writeOutputDuration);
if (self->state.outputCSV) {
self->state.GRUList[indxGRU - 1]->writeSuccess(self->state.successOutputFile);
// Check if we are done
if (self->state.numGRUDone >= self->state.numGRU) {
self->state.numGRUDone = 0; // just in case there were failures
if (self->state.numGRUFailed == 0) {
self->send(self->state.file_access_actor, deallocate_structures_v);
} else {
[=](run_failure, caf::actor actorRef, int indxGRU, int err) {
aout(self) << "GRU:" << self->state.GRUList[indxGRU - 1]->getRefGRU()
<< "indxGRU = " << indxGRU << "Failed \n"
<< "Will have to wait until all GRUs are done before it can be re-tried\n";
self->state.GRUList[indxGRU - 1]->updateFailed();
// Let the file_access_actor know this actor failed
self->send(self->state.file_access_actor, run_failure_v, indxGRU);
// check if we are the last hru to complete
if (self->state.numGRUDone >= self->state.numGRU) {
// *******************************************************************************************
// ******************************* END INTERFACE WITH HRU ************************************
// *******************************************************************************************
// *******************************************************************************************
// ****************************** INTERFACE WITH FileAccessActor *****************************
// *******************************************************************************************
[=](done_file_access_actor_init) {
// Init GRU Actors and the Output Structure
// self->send(self->state.file_access_actor, initalize_outputStructure_v);
self->send(self, init_hru_v);
[=](file_access_actor_done, double readDuration, double writeDuration) {
int err = 0;
if (debug) {
aout(self) << "\n********************************\n";
aout(self) << "Outputing Timing Info for HRUs\n";
for(auto gru : self->state.GRUList) {
aout(self) << "********************************\n";
// Delete GRUs
for (auto GRU : self->state.GRUList) {
delete GRU;
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration = calculateTime(self->state.start, self->state.end);
self->state.duration = self->state.duration / 1000; // Convert to milliseconds
aout(self) << "\nTotal Job Duration:\n";
aout(self) << " " << self->state.duration / 1000 << " Seconds\n";
aout(self) << " " << (self->state.duration / 1000) / 60 << " Minutes\n";
aout(self) << " " << ((self->state.duration / 1000) / 60) / 60 << " Hours\n";
aout(self) << "\nReading Duration:\n";
aout(self) << " " << readDuration << " Seconds\n";
aout(self) << "\nWriting Duration:\n";
aout(self) << " " << writeDuration << " Seconds\n\n";
// Tell Parent we are done
self->send(self->state.parent, done_job_v, self->state.numGRUFailed);
[=](file_access_actor_err, std::string function) {
aout(self) << "Failure in File Access Actor in function" << function << "\n";
aout(self) << "Letting Parent Know we are quitting\n";
self->send(self->state.parent, err_v);
// *******************************************************************************************
// ************************** END INTERFACE WITH FileAccessActor *****************************
// *******************************************************************************************
int parseSettings(stateful_actor<job_state>* self, std::string configPath) {
json settings;
std::string SummaActorsSettigs = "/Summa_Actors_Settings.json";
std::ifstream settings_file(configPath + SummaActorsSettigs);
settings_file >> settings;
if (settings.find("JobActor") != settings.end()) {
json JobActorConfig = settings["JobActor"];
// Find the File Manager Path
if (JobActorConfig.find("FileManagerPath") != JobActorConfig.end()) {
self->state.fileManager = JobActorConfig["FileManagerPath"];
} else {
aout(self) << "Error Finding FileManagerPath - Exiting as this is needed\n";
return -1;
// Find if we want to outputCSV
if (JobActorConfig.find("outputCSV") != JobActorConfig.end()) {
self->state.outputCSV = JobActorConfig["outputCSV"];
} else {
aout(self) << "Error Finding outputCSV in JSON file - Reverting to Default Value\n";
self->state.outputCSV = false;
// Output Path of CSV
if (self->state.outputCSV) {
if (JobActorConfig.find("csvPath") != JobActorConfig.end()) {
self->state.csvPath = JobActorConfig["csvPath"];
} else {
aout(self) << "Error Finding csvPath in JSON file = Reverting to Default Value \n";
self->state.outputCSV = false; // we just choose not to output a csv
return 0;
} else {
aout(self) << "Error Finding JobActor in JSON file - Exiting as there is no path for the fileManger\n";
return -1;
void initJob(stateful_actor<job_state>* self) {
std::string success = "Success"; // allows us to build the string
if (self->state.outputCSV) {
std::ofstream file;
self->state.successOutputFile = self->state.csvPath += success +=
std::to_string(self->state.startGRU) += ".csv";>state.successOutputFile, std::ios_base::out);
file << "GRU" << "," << "totalDuration" << "," << "initDuration" << "," <<
"forcingDuration" << "," << "runPhysicsDuration" << "," << "writeOutputDuration" <<
"," << "dt_init" << "," << "numAttemtps" << "\n";
int totalGRUs = 0;
int totalHRUs = 0;
int numHRUs = 0;
int err = 0;
// aout(self) << "Initalizing Globals \n";
if (err != 0) {
aout(self) << "Error: initGlobals" << std::endl;
void initalizeGRU(stateful_actor<job_state>* self) {
int startGRU = self->state.GRUList.size() + self->state.startGRU;
int indexGRU = self->state.GRUList.size() + 1; // Fortran reference starts at 1
auto gru = self->spawn(hru_actor, startGRU, indexGRU,
self->state.configPath, self->state.file_access_actor,
self->state.outputStrucSize, self);
self->state.GRUList.push_back(new GRUinfo(startGRU, indexGRU, gru,
self->state.dt_init_start_factor, self->state.maxRunAttempts));
void runGRUs(stateful_actor<job_state>* self) {
for(auto gru : self->state.GRUList) {
if(!gru->isCompleted() && !gru->isFailed()) {
self->send(gru->getActor(), start_hru_v);
void restartFailures(stateful_actor<job_state>* self) {
// Need to let the file_access_actor know so it can set up the new output Manager
self->send(self->state.file_access_actor, restart_failures_v);
self->state.numGRU = self->state.numGRUFailed;
self->state.numGRUFailed = 0;
self->state.numGRUDone = 0;
for(auto gru : self->state.GRUList) {
if (gru->isFailed() && !gru->isMaxAttemptsReached()) {
self->send(self->state.file_access_actor, reset_outputCounter_v, gru->getIndxGRU());
auto newGRU = self->spawn(hru_actor, gru->getRefGRU(), gru->getIndxGRU(),
self->state.outputStrucSize, self);
self->send(gru->getActor(), dt_init_factor_v, gru->getDt_init());
#ifndef OutputManager_H_
#define OutputManager_H_
#include "caf/all.hpp"
#include <vector>
#include <algorithm>
* @brief Basic Container class to hold actor references. This has a size component for checking when it is full.
class ActorRefList {
int numStepsToWrite; // We can save this value here so that we know how many steps to write
int currentSize;
unsigned int maxSize;
int minIndex = -1; // minimum index of the actor being stored on this list
int maxIndex = 0; // maximum index of the actor being stored on this list
std::vector<std::tuple<caf::actor, int>> list;
// Constructor
ActorRefList(int maxSize){
this->currentSize = 0;
this->maxSize = maxSize;
// Deconstructor
int getMaxIndex() {
return this->maxIndex;
int getMinIndex() {
return this->minIndex;
int getCurrentSize() {
return this->currentSize;
int getMaxSize() {
return this->maxSize;
int getNumStepsToWrite() {
return this->numStepsToWrite;
bool isFull() {
return list.size() == this->maxSize;
* Adds An Actor and its return message as a tuple to this->list
* actor - the actor ref of the actor being added to this->list
* returnMessage - Either 9999 (place holder and specifies to send a done_write_v message) or
* this is the current forcingFileList index that allows the file_access actor to know the number
* of steps the HRU actor that needs to compute
void addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) {
if (this->isFull()) {
throw "List is full, cannot add actor to this list";
if (index > this->maxIndex) {
this->maxIndex = index;
if (index < this->minIndex || this->minIndex < 0) {
this->minIndex = index;
this->numStepsToWrite = numStepsToWrite;
list.push_back(std::make_tuple(actor, returnMessage));
* Return a tuple of an actor and its returnMessage.
* The return message is 9999 or the index of the forcingFile it needs to acces
std::tuple<caf::actor,int> popActor() {
if (list.empty()) {
throw "List is empty, nothing to pop";
auto actor = list.back();
return actor;
bool isEmpty() {
return list.empty();
* When an actor fails we need to decrement the count
* so that this list becomes full when there is a failure
* indexHRU - index of the HRU causing the error
void decrementMaxSize() {
* Remove the failed HRU from the list
void removeFailed(caf::actor actorRef) {
bool found = false;
for(std::vector<std::tuple<caf::actor, int>>::iterator it = this->list.begin(); it != this->list.end(); it++) {
if (std::get<0>(*it) == actorRef) {
found = true;
this->currentSize--; this->maxSize--;
if (!found) {
throw "Element To Remove Not Found";
* @brief Class that manages which structure actors are held on
class OutputManager {
int numVectors;
int avgSizeOfActorList;
bool runningFailures;
std::vector<ActorRefList*> list;
std::vector<int> failedHRU;
std::vector<int> failureReRun; // index used so we can add failedHRUs if they fail a second time
// Constructor
OutputManager(int numVectors, int totalNumActors){
this->numVectors = numVectors;
int sizeOfOneVector = totalNumActors / numVectors;
this->avgSizeOfActorList = sizeOfOneVector;
this->runningFailures = false;
// Create the first n-1 vectors with the same size
for (int i = 0; i < numVectors - 1; i++) {
auto refList = new ActorRefList(sizeOfOneVector);
totalNumActors = totalNumActors - sizeOfOneVector;
// Create the last vector with size however many actors are left
auto refList = new ActorRefList(totalNumActors);
// Deconstructor
* @brief Adds an actor to its respective list
* @param actor Actor reference
* @param index Actor Index
* @param returnMessage Forcing File index or 9999
* @return int The list index that actor is added to.
int addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) {
int listIndex;
if (this->runningFailures) {
// find the index of the structure this HRU is in
auto it = find(this->failureReRun.begin(), this->failureReRun.end(), index);
if (it != this->failureReRun.end()) {
listIndex = it - this->failureReRun.begin();
} else {
throw "Element Not Found in failureReRun list";
this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite);
} else {
// Index has to be subtracted by 1 because Fortran array starts at 1
listIndex = (index - 1) / this->avgSizeOfActorList;
if (listIndex > this->numVectors - 1) {
listIndex = this->numVectors - 1;
this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite);
return listIndex;
* Remove tuple from list[index]
std::tuple<caf::actor,int> popActor(int index) {
if (index > this->numVectors - 1 || index < 0) {
throw "List Index Out Of Range";
} else if (this->list[index]->isEmpty()) {
throw "List is Empty, Nothing to pop";
return this->list[index]->popActor();
/** When a failure occurs an actor most likley will not already be on this list
* This method may and probably should not be used. Although needing to remove a
* specific element from a list may be needed.
* Remove the failed actor from the list
* Return the index of the list we removed the actor from
* This is so we can check if it is full
int removeFailed(caf::actor actorRef, int index) {
// Find the list this actor is on
int listIndex = (index - 1) / this->avgSizeOfActorList;
if (listIndex > this->numVectors - 1) {
listIndex = this->numVectors - 1;
return listIndex;
* Decrease the size of the list
* Add this GRU to the failed list
int decrementMaxSize(int indexHRU) {
// Find the list this actor is on
int listIndex = (indexHRU - 1) / this->avgSizeOfActorList;
if (listIndex > this->numVectors - 1) {
listIndex = this->numVectors - 1;
return listIndex;
void restartFailures() {
this->numVectors = this->failedHRU.size();
for (unsigned int i = 0; i < this->failedHRU.size(); i++) {
auto refList = new ActorRefList(1);
this->failureReRun = this->failedHRU;
this->runningFailures = true;
* Get the number of steps to write from the correct listIndex
int getNumStepsToWrite(int listIndex) {
return this->list[listIndex]->getNumStepsToWrite();
bool isFull(int listIndex) {
if (listIndex > this->numVectors - 1) {
throw "List Index Out Of Range";
return this->list[listIndex]->isFull();
bool isEmpty(int listIndex) {
return this->list[listIndex]->isEmpty();
int getSize(int listIndex) {
if (listIndex > this->numVectors - 1) {
throw "List Index Out Of Range";
return this->list[listIndex]->getCurrentSize();
int getMinIndex(int listIndex) {
return this->list[listIndex]->getMinIndex();
int getMaxIndex(int listIndex) {
return this->list[listIndex]->getMaxIndex();
void addFailed(int indxHRU) {
\ No newline at end of file
#include "SummaManager.h"
using namespace caf;
using json = nlohmann::json;
* Top Level Actor for Summa. This actor recieves the number of GRUs to compute from main and
* divides them into jobs that compute one at a time.
* @param startGRU - starting GRU for the simulation
* @param numGRU - total number of GRUs to compute
* @param configPath - location of file information for SUMMA
* @return behavior
behavior summa_actor(stateful_actor<summa_manager>* self, int startGRU, int numGRU, std::string configPath) {
self->state.start = std::chrono::high_resolution_clock::now();
// Set Variables
self->state.startGRU = startGRU;
self->state.numGRU = numGRU;
self->state.configPath = configPath;
parseSettings(self, configPath);
aout(self) << "SETTINGS FOR SUMMA_ACTOR\n";
aout(self) << "Output Structure Size = " << self->state.outputStrucSize << "\n";
aout(self) << "Max GRUs Per Job = " << self->state.maxGRUPerJob << "\n";
// Create the job_actor and start SUMMA
return {
[=](done_job, int numFailed) {
self->state.numFailed += numFailed;
aout(self) << "Job Done\n";
if (self->state.numGRU <= 0) {
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration = calculateTime(self->state.start, self->state.end);
self->state.duration = self->state.duration / 1000; // Convert to milliseconds
aout(self) << "Total Program Duration:\n";
aout(self) << " " << self->state.duration / 1000 << " Seconds\n";
aout(self) << " " << (self->state.duration / 1000) / 60 << " Minutes\n";
aout(self) << " " << ((self->state.duration / 1000) / 60) / 60 << " Hours\n";
aout(self) << "Program Finished \n";
} else {
// spawn a new job
[=](err) {
aout(self) << "Unrecoverable Error: Attempting To Fail Gracefully\n";
void spawnJob(stateful_actor<summa_manager>* self) {
// Ensure we do not start a job with too many GRUs
if (self->state.numGRU > self->state.maxGRUPerJob) {
// spawn the job actor
aout(self) << "\n Starting Job with startGRU = " << self->state.startGRU << "\n";
self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.maxGRUPerJob,
self->state.configPath, self->state.outputStrucSize, self);
// Update GRU count
self->state.numGRU = self->state.numGRU - self->state.maxGRUPerJob;
self->state.startGRU = self->state.startGRU + self->state.maxGRUPerJob;
} else {
self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.numGRU,
self->state.configPath, self->state.outputStrucSize, self);
self->state.numGRU = 0;
void parseSettings(stateful_actor<summa_manager>* self, std::string configPath) {
json settings;
std::string SummaActorsSettings = "/Summa_Actors_Settings.json";
std::ifstream settings_file(configPath + SummaActorsSettings);
settings_file >> settings;
if (settings.find("SummaActor") != settings.end()) {
json SummaActorConfig = settings["SummaActor"];
// Find the desired OutputStrucSize
if (SummaActorConfig.find("OuputStructureSize") != SummaActorConfig.end()) {
self->state.outputStrucSize = SummaActorConfig["OuputStructureSize"];
} else {
aout(self) << "Error Finding OutputStructureSize in JOSN - Reverting to default value\n";
self->state.outputStrucSize = 250;
// Find the desired maxGRUPerJob size
if (SummaActorConfig.find("maxGRUPerJob") != SummaActorConfig.end()) {
self->state.maxGRUPerJob = SummaActorConfig["maxGRUPerJob"];
} else {
aout(self) << "Error Finding maxGRUPerJob in JOSN - Reverting to default value\n";
self->state.maxGRUPerJob = 500;
} else {
aout(self) << "Error Finding SummaActor in JSON - Reverting to default values\n";
self->state.outputStrucSize = 250;
self->state.maxGRUPerJob = 500;
\ No newline at end of file