Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • gwu479/Summa-Actors
  • numerical_simulations_lab/actors/Summa-Actors
2 results
Show changes
Showing
with 2478 additions and 1771 deletions
#pragma once
#include "caf/all.hpp"
#include <iostream>
#include <fstream>
/*
* Determine the state of the GRU
*/
enum class gru_state {
running,
failed,
succeeded
};
int is_success(const gru_state& state);
/**
* Class that holds information about the running GRUs. This class is held by the job actor
* The GRU/HRU actors that carry out the simulation are held by the GRU class
*/
class GRU {
private:
int global_gru_index; // The index of the GRU in the netcdf file
int local_gru_index; // The index of the GRU within this job
caf::actor gru_actor; // The actor for the GRU
// Modifyable Parameters
int dt_init_factor; // The initial dt for the GRU
double rel_tol; // The relative tolerance for the GRU
double abs_tol; // The absolute tolerance for the GRU
// Status Information
int attempts_left; // The number of attempts left for the GRU to succeed
gru_state state; // The state of the GRU
// Timing Information
double run_time = 0.0; // The total time to run the GRU
double init_duration = 0.0; // The time to initialize the GRU
double forcing_duration = 0.0; // The time to read the forcing data
double run_physics_duration = 0.0; // The time to run the physics
double write_output_duration = 0.0; // The time to write the output
public:
// Constructor
GRU(int global_gru_index, int local_gru_index, caf::actor gru_actor, int dt_init_factor,
double rel_tol, double abs_tol, int max_attempts);
// Deconstructor
~GRU();
// Getters
int getGlobalGRUIndex();
int getLocalGRUIndex();
caf::actor getGRUActor();
double getRunTime();
double getInitDuration();
double getForcingDuration();
double getRunPhysicsDuration();
double getWriteOutputDuration();
double getRelTol();
double getAbsTol();
double getAttemptsLeft();
gru_state getStatus();
bool isFailed();
// Setters
void setRunTime(double run_time);
void setInitDuration(double init_duration);
void setForcingDuration(double forcing_duration);
void setRunPhysicsDuration(double run_physics_duration);
void setWriteOutputDuration(double write_output_duration);
void setRelTol(double rel_tol);
void setAbsTol(double abs_tol);
void setSuccess();
void setFailed();
void setRunning();
void decrementAttemptsLeft();
void setGRUActor(caf::actor gru_actor);
};
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "GRU.hpp"
#include "timing_info.hpp"
#include "settings_functions.hpp"
#include "global.hpp"
#include "json.hpp"
#include "hru_actor.hpp"
#include "message_atoms.hpp"
#include "file_access_actor.hpp"
#include <unistd.h>
#include <limits.h>
/*********************************************
* Job Actor Fortran Functions
*********************************************/
extern "C" {
void job_init_fortran(char const* file_manager, int* start_gru_index, int* num_gru, int* num_hru, int* err);
void deallocateJobActor(int* err);
}
/*********************************************
* Job Actor state variables
*********************************************/
namespace caf {
using chrono_time = std::chrono::time_point<std::chrono::system_clock>;
// Holds information about the GRUs
struct GRU_Container {
std::vector<GRU*> gru_list;
chrono_time gru_start_time; // Vector of start times for each GRU
int num_gru_done = 0;
int num_gru_failed = 0; // number of grus that are waiting to be restarted
int num_gru_in_run_domain = 0; // number of grus we are currently solving for
int run_attempts_left = 1; // current run attempt for all grus
};
struct job_state {
// Actor References
caf::actor file_access_actor; // actor reference for the file_access_actor
caf::actor parent; // actor reference to the top-level SummaActor
// Job Parameters
int start_gru; // Starting GRU for this job
int num_gru; // Number of GRUs for this job
int num_hru;
int max_run_attempts = 1; // Max number of attempts to solve a GRU
GRU_Container gru_container;
// Variables for GRU monitoring
int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
int num_gru_done = 0; // The number of GRUs that have completed
int num_gru_failed = 0; // Number of GRUs that have failed
// Timing Variables
TimingInfo job_timing;
std::string hostname;
// settings for all child actors (save in case we need to recover)
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
/** The Job Actor */
behavior job_actor(stateful_actor<job_state>* self,
int start_gru, int num_gru,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings,
actor parent);
/*********************************************
* Functions for the Job Actor
*********************************************/
/** Get the information for the GRUs that will be written to the netcdf file */
std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts,
std::vector<GRU*> &gru_list);
void handleGRUError(stateful_actor<job_state>* self, caf::actor src);
} // end namespace
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <string>
class Batch {
private:
int batch_id_;
int start_hru_;
int num_hru_;
double run_time_;
double read_time_;
double write_time_;
bool assigned_to_actor_;
bool solved_;
public:
Batch(int batch_id = -1, int start_hru = -1, int num_hru = -1);
// Getters
int getBatchID();
int getStartHRU();
int getNumHRU();
double getRunTime();
double getReadTime();
double getWriteTime();
bool isAssigned();
bool isSolved();
std::string getBatchInfoString();
// Setters
void updateRunTime(double run_time);
void updateReadTime(double read_time);
void updateWriteTime(double write_time);
void updateAssigned(bool boolean);
void updateSolved(bool boolean);
void printBatchInfo();
void writeBatchToFile(std::string csv_output, std::string hostname);
std::string toString();
void assignToActor(std::string hostname, caf::actor assigned_actor);
template <class Inspector>
friend bool inspect(Inspector& inspector, Batch& batch) {
return inspector.object(batch).fields(
inspector.field("batch_id", batch.batch_id_),
inspector.field("start_hru", batch.start_hru_),
inspector.field("num_hru", batch.num_hru_),
inspector.field("run_time", batch.run_time_),
inspector.field("read_time", batch.read_time_),
inspector.field("write_time", batch.write_time_),
inspector.field("assigned_to_actor", batch.assigned_to_actor_),
inspector.field("solved", batch.solved_));
}
};
\ No newline at end of file
#include "caf/all.hpp"
#pragma once
#include "client.hpp"
class Batch_Container {
private:
int start_hru_;
int total_hru_count_;
int num_hru_per_batch_;
int batches_remaining_;
std::vector<Batch> batch_list_;
// Assemble the total number of HRUs given by the user into batches.
void assembleBatches();
public:
// Creating the batch_manager will also create the batches
// with the two parameters that are passed in.
Batch_Container(int start_hru = 1, int total_hru_count = 0,
int num_hru_per_batch = 0);
// returns the size of the batch list
int getBatchesRemaining();
int getTotalBatches();
// Find an unsolved batch, set it to assigned and return it.
std::optional<Batch> getUnsolvedBatch();
// Update the batch status to solved and write the output to a file.
void updateBatch_success(Batch successful_batch, std::string output_csv, std::string hostname);
// Update the batch status but do not write the output to a file.
void updateBatch_success(Batch successful_batch);
// Update batch by id
void updateBatch_success(int batch_id, double run_time, double read_time,
double write_time);
// Update the batch to assigned = true
void setBatchAssigned(Batch batch);
// Update the batch to assigned = false
void setBatchUnassigned(Batch batch);
// Check if there are batches left to solve
bool hasUnsolvedBatches();
// TODO: Needs implementation
void updateBatch_failure(Batch failed_batch);
std::string getAllBatchInfoString();
double getTotalReadTime();
double getTotalWriteTime();
/**
* A client has found to be disconnected. Unassign all batches
* that were assigned to the disconnected client. The client id
* is passed in as a parameter
*/
void updatedBatch_disconnectedClient(int client_id);
/**
* Create the csv file for the completed batches.
*/
void inititalizeCSVOutput(std::string csv_output_name);
/**
* @brief Print the batches from the batch list
*
*/
void printBatches();
std::string getBatchesAsString();
/**
* @brief Find the batch with the batch_id parameter
* update the batches assigned actor member variable to false
*
*/
void updateBatchStatus_LostClient(int batch_id);
template <class Inspector>
friend bool inspect(Inspector& inspector, Batch_Container& batch_container) {
return inspector.object(batch_container).fields(
inspector.field("total_hru_count", batch_container.total_hru_count_),
inspector.field("num_hru_per_batch", batch_container.num_hru_per_batch_),
inspector.field("batches_remaining", batch_container.batches_remaining_),
inspector.field("batch_list", batch_container.batch_list_));
}
};
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <optional>
#include "batch.hpp"
class Client {
private:
caf::actor client_actor;
std::string hostname;
int id;
int batches_solved;
std::optional<Batch> current_batch;
public:
Client(int id = -1, caf::actor client_actor = nullptr, std::string hostname = "");
// ####################################################################
// Getters
// ####################################################################
caf::actor getActor();
int getID();
std::string getHostname();
std::optional<Batch> getBatch();
// ####################################################################
// Setters
// ####################################################################
void setBatch(std::optional<Batch> batch);
// ####################################################################
// Methods
// ####################################################################
std::string toString();
// Serialization so CAF can send an object of this class to another actor
template <class Inspector>
friend bool inspect(Inspector& inspector, Client& client) {
return inspector.object(client).fields(
inspector.field("client_actor",client.client_actor),
inspector.field("hostname",client.hostname),
inspector.field("id",client.id),
inspector.field("batches_solved",client.batches_solved),
inspector.field("current_batch",client.current_batch));
}
};
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <vector>
#include "batch.hpp"
#include "client.hpp"
#include <optional>
class Client_Container {
private:
std::vector<Client> client_list;
int id_counter;
public:
Client_Container();
// ####################################################################
// Getters
// ####################################################################
int getNumClients();
std::vector<Client> getClientList();
std::optional<Client> getClient(caf::actor_addr client_ref);
// ####################################################################
// Setters
// ####################################################################
void setBatchForClient(caf::actor client_ref, std::optional<Batch> batch);
// ####################################################################
// Methods
// ####################################################################
// add a new client to the client_list
void addClient(caf::actor client_actor, std::string hostname);
// remove a client from the client_list
void removeClient(Client client);
// return a client that is not solving a batch or return an empty optional
std::optional<Client> getIdleClient();
// Check if the client list is empty
bool isEmpty();
// pops client at the end of the list
Client removeClient_fromBack();
// return printable string
std::string toString();
template <class Inspector>
friend bool inspect(Inspector& inspector, Client_Container& client_container) {
return inspector.object(client_container).fields(
inspector.field("client_list", client_container.client_list),
inspector.field("id_counter", client_container.id_counter));
}
};
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "timing_info.hpp"
#include "settings_functions.hpp"
#include "batch_container.hpp"
#include <chrono>
#include <string>
#include <vector>
namespace caf {
struct job_timing_info {
std::vector<double> job_duration;
std::vector<double> job_read_duration;
std::vector<double> job_write_duration;
};
struct summa_actor_state {
// Timing Information For Summa-Actor
TimingInfo summa_actor_timing;
struct job_timing_info timing_info_for_jobs;
// Program Parameters
int startGRU; // starting GRU for the simulation
int numGRU; // number of GRUs to compute
int fileGRU; // number of GRUs in the file
std::string configPath; // path to the fileManager.txt file
int numFailed = 0; // Number of jobs that have failed
caf::actor currentJob; // Reference to the current job actor
caf::actor parent;
// Batches
Batch_Container batch_container;
int current_batch_id;
// settings for all child actors (save in case we need to recover)
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
behavior summa_actor(stateful_actor<summa_actor_state>* self,
int startGRU, int numGRU,
Summa_Actor_Settings summa_actor_settings,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings, actor parent);
void spawnJob(stateful_actor<summa_actor_state>* self);
} // namespace caf
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "summa_server.hpp"
#include <string>
#include <unistd.h>
#include <limits.h>
namespace caf {
// Inital behaviour that waits to connect to the lead server
behavior summa_backup_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings,
Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings);
// Function that is called ot connect to the lead server
void connecting_backup(stateful_actor<summa_server_state>* self, const std::string& host, uint16_t port);
behavior summa_backup_server(stateful_actor<summa_server_state>* self, const actor& server_actor);
}
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "settings_functions.hpp"
#include "batch.hpp"
#include "summa_actor.hpp"
#include "message_atoms.hpp"
#include <string>
#include <optional>
#include <unistd.h>
#include <limits.h>
namespace caf {
struct summa_client_state {
strong_actor_ptr current_server = nullptr;
actor current_server_actor;
std::vector<strong_actor_ptr> servers;
std::string hostname;
actor summa_actor_ref;
uint16_t port;
int batch_id;
int client_id; // id held by server
bool running = false; // initalized to false - flipped to true when client returns behavior summa_client
// tuple is the actor ref and hostname of the backup server
std::vector<std::tuple<caf::actor, std::string>> backup_servers_list;
Batch current_batch;
bool saved_batch = false;
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
behavior summa_client_init(stateful_actor<summa_client_state>* self);
behavior summa_client(stateful_actor<summa_client_state>* self);
void connecting(stateful_actor<summa_client_state>*, const std::string& host, uint16_t port);
void findLeadServer(stateful_actor<summa_client_state>* self, strong_actor_ptr serv);
}
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "batch.hpp"
#include "batch_container.hpp"
#include "client.hpp"
#include "client_container.hpp"
#include "settings_functions.hpp"
#include "global.hpp"
#include "message_atoms.hpp"
#include <string>
#include <optional>
#include <thread>
#include <chrono>
#include <iostream>
namespace caf {
struct summa_server_state {
strong_actor_ptr current_server; // if server is a backup then this will be set to the lead server
actor current_server_actor;
std::string hostname;
std::string csv_file_path;
std::string csv_output_name = "/batch_results.csv";
Client_Container client_container;
Batch_Container batch_container;
// Actor Reference, Hostname
std::vector<std::tuple<caf::actor, std::string>> backup_servers_list;
// Settings Structures
Distributed_Settings distributed_settings;
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
// Summa Server setup behaviour - initializes the state for the server
behavior summa_server_init(stateful_actor<summa_server_state>* self,
Distributed_Settings distributed_settings,
Summa_Actor_Settings summa_actor_settings,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings);
// Summa Server behaviour - handles messages from clients
behavior summa_server(stateful_actor<summa_server_state>* self);
// Summa Server backup behaviour - handles the exit messages for clients
behavior summa_server_exit(stateful_actor<summa_server_state>* self);
// Creates the csv file that holds the results of the batches
void initializeCSVOutput(std::string csv_output_path);
// Send all connected actors the updated backup servers list
void sendAllBackupServersList(stateful_actor<summa_server_state>* self);
// Look for the lost backup server in the backup servers list and remove it
void findAndRemoveLostBackupServer(stateful_actor<summa_server_state>* self, actor_addr lost_backup_server);
// Check for an idle client to send the failed or next batch we find that is not assigned
void checkForIdleClients(stateful_actor<summa_server_state>* self);
void notifyBackupServersOfRemovedClient(stateful_actor<summa_server_state>* self, Client client);
// Finds the batch the lost client was working on and reassigns it to another client if available
// If no client is available then the batch is added back to the list to be reassigned later
void resolveLostClient(stateful_actor<summa_server_state>* self, Client client);
// Removes the backup server from the list of backup servers
// All connected actors are then notified of the change
void resolveLostBackupServer(stateful_actor<summa_server_state>* self, const down_msg& dm);
// Convience function to keep code clean - just does what you think it does
void printRemainingBatches(stateful_actor<summa_server_state>* self);
} // namespace caf
#### parent directory of the 'build' directory ####
# F_MASTER =
#### fortran compiler ####
# FC =
#### C++ compiler ####
# CC =
#### Includes AND Libraries ####
# INCLUDES =
# LIBRARIES =
# ACTORS_INCLUDES =
# ACTORS_LIBRARIES =
# Production runs
FLAGS_NOAH = -O3 -ffree-form -ffree-line-length-none -fmax-errors=0 -fPIC
FLAGS_COMM = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
FLAGS_SUMMA = -O3 -ffree-line-length-none -fmax-errors=0 -fPIC
FLAGS_ACTORS = -O3
# # Debug runs
# FLAGS_NOAH = -pg -g -O0 -ffree-form -ffree-line-length-none -fmax-errors=0 -fbacktrace -Wno-unused -Wno-unused-dummy-argument -fPIC
# FLAGS_COMM = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
# FLAGS_SUMMA = -pg -g -O0 -Wall -ffree-line-length-none -fmax-errors=0 -fbacktrace -fcheck=bounds -fPIC
# FLAGS_ACTORS = -pg -g -O0 -Wall
#========================================================================
# PART 1: Define directory paths
#========================================================================
# Core directory that contains source code
F_KORE_DIR = $(F_MASTER)/build/source
# Location of the compiled modules
MOD_PATH = $(F_MASTER)/build
# Define the directory for the executables
EXE_PATH = $(F_MASTER)/bin
#========================================================================
# PART 2: Assemble all of the SUMMA sub-routines
#========================================================================
# Define directories
DRIVER_DIR = $(F_KORE_DIR)/driver
HOOKUP_DIR = $(F_KORE_DIR)/hookup
NETCDF_DIR = $(F_KORE_DIR)/netcdf
DSHARE_DIR = $(F_KORE_DIR)/dshare
NUMREC_DIR = $(F_KORE_DIR)/numrec
NOAHMP_DIR = $(F_KORE_DIR)/noah-mp
ENGINE_DIR = $(F_KORE_DIR)/engine
ACTORS_DIR = $(F_KORE_DIR)/actors
JOB_ACTOR_DIR = $(ACTORS_DIR)/job_actor
FILE_ACCESS_DIR = $(ACTORS_DIR)/file_access_actor
HRU_ACTOR_DIR = $(ACTORS_DIR)/hru_actor
# utilities
SUMMA_NRUTIL= \
nrtype.f90 \
f2008funcs.f90 \
nr_utility.f90
NRUTIL = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_NRUTIL))
# Numerical recipes procedures
# NOTE: all numerical recipes procedures are now replaced with free versions
SUMMA_NRPROC= \
expIntegral.f90 \
spline_int.f90
NRPROC = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_NRPROC))
# Hook-up modules (set files and directory paths)
SUMMA_HOOKUP= \
ascii_util.f90 \
summaActors_FileManager.f90
HOOKUP = $(patsubst %, $(HOOKUP_DIR)/%, $(SUMMA_HOOKUP))
# Data modules
SUMMA_DATAMS= \
multiconst.f90 \
var_lookup.f90 \
data_types.f90 \
globalData.f90 \
flxMapping.f90 \
get_ixname.f90 \
popMetadat.f90 \
outpt_stat.f90
DATAMS = $(patsubst %, $(DSHARE_DIR)/%, $(SUMMA_DATAMS))
# utility modules
SUMMA_UTILMS= \
time_utils.f90 \
mDecisions.f90 \
snow_utils.f90 \
soil_utils.f90 \
updatState.f90 \
matrixOper.f90
UTILMS = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_UTILMS))
# Model guts
SUMMA_MODGUT= \
MODGUT = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_MODGUT))
# Solver
SUMMA_SOLVER= \
vegPhenlgy.f90 \
diagn_evar.f90 \
stomResist.f90 \
groundwatr.f90 \
vegSWavRad.f90 \
vegNrgFlux.f90 \
ssdNrgFlux.f90 \
vegLiqFlux.f90 \
snowLiqFlx.f90 \
soilLiqFlx.f90 \
bigAquifer.f90 \
computFlux.f90 \
computResid.f90 \
computJacob.f90 \
eval8summa.f90 \
summaSolve.f90 \
systemSolv.f90 \
varSubstep.f90 \
opSplittin.f90 \
coupled_em.f90
SOLVER = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_SOLVER))
# Interface code for Fortran-C++
SUMMA_INTERFACE= \
cppwrap_datatypes.f90 \
cppwrap_auxiliary.f90 \
cppwrap_metadata.f90 \
INTERFACE = $(patsubst %, $(ACTORS_DIR)/global/%, $(SUMMA_INTERFACE))
SUMMA_FILEACCESS_INTERFACE = \
initOutputStruc.f90 \
deallocateOutputStruc.f90 \
cppwrap_fileAccess.f90
FILEACCESS_INTERFACE = $(patsubst %, $(FILE_ACCESS_DIR)/%, $(SUMMA_FILEACCESS_INTERFACE))
SUMMA_JOB_INTERFACE = \
cppwrap_job.f90
JOB_INTERFACE = $(patsubst %, $(JOB_ACTOR_DIR)/%, $(SUMMA_JOB_INTERFACE))
SUMMA_HRU_INTERFACE = \
cppwrap_hru.f90
HRU_INTERFACE = $(patsubst %, $(HRU_ACTOR_DIR)/%, $(SUMMA_HRU_INTERFACE))
# Define routines for SUMMA preliminaries
SUMMA_PRELIM= \
conv_funcs.f90 \
sunGeomtry.f90 \
convE2Temp.f90 \
allocspaceActors.f90 \
alloc_file_access.f90\
checkStruc.f90 \
childStruc.f90 \
ffile_info.f90 \
read_attribute.f90 \
read_pinit.f90 \
pOverwrite.f90 \
read_paramActors.f90 \
paramCheck.f90 \
check_icondActors.f90 \
# allocspace.f90
PRELIM = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_PRELIM))
SUMMA_NOAHMP= \
module_model_constants.F \
module_sf_noahutl.F \
module_sf_noahlsm.F \
module_sf_noahmplsm.F
NOAHMP = $(patsubst %, $(NOAHMP_DIR)/%, $(SUMMA_NOAHMP))
# Define routines for the SUMMA model runs
SUMMA_MODRUN = \
indexState.f90 \
getVectorz.f90 \
updateVars.f90 \
var_derive.f90 \
read_forcingActors.f90 \
access_forcing.f90\
access_write.f90 \
derivforce.f90 \
snowAlbedo.f90 \
canopySnow.f90 \
tempAdjust.f90 \
snwCompact.f90 \
layerMerge.f90 \
layerDivide.f90 \
volicePack.f90 \
qTimeDelay.f90
MODRUN = $(patsubst %, $(ENGINE_DIR)/%, $(SUMMA_MODRUN))
# Define NetCDF routines
# OutputStrucWrite is not a netcdf subroutine and should be
# moved
SUMMA_NETCDF = \
netcdf_util.f90 \
def_output.f90 \
outputStrucWrite.f90 \
writeOutput.f90 \
read_icondActors.f90
NETCDF = $(patsubst %, $(NETCDF_DIR)/%, $(SUMMA_NETCDF))
# ... stitch together common programs
COMM_ALL = $(NRUTIL) $(NRPROC) $(HOOKUP) $(DATAMS) $(UTILMS)
# ... stitch together SUMMA programs
SUMMA_ALL = $(NETCDF) $(PRELIM) $(MODRUN) $(SOLVER)
# Define the driver routine
SUMMA_DRIVER= \
summaActors_type.f90 \
summaActors_util.f90 \
summaActors_globalData.f90 \
summaActors_init.f90 \
SummaActors_setup.f90 \
summaActors_restart.f90 \
summaActors_forcing.f90 \
SummaActors_modelRun.f90 \
summaActors_alarms.f90 \
summaActors_wOutputStruc.f90
DRIVER = $(patsubst %, $(DRIVER_DIR)/%, $(SUMMA_DRIVER))
ACTORC = $(F_KORE_DIR)/actors/main.cc
ACTOR_TEST = $(F_KORE_DIR)/testing/testing_main.cc
#========================================================================
# PART 3: compilation
#======================================================================
# Compile
all: lib main
lib: compile_noah compile_comm compile_summa link clean
main: actors actorsLink actorsClean
test: actors_test actors_testLink actorsClean
# compile Noah-MP routines
compile_noah:
$(FC) $(FLAGS_NOAH) -c $(NRUTIL) $(NOAHMP)
# compile common routines
compile_comm:
$(FC) $(FLAGS_COMM) -c $(COMM_ALL) $(INCLUDES)
# compile SUMMA routines
compile_summa:
$(FC) $(FLAGS_SUMMA) -c $(SUMMA_ALL) $(DRIVER) $(INTERFACE) $(JOB_INTERFACE) $(FILEACCESS_INTERFACE) $(HRU_INTERFACE) $(INCLUDES)
# generate library
link:
$(FC) -shared *.o -o libsumma.so
mv libsumma.so $(F_MASTER)/bin
# compile the c++ portion of SummaActors
actors:
$(CC) $(FLAGS_ACTORS) -c $(ACTORC) -std=c++17 $(ACTORS_INCLUDES)
actorsLink:
$(CC) -o summaMain *.o $(ACTORS_LIBRARIES)
mv summaMain $(F_MASTER)/bin
#### COMPILE TESTING FILES ####
actors_test:
$(CC) $(FLAGS_ACTORS) -c $(ACTOR_TEST) -std=c++17 $(ACTORS_INCLUDES)
actors_testLink:
$(CC) -o summaTest *.o $(ACTORS_LIBRARIES)
actorsClean:
rm *.o
# Remove object files
clean:
rm -f *.o *.mod soil_veg_gen_parm__genmod.f90
clean_lib:
rm *.so
#ifndef FILEACCESS_H_
#define FILEACCESS_H_
#include "caf/all.hpp"
#include "../global/fortran_dataTypes.h"
#include "../global/messageAtoms.h"
#include "../global/global.h"
#include "../global/json.hpp"
#include "fileAccess_subroutine_wrappers.h"
#include "OutputManager.h"
#include <vector>
#include <chrono>
class forcingFile {
private:
int fileID; // which file are we relative the forcing file list saved in fortran
int numSteps; // the number of steps in this forcing file
bool isLoaded; // is this file actually loaded in to RAM yet.
public:
forcingFile(int fileID) {
this->fileID = fileID;
this->numSteps = 0;
this->isLoaded = false;
}
int getNumSteps() {
return this->numSteps;
}
bool isFileLoaded() {
return this->isLoaded;
}
void updateIsLoaded() {
this->isLoaded = true;
}
void updateNumSteps(int numSteps) {
this->numSteps = numSteps;
this->isLoaded = true;
}
};
struct file_access_state {
// Variables set on Spwan
caf::actor parent;
int startGRU;
int numGRU;
void *handle_forcFileInfo = new_handle_file_info(); // Handle for the forcing file information
void *handle_ncid = new_handle_var_i(); // output file ids
OutputManager *output_manager;
int num_vectors_in_output_manager;
int num_steps;
int outputStrucSize;
int stepsInCurrentFile;
int numFiles;
int filesLoaded;
int err = 0;
std::vector<forcingFile> forcFileList; // list of steps in file
std::vector<bool> outputFileInitHRU;
std::chrono::time_point<std::chrono::system_clock> readStart;
std::chrono::time_point<std::chrono::system_clock> readEnd;
double readDuration = 0.0;
std::chrono::time_point<std::chrono::system_clock> writeStart;
std::chrono::time_point<std::chrono::system_clock> writeEnd;
double writeDuration = 0.0;
};
#endif
\ No newline at end of file
#ifndef FILEACCESSACTOR_H_
#define FILEACCESSACTOR_H_
#include "FileAccess.h"
using namespace caf;
using json = nlohmann::json;
void initalizeFileAccessActor(stateful_actor<file_access_state>* self);
int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU, int numStepsToWrite, int returnMessage, caf::actor actorRef);
int readForcing(stateful_actor<file_access_state>* self, int currentFile);
int write(stateful_actor<file_access_state>* self, int listIndex);
int parseSettings(stateful_actor<file_access_state>* self, std::string configPath);
behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU,
int outputStrucSize, std::string configPath, actor parent) {
// Set File_Access_Actor variables
self->state.parent = parent;
self->state.numGRU = numGRU;
self->state.startGRU = startGRU;
self->state.outputStrucSize = outputStrucSize;
// Get Settings from configuration file
if (parseSettings(self, configPath) == -1) {
aout(self) << "Error with JSON Settings File!!!\n";
self->quit();
} else {
aout(self) << "\nSETTINGS FOR FILE_ACCESS_ACTOR\n" <<
"Number of Vectors in Output Structure = " << self->state.num_vectors_in_output_manager << "\n";
}
initalizeFileAccessActor(self);
return {
[=](initalize_outputStructure) {
aout(self) << "Initalizing Output Structure" << std::endl;
Init_OutputStruct(self->state.handle_forcFileInfo, &self->state.outputStrucSize,
&self->state.numGRU, &self->state.err);
},
[=](write_param, int indxGRU, int indxHRU) {
int err;
err = 0;
Write_HRU_Param(self->state.handle_ncid, &indxGRU, &indxHRU, &err);
if (err != 0) {
aout(self) << "ERROR: Write_HRU_PARAM -- For HRU = " << indxHRU << "\n";
}
},
[=](access_forcing, int currentFile, caf::actor refToRespondTo) {
// aout(self) << "Received Current FIle = " << currentFile << std::endl;
if (currentFile <= self->state.numFiles) {
if(self->state.forcFileList[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1
// aout(self) << "ForcingFile Already Loaded \n";
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
} else {
self->state.readStart = std::chrono::high_resolution_clock::now();
// Load the file
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
}
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
// Check if we have loaded all forcing files
if(self->state.filesLoaded <= self->state.numFiles) {
self->send(self, access_forcing_internal_v, currentFile + 1);
}
self->send(refToRespondTo, run_hru_v,
self->state.forcFileList[currentFile - 1].getNumSteps());
}
} else {
aout(self) << currentFile << " is larger than expected" << std::endl;
}
},
[=](access_forcing_internal, int currentFile) {
if (self->state.filesLoaded <= self->state.numFiles &&
currentFile <= self->state.numFiles) {
// aout(self) << "Loading in background, File:" << currentFile << "\n";
if (self->state.forcFileList[currentFile - 1].isFileLoaded()) {
aout(self) << "File Loaded when shouldn't be \n";
}
self->state.readStart = std::chrono::high_resolution_clock::now();
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
}
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
self->send(self, access_forcing_internal_v, currentFile + 1);
} else {
aout(self) << "All Forcing Files Loaded \n";
}
},
[=](write_output, int indxGRU, int indxHRU, int numStepsToWrite,
caf::actor refToRespondTo) {
int err;
int returnMessage = 9999;
err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite, returnMessage, refToRespondTo);
if (err != 0) {
aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n";
}
},
[=](read_and_write, int indxGRU, int indxHRU, int numStepsToWrite, int currentFile,
caf::actor refToRespondTo) {
int err;
err = readForcing(self, currentFile);
if (err != 0)
aout(self) << "\nERROR: FILE_ACCESS_ACTOR - READING_FORCING FAILED\n";
err = writeOutput(self, indxGRU, indxHRU, numStepsToWrite, currentFile, refToRespondTo);
if (err != 0)
aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n";
},
[=](run_failure, int indxGRU) {
int listIndex;
// update the list in Fortran
updateFailed(&indxGRU);
listIndex = self->state.output_manager->decrementMaxSize(indxGRU);
// Check if this list is now full
if(self->state.output_manager->isFull(listIndex)) {
write(self, listIndex);
}
},
/**
* Message from JobActor
* OutputManager needs to be adjusted so the failed HRUs can run again
*/
[=](restart_failures) {
resetFailedArray();
self->state.output_manager->restartFailures();
},
[=](deallocate_structures) {
aout(self) << "Deallocating Structure" << std::endl;
FileAccessActor_DeallocateStructures(self->state.handle_forcFileInfo, self->state.handle_ncid);
self->state.readDuration = self->state.readDuration / 1000; // Convert to milliseconds
self->state.readDuration = self->state.readDuration / 1000; // Convert to seconds
self->state.writeDuration = self->state.writeDuration / 1000; // Convert to milliseconds
self->state.writeDuration = self->state.writeDuration / 1000; // Convert to milliseconds
self->send(self->state.parent, file_access_actor_done_v, self->state.readDuration,
self->state.writeDuration);
self->quit();
},
[=](reset_outputCounter, int indxGRU) {
resetOutputCounter(&indxGRU);
}
};
}
void initalizeFileAccessActor(stateful_actor<file_access_state>* self) {
int indx = 1;
int err = 0;
// aout(self) << "Set Up the forcing file" << std::endl;
ffile_info_C(&indx, self->state.handle_forcFileInfo, &self->state.numFiles, &err);
if (err != 0) {
aout(self) << "Error: ffile_info_C - File_Access_Actor \n";
std::string function = "ffile_info_C";
self->send(self->state.parent, file_access_actor_err_v, function);
self->quit();
return;
}
mDecisions_C(&self->state.num_steps, &err);
if (err != 0) {
aout(self) << "Error: mDecisions - FileAccess Actor \n";
std::string function = "mDecisions_C";
self->send(self->state.parent, file_access_actor_err_v, function);
self->quit();
return;
}
read_pinit_C(&err);
if (err != 0) {
aout(self) << "ERROR: read_pinit_C\n";
std::string function = "read_pinit_C";
self->send(self->state.parent, file_access_actor_err_v, function);
self->quit();
return;
}
read_vegitationTables(&err);
if (err != 0) {
aout(self) << "ERROR: read_vegitationTables\n";
std::string function = "read_vegitationTables";
self->send(self->state.parent, file_access_actor_err_v, function);
self->quit();
return;
}
initFailedHRUTracker(&self->state.numGRU);
Create_Output_File(self->state.handle_ncid, &self->state.numGRU, &self->state.startGRU, &err);
if (err != 0) {
aout(self) << "ERROR: Create_OutputFile\n";
std::string function = "Create_Output_File";
self->send(self->state.parent, file_access_actor_err_v, function);
self->quit();
return;
}
// Initalize the output Structure
aout(self) << "Initalizing Output Structure" << std::endl;
Init_OutputStruct(self->state.handle_forcFileInfo, &self->state.outputStrucSize,
&self->state.numGRU, &self->state.err);
// Initalize the output manager
self->state.output_manager = new OutputManager(self->state.num_vectors_in_output_manager, self->state.numGRU);
self->send(self->state.parent, done_file_access_actor_init_v);
// initalize the forcingFile array
self->state.filesLoaded = 0;
for (int i = 1; i <= self->state.numFiles; i++) {
self->state.forcFileList.push_back(forcingFile(i));
}
}
int write(stateful_actor<file_access_state>* self, int listIndex) {
int err = 0;
int minGRU = self->state.output_manager->getMinIndex(listIndex);
int maxGRU = self->state.output_manager->getMaxIndex(listIndex);
int numStepsToWrite = self->state.output_manager->getNumStepsToWrite(listIndex);
FileAccessActor_WriteOutput(self->state.handle_ncid,
&numStepsToWrite, &minGRU,
&maxGRU, &err);
// Pop The actors and send them the correct continue message
while(!self->state.output_manager->isEmpty(listIndex)) {
std::tuple<caf::actor, int> actor = self->state.output_manager->popActor(listIndex);
if (get<1>(actor) == 9999) {
self->send(get<0>(actor), done_write_v);
} else {
self->send(get<0>(actor), run_hru_v,
self->state.forcFileList[get<1>(actor) - 1].getNumSteps());
}
}
return 0;
}
int writeOutput(stateful_actor<file_access_state>* self, int indxGRU, int indxHRU,
int numStepsToWrite, int returnMessage, caf::actor actorRef) {
self->state.writeStart = std::chrono::high_resolution_clock::now();
if (debug) {
aout(self) << "Recieved Write Request From GRU: " << indxGRU << "\n";
}
int err = 0;
int listIndex = self->state.output_manager->addActor(actorRef, indxGRU, returnMessage, numStepsToWrite);
if (self->state.output_manager->isFull(listIndex)) {
if (debug) {
aout(self) << "List with Index " << listIndex << " is full and ready to write\n";
aout(self) << "Minimum GRU Index = " << self->state.output_manager->getMinIndex(listIndex) << "\n";
aout(self) << "Maximum GRU Index = " << self->state.output_manager->getMaxIndex(listIndex) << "\n";
}
err = write(self, listIndex);
} else {
if (debug) {
aout(self) << "List with Index " << listIndex << " is not full yet waiting to write\n";
aout(self) << "Size of list is " << self->state.output_manager->getSize(listIndex) << "\n";
}
}
self->state.writeEnd = std::chrono::high_resolution_clock::now();
self->state.writeDuration += calculateTime(self->state.writeStart, self->state.writeEnd);
return err;
}
int readForcing(stateful_actor<file_access_state>* self, int currentFile) {
// Check if we have already loaded this file
if(self->state.forcFileList[currentFile -1].isFileLoaded()) {
if (debug)
aout(self) << "ForcingFile Already Loaded \n";
return 0;
} else {
// File Needs to be loaded
self->state.readStart = std::chrono::high_resolution_clock::now();
// Load the file
FileAccessActor_ReadForcing(self->state.handle_forcFileInfo, &currentFile,
&self->state.stepsInCurrentFile, &self->state.startGRU,
&self->state.numGRU, &self->state.err);
if (self->state.err != 0) {
if (debug)
aout(self) << "ERROR: FileAccessActor_ReadForcing\n" <<
"currentFile = " << currentFile << "\n" << "number of steps = "
<< self->state.stepsInCurrentFile << "\n";
return -1;
} else {
self->state.filesLoaded += 1;
self->state.forcFileList[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.readEnd = std::chrono::high_resolution_clock::now();
self->state.readDuration += calculateTime(self->state.readStart, self->state.readEnd);
return 0;
}
}
}
int parseSettings(stateful_actor<file_access_state>* self, std::string configPath) {
json settings;
std::string SummaActorsSettigs = "/Summa_Actors_Settings.json";
std::ifstream settings_file(configPath + SummaActorsSettigs);
settings_file >> settings;
settings_file.close();
if (settings.find("FileAccessActor") != settings.end()) {
json FileAccessActorConfig = settings["FileAccessActor"];
// Find the File Manager Path
if (FileAccessActorConfig.find("num_vectors_in_output_manager") != FileAccessActorConfig.end()) {
self->state.num_vectors_in_output_manager = FileAccessActorConfig["num_vectors_in_output_manager"];
} else {
aout(self) << "Error Finding FileManagerPath - Exiting as this is needed\n";
return -1;
}
return 0;
} else {
aout(self) << "Error Finding JobActor in JSON file - Exiting as there is no path for the fileManger\n";
return -1;
}
}
#endif
\ No newline at end of file
#ifndef OutputManager_H_
#define OutputManager_H_
#include "caf/all.hpp"
#include <vector>
#include <algorithm>
/**
* @brief Basic Container class to hold actor references. This has a size component for checking when it is full.
*
*/
class ActorRefList {
private:
int numStepsToWrite; // We can save this value here so that we know how many steps to write
int currentSize;
unsigned int maxSize;
int minIndex = -1; // minimum index of the actor being stored on this list
int maxIndex = 0; // maximum index of the actor being stored on this list
std::vector<std::tuple<caf::actor, int>> list;
public:
// Constructor
ActorRefList(int maxSize){
this->currentSize = 0;
this->maxSize = maxSize;
}
// Deconstructor
~ActorRefList(){};
int getMaxIndex() {
return this->maxIndex;
}
int getMinIndex() {
return this->minIndex;
}
int getCurrentSize() {
return this->currentSize;
}
int getMaxSize() {
return this->maxSize;
}
int getNumStepsToWrite() {
return this->numStepsToWrite;
}
bool isFull() {
return list.size() == this->maxSize;
}
/**
* Adds An Actor and its return message as a tuple to this->list
* actor - the actor ref of the actor being added to this->list
* returnMessage - Either 9999 (place holder and specifies to send a done_write_v message) or
* this is the current forcingFileList index that allows the file_access actor to know the number
* of steps the HRU actor that needs to compute
*/
void addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) {
if (this->isFull()) {
throw "List is full, cannot add actor to this list";
}
if (index > this->maxIndex) {
this->maxIndex = index;
}
if (index < this->minIndex || this->minIndex < 0) {
this->minIndex = index;
}
this->numStepsToWrite = numStepsToWrite;
this->currentSize++;
list.push_back(std::make_tuple(actor, returnMessage));
}
/**
* Return a tuple of an actor and its returnMessage.
* The return message is 9999 or the index of the forcingFile it needs to acces
*/
std::tuple<caf::actor,int> popActor() {
if (list.empty()) {
throw "List is empty, nothing to pop";
}
auto actor = list.back();
list.pop_back();
this->currentSize--;
return actor;
}
bool isEmpty() {
return list.empty();
}
/**
* When an actor fails we need to decrement the count
* so that this list becomes full when there is a failure
*
* indexHRU - index of the HRU causing the error
*/
void decrementMaxSize() {
this->maxSize--;
}
/**
* Remove the failed HRU from the list
*
*/
void removeFailed(caf::actor actorRef) {
bool found = false;
for(std::vector<std::tuple<caf::actor, int>>::iterator it = this->list.begin(); it != this->list.end(); it++) {
if (std::get<0>(*it) == actorRef) {
found = true;
this->list.erase(it);
this->currentSize--; this->maxSize--;
break;
}
}
if (!found) {
throw "Element To Remove Not Found";
}
}
};
/**
* @brief Class that manages which structure actors are held on
*
*/
class OutputManager {
private:
int numVectors;
int avgSizeOfActorList;
bool runningFailures;
std::vector<ActorRefList*> list;
std::vector<int> failedHRU;
std::vector<int> failureReRun; // index used so we can add failedHRUs if they fail a second time
public:
// Constructor
OutputManager(int numVectors, int totalNumActors){
this->numVectors = numVectors;
int sizeOfOneVector = totalNumActors / numVectors;
this->avgSizeOfActorList = sizeOfOneVector;
this->runningFailures = false;
// Create the first n-1 vectors with the same size
for (int i = 0; i < numVectors - 1; i++) {
auto refList = new ActorRefList(sizeOfOneVector);
totalNumActors = totalNumActors - sizeOfOneVector;
list.push_back(refList);
}
// Create the last vector with size however many actors are left
auto refList = new ActorRefList(totalNumActors);
list.push_back(refList);
}
// Deconstructor
~OutputManager(){};
/**
* @brief Adds an actor to its respective list
*
* @param actor Actor reference
* @param index Actor Index
* @param returnMessage Forcing File index or 9999
* @return int The list index that actor is added to.
*/
int addActor(caf::actor actor, int index, int returnMessage, int numStepsToWrite) {
int listIndex;
if (this->runningFailures) {
// find the index of the structure this HRU is in
auto it = find(this->failureReRun.begin(), this->failureReRun.end(), index);
if (it != this->failureReRun.end()) {
listIndex = it - this->failureReRun.begin();
} else {
throw "Element Not Found in failureReRun list";
}
this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite);
} else {
// Index has to be subtracted by 1 because Fortran array starts at 1
listIndex = (index - 1) / this->avgSizeOfActorList;
if (listIndex > this->numVectors - 1) {
listIndex = this->numVectors - 1;
}
this->list[listIndex]->addActor(actor, index, returnMessage, numStepsToWrite);
}
return listIndex;
}
/**
* Remove tuple from list[index]
*
*/
std::tuple<caf::actor,int> popActor(int index) {
if (index > this->numVectors - 1 || index < 0) {
throw "List Index Out Of Range";
} else if (this->list[index]->isEmpty()) {
throw "List is Empty, Nothing to pop";
}
return this->list[index]->popActor();
}
/** When a failure occurs an actor most likley will not already be on this list
* This method may and probably should not be used. Although needing to remove a
* specific element from a list may be needed.
* Remove the failed actor from the list
* Return the index of the list we removed the actor from
* This is so we can check if it is full
*/
int removeFailed(caf::actor actorRef, int index) {
// Find the list this actor is on
int listIndex = (index - 1) / this->avgSizeOfActorList;
if (listIndex > this->numVectors - 1) {
listIndex = this->numVectors - 1;
}
this->list[listIndex]->removeFailed(actorRef);
return listIndex;
}
/**
* Decrease the size of the list
* Add this GRU to the failed list
*/
int decrementMaxSize(int indexHRU) {
this->failedHRU.push_back(indexHRU);
// Find the list this actor is on
int listIndex = (indexHRU - 1) / this->avgSizeOfActorList;
if (listIndex > this->numVectors - 1) {
listIndex = this->numVectors - 1;
}
this->list[listIndex]->decrementMaxSize();
return listIndex;
}
void restartFailures() {
this->list.clear();
this->numVectors = this->failedHRU.size();
for (unsigned int i = 0; i < this->failedHRU.size(); i++) {
auto refList = new ActorRefList(1);
this->list.push_back(refList);
}
this->failureReRun = this->failedHRU;
this->failedHRU.clear();
this->runningFailures = true;
}
/**
* Get the number of steps to write from the correct listIndex
*/
int getNumStepsToWrite(int listIndex) {
return this->list[listIndex]->getNumStepsToWrite();
}
bool isFull(int listIndex) {
if (listIndex > this->numVectors - 1) {
throw "List Index Out Of Range";
}
return this->list[listIndex]->isFull();
}
bool isEmpty(int listIndex) {
return this->list[listIndex]->isEmpty();
}
int getSize(int listIndex) {
if (listIndex > this->numVectors - 1) {
throw "List Index Out Of Range";
}
return this->list[listIndex]->getCurrentSize();
}
int getMinIndex(int listIndex) {
return this->list[listIndex]->getMinIndex();
}
int getMaxIndex(int listIndex) {
return this->list[listIndex]->getMaxIndex();
}
void addFailed(int indxHRU) {
this->failedHRU.push_back(indxHRU);
}
};
#endif
\ No newline at end of file
module summaActors_deallocateOuptutStruct
USE nrtype
implicit none
public::deallocateOutputStruc
contains
subroutine deallocateOutputStruc(err)
USE globalData,only:outputStructure
implicit none
integer(i4b), intent(inout) :: err
err = 0
! Time
call deallocateData_output(outputStructure(1)%timeStruct(1)); deallocate(outputStructure(1)%timeStruct)
! Forc
call deallocateData_output(outputStructure(1)%forcStat(1)); deallocate(outputStructure(1)%forcStat)
call deallocateData_output(outputStructure(1)%forcStruct(1)); deallocate(outputStructure(1)%forcStruct)
! prog
call deallocateData_output(outputStructure(1)%progStat(1)); deallocate(outputStructure(1)%progStat)
call deallocateData_output(outputStructure(1)%progStruct(1)); deallocate(outputStructure(1)%progStruct)
! diag
call deallocateData_output(outputStructure(1)%diagStat(1)); deallocate(outputStructure(1)%diagStat)
call deallocateData_output(outputStructure(1)%diagStruct(1)); deallocate(outputStructure(1)%diagStruct)
! flux
call deallocateData_output(outputStructure(1)%fluxStat(1)); deallocate(outputStructure(1)%fluxStat)
call deallocateData_output(outputStructure(1)%fluxStruct(1)); deallocate(outputStructure(1)%fluxStruct)
! indx
call deallocateData_output(outputStructure(1)%indxStat(1)); deallocate(outputStructure(1)%indxStat)
call deallocateData_output(outputStructure(1)%indxStruct(1)); deallocate(outputStructure(1)%indxStruct)
! bvar
call deallocateData_output(outputStructure(1)%bvarStat(1)); deallocate(outputStructure(1)%bvarStat)
call deallocateData_output(outputStructure(1)%bvarStruct(1)); deallocate(outputStructure(1)%bvarStruct)
! id
call deallocateData_output(outputStructure(1)%idStruct(1)); deallocate(outputStructure(1)%idStruct)
! attr
call deallocateData_output(outputStructure(1)%attrStruct(1)); deallocate(outputStructure(1)%attrStruct)
! type
call deallocateData_output(outputStructure(1)%typeStruct(1)); deallocate(outputStructure(1)%typeStruct)
! mpar
call deallocateData_output(outputStructure(1)%mparStruct(1)); deallocate(outputStructure(1)%mparStruct)
! bpar
call deallocateData_output(outputStructure(1)%bparStruct(1)); deallocate(outputStructure(1)%bparStruct)
! finalize stats
call deallocateData_output(outputStructure(1)%finalizeStats(1)); deallocate(outputStructure(1)%finalizeStats)
end subroutine deallocateOutputStruc
subroutine deallocateData_output(dataStruct)
USE data_types,only:gru_hru_time_doubleVec, &
gru_hru_time_intVec, &
gru_hru_time_flagVec, &
gru_hru_time_int, &
gru_hru_int, &
gru_hru_time_int8, &
gru_hru_time_double, &
gru_hru_double, &
gru_double
implicit none
class(*),intent(inout) :: dataStruct
! local variables
integer(i4b) :: iGRU
integer(i4b) :: iHRU
integer(i4b) :: iVar
integer(i4b) :: iTim
select type(dataStruct)
class is (gru_hru_time_doubleVec)
do iGRU = 1, size(dataStruct%gru(:))
do iHRU = 1, size(dataStruct%gru(iGRU)%hru(:))
do iVar = 1, size(dataStruct%gru(iGRU)%hru(iHRU)%var(:))
do iTim = 1, size(dataStruct%gru(iGRU)%hru(iHRU)%var(iVar)%tim(:))
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var(iVar)%tim(iTim)%dat)
end do ! Time
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var(iVar)%tim)
end do ! var
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var)
end do ! hru
deallocate(dataStruct%gru(iGRU)%hru)
end do ! gru
deallocate(dataStruct%gru)
class is (gru_hru_time_intVec)
do iGRU = 1, size(dataStruct%gru(:))
do iHRU = 1, size(dataStruct%gru(iGRU)%hru(:))
do iVar = 1, size(dataStruct%gru(iGRU)%hru(iHRU)%var(:))
do iTim = 1, size(dataStruct%gru(iGRU)%hru(iHRU)%var(iVar)%tim(:))
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var(iVar)%tim(iTim)%dat)
end do ! Time
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var(iVar)%tim)
end do ! var
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var)
end do ! hru
deallocate(dataStruct%gru(iGRU)%hru)
end do ! gru
deallocate(dataStruct%gru)
class is (gru_hru_time_flagVec)
do iGRU = 1, size(dataStruct%gru(:))
do iHRU = 1, size(dataStruct%gru(iGRU)%hru(:))
do iTim = 1, size(dataStruct%gru(iGRU)%hru(iHRU)%tim(:))
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%tim(iTim)%dat)
end do ! Time
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%tim)
end do ! hru
deallocate(dataStruct%gru(iGRU)%hru)
end do ! gru
deallocate(dataStruct%gru)
class is (gru_hru_time_int)
do iGRU = 1, size(dataStruct%gru(:))
do iHRU = 1, size(dataStruct%gru(iGRU)%hru(:))
do iVar = 1, size(dataStruct%gru(iGRU)%hru(iHRU)%var(:))
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var(iVar)%tim)
end do ! var
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var)
end do ! hru
deallocate(dataStruct%gru(iGRU)%hru)
end do ! gru
deallocate(dataStruct%gru)
class is (gru_hru_int)
do iGRU = 1, size(dataStruct%gru(:))
do iHRU = 1, size(dataStruct%gru(iGRU)%hru(:))
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var)
end do ! hru
deallocate(dataStruct%gru(iGRU)%hru)
end do ! gru
deallocate(dataStruct%gru)
class is (gru_hru_time_int8)
do iGRU = 1, size(dataStruct%gru(:))
do iHRU = 1, size(dataStruct%gru(iGRU)%hru(:))
do iVar = 1, size(dataStruct%gru(iGRU)%hru(iHRU)%var(:))
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var(iVar)%tim)
end do ! var
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var)
end do ! hru
deallocate(dataStruct%gru(iGRU)%hru)
end do ! gru
deallocate(dataStruct%gru)
class is (gru_hru_time_double)
do iGRU = 1, size(dataStruct%gru(:))
do iHRU = 1, size(dataStruct%gru(iGRU)%hru(:))
do iVar = 1, size(dataStruct%gru(iGRU)%hru(iHRU)%var(:))
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var(iVar)%tim)
end do ! var
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var)
end do ! hru
deallocate(dataStruct%gru(iGRU)%hru)
end do ! gru
deallocate(dataStruct%gru)
class is (gru_hru_double)
do iGRU = 1, size(dataStruct%gru(:))
do iHRU = 1, size(dataStruct%gru(iGRU)%hru(:))
deallocate(dataStruct%gru(iGRU)%hru(iHRU)%var)
end do ! hru
deallocate(dataStruct%gru(iGRU)%hru)
end do ! gru
deallocate(dataStruct%gru)
class is (gru_double)
do iGRU = 1, size(dataStruct%gru(:))
deallocate(dataStruct%gru(iGRU)%var)
end do ! gru
deallocate(dataStruct%gru)
end select
end subroutine
end module
\ No newline at end of file
......@@ -18,8 +18,10 @@
! You should have received a copy of the GNU General Public License
! along with this program. If not, see <http://www.gnu.org/licenses/>.
module def_output_module
USE data_types,only:var_i
module def_output_actors_module
USE data_types,only:var_i
USE actor_data_types,only:netcdf_gru_actor_info
USE netcdf
USE netcdf_util_module,only:netcdf_err ! netcdf error handling function
USE netcdf_util_module,only:nc_file_close ! close NetCDF files
......@@ -70,119 +72,167 @@ contains
! **********************************************************************************************************
! public subroutine def_output: define model output file
! **********************************************************************************************************
subroutine def_output(summaVersion,buildTime,gitBranch,gitHash,nGRU,nHRU,nSoil,infile,ncid_c,err,message)
USE globalData,only:structInfo ! information on the data structures
USE globalData,only:forc_meta,attr_meta,type_meta ! metaData structures
USE globalData,only:prog_meta,diag_meta,flux_meta,deriv_meta ! metaData structures
USE globalData,only:mpar_meta,indx_meta ! metaData structures
USE globalData,only:bpar_meta,bvar_meta,time_meta ! metaData structures
USE globalData,only:model_decisions ! model decisions
USE globalData,only:outFreq ! output frequencies
USE globalData,only:fname
! USE globalData,only:ncid
USE var_lookup,only:maxVarFreq ! # of available output frequencies
USE get_ixname_module,only:get_freqName ! get name of frequency from frequency index
! declare dummy variables
character(*),intent(in) :: summaVersion ! SUMMA version
character(*),intent(in) :: buildTime ! build time
character(*),intent(in) :: gitBranch ! git branch
character(*),intent(in) :: gitHash ! git hash
integer(i4b),intent(in) :: nGRU ! number of GRUs
integer(i4b),intent(in) :: nHRU ! number of HRUs
integer(i4b),intent(in) :: nSoil ! number of soil layers in the first HRU (used to define fixed length dimensions)
character(*),intent(in) :: infile ! file suffix
type(var_i),intent(inout) :: ncid_c ! id of output file
integer(i4b),intent(out) :: err ! error code
character(*),intent(out) :: message ! error message
! local variables
integer(i4b) :: ivar ! loop through model decisions
integer(i4b) :: iFreq ! loop through output frequencies
integer(i4b) :: iStruct ! loop through structure types
character(len=32) :: fstring ! string to hold model output freuqnecy
character(len=256) :: cmessage ! temporary error message
! initialize errors
err=0; message="def_output/"
! close files if already open
do iFreq=1,maxvarFreq
if (ncid_c%var(iFreq)/=integerMissing) then
call nc_file_close(ncid_c%var(iFreq),err,cmessage)
if(err/=0)then; message=trim(message)//trim(cmessage); return; end if
subroutine def_output(ncid,startGRU,nGRU,nHRU,actor_info,err,message)
USE globalData,only:structInfo ! information on the data structures
USE globalData,only:forc_meta,attr_meta,type_meta ! metaData structures
USE globalData,only:prog_meta,diag_meta,flux_meta,deriv_meta ! metaData structures
USE globalData,only:mpar_meta,indx_meta ! metaData structures
USE globalData,only:bpar_meta,bvar_meta,time_meta ! metaData structures
USE globalData,only:model_decisions ! model decisions
USE globalData,only:outFreq ! output frequencies
! Some global variabels required in the writing process
USE globalData,only:nHRUrun
USE globalData,only:nGRUrun
USE globalData,only:gru_struc
USE globalData,only:fileout
! modules that are not globalData
USE var_lookup,only:maxVarFreq ! # of available output frequencies
USE get_ixname_module,only:get_freqName ! get name of frequency from frequency index
USE summaFileManager,only:OUTPUT_PATH,OUTPUT_PREFIX ! define output file
USE globalData,only:outputTimeStep ! output time step
! ---------------------------------------------------------------------------------------
! * Dummy Variables
! ---------------------------------------------------------------------------------------
type(var_i),pointer :: ncid ! id of output file
integer(i4b),intent(in) :: startGRU ! startGRU for the entire job (for file creation)
integer(i4b),intent(in) :: nGRU ! number of GRUs
integer(i4b),intent(in) :: nHRU ! number of HRUs
type(netcdf_gru_actor_info),intent(out):: actor_info ! netcdf actor information
character(*),intent(out) :: message ! error message
integer(i4b),intent(out) :: err ! error code
! ---------------------------------------------------------------------------------------
! * Local Subroutine Variables
! ---------------------------------------------------------------------------------------
integer(i4b) :: ivar ! loop through model decisions
integer(i4b) :: iFreq ! loop through output frequencies
integer(i4b) :: iStruct ! loop through structure types
character(len=32) :: fstring ! string to hold model output freuqnecy
character(len=256) :: cmessage ! temporary error message
integer(i4b) :: iGRU
character(LEN=256) :: startGRUString ! String Variable to convert startGRU
character(LEN=256) :: numGRUString ! String Varaible to convert numGRU
character(len=1024) :: fname ! temporary filename
! initialize errors
err=0; message="def_output/"
! allocate space for the output file ID array
if (.not.allocated(ncid%var))then
allocate(ncid%var(maxVarFreq))
ncid%var(:) = integerMissing
endif
end do
! initialize netcdf file id
! ncid(:) = integerMissing
! create initial file
! each file will have a master name with a frequency appended at the end:
! e.g., xxxxxxxxx_timestep.nc (for output at every model timestep)
! e.g., xxxxxxxxx_monthly.nc (for monthly model output)
do iFreq=1,maxvarFreq
! skip frequencies that are not needed
if(.not.outFreq(iFreq)) cycle
! create file
fstring = get_freqName(iFreq)
fname = trim(infile)//'_'//trim(fstring)//'.nc'
call ini_create(nGRU,nHRU,nSoil,trim(fname),ncid_c%var(iFreq),err,cmessage)
if(err/=0)then; message=trim(message)//trim(cmessage); return; end if
! print*,'Created output file: '//trim(fname)
! define SUMMA version
do iVar=1,4
! write attributes
if(iVar==1) call put_attrib(ncid_c%var(iFreq),'summaVersion', summaVersion, err, cmessage) ! SUMMA version
if(iVar==2) call put_attrib(ncid_c%var(iFreq),'buildTime' , buildTime , err, cmessage) ! build time
if(iVar==3) call put_attrib(ncid_c%var(iFreq),'gitBranch' , gitBranch , err, cmessage) ! git branch
if(iVar==4) call put_attrib(ncid_c%var(iFreq),'gitHash' , gitHash , err, cmessage) ! git hash
! check errors
if(err/=0)then; message=trim(message)//trim(cmessage); return; end if
end do
! define model decisions
do iVar = 1,size(model_decisions)
if(model_decisions(iVar)%iDecision.ne.integerMissing)then
call put_attrib(ncid_c%var(iFreq),model_decisions(iVar)%cOption,model_decisions(iVar)%cDecision,err,cmessage)
if(err/=0)then; message=trim(message)//trim(cmessage); return; end if
end if
end do
! define variables
do iStruct = 1,size(structInfo)
select case (trim(structInfo(iStruct)%structName))
case('attr' ); call def_variab(ncid_c%var(iFreq),iFreq,needHRU, noTime,attr_meta, outputPrecision, err,cmessage) ! local attributes HRU
case('type' ); call def_variab(ncid_c%var(iFreq),iFreq,needHRU, noTime,type_meta, nf90_int, err,cmessage) ! local classification
case('mpar' ); call def_variab(ncid_c%var(iFreq),iFreq,needHRU, noTime,mpar_meta, outputPrecision, err,cmessage) ! model parameters
case('bpar' ); call def_variab(ncid_c%var(iFreq),iFreq,needGRU, noTime,bpar_meta, outputPrecision, err,cmessage) ! basin-average param
case('indx' ); call def_variab(ncid_c%var(iFreq),iFreq,needHRU,needTime,indx_meta, nf90_int, err,cmessage) ! model variables
case('deriv'); call def_variab(ncid_c%var(iFreq),iFreq,needHRU,needTime,deriv_meta,outputPrecision, err,cmessage) ! model derivatives
case('time' ); call def_variab(ncid_c%var(iFreq),iFreq, noHRU,needTime,time_meta, nf90_int, err,cmessage) ! model derivatives
case('forc' ); call def_variab(ncid_c%var(iFreq),iFreq,needHRU,needTime,forc_meta, outputPrecision, err,cmessage) ! model forcing data
case('prog' ); call def_variab(ncid_c%var(iFreq),iFreq,needHRU,needTime,prog_meta, outputPrecision, err,cmessage) ! model prognostics
case('diag' ); call def_variab(ncid_c%var(iFreq),iFreq,needHRU,needTime,diag_meta, outputPrecision, err,cmessage) ! model diagnostic variables
case('flux' ); call def_variab(ncid_c%var(iFreq),iFreq,needHRU,needTime,flux_meta, outputPrecision, err,cmessage) ! model fluxes
case('bvar' ); call def_variab(ncid_c%var(iFreq),iFreq,needGRU,needTime,bvar_meta, outputPrecision, err,cmessage) ! basin-average variables
case('id' ); cycle ! ids -- see write_hru_info()
case default; err=20; message=trim(message)//'unable to identify lookup structure';
end select
! error handling
if(err/=0)then;err=20;message=trim(message)//trim(cmessage)//'[structure = '//trim(structInfo(iStruct)%structName);return;end if
end do ! iStruct
! write HRU dimension and ID for each output file
call write_hru_info(ncid_c%var(iFreq), err, cmessage); if(err/=0) then; message=trim(message)//trim(cmessage); return; end if
end do
! Set the global variable for the number of HRU and GRU in run
nGRUrun = nGRU
nHRUrun = nGRU
! create the name of the new files
write(unit=startGRUString,fmt=*)startGRU
write(unit=numGRUString,fmt=*) nGRU
fileout = trim(OUTPUT_PATH)//trim(OUTPUT_PREFIX)//"GRU"&
//trim(adjustl(startGRUString))//"-"//trim(adjustl(numGRUString))
! close files if already open
do iFreq=1,maxvarFreq
if (ncid%var(iFreq)/=integerMissing) then
call nc_file_close(ncid%var(iFreq),err,cmessage)
if(err/=0)then
message=trim(message)//trim(cmessage)
print*, message
return
end if
endif
end do
! create initial file
! each file will have a master name with a frequency appended at the end:
! e.g., xxxxxxxxx_timestep.nc (for output at every model timestep)
! e.g., xxxxxxxxx_monthly.nc (for monthly model output)
do iFreq=1,maxvarFreq
! skip frequencies that are not needed
if(.not.outFreq(iFreq)) cycle
! create file
fstring = get_freqName(iFreq)
fname = trim(fileout)//'_'//trim(fstring)//'.nc'
call ini_create(nGRU,nHRU,gru_struc(1)%hruInfo(1)%nSoil,trim(fname),ncid%var(iFreq),err,cmessage)
if(err/=0)then; message=trim(message)//trim(cmessage); print*, message; return; end if
! define model decisions
do iVar = 1,size(model_decisions)
if(model_decisions(iVar)%iDecision.ne.integerMissing)then
call put_attrib(ncid%var(iFreq),model_decisions(iVar)%cOption,model_decisions(iVar)%cDecision,err,cmessage)
if(err/=0)then
message=trim(message)//trim(cmessage)
print*, message
return
end if
end if
end do
! define variables
do iStruct = 1,size(structInfo)
select case (trim(structInfo(iStruct)%structName))
case('attr' ); call def_variab(ncid%var(iFreq),iFreq,needHRU, noTime,attr_meta, outputPrecision, err,cmessage) ! local attributes HRU
case('type' ); call def_variab(ncid%var(iFreq),iFreq,needHRU, noTime,type_meta, nf90_int, err,cmessage) ! local classification
case('mpar' ); call def_variab(ncid%var(iFreq),iFreq,needHRU, noTime,mpar_meta, outputPrecision, err,cmessage) ! model parameters
case('bpar' ); call def_variab(ncid%var(iFreq),iFreq,needGRU, noTime,bpar_meta, outputPrecision, err,cmessage) ! basin-average param
case('indx' ); call def_variab(ncid%var(iFreq),iFreq,needHRU,needTime,indx_meta, nf90_int, err,cmessage) ! model variables
case('deriv' ); call def_variab(ncid%var(iFreq),iFreq,needHRU,needTime,deriv_meta,outputPrecision, err,cmessage) ! model derivatives
case('time' ); call def_variab(ncid%var(iFreq),iFreq, noHRU,needTime,time_meta, nf90_int, err,cmessage) ! model derivatives
case('forc' ); call def_variab(ncid%var(iFreq),iFreq,needHRU,needTime,forc_meta, outputPrecision, err,cmessage) ! model forcing data
case('prog' ); call def_variab(ncid%var(iFreq),iFreq,needHRU,needTime,prog_meta, outputPrecision, err,cmessage) ! model prognostics
case('diag' ); call def_variab(ncid%var(iFreq),iFreq,needHRU,needTime,diag_meta, outputPrecision, err,cmessage) ! model diagnostic variables
case('flux' ); call def_variab(ncid%var(iFreq),iFreq,needHRU,needTime,flux_meta, outputPrecision, err,cmessage) ! model fluxes
case('bvar' ); call def_variab(ncid%var(iFreq),iFreq,needGRU,needTime,bvar_meta, outputPrecision, err,cmessage) ! basin-average variables
case('id' ); cycle
case('lookup'); cycle ! ids -- see write_hru_info()
case default; err=20; message=trim(message)//'unable to identify lookup structure';
end select
! error handling
if(err/=0)then
err=20
message=trim(message)//trim(cmessage)//'[structure = '//trim(structInfo(iStruct)%structName)
print*, message
return
end if
end do ! iStruct
! write HRU dimension and ID for each output file
call write_hru_info(ncid%var(iFreq), err, cmessage)
if(err/=0) then
message=trim(message)//trim(cmessage)
print*, message
return
end if
! define timing variables for actors code
! TODO: Add attributes to these variables
err = nf90_def_var(ncid%var(iFreq),"run_time",outputPrecision,(/gru_DimID/),actor_info%run_time_var_id)
err = nf90_def_var(ncid%var(iFreq),"init_duration",outputPrecision,(/gru_DimID/),actor_info%init_duration_var_id)
err = nf90_def_var(ncid%var(iFreq),"forcing_duration",outputPrecision,(/gru_DimID/),actor_info%forcing_duration_var_id)
err = nf90_def_var(ncid%var(iFreq),"run_physics_duration",outputPrecision,(/gru_DimID/),actor_info%run_physics_duration_var_id)
err = nf90_def_var(ncid%var(iFreq),"write_output_duration",outputPrecision,(/gru_DimID/),actor_info%write_output_duration_var_id)
err = nf90_def_var(ncid%var(iFreq),"successful",nf90_int,(/gru_DimID/),actor_info%state_var_id)
err = nf90_def_var(ncid%var(iFreq),"num_attempts",nf90_int,(/gru_DimID/),actor_info%num_attempts_var_id)
err = nf90_def_var(ncid%var(iFreq),"rel_tol",outputPrecision,(/gru_DimID/),actor_info%rel_tol_var_id)
err = nf90_def_var(ncid%var(iFreq),"abs_tol",outputPrecision,(/gru_DimID/),actor_info%abs_tol_var_id)
if(err/=0) then; message=trim(message)//trim(cmessage); print*, message; return; end if
end do
end subroutine def_output
! **********************************************************************************************************
! private subroutine ini_create: initial create
! **********************************************************************************************************
subroutine ini_create(nGRU,nHRU,nSoil,infile,ncid,err,message)
! **********************************************************************************************************
! private subroutine ini_create: initial create
! **********************************************************************************************************
subroutine ini_create(nGRU,nHRU,nSoil,infile,ncid,err,message)
! variables to define number of steps per file (total number of time steps, step length, etc.)
USE multiconst,only:secprday ! number of seconds per day
! model decisions
......@@ -486,4 +536,4 @@ end subroutine def_output
end subroutine
end module def_output_module
end module def_output_actors_module