Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • gwu479/Summa-Actors
  • numerical_simulations_lab/actors/Summa-Actors
2 results
Show changes
Showing
with 654 additions and 1907 deletions
#pragma once
#include "caf/all.hpp"
#include <iostream>
#include <fstream>
/*
* Determine the state of the GRU
*/
enum class gru_state {
running,
failed,
succeeded
};
int is_success(const gru_state& state);
/**
* Class that holds information about the running GRUs. This class is held by the job actor
* The GRU/HRU actors that carry out the simulation are held by the GRU class
*/
class GRU {
private:
int global_gru_index; // The index of the GRU in the netcdf file
int local_gru_index; // The index of the GRU within this job
caf::actor gru_actor; // The actor for the GRU
// Modifyable Parameters
int dt_init_factor; // The initial dt for the GRU
double rel_tol; // The relative tolerance for the GRU
double abs_tol; // The absolute tolerance for the GRU
// Status Information
int attempts_left; // The number of attempts left for the GRU to succeed
gru_state state; // The state of the GRU
// Timing Information
double run_time = 0.0; // The total time to run the GRU
double init_duration = 0.0; // The time to initialize the GRU
double forcing_duration = 0.0; // The time to read the forcing data
double run_physics_duration = 0.0; // The time to run the physics
double write_output_duration = 0.0; // The time to write the output
public:
// Constructor
GRU(int global_gru_index, int local_gru_index, caf::actor gru_actor, int dt_init_factor,
double rel_tol, double abs_tol, int max_attempts);
// Deconstructor
~GRU();
// Getters
int getGlobalGRUIndex();
int getLocalGRUIndex();
caf::actor getGRUActor();
double getRunTime();
double getInitDuration();
double getForcingDuration();
double getRunPhysicsDuration();
double getWriteOutputDuration();
double getRelTol();
double getAbsTol();
double getAttemptsLeft();
gru_state getStatus();
bool isFailed();
// Setters
void setRunTime(double run_time);
void setInitDuration(double init_duration);
void setForcingDuration(double forcing_duration);
void setRunPhysicsDuration(double run_physics_duration);
void setWriteOutputDuration(double write_output_duration);
void setRelTol(double rel_tol);
void setAbsTol(double abs_tol);
void setSuccess();
void setFailed();
void setRunning();
void decrementAttemptsLeft();
void setGRUActor(caf::actor gru_actor);
};
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "GRU.hpp"
#include "timing_info.hpp"
#include "settings_functions.hpp"
#include "global.hpp"
#include "json.hpp"
#include "hru_actor.hpp"
#include "message_atoms.hpp"
#include "file_access_actor.hpp"
#include <unistd.h>
#include <limits.h>
/*********************************************
* Job Actor Fortran Functions
*********************************************/
extern "C" {
void job_init_fortran(char const* file_manager, int* start_gru_index, int* num_gru, int* num_hru, int* err);
void deallocateJobActor(int* err);
}
/*********************************************
* Job Actor state variables
*********************************************/
namespace caf {
using chrono_time = std::chrono::time_point<std::chrono::system_clock>;
// Holds information about the GRUs
struct GRU_Container {
std::vector<GRU*> gru_list;
chrono_time gru_start_time; // Vector of start times for each GRU
int num_gru_done = 0;
int num_gru_failed = 0; // number of grus that are waiting to be restarted
int num_gru_in_run_domain = 0; // number of grus we are currently solving for
int run_attempts_left = 1; // current run attempt for all grus
};
struct job_state {
// Actor References
caf::actor file_access_actor; // actor reference for the file_access_actor
caf::actor parent; // actor reference to the top-level SummaActor
// Job Parameters
int start_gru; // Starting GRU for this job
int num_gru; // Number of GRUs for this job
int num_hru;
int max_run_attempts = 1; // Max number of attempts to solve a GRU
GRU_Container gru_container;
// Variables for GRU monitoring
int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
int num_gru_done = 0; // The number of GRUs that have completed
int num_gru_failed = 0; // Number of GRUs that have failed
// Timing Variables
TimingInfo job_timing;
std::string hostname;
// settings for all child actors (save in case we need to recover)
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
/** The Job Actor */
behavior job_actor(stateful_actor<job_state>* self,
int start_gru, int num_gru,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings,
actor parent);
/*********************************************
* Functions for the Job Actor
*********************************************/
/** Get the information for the GRUs that will be written to the netcdf file */
std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts,
std::vector<GRU*> &gru_list);
void handleGRUError(stateful_actor<job_state>* self, caf::actor src);
} // end namespace
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <string>
class Batch {
private:
int batch_id_;
int start_hru_;
int num_hru_;
double run_time_;
double read_time_;
double write_time_;
bool assigned_to_actor_;
bool solved_;
public:
Batch(int batch_id = -1, int start_hru = -1, int num_hru = -1);
// Getters
int getBatchID();
int getStartHRU();
int getNumHRU();
double getRunTime();
double getReadTime();
double getWriteTime();
bool isAssigned();
bool isSolved();
std::string getBatchInfoString();
// Setters
void updateRunTime(double run_time);
void updateReadTime(double read_time);
void updateWriteTime(double write_time);
void updateAssigned(bool boolean);
void updateSolved(bool boolean);
void printBatchInfo();
void writeBatchToFile(std::string csv_output, std::string hostname);
std::string toString();
void assignToActor(std::string hostname, caf::actor assigned_actor);
template <class Inspector>
friend bool inspect(Inspector& inspector, Batch& batch) {
return inspector.object(batch).fields(
inspector.field("batch_id", batch.batch_id_),
inspector.field("start_hru", batch.start_hru_),
inspector.field("num_hru", batch.num_hru_),
inspector.field("run_time", batch.run_time_),
inspector.field("read_time", batch.read_time_),
inspector.field("write_time", batch.write_time_),
inspector.field("assigned_to_actor", batch.assigned_to_actor_),
inspector.field("solved", batch.solved_));
}
};
\ No newline at end of file
#include "caf/all.hpp"
#pragma once
#include "client.hpp"
class Batch_Container {
private:
int start_hru_;
int total_hru_count_;
int num_hru_per_batch_;
int batches_remaining_;
std::vector<Batch> batch_list_;
// Assemble the total number of HRUs given by the user into batches.
void assembleBatches();
public:
// Creating the batch_manager will also create the batches
// with the two parameters that are passed in.
Batch_Container(int start_hru = 1, int total_hru_count = 0,
int num_hru_per_batch = 0);
// returns the size of the batch list
int getBatchesRemaining();
int getTotalBatches();
// Find an unsolved batch, set it to assigned and return it.
std::optional<Batch> getUnsolvedBatch();
// Update the batch status to solved and write the output to a file.
void updateBatch_success(Batch successful_batch, std::string output_csv, std::string hostname);
// Update the batch status but do not write the output to a file.
void updateBatch_success(Batch successful_batch);
// Update batch by id
void updateBatch_success(int batch_id, double run_time, double read_time,
double write_time);
// Update the batch to assigned = true
void setBatchAssigned(Batch batch);
// Update the batch to assigned = false
void setBatchUnassigned(Batch batch);
// Check if there are batches left to solve
bool hasUnsolvedBatches();
// TODO: Needs implementation
void updateBatch_failure(Batch failed_batch);
std::string getAllBatchInfoString();
double getTotalReadTime();
double getTotalWriteTime();
/**
* A client has found to be disconnected. Unassign all batches
* that were assigned to the disconnected client. The client id
* is passed in as a parameter
*/
void updatedBatch_disconnectedClient(int client_id);
/**
* Create the csv file for the completed batches.
*/
void inititalizeCSVOutput(std::string csv_output_name);
/**
* @brief Print the batches from the batch list
*
*/
void printBatches();
std::string getBatchesAsString();
/**
* @brief Find the batch with the batch_id parameter
* update the batches assigned actor member variable to false
*
*/
void updateBatchStatus_LostClient(int batch_id);
template <class Inspector>
friend bool inspect(Inspector& inspector, Batch_Container& batch_container) {
return inspector.object(batch_container).fields(
inspector.field("total_hru_count", batch_container.total_hru_count_),
inspector.field("num_hru_per_batch", batch_container.num_hru_per_batch_),
inspector.field("batches_remaining", batch_container.batches_remaining_),
inspector.field("batch_list", batch_container.batch_list_));
}
};
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <optional>
#include "batch.hpp"
class Client {
private:
caf::actor client_actor;
std::string hostname;
int id;
int batches_solved;
std::optional<Batch> current_batch;
public:
Client(int id = -1, caf::actor client_actor = nullptr, std::string hostname = "");
// ####################################################################
// Getters
// ####################################################################
caf::actor getActor();
int getID();
std::string getHostname();
std::optional<Batch> getBatch();
// ####################################################################
// Setters
// ####################################################################
void setBatch(std::optional<Batch> batch);
// ####################################################################
// Methods
// ####################################################################
std::string toString();
// Serialization so CAF can send an object of this class to another actor
template <class Inspector>
friend bool inspect(Inspector& inspector, Client& client) {
return inspector.object(client).fields(
inspector.field("client_actor",client.client_actor),
inspector.field("hostname",client.hostname),
inspector.field("id",client.id),
inspector.field("batches_solved",client.batches_solved),
inspector.field("current_batch",client.current_batch));
}
};
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <vector>
#include "batch.hpp"
#include "client.hpp"
#include <optional>
class Client_Container {
private:
std::vector<Client> client_list;
int id_counter;
public:
Client_Container();
// ####################################################################
// Getters
// ####################################################################
int getNumClients();
std::vector<Client> getClientList();
std::optional<Client> getClient(caf::actor_addr client_ref);
// ####################################################################
// Setters
// ####################################################################
void setBatchForClient(caf::actor client_ref, std::optional<Batch> batch);
// ####################################################################
// Methods
// ####################################################################
// add a new client to the client_list
void addClient(caf::actor client_actor, std::string hostname);
// remove a client from the client_list
void removeClient(Client client);
// return a client that is not solving a batch or return an empty optional
std::optional<Client> getIdleClient();
// Check if the client list is empty
bool isEmpty();
// pops client at the end of the list
Client removeClient_fromBack();
// return printable string
std::string toString();
template <class Inspector>
friend bool inspect(Inspector& inspector, Client_Container& client_container) {
return inspector.object(client_container).fields(
inspector.field("client_list", client_container.client_list),
inspector.field("id_counter", client_container.id_counter));
}
};
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "timing_info.hpp"
#include "settings_functions.hpp"
#include "batch_container.hpp"
#include <chrono>
#include <string>
#include <vector>
namespace caf {
struct job_timing_info {
std::vector<double> job_duration;
std::vector<double> job_read_duration;
std::vector<double> job_write_duration;
};
struct summa_actor_state {
// Timing Information For Summa-Actor
TimingInfo summa_actor_timing;
struct job_timing_info timing_info_for_jobs;
// Program Parameters
int startGRU; // starting GRU for the simulation
int numGRU; // number of GRUs to compute
int fileGRU; // number of GRUs in the file
std::string configPath; // path to the fileManager.txt file
int numFailed = 0; // Number of jobs that have failed
caf::actor currentJob; // Reference to the current job actor
caf::actor parent;
// Batches
Batch_Container batch_container;
int current_batch_id;
// settings for all child actors (save in case we need to recover)
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
behavior summa_actor(stateful_actor<summa_actor_state>* self,
int startGRU, int numGRU,
Summa_Actor_Settings summa_actor_settings,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings, actor parent);
void spawnJob(stateful_actor<summa_actor_state>* self);
} // namespace caf
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "summa_server.hpp"
#include <string>
#include <unistd.h>
#include <limits.h>
namespace caf {
// Inital behaviour that waits to connect to the lead server
behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings,
Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings);
// Function that is called ot connect to the lead server
void connecting_backup(stateful_actor<summa_server_state>* self, const std::string& host, uint16_t port);
behavior summa_backup_server(stateful_actor<summa_server_state>* self, const actor& server_actor);
}
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "settings_functions.hpp"
#include "batch.hpp"
#include "summa_actor.hpp"
#include "message_atoms.hpp"
#include <string>
#include <optional>
#include <unistd.h>
#include <limits.h>
namespace caf {
struct summa_client_state {
strong_actor_ptr current_server = nullptr;
actor current_server_actor;
std::vector<strong_actor_ptr> servers;
std::string hostname;
actor summa_actor_ref;
uint16_t port;
int batch_id;
int client_id; // id held by server
bool running = false; // initalized to false - flipped to true when client returns behavior summa_client
// tuple is the actor ref and hostname of the backup server
std::vector<std::tuple<caf::actor, std::string>> backup_servers_list;
Batch current_batch;
bool saved_batch = false;
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
behavior summa_client_init(stateful_actor<summa_client_state>* self);
behavior summa_client(stateful_actor<summa_client_state>* self);
void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port);
void findLeadServer(stateful_actor<summa_client_state>* self, strong_actor_ptr serv);
}
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "batch.hpp"
#include "batch_container.hpp"
#include "client.hpp"
#include "client_container.hpp"
#include "settings_functions.hpp"
#include "global.hpp"
#include "message_atoms.hpp"
#include <string>
#include <optional>
#include <thread>
#include <chrono>
#include <iostream>
namespace caf {
struct summa_server_state {
strong_actor_ptr current_server; // if server is a backup then this will be set to the lead server
actor current_server_actor;
std::string hostname;
std::string csv_file_path;
std::string csv_output_name = "/batch_results.csv";
Client_Container client_container;
Batch_Container batch_container;
// Actor Reference, Hostname
std::vector<std::tuple<caf::actor, std::string>> backup_servers_list;
// Settings Structures
Distributed_Settings distributed_settings;
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
// Summa Server setup behaviour - initializes the state for the server
behavior summa_server_init(stateful_actor<summa_server_state>* self,
Distributed_Settings distributed_settings,
Summa_Actor_Settings summa_actor_settings,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings);
// Summa Server behaviour - handles messages from clients
behavior summa_server(stateful_actor<summa_server_state>* self);
// Summa Server backup behaviour - handles the exit messages for clients
behavior summa_server_exit(stateful_actor<summa_server_state>* self);
// Creates the csv file that holds the results of the batches
void initializeCSVOutput(std::string csv_output_path);
// Send all connected actors the updated backup servers list
void sendAllBackupServersList(stateful_actor<summa_server_state>* self);
// Look for the lost backup server in the backup servers list and remove it
void findAndRemoveLostBackupServer(stateful_actor<summa_server_state>* self, actor_addr lost_backup_server);
// Check for an idle client to send the failed or next batch we find that is not assigned
void checkForIdleClients(stateful_actor<summa_server_state>* self);
void notifyBackupServersOfRemovedClient(stateful_actor<summa_server_state>* self, Client client);
// Finds the batch the lost client was working on and reassigns it to another client if available
// If no client is available then the batch is added back to the list to be reassigned later
void resolveLostClient(stateful_actor<summa_server_state>* self, Client client);
// Removes the backup server from the list of backup servers
// All connected actors are then notified of the change
void resolveLostBackupServer(stateful_actor<summa_server_state>* self, const down_msg& dm);
// Convience function to keep code clean - just does what you think it does
void printRemainingBatches(stateful_actor<summa_server_state>* self);
} // namespace caf
#### parent directory of the 'build' directory ####
# F_MASTER =
#### fortran compiler ####
# FC =
#### C++ compiler ####
# CC=g++
#### Includes AND Libraries ####
# INCLUDES =
# LIBRARIES =
# ACTORS_INCLUDES =
# ACTORS_LIBRARIES =
# gfortran compiler flags
ifeq "$(FC)" "gfortran"
# Production runs
# FLAGS_NOAH = -O3 -ffree-form -ffree-line-length-none -fmax-errors=0 -fPIC
# FLAGS_COMM = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
# FLAGS_SUMMA = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
# FLAGS_ACTORS = -O3
# # Debug runs
FLAGS_NOAH = -p -g -O0 -ffree-form -ffree-line-length-none -fmax-errors=0 -fbacktrace -Wno-unused -Wno-unused-dummy-argument -fPIC
FLAGS_COMM = -p -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
FLAGS_SUMMA = -p -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
FLAGS_ACTORS = -g -O0 -Wall
endif
# ifort compiler flags
ifeq "$(FC)" "ifort"
# define compiler flags
# FLAGS_NOAH = -O3 -autodouble -warn nounused -noerror_limit -FR -auto -fltconsistency -fPIC
# FLAGS_COMM = -O3 -FR -auto -warn nounused -fltconsistency -fpe0 -fPIC
# FLAGS_SUMMA = -O3 -FR -auto -warn nounused -fltconsistency -fpe0 -fPIC
# debug runs
FLAGS_NOAH = -O3 -g -autodouble -warn nounused -noerror_limit -FR -auto -fltconsistency -fPIC
FLAGS_COMM = -O3 -g -FR -auto -warn nounused -fltconsistency -fpe0 -fPIC
FLAGS_SUMMA = -O3 -g -FR -auto -warn nounused -fltconsistency -fpe0 -fPIC
endif
#========================================================================
# PART 1: Define directory paths
#========================================================================
# Core directory that contains source code
F_KORE_DIR = $(F_MASTER)/build/source
# Location of the compiled modules
MOD_PATH = $(F_MASTER)/build
# Define the directory for the executables
EXE_PATH = $(F_MASTER)/bin
#========================================================================
# PART 2: Assemble all of the SUMMA sub-routines
#========================================================================
# Define directories
DRIVER_DIR = $(F_KORE_DIR)/driver
HOOKUP_DIR = $(F_KORE_DIR)/hookup
NETCDF_DIR = $(F_KORE_DIR)/netcdf
DSHARE_DIR = $(F_KORE_DIR)/dshare
NUMREC_DIR = $(F_KORE_DIR)/numrec
NOAHMP_DIR = $(F_KORE_DIR)/noah-mp
ENGINE_DIR = $(F_KORE_DIR)/engine
INTERFACE_DIR = $(F_KORE_DIR)/interface
JOB_ACTOR_DIR = $(INTERFACE_DIR)/job_actor
FILE_ACCESS_DIR = $(INTERFACE_DIR)/file_access_actor
HRU_ACTOR_DIR = $(INTERFACE_DIR)/hru_actor
# utilities
SUMMA_NRUTIL= \
nrtype.f90 \
f2008funcs.f90 \
nr_utility.f90
NRUTIL = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_NRUTIL))
# Numerical recipes procedures
# NOTE: all numerical recipes procedures are now replaced with free versions
SUMMA_NRPROC= \
expIntegral.f90 \
spline_int.f90
NRPROC = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_NRPROC))
# Hook-up modules (set files and directory paths)
SUMMA_HOOKUP= \
ascii_util.f90 \
summaActors_FileManager.f90
HOOKUP = $(patsubst %, $(HOOKUP_DIR)/%, $(SUMMA_HOOKUP))
# Data modules
SUMMA_DATAMS= \
multiconst.f90 \
var_lookup.f90 \
data_types.f90 \
globalData.f90 \
flxMapping.f90 \
get_ixname.f90 \
popMetadat.f90 \
outpt_stat.f90
DATAMS = $(patsubst %, $(DSHARE_DIR)/%, $(SUMMA_DATAMS))
# utility modules
SUMMA_UTILMS= \
time_utils.f90 \
mDecisions.f90 \
snow_utils.f90 \
soil_utils.f90 \
updatState.f90 \
matrixOper.f90
UTILMS = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_UTILMS))
# Model guts
SUMMA_MODGUT= \
MODGUT = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_MODGUT))
# Solver
SUMMA_SOLVER= \
vegPhenlgy.f90 \
diagn_evar.f90 \
stomResist.f90 \
groundwatr.f90 \
vegSWavRad.f90 \
vegNrgFlux.f90 \
ssdNrgFlux.f90 \
vegLiqFlux.f90 \
snowLiqFlx.f90 \
soilLiqFlx.f90 \
bigAquifer.f90 \
computFlux.f90 \
computResid.f90 \
computJacob.f90 \
eval8summa.f90 \
summaSolve.f90 \
systemSolv.f90 \
varSubstep.f90 \
opSplittin.f90 \
coupled_em.f90
SOLVER = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_SOLVER))
# Interface code for Fortran-C++
SUMMA_INTERFACE= \
cppwrap_datatypes.f90 \
cppwrap_auxiliary.f90 \
cppwrap_metadata.f90 \
INTERFACE = $(patsubst %, $(INTERFACE_DIR)/%, $(SUMMA_INTERFACE))
SUMMA_FILEACCESS_INTERFACE = \
initOutputStruc.f90 \
deallocateOutputStruc.f90 \
cppwrap_fileAccess.f90
FILEACCESS_INTERFACE = $(patsubst %, $(FILE_ACCESS_DIR)/%, $(SUMMA_FILEACCESS_INTERFACE))
SUMMA_JOB_INTERFACE = \
cppwrap_job.f90
JOB_INTERFACE = $(patsubst %, $(JOB_ACTOR_DIR)/%, $(SUMMA_JOB_INTERFACE))
SUMMA_HRU_INTERFACE = \
cppwrap_hru.f90
HRU_INTERFACE = $(patsubst %, $(HRU_ACTOR_DIR)/%, $(SUMMA_HRU_INTERFACE))
# Define routines for SUMMA preliminaries
SUMMA_PRELIM= \
conv_funcs.f90 \
sunGeomtry.f90 \
convE2Temp.f90 \
allocspaceActors.f90 \
alloc_file_access.f90\
checkStruc.f90 \
childStruc.f90 \
ffile_info.f90 \
read_attribute.f90 \
read_pinit.f90 \
pOverwrite.f90 \
read_paramActors.f90 \
paramCheck.f90 \
check_icondActors.f90 \
# allocspace.f90
PRELIM = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_PRELIM))
SUMMA_NOAHMP= \
module_model_constants.F \
module_sf_noahutl.F \
module_sf_noahlsm.F \
module_sf_noahmplsm.F
NOAHMP = $(patsubst %, $(NOAHMP_DIR)/%, $(SUMMA_NOAHMP))
# Define routines for the SUMMA model runs
SUMMA_MODRUN = \
indexState.f90 \
getVectorz.f90 \
updateVars.f90 \
var_derive.f90 \
read_forcingActors.f90 \
access_forcing.f90\
access_write.f90 \
derivforce.f90 \
snowAlbedo.f90 \
canopySnow.f90 \
tempAdjust.f90 \
snwCompact.f90 \
layerMerge.f90 \
layerDivide.f90 \
volicePack.f90 \
qTimeDelay.f90
MODRUN = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_MODRUN))
# Define NetCDF routines
# OutputStrucWrite is not a netcdf subroutine and should be
# moved
SUMMA_NETCDF = \
netcdf_util.f90 \
def_output.f90 \
outputStrucWrite.f90 \
writeOutput.f90 \
read_icondActors.f90
NETCDF = $(patsubst %, $(NETCDF_DIR)/%, $(SUMMA_NETCDF))
# ... stitch together common programs
COMM_ALL = $(NRUTIL) $(NRPROC) $(HOOKUP) $(DATAMS) $(UTILMS)
# ... stitch together SUMMA programs
SUMMA_ALL = $(NETCDF) $(PRELIM) $(MODRUN) $(SOLVER)
# Define the driver routine
SUMMA_DRIVER= \
summaActors_type.f90 \
summaActors_util.f90 \
summaActors_globalData.f90 \
summaActors_init.f90 \
SummaActors_setup.f90 \
summaActors_restart.f90 \
summaActors_forcing.f90 \
SummaActors_modelRun.f90 \
summaActors_alarms.f90 \
summaActors_wOutputStruc.f90
DRIVER = $(patsubst %, $(DRIVER_DIR)/%, $(SUMMA_DRIVER))
ACTORC = $(F_KORE_DIR)/actors/main.cc
ACTOR_TEST = $(F_KORE_DIR)/testing/testing_main.cc
#========================================================================
# PART 3: compilation
#======================================================================
# Compile
all: lib main
lib: compile_noah compile_comm compile_summa link clean
main: actors actorsLink actorsClean
test: actors_test actors_testLink actorsClean
# compile Noah-MP routines
compile_noah:
$(FC) $(FLAGS_NOAH) -c $(NRUTIL) $(NOAHMP)
# compile common routines
compile_comm:
$(FC) $(FLAGS_COMM) -c $(COMM_ALL) $(INCLUDES)
# compile SUMMA routines
compile_summa:
$(FC) $(FLAGS_SUMMA) -c $(SUMMA_ALL) $(DRIVER) $(INTERFACE) $(JOB_INTERFACE) $(FILEACCESS_INTERFACE) $(HRU_INTERFACE) $(INCLUDES)
# generate library
link:
$(FC) -shared *.o -o libsumma.so
# compile the c++ portion of SummaActors
actors:
$(CC) $(FLAGS_ACTORS) -c $(ACTORC) -std=c++17 $(ACTORS_INCLUDES)
actorsLink:
$(CC) -o summaMain *.o $(ACTORS_LIBRARIES)
actors_test:
$(CC) $(FLAGS_ACTORS) -c $(ACTOR_TEST) -std=c++17 $(ACTORS_INCLUDES)
actors_testLink:
$(CC) -o summaTest *.o $(ACTORS_LIBRARIES)
actorsClean:
rm *.o
# Remove object files
clean:
rm -f *.o *.mod soil_veg_gen_parm__genmod.f90
clean_lib:
rm *.so
#ifndef FILEACCESS_H_
#define FILEACCESS_H_
#include "../interface/fortran_dataTypes.h"
#include "../interface/file_access_actor/fileAccess_subroutine_wrappers.h"
#include "caf/all.hpp"
#include "messageAtoms.h"
#include <vector>
#include <chrono>
#include "global.h"
class forcingFile {
private:
int fileID; // which file are we relative the forcing file list saved in fortran
int numSteps; // the number of steps in this forcing file
bool isLoaded; // is this file actually loaded in to RAM yet.
public:
forcingFile(int fileID) {
this->fileID = fileID;
this->numSteps = 0;
this->isLoaded = false;
}
int getNumSteps() {
return this->numSteps;
}
bool isFileLoaded() {
return this->isLoaded;
}
void updateIsLoaded() {
this->isLoaded = true;
}
void updateNumSteps(int numSteps) {
this->numSteps = numSteps;
this->isLoaded = true;
}
};
struct file_access_state {
// Variables set on Spwan
caf::actor parent;
int startGRU;
int numGRU;
void *handle_forcFileInfo = new_handle_file_info(); // Handle for the forcing file information
void *handle_ncid = new_handle_var_i(); // output file ids
bool outputFileExists = false;
int num_steps;
int outputStrucSize;
int stepsInCurrentFile;
int numFiles;
int filesLoaded;
int err = 0;
std::vector<forcingFile> forcFileList; // list of steps in file
std::vector<bool> outputFileInitHRU;
std::chrono::time_point<std::chrono::system_clock> readStart;
std::chrono::time_point<std::chrono::system_clock> readEnd;
double readDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> writeStart;
std::chrono::time_point<std::chrono::system_clock> writeEnd;
double writeDuration = 0.0;
};
#endif
\ No newline at end of file
#ifndef FILEACCESSACTOR_H_
#define FILEACCESSACTOR_H_
#include "FileAccess.h"
using namespace caf;
void initalizeFileAccessActor(stateful_actor<file_access_state>* self);
int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, int numStepsToWrite);
int readForcing(stateful_actor<file_access_state>* self, int currentFile);
behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU,
int outputStrucSize, actor parent) {
// Set File_Access_Actor variables
self->state.parent = parent;
self->state.numGRU = numGRU;
self->state.startGRU = startGRU;
self->state.outputStrucSize = outputStrucSize;
initalizeFileAccessActor(self);
return {
[=](initalize_outputStrucure) {
Init_OutputStruct(self->state.handle_forcFileInfo, &self->state.outputStrucSize,
&self->state.numGRU, &self->state.err);
},
[=](write_param, int indxGRU, int indxHRU) {
int err;
err = 0;
Write_HRU_Param(self->state.handle_ncid, &indxGRU, &indxHRU, &err);
if (err != 0) {
aout(self) << "ERROR: Write_HRU_PARAM -- For HRU = " << indxHRU << "\n";
}
},
[=](access_forcing, int currentFile, caf::actor refToRespondTo) {
// aout(self) << "Received Current FIle = " << currentFile << std::endl;
if (currentFile <= self->state.numFiles) {
if(self->state.forcFileList[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1
// aout(self) << "ForcingFile Already Loaded \n";
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
} else {
self->state.readStart = std::chrono::high_resolution_clock::now();
// Load the file
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
}
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
// Check if we have loaded all forcing files
if(self->state.filesLoaded <= self->state.numFiles) {
self->send(self, access_forcing_internal_v, currentFile + 1);
}
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
}
} else {
aout(self) << currentFile << "is larger than expected" << std::endl;
}
},
[=](access_forcing_internal, int currentFile) {
if (self->state.filesLoaded <= self->state.numFiles &&
currentFile <= self->state.numFiles) {
// aout(self) << "Loading in background, File:" << currentFile << "\n";
if (self->state.forcFileList[currentFile - 1].isFileLoaded()) {
aout(self) << "File Loaded when shouldn't be \n";
}
self->state.readStart = std::chrono::high_resolution_clock::now();
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
}
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
self->send(self, access_forcing_internal_v, currentFile + 1);
} else {
aout(self) << "All Forcing Files Loaded \n";
}
},
[=](write_output, int indxGRU, int indxHRU, int numStepsToWrite,
caf::actor refToRespondTo) {
int err;
err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite);
if (err != 0) {
aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n";
} else {
self->send(refToRespondTo, done_write_v);
}
},
[=](read_and_write, int indxGRU, int indxHRU, int numStepsToWrite, int currentFile,
caf::actor refToRespondTo) {
int err;
err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite);
if (err != 0)
aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n";
err = readForcing(self, currentFile);
if (err != 0)
aout(self) << "\nERROR: FILE_ACCESS_ACTOR - READING_FORCING FAILED\n";
// Respond to HRU
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
},
[=](deallocate_structures) {
aout(self) << "Deallocating Structure" << std::endl;
FileAccessActor_DeallocateStructures(self->state.handle_forcFileInfo, self->state.handle_ncid);
self->state.readDuration = self->state.readDuration / 1000;
self->state.writeDuration = self->state.writeDuration / 1000;
self->send(self->state.parent, file_access_actor_done_v, self->state.readDuration,
self->state.writeDuration);
self->quit();
},
[=](reset_outputCounter, int indxGRU) {
resetOutputCounter(&indxGRU);
}
};
}
void initalizeFileAccessActor(stateful_actor<file_access_state>* self) {
int indx = 1;
int err = 0;
// aout(self) << "Set Up the forcing file" << std::endl;
ffile_info_C(&indx, self->state.handle_forcFileInfo, &self->state.numFiles, &err);
if (err != 0) {
aout(self) << "Error: ffile_info_C - HRU = " << indx <<
" - indxGRU = " << indx << " - refGRU = " << std::endl;
self->quit();
}
mDecisions_C(&self->state.num_steps, &err);
if (err != 0) {
aout(self) << "Error: mDecisions - FileAccess Actor " << std::endl;
self->quit();
}
read_pinit_C(&err);
if (err != 0) {
aout(self) << "ERROR: read_pinit_C\n";
}
read_vegitationTables(&err);
if (err != 0) {
aout(self) << "ERROR: read_vegitationTables\n";
}
Create_Output_File(self->state.handle_ncid, &self->state.numGRU, &self->state.startGRU, &err);
if (err != 0) {
aout(self) << "ERROR: Create_OutputFile\n";
}
// initalize vector for knowing if HRU output has init'd
for(int i = 0; i < self->state.numGRU; i++) {
self->state.outputFileInitHRU.push_back(false);
}
self->send(self->state.parent, done_file_access_actor_init_v);
// initalize the forcingFile array
self->state.filesLoaded = 0;
for (int i = 1; i <= self->state.numFiles; i++) {
self->state.forcFileList.push_back(forcingFile(i));
}
}
int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU,
int numStepsToWrite) {
self->state.writeStart = std::chrono::high_resolution_clock::now();
int err = 0;
FileAccessActor_WriteOutput(self->state.handle_ncid,
&numStepsToWrite, &indxGRU, &indxHRU, &err);
self->state.writeEnd = std::chrono::high_resolution_clock::now();
self->state.writeDuration += calculateTime(self->state.writeStart, self->state.writeEnd);
return err;
}
int readForcing(stateful_actor<file_access_state>* self, int currentFile) {
// Check if we have already loaded this file
if(self->state.forcFileList[currentFile -1].isFileLoaded()) {
if (debug)
aout(self) << "ForcingFile Already Loaded \n";
return 0;
} else {
// File Needs to be loaded
self->state.readStart = std::chrono::high_resolution_clock::now();
// Load the file
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
if (debug)
aout(self) << "ERROR: FileAccessActor_ReadForcing\n" <<
"currentFile = " << currentFile << "\n" << "number of steps = "
<< self->state.stepsInCurrentFile << "\n";
return -1;
} else {
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
return 0;
}
}
}
#endif
\ No newline at end of file
#ifndef GRUinfo_H_
#define GRUinfo_H_
#include "caf/all.hpp"
#include <iostream>
#include <fstream>
#include "Job.h"
class GRUinfo {
private:
int refGRU; // This will be the same as the refGRU
int indxGRU;
caf::actor GRU;
// Variable to update
int dt_init;
// Completed Information
int currentAttempt;
int maxAttempts;
bool completed;
bool failed;
// Timing information for the GRU
double runTime;
double initDuration;
double forcingDuration;
double runPhysicsDuration;
double writeOutputDuration;
public:
// Constructor
GRUinfo(int refGRU, int indxGRU, caf::actor gru, int dt_init, int maxAttempts) {
this->refGRU = refGRU;
this->indxGRU = indxGRU;
this->GRU = gru;
this->dt_init = dt_init;
this->currentAttempt = 1;
this->maxAttempts = maxAttempts;
this->completed = false;
this->failed = false;
}
// Deconstructor
~GRUinfo(){};
// Getters
int getRefGRU() {
return this->refGRU;
}
int getIndxGRU() {
return this->indxGRU;
}
int getDt_init() {
return this->dt_init;
}
caf::actor getActor() {
return GRU;
}
// Setters
void updateGRU(caf::actor gru) {
this->GRU = gru;
}
void updateFailed() {
if (this->failed) {
this->failed = false;
} else {
this->failed = true;
}
}
void updateCompletedToTrue(){
this->completed = true;
}
void updateDt_init() {
this->dt_init = this->dt_init * 2;
}
void updateCurrentAttempt() {
this->currentAttempt++;
}
// Methods that return Booleans
bool isMaxAttemptsReached() {
return this->maxAttempts <= this->currentAttempt;
}
bool isFailed() {
return this->failed;
}
bool isCompleted() {
return this->completed;
}
void doneRun(double runTime, double initDuration, double forcingDuration,
double runPhysicsDuration, double writeOutputDuration) {
this->runTime = runTime;
this->initDuration = initDuration;
this->forcingDuration = forcingDuration;
this->runPhysicsDuration = runPhysicsDuration;
this->writeOutputDuration = writeOutputDuration;
}
// Methods for writing statistics to a file
void writeSuccess(std::string fileName) {
std::ofstream file;
file.open(fileName, std::ios_base::app);
file << this->refGRU << ","
<< this->runTime << ","
<< this->initDuration << ","
<< this->forcingDuration << ","
<< this->runPhysicsDuration << ","
<< this->writeOutputDuration << ","
<< this->dt_init << ","
<< this->currentAttempt << "\n";
file.close();
}
void writeFail(std::string fileName) {
std::ofstream file;
file.open(fileName, std::ios_base::app);
file << this->refGRU << ","
<< this->dt_init << ","
<< this->currentAttempt << "\n";
file.close();
}
void printOutput() {
std::cout << "\nGRU = " << this->refGRU << "\n" <<
"RunTime = " << this->runTime << "\n" <<
"initDuration = " << this->initDuration << "\n" <<
"forcingDuration = " << this->forcingDuration << "\n" <<
"runPhysicsDuration = " << this->runPhysicsDuration << "\n" <<
"writeOutputDuration = " << this->writeOutputDuration << "\n\n";
}
};
#endif
\ No newline at end of file
#ifndef HRU_H_
#define HRU_H_
#include "caf/all.hpp"
#include "../interface/fortran_dataTypes.h"
#include "../interface/hru_actor/hru_subroutine_wrappers.h"
#include "messageAtoms.h"
#include <fstream>
#include <string>
#include <typeinfo>
#include <stdio.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <chrono>
#include <iostream>
#include "json.hpp"
#include "global.h"
using namespace caf;
struct hru_state {
// Actor References
caf::actor file_access_actor;
caf::actor parent;
// Info about which HRU we are and our indexes
// into global structures in Fortran
int indxHRU; // index for hru part of derived types in FORTRAN
int indxGRU; // index for gru part of derived types in FORTRAN
int refGRU; // The actual ID of the GRU we are
// Variables for output/forcing structures
int outputStrucSize;
int outputStep;
int stepsInCurrentFFile;
int forcingFileStep;
int currentForcingFile = 1;
// statistics structures
void *handle_forcStat = new_handle_var_dlength(); // model forcing data
void *handle_progStat = new_handle_var_dlength(); // model prognostic (state) variables
void *handle_diagStat = new_handle_var_dlength(); // model diagnostic variables
void *handle_fluxStat = new_handle_var_dlength(); // model fluxes
void *handle_indxStat = new_handle_var_dlength(); // model indices
void *handle_bvarStat = new_handle_var_dlength(); // basin-average variables
// primary data structures (scalars)
void *handle_timeStruct = new_handle_var_i(); // model time data
void *handle_forcStruct = new_handle_var_d(); // model forcing data
void *handle_attrStruct = new_handle_var_d(); // local attributes for each HRU
void *handle_typeStruct = new_handle_var_i(); // local classification of soil veg etc. for each HRU
void *handle_idStruct = new_handle_var_i8();
// primary data structures (variable length vectors)
void *handle_indxStruct = new_handle_var_ilength(); // model indices
void *handle_mparStruct = new_handle_var_dlength(); // model parameters
void *handle_progStruct = new_handle_var_dlength(); // model prognostic (state) variables
void *handle_diagStruct = new_handle_var_dlength(); // model diagnostic variables
void *handle_fluxStruct = new_handle_var_dlength(); // model fluxes
// basin-average structures
void *handle_bparStruct = new_handle_var_d(); // basin-average parameters
void *handle_bvarStruct = new_handle_var_dlength(); // basin-average variables
// ancillary data structures
void *handle_dparStruct = new_handle_var_d(); // default model parameters
// Local hru data
void *handle_ncid = new_handle_var_i(); // output file ids
void *handle_statCounter = new_handle_var_i();
void *handle_outputTimeStep = new_handle_var_i();
void *handle_resetStats = new_handle_flagVec();
void *handle_finalizeStats = new_handle_flagVec();
void *handle_oldTime = new_handle_var_i();
void *handle_refTime = new_handle_var_i();
void *handle_finshTime = new_handle_var_i();
void *handle_startTime = new_handle_var_i();
// Misc Variables
int timestep = 1; // Current Timestep of HRU simulation
int computeVegFlux; // flag to indicate if we are computing fluxes over vegetation
double dt_init; // used to initialize the length of the sub-step for each HRU
double upArea; // area upslope of each HRU
int num_steps = 0; // number of time steps
int forcingStep; // index of current time step in current forcing file
int iFile; // index of current forcing file from forcing file list
int dt_init_factor = 1; // factor of dt_init (coupled_em)
bool printOutput;
int outputFrequency;
// Julian Day variables
double fracJulDay;
double tmZoneOffsetFracDay;
int yearLength;
int err = 0; // error conotrol
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration = 0.0;
std::chrono::time_point<std::chrono::system_clock> initStart;
std::chrono::time_point<std::chrono::system_clock> initEnd;
double initDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> forcingStart;
std::chrono::time_point<std::chrono::system_clock> forcingEnd;
double forcingDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> runPhysicsStart;
std::chrono::time_point<std::chrono::system_clock> runPhysicsEnd;
double runPhysicsDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> writeOutputStart;
std::chrono::time_point<std::chrono::system_clock> writeOutputEnd;
double writeOutputDuration = 0.0;
};
/**
* @brief Get the settings from the settings JSON file
*
* @param self Actor State
* @param configPath Path to the directory that contains the settings file
*/
void parseSettings(stateful_actor<hru_state>* self, std::string configPath);
/**
Function to initalize the HRU for running
*/
void Initialize_HRU(stateful_actor<hru_state>* self);
/**
Function runs all of the hru time_steps
*/
int Run_HRU(stateful_actor<hru_state>* self);
bool check_HRU(stateful_actor<hru_state>* self, int err);
void initalizeTimeVars(stateful_actor<hru_state>* self);
void finalizeTimeVars(stateful_actor<hru_state>* self);
void deallocateHRUStructures(stateful_actor<hru_state>* self);
void printOutput(stateful_actor<hru_state>* self);
#endif
\ No newline at end of file
This diff is collapsed.
#ifndef SUMMACLIENT_H_
#define SUMMACLIENT_H_
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "string.h"
#include <unistd.h>
#include <vector>
#include "FileAccessActor.h"
#include "../interface/job_actor/job_subroutine_wrappers.h"
#include "HRUActor.h"
#include <chrono>
#include "messageAtoms.h"
#include "GRUinfo.h"
#include <iostream>
#include <fstream>
#include <sys/stat.h>
#include "json.hpp"
#include "global.h"
struct job_state {
// Actor References
caf::actor file_access_actor; // actor reference for the file_access_actor
caf::actor parent; // actor reference to the top-level SummaActor
// Job Parameters
int startGRU; // Starting GRU for this job
int numGRU; // Number of GRUs for this job
std::string configPath;
std::string fileManager; // Path of the fileManager.txt file
// Variables for GRU monitoring
int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
int maxRunAttempts = 3; // Max number of attemtps to solve a GRU
std::vector<GRUinfo*> GRUList; // List of all GRUs under this job actor
int numGRUDone = 0; // The number of GRUs that have completed
int GRUInit = 0; // Number of GRUs initalized
int err = 0; // Error Code
int numGRUFailed = 0; // Number of GRUs that have failed
int outputStrucSize;
// Timing Variables
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration;
// Output File Names for Timings
bool outputCSV;
std::string csvOut;
std::string csvPath;
std::string successOutputFile;
std::string failedOutputFile = "failedHRU";
std::string fileAccessActorStats = "fileAccessActor.csv";
};
int parseSettings(stateful_actor<job_state>* self, std::string configPath);
void initJob(stateful_actor<job_state>* self);
void initalizeGRU(stateful_actor<job_state>* self);
void runGRUs(stateful_actor<job_state>* self);
void restartFailures(stateful_actor<job_state>* self);
#endif
\ No newline at end of file
#ifndef SUMMACLIENTACTOR_H_
#define SUMMACLIENTACTOR_H_
#include "Job.h"
using namespace caf;
using json = nlohmann::json;
/**
* @brief First Actor that is spawned that is not the Coordinator Actor.
*
* @param self
* @return behavior
*/
behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
std::string configPath, int outputStrucSize, caf::actor parent) {
self->state.start = std::chrono::high_resolution_clock::now();
// Set Job Variables
self->state.startGRU = startGRU;
self->state.numGRU = numGRU;
self->state.configPath = configPath;
self->state.parent = parent;
self->state.outputStrucSize = outputStrucSize;
if (parseSettings(self, configPath) == -1) {
aout(self) << "ERROR WITH JSON SETTINGS FILE!!!\n";
self->quit();
} else {
aout(self) << "\nSETTINGS FOR JOB_ACTOR\n" <<
"File Manager Path = " << self->state.fileManager << "\n" <<
"outputCSV = " << self->state.outputCSV << "\n";
if (self->state.outputCSV) {
aout(self) << "csvPath = " << self->state.csvPath << "\n";
}
}
// Initalize global variables
initJob(self);
// Spawn the file_access_actor. This will return the number of forcing files we are working with
self->state.file_access_actor = self->spawn(file_access_actor, self->state.startGRU, self->state.numGRU,
self->state.outputStrucSize, self);
aout(self) << "Job Actor Initalized \n";
return {
[=](done_file_access_actor_init) {
// Init GRU Actors and the Output Structure
self->send(self->state.file_access_actor, initalize_outputStrucure_v);
self->send(self, init_hru_v);
},
[=](init_hru) {
initalizeGRU(self);
},
[=](done_init_hru) {
if (debug) {
aout(self) << "Done Init\n";
}
// aout(self) << "Done init\n";
self->state.GRUInit++;
if (self->state.GRUInit < self->state.numGRU) {
self->send(self, init_hru_v);
} else {
aout(self) << "All GRUs are initalized\n";
self->state.GRUInit = 0; // reset counter in case we have failures
runGRUs(self);
}
},
/**
* Message from HRUActor, HRU is done the current forcing file but is not
* done its simulation and needs the next file
* indxGRU - Index into the actor array so we know which HRU this is.
* NOTE: Naming of GRU and HRU is confusing as the plan is to further seperate
* NOTE: For NA_Domain GRU is used as that is how we index the forcing file
*/
[=](done_hru, int indxGRU, double totalDuration, double initDuration,
double forcingDuration, double runPhysicsDuration, double writeOutputDuration) {
aout(self) << "GRU " << indxGRU << " Done\n";
self->state.GRUList[indxGRU - 1]->doneRun(totalDuration, initDuration, forcingDuration,
runPhysicsDuration, writeOutputDuration);
if (self->state.outputCSV) {
self->state.GRUList[indxGRU - 1]->writeSuccess(self->state.successOutputFile);
}
self->state.numGRUDone++;
// Check if we are done
if (self->state.numGRUDone >= self->state.numGRU) {
self->state.numGRUDone = 0; // just in case there were failures
if (self->state.numGRUFailed == 0) {
self->send(self->state.file_access_actor, deallocate_structures_v);
} else {
restartFailures(self);
}
}
},
[=](file_access_actor_done, double readDuration, double writeDuration) {
int err = 0;
if (debug) {
aout(self) << "\n********************************\n";
aout(self) << "Outputing Timing Info for HRUs\n";
for(auto gru : self->state.GRUList) {
gru->printOutput();
}
aout(self) << "********************************\n";
}
// Delete GRUs
for (auto GRU : self->state.GRUList) {
delete GRU;
}
self->state.GRUList.clear();
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration = calculateTime(self->state.start, self->state.end);
aout(self) << "\nTotal Job Duration:\n";
aout(self) << " " << self->state.duration / 1000 << " Seconds\n";
aout(self) << " " << (self->state.duration / 1000) / 60 << " Minutes\n";
aout(self) << " " << ((self->state.duration / 1000) / 60) / 60 << " Hours\n";
aout(self) << "\nReading Duration:\n";
aout(self) << " " << readDuration << " Seconds\n";
aout(self) << "\nWriting Duration:\n";
aout(self) << " " << writeDuration << " Seconds\n\n";
cleanUpJobActor(&err);
// Tell Parent we are done
self->send(self->state.parent, done_job_v, self->state.numGRUFailed);
self->quit();
},
[=](run_failure, int indxGRU, int err) {
aout(self) << "GRU:" << indxGRU << "Failed \n" <<
"Will have to wait until all GRUs are done before it can be re-tried\n";
self->state.numGRUFailed++;
self->state.numGRUDone++;
self->state.GRUList[indxGRU - 1]->updateFailed();
// check if we are the last hru to complete
if (self->state.numGRUDone >= self->state.numGRU) {
restartFailures(self);
}
},
};
}
int parseSettings(stateful_actor<job_state>* self, std::string configPath) {
json settings;
std::string SummaActorsSettigs = "/Summa_Actors_Settings.json";
std::ifstream settings_file(configPath + SummaActorsSettigs);
settings_file >> settings;
settings_file.close();
if (settings.find("JobActor") != settings.end()) {
json JobActorConfig = settings["JobActor"];
// Find the File Manager Path
if (JobActorConfig.find("FileManagerPath") != JobActorConfig.end()) {
self->state.fileManager = JobActorConfig["FileManagerPath"];
} else {
aout(self) << "Error Finding FileManagerPath - Exiting as this is needed\n";
return -1;
}
// Find if we want to outputCSV
if (JobActorConfig.find("outputCSV") != JobActorConfig.end()) {
self->state.outputCSV = JobActorConfig["outputCSV"];
} else {
aout(self) << "Error Finding outputCSV in JSON file - Reverting to Default Value\n";
self->state.outputCSV = false;
}
// Output Path of CSV
if (self->state.outputCSV) {
if (JobActorConfig.find("csvPath") != JobActorConfig.end()) {
self->state.csvPath = JobActorConfig["csvPath"];
} else {
aout(self) << "Error Finding csvPath in JSON file = Reverting to Default Value \n";
self->state.outputCSV = false; // we just choose not to output a csv
}
}
return 0;
} else {
aout(self) << "Error Finding JobActor in JSON file - Exiting as there is no path for the fileManger\n";
return -1;
}
}
void initJob(stateful_actor<job_state>* self) {
std::string success = "Success"; // allows us to build the string
if (self->state.outputCSV) {
std::ofstream file;
self->state.successOutputFile = self->state.csvPath += success +=
std::to_string(self->state.startGRU) += ".csv";
file.open(self->state.successOutputFile, std::ios_base::out);
file << "GRU" << "," << "totalDuration" << "," << "initDuration" << "," <<
"forcingDuration" << "," << "runPhysicsDuration" << "," << "writeOutputDuration" <<
"," << "dt_init" << "," << "numAttemtps" << "\n";
file.close();
}
int totalGRUs = 0;
int totalHRUs = 0;
int numHRUs = 0;
int err = 0;
// aout(self) << "Initalizing Globals \n";
initGlobals(self->state.fileManager.c_str(),
&totalGRUs,
&totalHRUs,
&self->state.numGRU,
&numHRUs,
&self->state.startGRU,
&err);
if (err != 0) {
aout(self) << "Error: initGlobals" << std::endl;
self->quit();
}
}
void initalizeGRU(stateful_actor<job_state>* self) {
int startGRU = self->state.GRUList.size() + self->state.startGRU;
int indexGRU = self->state.GRUList.size() + 1; // Fortran reference starts at 1
auto gru = self->spawn(hru_actor, startGRU, indexGRU,
self->state.configPath, self->state.file_access_actor,
self->state.outputStrucSize, self);
self->state.GRUList.push_back(new GRUinfo(startGRU, indexGRU, gru,
self->state.dt_init_start_factor, self->state.maxRunAttempts));
}
void runGRUs(stateful_actor<job_state>* self) {
for(auto gru : self->state.GRUList) {
if(!gru->isCompleted() && !gru->isFailed()) {
self->send(gru->getActor(), start_hru_v);
}
}
}
void restartFailures(stateful_actor<job_state>* self) {
self->state.numGRU = self->state.numGRUFailed;
self->state.numGRUFailed = 0;
self->state.numGRUDone = 0;
for(auto gru : self->state.GRUList) {
if (gru->isFailed() && !gru->isMaxAttemptsReached()) {
gru->updateFailed();
self->send(self->state.file_access_actor, reset_outputCounter_v, gru->getIndxGRU());
gru->updateDt_init();
auto newGRU = self->spawn(hru_actor, gru->getRefGRU(), gru->getIndxGRU(),
self->state.configPath,self->state.file_access_actor,
self->state.outputStrucSize, self);
gru->updateGRU(newGRU);
gru->updateCurrentAttempt();
self->send(gru->getActor(), dt_init_factor_v, gru->getDt_init());
} else {
aout(self) << "We are done \n";
}
}
}
#endif
#ifndef SUMMAACTOR_H_
#define SUMMAACTOR_H_
#include "SummaManager.h"
using namespace caf;
using json = nlohmann::json;
/**
* Top Level Actor for Summa. This actor recieves the number of GRUs to compute from main and
* divides them into jobs that compute one at a time.
*
* @param startGRU - starting GRU for the simulation
* @param numGRU - total number of GRUs to compute
* @param configPath - location of file information for SUMMA
* @return behavior
*/
behavior summa_actor(stateful_actor<summa_manager>* self, int startGRU, int numGRU, std::string configPath) {
self->state.start = std::chrono::high_resolution_clock::now();
// Set Variables
self->state.startGRU = startGRU;
self->state.numGRU = numGRU;
self->state.configPath = configPath;
parseSettings(self, configPath);
aout(self) << "SETTINGS FOR SUMMA_ACTOR\n";
aout(self) << "Output Structure Size = " << self->state.outputStrucSize << "\n";
aout(self) << "Max GRUs Per Job = " << self->state.maxGRUPerJob << "\n";
// Create the job_actor and start SUMMA
spawnJob(self);
return {
[=](done_job, int numFailed) {
self->state.numFailed += numFailed;
aout(self) << "Job Done\n";
if (self->state.numGRU <= 0) {
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration = calculateTime(self->state.start, self->state.end);
aout(self) << "Total Program Duration:\n";
aout(self) << " " << self->state.duration / 1000 << " Seconds\n";
aout(self) << " " << (self->state.duration / 1000) / 60 << " Minutes\n";
aout(self) << " " << ((self->state.duration / 1000) / 60) / 60 << " Hours\n";
aout(self) << "Program Finished \n";
} else {
// spawn a new job
spawnJob(self);
}
},
};
}
void spawnJob(stateful_actor<summa_manager>* self) {
// Ensure we do not start a job with too many GRUs
if (self->state.numGRU > self->state.maxGRUPerJob) {
// spawn the job actor
self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.maxGRUPerJob,
self->state.configPath, self->state.outputStrucSize, self);
// Update GRU count
self->state.numGRU = self->state.numGRU - self->state.maxGRUPerJob;
self->state.startGRU = self->state.startGRU + self->state.maxGRUPerJob;
} else {
self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.numGRU,
self->state.configPath, self->state.outputStrucSize, self);
self->state.numGRU = 0;
}
}
void parseSettings(stateful_actor<summa_manager>* self, std::string configPath) {
json settings;
std::string SummaActorsSettings = "/Summa_Actors_Settings.json";
std::ifstream settings_file(configPath + SummaActorsSettings);
settings_file >> settings;
settings_file.close();
if (settings.find("SummaActor") != settings.end()) {
json SummaActorConfig = settings["SummaActor"];
// Find the desired OutputStrucSize
if (SummaActorConfig.find("OuputStrucureSize") != SummaActorConfig.end()) {
self->state.outputStrucSize = SummaActorConfig["OuputStrucureSize"];
} else {
aout(self) << "Error Finding OutputStructureSize in JOSN - Reverting to default value\n";
self->state.outputStrucSize = 250;
}
// Find the desired maxGRUPerJob size
if (SummaActorConfig.find("maxGRUPerJob") != SummaActorConfig.end()) {
self->state.maxGRUPerJob = SummaActorConfig["maxGRUPerJob"];
} else {
aout(self) << "Error Finding maxGRUPerJob in JOSN - Reverting to default value\n";
self->state.maxGRUPerJob = 500;
}
} else {
aout(self) << "Error Finding SummaActor in JSON - Reverting to default values\n";
self->state.outputStrucSize = 250;
self->state.maxGRUPerJob = 500;
}
}
#endif
\ No newline at end of file
#ifndef SUMMAMANGER_H_
#define SUMMAMANGER_H_
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "JobActor.h"
#include <iostream>
#include <chrono>
#include <string>
#include "json.hpp"
#include <fstream>
#include "global.h"
struct summa_manager {
// Timing Information
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration;
// Program Parameters
int startGRU; // starting GRU for the simulation
int numGRU; // number of GRUs to compute
std::string configPath;// path to the fileManager.txt file
// Information about the jobs
int numFailed = 0; // Number of jobs that have failed
// Values Set By Summa_Actors_Settings.json
int maxGRUPerJob; // maximum number of GRUs a job can compute at once
int outputStrucSize;
caf::actor currentJob; // Reference to the current job actor
};
/**
* @brief Function to spawn a job actor
*/
void spawnJob(stateful_actor<summa_manager>* self);
void parseSettings(stateful_actor<summa_manager>* self, std::string configPath);
#endif
\ No newline at end of file