Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • gwu479/Summa-Actors
  • numerical_simulations_lab/actors/Summa-Actors
2 results
Show changes
Showing
with 23441 additions and 4 deletions
#pragma once
#include <vector>
class Forcing_File_Info {
private:
int file_ID;
int num_steps;
bool is_loaded;
public:
Forcing_File_Info(int file_ID);
int getNumSteps();
bool isFileLoaded();
void updateIsLoaded();
void updateNumSteps(int num_steps);
};
struct Forcing_Info {
int num_vars;
int num_timesteps;
std::vector<int> index_forc_var;
std::vector<int> ncid_var;
};
\ No newline at end of file
#pragma once
#include "caf/actor.hpp"
#include <optional>
#include <cmath>
#include "fortran_data_types.hpp"
#include <vector>
#include <iostream>
/*
* This class manages a portion of the HRUs in the model.
* All HRUs are grouped into partitions/objects of this class.
*/
class Output_Partition {
private:
int start_local_gru_index; // The index of the first GRU in the partition
int end_local_gru_index; // The index of the last GRU in the partition
int num_local_grus; // The number of GRUs in the partition
int num_active_grus; // The number of GRUs that have not failed
int num_timesteps_simulation; // The number of timesteps in the simulation
int num_stored_timesteps; // The number of timesteps held within the partition
bool write_params = true; // Flag to write the parameters to the output file (only performed once)
std::vector<caf::actor> ready_to_write_list;
std::vector<int> failed_gru_index_list; // The list of GRUs that have failed
public:
Output_Partition(int start_local_gru_index, int num_local_grus, int num_timesteps_simulation, int num_timesteps_buffer);
~Output_Partition();
// Set the GRU to ready to write
void setGRUReadyToWrite(caf::actor gru_actor);
// Check if all GRUs are ready to write
bool isReadyToWrite();
// Get the max index of the GRUs in the partition
int getMaxGRUIndex();
// Get the number of timesteps stored in the partition
int getNumStoredTimesteps();
// Get the start gru index
int getStartGRUIndex();
// Update the number of timesteps remaining in the simulation
void updateTimeSteps();
// Get the list of GRUs that have written so we can send them the next set of timesteps
std::vector<caf::actor> getReadyToWriteList();
// Reset the list of GRUs that are ready to write
void resetReadyToWriteList();
// Add a GRU index to the list of failed GRUs
void addFailedGRUIndex(int local_gru_index);
std::vector<int> getFailedGRUIndexList();
int getNumActiveGRUs();
int getNumLocalGRUs();
int getRemainingTimesteps();
bool isWriteParams();
};
/*
* This class is used to store informaiton about when
* HRUs are ready to write. This class does not store
* the data of the HRUs only the information about if
* HRUs are ready to write and which HRUs should be
* written to the output file.
*/
class Output_Container {
private:
int num_partitions; // The number of partitions in the model
int num_grus_per_partition; // The average number of GRUs per partition
int num_grus; // The number of GRUs in the model
int num_timesteps_simulation; // The number of timesteps in the simulation
int num_stored_timesteps; // The number of timesteps a partion can hold before needing to write
bool rerunning_failed_hrus = false;
std::vector<Output_Partition*> output_partitions; // This is the main output partition
std::vector<int> failed_gru_index_list; // The list of GRUs that have failed
// Private Method
public:
Output_Container(int num_partitions, int num_grus, int num_timesteps_simulation, int num_timesteps_buffer);
~Output_Container();
int findPartition(int local_gru_index);
int getNumPartitions();
// The output container needs to be restructured when rerunning the failed GRUs.
void reconstruct();
Output_Partition* getOutputPartition(int local_gru_index);
std::vector<int> getFailedGRUIndexList();
};
#pragma once
// Setters
void set_flagVec(std::vector<int>& arr_i, void* handle);
void set_var_i(std::vector<int>& arr_i, void* handle);
void set_var_d(std::vector<double> &arr_d, void* handle);
void set_var_i8(std::vector<long int>& arr_i, void* handle);
void set_i8length(std::vector<long int> &arr_i8length, void* handle);
void set_ilength(std::vector<int> &arr_ilength, void* handle);
void set_dlength(std::vector<double> &arr_dlength, void* handle);
void set_var_flagVec(std::vector<std::vector<int> > &mat, void* handle);
void set_var_ilength(std::vector<std::vector<int> > &mat, void* handle);
void set_var_i8length(std::vector<std::vector<long int> > &mat, void* handle);
void set_var_dlength(std::vector<std::vector<double> > &mat, void *handle);
// Getters
std::vector<int> get_flagVec(void* handle);
std::vector<int> get_var_i(void* handle);
std::vector<double> get_var_d(void* handle);
std::vector<long int> get_var_i8(void* handle);
std::vector<long int> get_i8length(void* handle);
std::vector<int> get_ilength(void* handle);
std::vector<double> get_dlength(void* handle);
std::vector<std::vector<int> > get_var_flagVec(void* handle);
std::vector<std::vector<int> > get_var_ilength(void* handle);
std::vector<std::vector<long int> > get_var_i8length(void* handle);
std::vector<std::vector<double> > get_var_dlength(void* handle);
std::vector<double> get_attr_struct(void* handle);
std::vector<int> get_type_struct(void* handle);
std::vector<std::vector<double>> get_mpar_struct_array(void* handle);
std::vector<double> get_bpar_struct(void* handle);
\ No newline at end of file
#ifndef FORTRAN_DATATYPES_H_
#define FORTRAN_DATATYPES_H_
#pragma once
extern "C" {
// flagVec
......@@ -15,6 +14,8 @@ extern "C" {
void set_data_var_i(void* handle, const int* array, int size);
void get_size_data_var_i(void* handle, int* size);
void get_data_var_i(void* handle, int* array);
void get_size_data_typeStruct(void* handle, int* size);
void get_data_typeStruct(void* handle, int* array);
// var_i8
void* new_handle_var_i8();
......@@ -29,6 +30,10 @@ extern "C" {
void set_data_var_d(void* handle, const double* array, int size);
void get_size_data_var_d(void* handle, int* size);
void get_data_var_d(void* handle, double* array);
void get_data_attrStruct(void* handle, double* array);
void get_size_data_attrStruct(void* handle, int* size);
void get_data_bparStruct(void* handle, double* array);
void get_size_data_bparStruct(void* handle, int* size);
// ilength
void* new_handle_ilength();
......@@ -51,6 +56,7 @@ extern "C" {
void get_size_data_dlength(void* handle, int* size);
void get_data_dlength(void* handle, double* array);
// var_flagVec
void* new_handle_var_flagVec();
void delete_handle_var_flagVec(void* handle);
......@@ -82,6 +88,9 @@ extern "C" {
void get_size_var_dlength(void* handle, int* num_var);
void get_size_data_var_dlength(void* handle, int* num_var, int* num_dat);
void get_data_var_dlength(void* handle, double* array);
void get_size_var_mparStruct(void* handle, int* num_var);
void get_size_data_mparStruct(void* handle, int* num_var, int* num_dat);
void get_data_mparStruct(void* handle, double* array);
// var_dlength_array
void* new_handle_dlength_array();
......@@ -97,6 +106,12 @@ extern "C" {
void* new_handle_file_info();
void delete_handle_file_info(void* handle);
}
// zLookup
void* new_handle_z_lookup();
void* delete_handle_z_lookup(void* handle);
// hru_type
void* new_handle_hru_type();
void delete_handle_hru_type(void* handle);
#endif
\ No newline at end of file
}
\ No newline at end of file
#pragma once
#include <chrono>
/**
* Return the time between to time points
*/
double calculateTime(std::chrono::time_point<std::chrono::system_clock> start,
std::chrono::time_point<std::chrono::system_clock> end);
struct serializable_netcdf_gru_actor_info {
double run_time;
double init_duration;
double forcing_duration;
double run_physics_duration;
double write_output_duration;
int successful; // 0 = false, 1 = true
int num_attempts;
double rel_tol;
double abs_tol;
};
template<class Inspector>
bool inspect(Inspector& f, serializable_netcdf_gru_actor_info& x) {
return f.object(x).fields(f.field("run_time", x.run_time),
f.field("init_duration", x.init_duration),
f.field("forcing_duration", x.forcing_duration),
f.field("run_physics_duration", x.run_physics_duration),
f.field("write_output_duration", x.write_output_duration),
f.field("successful", x.successful),
f.field("num_attempts", x.num_attempts),
f.field("rel_tol", x.rel_tol),
f.field("abs_tol", x.abs_tol));
}
This diff is collapsed.
#pragma once
#include "batch.hpp"
#include "batch_container.hpp"
#include "client.hpp"
#include "client_container.hpp"
#include <vector>
#include "settings_functions.hpp"
#include "global.hpp"
#include "caf/all.hpp"
enum class hru_error : uint8_t {
run_physics_unhandleable = 1,
run_physics_infeasible_state = 2,
};
enum class file_access_error : uint8_t {
writing_error = 1,
unhandleable_error = 2,
mDecisions_error = 100,
};
// HRU Errors
std::string to_string(hru_error err);
bool from_string(caf::string_view in, hru_error& out);
bool from_integer(uint8_t in, hru_error& out);
template<class Inspector>
bool inspect(Inspector& f, hru_error& x) {
return caf::default_enum_inspect(f, x);
}
// File Access Actor
std::string to_string(file_access_error err);
bool from_string(caf::string_view in, file_access_error& out);
bool from_integer(uint8_t in, file_access_error& out);
template<class Inspector>
bool inspect(Inspector& f, file_access_error& x) {
return caf::default_enum_inspect(f, x);
}
CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id)
// Sender: job_actor
// Reciever: summa_actor
// Summary: job_actor finished job
CAF_ADD_ATOM(summa, done_job)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, err)
CAF_ADD_ATOM(summa, err_atom)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, init_gru)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, done_init_gru)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, init_hru)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, done_init_hru)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, done_write)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, done_hru)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, run_failure)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, access_forcing)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, access_forcing_internal)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, write_output)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, deallocate_structures)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, write_param)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, restart_failures)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, get_attributes_params)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, run_hru)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, start_hru)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, dt_init_factor)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, connect_to_server)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, connect_as_backup)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, compute_batch)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, done_batch)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, time_to_exit)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, update_with_current_state)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, new_assigned_batch)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, no_more_batches)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, update_backup_server_list)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, client_removed)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, new_client)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, is_lead_server)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, yes)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, no)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, new_forcing_file)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, num_steps_before_write)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, get_num_output_steps)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, finalize)
// Struct Types
CAF_ADD_TYPE_ID(summa, (Distributed_Settings))
CAF_ADD_TYPE_ID(summa, (Summa_Actor_Settings))
CAF_ADD_TYPE_ID(summa, (File_Access_Actor_Settings))
CAF_ADD_TYPE_ID(summa, (Job_Actor_Settings))
CAF_ADD_TYPE_ID(summa, (HRU_Actor_Settings))
CAF_ADD_TYPE_ID(summa, (serializable_netcdf_gru_actor_info))
// Class Types
CAF_ADD_TYPE_ID(summa, (Client))
CAF_ADD_TYPE_ID(summa, (Client_Container))
CAF_ADD_TYPE_ID(summa, (Batch))
CAF_ADD_TYPE_ID(summa, (Batch_Container))
CAF_ADD_TYPE_ID(summa, (std::vector<std::vector<double>>))
CAF_ADD_TYPE_ID(summa, (std::vector<std::vector<int>>))
CAF_ADD_TYPE_ID(summa, (std::vector<int>))
CAF_ADD_TYPE_ID(summa, (std::vector<double>))
CAF_ADD_TYPE_ID(summa, (std::vector<long int>))
CAF_ADD_TYPE_ID(summa, (std::vector<std::tuple<caf::actor, std::string>>))
CAF_ADD_TYPE_ID(summa, (std::vector<serializable_netcdf_gru_actor_info>))
// GRU Parameter/Attribute Vectors
CAF_ADD_TYPE_ID(summa, (std::tuple<std::vector<double>,
std::vector<int>,
std::vector<long int>,
std::vector<double>,
std::vector<double>,
std::vector<std::vector<double>>>))
// File_Access_Actor Read/Write times
CAF_ADD_TYPE_ID(summa, (std::tuple<double, double>))
CAF_ADD_TYPE_ID(summa, (std::optional<caf::strong_actor_ptr>))
// error types
CAF_ADD_TYPE_ID(summa, (hru_error))
CAF_ADD_TYPE_ID(summa, (file_access_error))
CAF_END_TYPE_ID_BLOCK(summa)
CAF_ERROR_CODE_ENUM(hru_error)
CAF_ERROR_CODE_ENUM(file_access_error)
\ No newline at end of file
#pragma once
#include <string>
#include <vector>
#include <thread>
#include <optional>
#include "json.hpp"
#include <bits/stdc++.h>
#include <sys/stat.h>
using json = nlohmann::json;
struct File_Access_Actor_Settings;
struct Job_Actor_Settings;
struct HRU_Actor_Settings;
// ####################################################################
// Distributed Settings
// ####################################################################
struct Distributed_Settings;
struct Distributed_Settings {
bool distributed_mode; // flag for starting summa in distributed mode
std::vector<std::string> servers_list; // the hostname of the server actor
int port; // the port number of the server actor
int total_hru_count;
int num_hru_per_batch;
};
template<class Inspector>
bool inspect(Inspector& inspector, Distributed_Settings& distributed_settings) {
return inspector.object(distributed_settings).fields(
inspector.field("distributed_mode", distributed_settings.distributed_mode),
inspector.field("servers_list", distributed_settings.servers_list),
inspector.field("port", distributed_settings.port),
inspector.field("total_hru_count", distributed_settings.total_hru_count),
inspector.field("num_hru_per_batch",distributed_settings.num_hru_per_batch));
}
Distributed_Settings readDistributedSettings(std::string json_settings_file);
// ####################################################################
// SUMMA Actor Settings
// ####################################################################
struct Summa_Actor_Settings;
struct Summa_Actor_Settings {
int max_gru_per_job;
};
template<class Inspector>
bool inspect(Inspector& inspector, Summa_Actor_Settings& summa_actor_settings) {
return inspector.object(summa_actor_settings).fields(
inspector.field("max_gru_per_job", summa_actor_settings.max_gru_per_job));
}
Summa_Actor_Settings readSummaActorSettings(std::string json_settings_file);
// ####################################################################
// File Access Actor Settings
// ####################################################################
struct File_Access_Actor_Settings {
int num_partitions_in_output_buffer;
int num_timesteps_in_output_buffer;
};
template<class Inspector>
bool inspect(Inspector& inspector, File_Access_Actor_Settings& file_access_actor_settings) {
return inspector.object(file_access_actor_settings).fields(
inspector.field("num_partitions_in_output_buffer",
file_access_actor_settings.num_partitions_in_output_buffer),
inspector.field("num_timesteps_in_output_buffer",
file_access_actor_settings.num_timesteps_in_output_buffer));
}
File_Access_Actor_Settings readFileAccessActorSettings(std::string json_settings_file);
// ####################################################################
// Job Actor Settings
// ####################################################################
struct Job_Actor_Settings {
std::string file_manager_path;
int max_run_attempts; // maximum number of times to attempt to run each HRU in a job
};
template<class Inspector>
bool inspect(Inspector& inspector, Job_Actor_Settings& job_actor_settings) {
return inspector.object(job_actor_settings).fields(
inspector.field("file_manager_path", job_actor_settings.file_manager_path),
inspector.field("max_run_attempts", job_actor_settings.max_run_attempts));
}
Job_Actor_Settings readJobActorSettings(std::string json_settings_file);
// ####################################################################
// HRU Actor Settings
// ####################################################################
struct HRU_Actor_Settings {
bool print_output;
int output_frequency;
int dt_init_factor; // factor to multiply the initial timestep by
double rel_tol;
double abs_tol;
double relTolTempCas;
double absTolTempCas;
double relTolTempVeg;
double absTolTempVeg;
double relTolWatVeg;
double absTolWatVeg;
double relTolTempSoilSnow;
double absTolTempSoilSnow;
double relTolWatSnow;
double absTolWatSnow;
double relTolMatric;
double absTolMatric;
double relTolAquifr;
double absTolAquifr;
};
template<class Inspector>
bool inspect(Inspector& inspector, HRU_Actor_Settings& hru_actor_settings) {
return inspector.object(hru_actor_settings).fields(
inspector.field("print_output", hru_actor_settings.print_output),
inspector.field("output_frequency", hru_actor_settings.output_frequency),
inspector.field("dt_init_factor", hru_actor_settings.dt_init_factor),
inspector.field("rel_tol", hru_actor_settings.rel_tol),
inspector.field("abs_tol", hru_actor_settings.abs_tol));
}
HRU_Actor_Settings readHRUActorSettings(std::string json_settings_file);
// ####################################################################
// Non Actor Specific Settings
// ####################################################################
int checkFileExists(std::string file_path);
/**
* @brief Get the Settings from json
* Template function that can be used with retrieving any singular type from the settings file
*/
template <typename T>
std::optional<T> getSettings(std::string json_settings_file, std::string key_1, std::string key_2,
T return_value) {
json settings;
std::ifstream settings_file(json_settings_file);
if (!settings_file.good()) return {};
settings_file >> settings;
settings_file.close();
// find first key
try {
if (settings.find(key_1) != settings.end()) {
json key_1_settings = settings[key_1];
// find value behind second key
if (key_1_settings.find(key_2) != key_1_settings.end()) {
return key_1_settings[key_2];
} else
return {};
} else {
return {}; // return none in the optional (error value)
}
} catch (json::exception& e) {
std::cout << e.what() << "\n";
std::cout << key_1 << "\n";
std::cout << key_2 << "\n";
return {};
}
}
// Get settings from settings file in array format
std::optional<std::vector<std::string>> getSettingsArray(std::string json_settings_file, std::string key_1, std::string key_2);
// Check the settings - Print them out to stdout
void check_settings_from_json(Distributed_Settings &distributed_settings,
Summa_Actor_Settings &summa_actor_settings, File_Access_Actor_Settings &file_access_actor_settings,
Job_Actor_Settings &job_actor_settings, HRU_Actor_Settings &hru_actor_settings);
// Output a default configuration file
void generate_config_file();
\ No newline at end of file
#pragma once
#include <chrono>
#include <optional>
#include <vector>
#include <string>
using chrono_time = std::chrono::time_point<std::chrono::system_clock>;
/**
* Class to manage timing information. This allows the user to add an arbitrary amount of timing variables.
* The timing variables are accessed through their named string and will keep a running duration of the amount
* of time spent through multiple calls to updateStartPoint and updateEndPoint
*/
class TimingInfo {
private:
std::vector<std::optional<chrono_time>> start;
std::vector<std::optional<chrono_time>> end;
std::vector<double> duration;
std::vector<std::string> name_of_time_point; // the name you want for the time point (ie. reading, writing, duration)
int num_time_points;
std::optional<double> calculateDuration(int index);
std::optional<int> getIndex(std::string time_point_name);
public:
TimingInfo();
~TimingInfo();
void addTimePoint(std::string time_point_name);
void updateStartPoint(std::string time_point_name);
void updateEndPoint(std::string time_point_name);
std::optional<double> getDuration(std::string time_point_name); // returns duration in seconds
};
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include "fortran_data_types.hpp"
#include "timing_info.hpp"
#include "global.hpp"
#include <vector>
#include <array>
#include <chrono>
#include <string>
#include "var_lookup.hpp"
namespace caf{
struct gru_state {
// GRU Initalization
int ref_gru;
int indx_gru;
std::string config_path;
caf::actor file_access_actor;
int output_struc_size;
caf::actor parent;
// HRU information
std::vector<caf::actor> hru_list;
int num_hrus;
int hrus_complete = 0;
// Global Data
int nTimeDelay = 2000; // number of hours in the time delay histogram (default: ~1 season = 24*365/4)
struct iLookVarType var_type_lookup;
// Data Structure local to the GRU
int num_bpar_vars; // number of variables in the fortran structure for bpar_struct
std::vector<double> bpar_struct;
std::vector<int> bpar_struct_var_type_list; // The types to the variable at each element in bpar_struct
int num_bvar_vars; // number of variables in the fortran structure for bvar_struct
std::vector<std::vector<double>> bvar_struct;
std::vector<int> bvar_struct_var_type_list;
int num_var_types;
std::vector<int> i_look_var_type_list; // The types to the variable at each element in bvar_struct
};
behavior gru_actor(stateful_actor<gru_state>* self,
int ref_gru,
int indx_gru,
std::string config_path,
caf::actor file_access_actor,
caf::actor parent);
}
\ No newline at end of file
#pragma once
extern "C" {
void getVarSizes(int* num_var_types, int* num_bpar_vars, int* num_bvar_vars);
void initVarType(void* var_type_lookup);
void fillVarTypeLists( int* num_bpar_vars, int* num_bvar_vars,
void* bpar_struct_var_type_list, void* bvar_struct_var_type_list, int* err);
int getNumHRU(int* indx_gru);
}
\ No newline at end of file
#pragma once
#pragma once
#include "caf/all.hpp"
#include "fortran_data_types.hpp"
#include "auxilary.hpp"
#include "timing_info.hpp"
#include "settings_functions.hpp"
#include <string>
#include "message_atoms.hpp"
#include "global.hpp"
/*********************************************
* HRU Actor Fortran Functions
*********************************************/
extern "C" {
// Initialize HRU data_structures
void initHRU(int* indxGRU, int* num_steps, void* hru_data, int* err);
void setupHRUParam( int* indxGRU, int* indxHRU, void* hru_data, double* upArea, int* err);
// Setup summa_readRestart File if this option has been chosen
void summa_readRestart(int* indxGRU, int* indxHRU, void* hru_data, double* dtInit, int* err);
// Run the model for one timestep
void RunPhysics(int* id, int* stepIndex, void* hru_data, double* dt, int* dt_int_factor, int* err);
void hru_writeOutput(int* index_hru, int* index_gru, int* timestep, int* output_step, void* hru_data, int* err);
void setTimeZoneOffset(int* iFile, void* hru_data, int* err);
void HRU_readForcing(int* index_gru, int* iStep, int* iRead, int* iFile, void* hru_data, int* err);
void get_sundials_tolerances(void* hru_data, double* relTol, double* absTol);
void set_sundials_tolerances(void* hru_data, double* relTol, double* absTol);
void setIDATolerances(void* hru_data, double* relTolTempCas, double* absTolTempCas, double* relTolTempVeg,
double* absTolTempVeg, double* relTolWatVeg, double* absTolWatVeg, double* relTolTempSoilSnow,
double* absTolTempSoilSnow, double* relTolWatSnow, double* absTolWatSnow, double* relTolMatric,
double* absTolMatric, double* relTolAquifr, double* absTolAquifr);
}
/*********************************************
* HRU Actor state variables
*********************************************/
namespace caf {
struct hru_state {
// Actor References
caf::actor file_access_actor;
caf::actor parent;
// Info about which HRU we are and our indexes
// into global structures in Fortran
int indxHRU; // index for hru part of derived types in FORTRAN
int indxGRU; // index for gru part of derived types in FORTRAN
int refGRU; // The actual ID of the GRU we are
// Variables for forcing structures
int stepsInCurrentFFile; // number of time steps in current forcing file
int num_steps_until_write; // number of time steps until we pause for FA_Actor to write
// HRU data structures (formerly summa_type)
void *hru_data = new_handle_hru_type();
// Misc Variables
int timestep = 1; // Current Timestep of HRU simulation
int forcingStep = 1; // index of current time step in current forcing file
int num_steps = 0; // number of time steps
int iFile = 1; // index of current forcing file from forcing file list
int dt_init_factor = 1; // factor of dt_init (coupled_em)
int output_structure_step_index = 1; // index of current time step in output structure
double dt_init; // used to initialize the length of the sub-step for each HRU
double upArea; // area upslope of each HRU
int err = 0;
// Sundials variables
double rtol = -9999; // -9999 uses default
double atol = -9999; // -9999 uses default
// Settings
HRU_Actor_Settings hru_actor_settings;
~hru_state() {
delete_handle_hru_type(hru_data);
}
};
behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU,
HRU_Actor_Settings hru_actor_settings, caf::actor file_access_actor,
caf::actor parent);
/*********************************************
* Functions for the HRU Actor
*********************************************/
/** Function to initalize the HRU for running */
void Initialize_HRU(stateful_actor<hru_state>* self);
/** Function runs all of the hru time_steps */
int Run_HRU(stateful_actor<hru_state>* self);
}
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <iostream>
#include <fstream>
/*
* Determine the state of the GRU
*/
enum class gru_state {
running,
failed,
succeeded
};
int is_success(const gru_state& state);
/**
* Class that holds information about the running GRUs. This class is held by the job actor
* The GRU/HRU actors that carry out the simulation are held by the GRU class
*/
class GRU {
private:
int global_gru_index; // The index of the GRU in the netcdf file
int local_gru_index; // The index of the GRU within this job
caf::actor gru_actor; // The actor for the GRU
// Modifyable Parameters
int dt_init_factor; // The initial dt for the GRU
double rel_tol; // The relative tolerance for the GRU
double abs_tol; // The absolute tolerance for the GRU
// Status Information
int attempts_left; // The number of attempts left for the GRU to succeed
gru_state state; // The state of the GRU
// Timing Information
double run_time = 0.0; // The total time to run the GRU
double init_duration = 0.0; // The time to initialize the GRU
double forcing_duration = 0.0; // The time to read the forcing data
double run_physics_duration = 0.0; // The time to run the physics
double write_output_duration = 0.0; // The time to write the output
public:
// Constructor
GRU(int global_gru_index, int local_gru_index, caf::actor gru_actor, int dt_init_factor,
double rel_tol, double abs_tol, int max_attempts);
// Deconstructor
~GRU();
// Getters
int getGlobalGRUIndex();
int getLocalGRUIndex();
caf::actor getGRUActor();
double getRunTime();
double getInitDuration();
double getForcingDuration();
double getRunPhysicsDuration();
double getWriteOutputDuration();
double getRelTol();
double getAbsTol();
double getAttemptsLeft();
gru_state getStatus();
bool isFailed();
// Setters
void setRunTime(double run_time);
void setInitDuration(double init_duration);
void setForcingDuration(double forcing_duration);
void setRunPhysicsDuration(double run_physics_duration);
void setWriteOutputDuration(double write_output_duration);
void setRelTol(double rel_tol);
void setAbsTol(double abs_tol);
void setSuccess();
void setFailed();
void setRunning();
void decrementAttemptsLeft();
void setGRUActor(caf::actor gru_actor);
};
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "GRU.hpp"
#include "timing_info.hpp"
#include "settings_functions.hpp"
#include "global.hpp"
#include "json.hpp"
#include "hru_actor.hpp"
#include "message_atoms.hpp"
#include "file_access_actor.hpp"
#include <unistd.h>
#include <limits.h>
/*********************************************
* Job Actor Fortran Functions
*********************************************/
extern "C" {
void job_init_fortran(char const* file_manager, int* start_gru_index, int* num_gru, int* num_hru, int* err);
void deallocateJobActor(int* err);
}
/*********************************************
* Job Actor state variables
*********************************************/
namespace caf {
using chrono_time = std::chrono::time_point<std::chrono::system_clock>;
// Holds information about the GRUs
struct GRU_Container {
std::vector<GRU*> gru_list;
chrono_time gru_start_time; // Vector of start times for each GRU
int num_gru_done = 0;
int num_gru_failed = 0; // number of grus that are waiting to be restarted
int num_gru_in_run_domain = 0; // number of grus we are currently solving for
int run_attempts_left = 1; // current run attempt for all grus
};
struct job_state {
// Actor References
caf::actor file_access_actor; // actor reference for the file_access_actor
caf::actor parent; // actor reference to the top-level SummaActor
// Job Parameters
int start_gru; // Starting GRU for this job
int num_gru; // Number of GRUs for this job
int num_hru;
int max_run_attempts = 1; // Max number of attempts to solve a GRU
GRU_Container gru_container;
// Variables for GRU monitoring
int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
int num_gru_done = 0; // The number of GRUs that have completed
int num_gru_failed = 0; // Number of GRUs that have failed
// Timing Variables
TimingInfo job_timing;
std::string hostname;
// settings for all child actors (save in case we need to recover)
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
/** The Job Actor */
behavior job_actor(stateful_actor<job_state>* self,
int start_gru, int num_gru,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings,
actor parent);
/*********************************************
* Functions for the Job Actor
*********************************************/
/** Get the information for the GRUs that will be written to the netcdf file */
std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts,
std::vector<GRU*> &gru_list);
void handleGRUError(stateful_actor<job_state>* self, caf::actor src);
} // end namespace
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <string>
class Batch {
private:
int batch_id_;
int start_hru_;
int num_hru_;
double run_time_;
double read_time_;
double write_time_;
bool assigned_to_actor_;
bool solved_;
public:
Batch(int batch_id = -1, int start_hru = -1, int num_hru = -1);
// Getters
int getBatchID();
int getStartHRU();
int getNumHRU();
double getRunTime();
double getReadTime();
double getWriteTime();
bool isAssigned();
bool isSolved();
std::string getBatchInfoString();
// Setters
void updateRunTime(double run_time);
void updateReadTime(double read_time);
void updateWriteTime(double write_time);
void updateAssigned(bool boolean);
void updateSolved(bool boolean);
void printBatchInfo();
void writeBatchToFile(std::string csv_output, std::string hostname);
std::string toString();
void assignToActor(std::string hostname, caf::actor assigned_actor);
template <class Inspector>
friend bool inspect(Inspector& inspector, Batch& batch) {
return inspector.object(batch).fields(
inspector.field("batch_id", batch.batch_id_),
inspector.field("start_hru", batch.start_hru_),
inspector.field("num_hru", batch.num_hru_),
inspector.field("run_time", batch.run_time_),
inspector.field("read_time", batch.read_time_),
inspector.field("write_time", batch.write_time_),
inspector.field("assigned_to_actor", batch.assigned_to_actor_),
inspector.field("solved", batch.solved_));
}
};
\ No newline at end of file
#include "caf/all.hpp"
#pragma once
#include "client.hpp"
class Batch_Container {
private:
int start_hru_;
int total_hru_count_;
int num_hru_per_batch_;
int batches_remaining_;
std::vector<Batch> batch_list_;
// Assemble the total number of HRUs given by the user into batches.
void assembleBatches();
public:
// Creating the batch_manager will also create the batches
// with the two parameters that are passed in.
Batch_Container(int start_hru = 1, int total_hru_count = 0,
int num_hru_per_batch = 0);
// returns the size of the batch list
int getBatchesRemaining();
int getTotalBatches();
// Find an unsolved batch, set it to assigned and return it.
std::optional<Batch> getUnsolvedBatch();
// Update the batch status to solved and write the output to a file.
void updateBatch_success(Batch successful_batch, std::string output_csv, std::string hostname);
// Update the batch status but do not write the output to a file.
void updateBatch_success(Batch successful_batch);
// Update batch by id
void updateBatch_success(int batch_id, double run_time, double read_time,
double write_time);
// Update the batch to assigned = true
void setBatchAssigned(Batch batch);
// Update the batch to assigned = false
void setBatchUnassigned(Batch batch);
// Check if there are batches left to solve
bool hasUnsolvedBatches();
// TODO: Needs implementation
void updateBatch_failure(Batch failed_batch);
std::string getAllBatchInfoString();
double getTotalReadTime();
double getTotalWriteTime();
/**
* A client has found to be disconnected. Unassign all batches
* that were assigned to the disconnected client. The client id
* is passed in as a parameter
*/
void updatedBatch_disconnectedClient(int client_id);
/**
* Create the csv file for the completed batches.
*/
void inititalizeCSVOutput(std::string csv_output_name);
/**
* @brief Print the batches from the batch list
*
*/
void printBatches();
std::string getBatchesAsString();
/**
* @brief Find the batch with the batch_id parameter
* update the batches assigned actor member variable to false
*
*/
void updateBatchStatus_LostClient(int batch_id);
template <class Inspector>
friend bool inspect(Inspector& inspector, Batch_Container& batch_container) {
return inspector.object(batch_container).fields(
inspector.field("total_hru_count", batch_container.total_hru_count_),
inspector.field("num_hru_per_batch", batch_container.num_hru_per_batch_),
inspector.field("batches_remaining", batch_container.batches_remaining_),
inspector.field("batch_list", batch_container.batch_list_));
}
};
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <optional>
#include "batch.hpp"
class Client {
private:
caf::actor client_actor;
std::string hostname;
int id;
int batches_solved;
std::optional<Batch> current_batch;
public:
Client(int id = -1, caf::actor client_actor = nullptr, std::string hostname = "");
// ####################################################################
// Getters
// ####################################################################
caf::actor getActor();
int getID();
std::string getHostname();
std::optional<Batch> getBatch();
// ####################################################################
// Setters
// ####################################################################
void setBatch(std::optional<Batch> batch);
// ####################################################################
// Methods
// ####################################################################
std::string toString();
// Serialization so CAF can send an object of this class to another actor
template <class Inspector>
friend bool inspect(Inspector& inspector, Client& client) {
return inspector.object(client).fields(
inspector.field("client_actor",client.client_actor),
inspector.field("hostname",client.hostname),
inspector.field("id",client.id),
inspector.field("batches_solved",client.batches_solved),
inspector.field("current_batch",client.current_batch));
}
};
\ No newline at end of file
#pragma once
#include "caf/all.hpp"
#include <vector>
#include "batch.hpp"
#include "client.hpp"
#include <optional>
class Client_Container {
private:
std::vector<Client> client_list;
int id_counter;
public:
Client_Container();
// ####################################################################
// Getters
// ####################################################################
int getNumClients();
std::vector<Client> getClientList();
std::optional<Client> getClient(caf::actor_addr client_ref);
// ####################################################################
// Setters
// ####################################################################
void setBatchForClient(caf::actor client_ref, std::optional<Batch> batch);
// ####################################################################
// Methods
// ####################################################################
// add a new client to the client_list
void addClient(caf::actor client_actor, std::string hostname);
// remove a client from the client_list
void removeClient(Client client);
// return a client that is not solving a batch or return an empty optional
std::optional<Client> getIdleClient();
// Check if the client list is empty
bool isEmpty();
// pops client at the end of the list
Client removeClient_fromBack();
// return printable string
std::string toString();
template <class Inspector>
friend bool inspect(Inspector& inspector, Client_Container& client_container) {
return inspector.object(client_container).fields(
inspector.field("client_list", client_container.client_list),
inspector.field("id_counter", client_container.id_counter));
}
};
#pragma once
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "timing_info.hpp"
#include "settings_functions.hpp"
#include "batch_container.hpp"
#include <chrono>
#include <string>
#include <vector>
namespace caf {
struct job_timing_info {
std::vector<double> job_duration;
std::vector<double> job_read_duration;
std::vector<double> job_write_duration;
};
struct summa_actor_state {
// Timing Information For Summa-Actor
TimingInfo summa_actor_timing;
struct job_timing_info timing_info_for_jobs;
// Program Parameters
int startGRU; // starting GRU for the simulation
int numGRU; // number of GRUs to compute
int fileGRU; // number of GRUs in the file
std::string configPath; // path to the fileManager.txt file
int numFailed = 0; // Number of jobs that have failed
caf::actor currentJob; // Reference to the current job actor
caf::actor parent;
// Batches
Batch_Container batch_container;
int current_batch_id;
// settings for all child actors (save in case we need to recover)
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
HRU_Actor_Settings hru_actor_settings;
};
behavior summa_actor(stateful_actor<summa_actor_state>* self,
int startGRU, int numGRU,
Summa_Actor_Settings summa_actor_settings,
File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings,
HRU_Actor_Settings hru_actor_settings, actor parent);
void spawnJob(stateful_actor<summa_actor_state>* self);
} // namespace caf
\ No newline at end of file