diff --git a/.gitignore b/.gitignore index bc92fccd59ccc922552a25480224bd8ff1a122ee..6056d65901baa1cd5d8e8a3e8d73e602a6ba25e5 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,7 @@ bin/data_be.csv bin/data_kinsol.csv bin/submission_copern.sh bin/summa_be +bin/data_kinsol_state_vec.csv +bin/state.png +bin/plot_resuduial.py +bin/summa_sundials diff --git a/bin/summa_sundials b/bin/summa_sundials deleted file mode 100755 index 62e56e7c81cf78345b9c4507714a52af3db8e638..0000000000000000000000000000000000000000 Binary files a/bin/summa_sundials and /dev/null differ diff --git a/build/cmake/CMakeLists.txt b/build/cmake/CMakeLists.txt index 18891b63cf9144eeb66e53eeecee9f0fe79eca41..bdc73606c63d4ef19affc08fa21d3a7923330991 100644 --- a/build/cmake/CMakeLists.txt +++ b/build/cmake/CMakeLists.txt @@ -25,8 +25,8 @@ include(ida.cmake) ######### SET THE PATHS TO THE SUNDIALS LIBRARIES AND INCLUDE FILES ######### ############################################################################# # set(DIR_SUNDIALS "/globalhome/kck540/HPC/Libraries/sundials/instdir") -set(DIR_SUNDIALS "/home/kklenk/projects/rpp-kshook/CompHydCore/SummaSundials/sundials/sundials/instdir") -#set(DIR_SUNDIALS "/usr/local/sundials") +# set(DIR_SUNDIALS "/home/kklenk/projects/rpp-kshook/CompHydCore/SummaSundials/sundials/sundials/instdir") +set(DIR_SUNDIALS "/usr/local/sundials-6.3.0") ############################################################################# # Set default build type to Release diff --git a/build/cmake/kinsol.cmake b/build/cmake/kinsol.cmake index c9b5b72b99751423150c3ba27bc6594c995ed7ad..b3a0e35482d0becac637801e9a1eb0c7a50449ee 100644 --- a/build/cmake/kinsol.cmake +++ b/build/cmake/kinsol.cmake @@ -8,32 +8,23 @@ function(compile_with_kinsol PARENT_DIR, DIR_SUNDIALS) message("DIR_SUNDIALS: ${DIR_SUNDIALS}") # Building Summa with sundials on local machine - # link_directories(${DIR_SUNDIALS}/lib) - # set(CMAKE_BUILD_RPATH "${DIR_SUNDIALS}/lib:/usr/local/lib") - # set(SUMMA_INCLUDES - # "/usr/include" - # "${DIR_SUNDIALS}/include" - # "${DIR_SUNDIALS}/fortran" - # ${netCDF_INCLUDES} - # ${LAPACK_INCLUDES}) - link_directories(${DIR_SUNDIALS}/lib64) - set(CMAKE_BUILD_RPATH "${DIR_SUNDIALS}/lib64") - set(SUMMA_INCLUDES - "$ENV{EBROOTNETCDFMINFORTRAN}/include" + link_directories(${DIR_SUNDIALS}/lib) + set(CMAKE_BUILD_RPATH "${DIR_SUNDIALS}/lib:/usr/local/lib") + set(SUMMA_INCLUDES + "/usr/include" "${DIR_SUNDIALS}/include" "${DIR_SUNDIALS}/fortran" ${netCDF_INCLUDES} ${LAPACK_INCLUDES}) + # link_directories(${DIR_SUNDIALS}/lib64) + # set(CMAKE_BUILD_RPATH "${DIR_SUNDIALS}/lib64") + # set(SUMMA_INCLUDES + # "$ENV{EBROOTNETCDFMINFORTRAN}/include" + # "${DIR_SUNDIALS}/include" + # "${DIR_SUNDIALS}/fortran" + # ${netCDF_INCLUDES} + # ${LAPACK_INCLUDES}) - # set(SUMMA_LIBS - # -lsundials_fkinsol_mod - # -lsundials_fnvecserial_mod - # -lsundials_fsunmatrixdense_mod - # -lsundials_fsunlinsoldense_mod - # -lsundials_fsunnonlinsolnewton_mod - # -lnetcdff - # -lopenblas - # SUMMA_NOAHMP) set(SUMMA_LIBS -lsundials_fkinsol_mod -lsundials_fnvecserial_mod @@ -41,48 +32,44 @@ function(compile_with_kinsol PARENT_DIR, DIR_SUNDIALS) -lsundials_fsunlinsoldense_mod -lsundials_fsunnonlinsolnewton_mod -lnetcdff - ${netCDF_LIBRARIES} - ${LAPACK_LIBRARIES} + -lopenblas SUMMA_NOAHMP) - - # set(SUMMA_ACTORS_INCLUDES - # ${CAF_INCLUDES} - # "$ENV{EBROOTNETCDFMINFORTRAN}/include" - # ${LAPACK_INCLUDES} - # "${DIR_SUNDIALS}/include" - # "${PARENT_DIR}/build/includes/global" - # "${PARENT_DIR}/build/includes/summa_actor" - # "${PARENT_DIR}/build/includes/gru_actor" - # "${PARENT_DIR}/build/includes/job_actor" - # "${PARENT_DIR}/build/includes/file_access_actor" - # "${PARENT_DIR}/build/includes/hru_actor") - set(SUMMA_ACTORS_INCLUDES - ${CAF_INCLUDES} - "$ENV{EBROOTNETCDFMINFORTRAN}/include" - ${LAPACK_INCLUDES} - "${DIR_SUNDIALS}/include" - "${PARENT_DIR}/build/includes/global" - "${PARENT_DIR}/build/includes/summa_actor" - "${PARENT_DIR}/build/includes/gru_actor" - "${PARENT_DIR}/build/includes/job_actor" - "${PARENT_DIR}/build/includes/file_access_actor" - "${PARENT_DIR}/build/includes/hru_actor") - - # set(SUMMA_ACTORS_LIBS - # -lopenblas - # -lcaf_core - # -lcaf_io - # summa - # -lnetcdff + # set(SUMMA_LIBS # -lsundials_fkinsol_mod # -lsundials_fnvecserial_mod # -lsundials_fsunmatrixdense_mod # -lsundials_fsunlinsoldense_mod - # -lsundials_fsunnonlinsolnewton_mod) + # -lsundials_fsunnonlinsolnewton_mod + # -lnetcdff + # ${netCDF_LIBRARIES} + # ${LAPACK_LIBRARIES} + # SUMMA_NOAHMP) + + set(SUMMA_ACTORS_INCLUDES + ${CAF_INCLUDES} + "$ENV{EBROOTNETCDFMINFORTRAN}/include" + ${LAPACK_INCLUDES} + "${DIR_SUNDIALS}/include" + "${PARENT_DIR}/build/includes/global" + "${PARENT_DIR}/build/includes/summa_actor" + "${PARENT_DIR}/build/includes/gru_actor" + "${PARENT_DIR}/build/includes/job_actor" + "${PARENT_DIR}/build/includes/file_access_actor" + "${PARENT_DIR}/build/includes/hru_actor") + # set(SUMMA_ACTORS_INCLUDES + # ${CAF_INCLUDES} + # "$ENV{EBROOTNETCDFMINFORTRAN}/include" + # ${LAPACK_INCLUDES} + # "${DIR_SUNDIALS}/include" + # "${PARENT_DIR}/build/includes/global" + # "${PARENT_DIR}/build/includes/summa_actor" + # "${PARENT_DIR}/build/includes/gru_actor" + # "${PARENT_DIR}/build/includes/job_actor" + # "${PARENT_DIR}/build/includes/file_access_actor" + # "${PARENT_DIR}/build/includes/hru_actor") + set(SUMMA_ACTORS_LIBS - ${CAF_LIBRARIES} - ${netCDF_LIBRARIES} - ${LAPACK_LIBRARIES} + -lopenblas -lcaf_core -lcaf_io summa @@ -92,6 +79,19 @@ function(compile_with_kinsol PARENT_DIR, DIR_SUNDIALS) -lsundials_fsunmatrixdense_mod -lsundials_fsunlinsoldense_mod -lsundials_fsunnonlinsolnewton_mod) + # set(SUMMA_ACTORS_LIBS + # ${CAF_LIBRARIES} + # ${netCDF_LIBRARIES} + # ${LAPACK_LIBRARIES} + # -lcaf_core + # -lcaf_io + # summa + # -lnetcdff + # -lsundials_fkinsol_mod + # -lsundials_fnvecserial_mod + # -lsundials_fsunmatrixdense_mod + # -lsundials_fsunlinsoldense_mod + # -lsundials_fsunnonlinsolnewton_mod) set(ACTORS_DIR ${PARENT_DIR}/build/source/actors) diff --git a/build/includes/file_access_actor/file_access_actor.hpp b/build/includes/file_access_actor/file_access_actor.hpp index 1d0856fd4a304fe69321fe421c1cadd8149dbc98..7ddd19f8aa096e1a9a3e0aa072a6938537ef73ce 100644 --- a/build/includes/file_access_actor/file_access_actor.hpp +++ b/build/includes/file_access_actor/file_access_actor.hpp @@ -7,10 +7,23 @@ #include "settings_functions.hpp" #include "fortran_data_types.hpp" #include "auxilary.hpp" +#include "global.hpp" // class Output_Container; +struct netcdf_gru_actor_info { + int run_time_var_id; + int init_duration_var_id; + int forcing_duration_var_id; + int run_physics_duration_var_id; + int write_output_duration_var_id; + + int state_var_id; // The success of the GRU 1 = pass, 0 = fail + int num_attempts_var_id; +}; + + namespace caf { @@ -20,6 +33,8 @@ struct file_access_state { int start_gru; int num_gru; + netcdf_gru_actor_info gru_actor_stats; + // std::vector<hru_output_handles> vector_of_output_handles; void *handle_forcing_file_info; // Handle for the forcing file information diff --git a/build/includes/file_access_actor/file_access_actor_subroutine_wrappers.hpp b/build/includes/file_access_actor/file_access_actor_subroutine_wrappers.hpp index 1a268e5c9c545b11e5d68993842f645b3175e850..88ce743d0d42c5b748769ed231a59aaed3e6772f 100644 --- a/build/includes/file_access_actor/file_access_actor_subroutine_wrappers.hpp +++ b/build/includes/file_access_actor/file_access_actor_subroutine_wrappers.hpp @@ -1,4 +1,5 @@ #pragma once +#include "file_access_actor.hpp" extern "C" { // initalizeFileAccessActor @@ -7,7 +8,8 @@ extern "C" { void read_pinit_C(int* err); void read_vegitationTables(int* err); void initFailedHRUTracker(int* numGRU); - void def_output(void* handle_ncid, int* startGRU, int* numGRU, int* numHRU, int* err); + void def_output(void* handle_ncid, int* startGRU, int* numGRU, int* numHRU, + netcdf_gru_actor_info* actor_info, int* err); // OutputStructure and Output functions void initOutputStructure(void* handle_forcFileInfo, int* max_steps, int* numGRU, int* err); @@ -73,6 +75,11 @@ extern "C" { void writeTimeToNetCDF(void* handle_ncid, void* handle_finalize_stats, void* handle_output_timestep, void* handle_time_struct, int* err); + void WriteGRUStatistics(void* handle_ncid, netcdf_gru_actor_info* actor_info, + serializable_netcdf_gru_actor_info* gru_stats_vector, int* num_gru, int* err); + + + diff --git a/build/includes/global/global.hpp b/build/includes/global/global.hpp index 4ded34993623d76f5a2da6538eef1cb52bf9392e..4730894760407f87809ef64543ac5ce299d95ea8 100644 --- a/build/includes/global/global.hpp +++ b/build/includes/global/global.hpp @@ -8,5 +8,27 @@ double calculateTime(std::chrono::time_point<std::chrono::system_clock> start, std::chrono::time_point<std::chrono::system_clock> end); +struct serializable_netcdf_gru_actor_info { + double run_time; + double init_duration; + double forcing_duration; + double run_physics_duration; + double write_output_duration; + + int successful; // 0 = false, 1 = true + int num_attempts; +}; + +template<class Inspector> +bool inspect(Inspector& f, serializable_netcdf_gru_actor_info& x) { + return f.object(x).fields(f.field("run_time", x.run_time), + f.field("init_duration", x.init_duration), + f.field("forcing_duration", x.forcing_duration), + f.field("run_physics_duration", x.run_physics_duration), + f.field("write_output_duration", x.write_output_duration), + f.field("successful", x.successful), + f.field("num_attempts", x.num_attempts)); +} + diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp index cc7bea341e3e5bca4801e9117e499e60f6e7f22d..7306095c077fbb3098adac1b822673a52bba0172 100644 --- a/build/includes/global/message_atoms.hpp +++ b/build/includes/global/message_atoms.hpp @@ -6,6 +6,7 @@ #include "client/client_container.hpp" #include <vector> #include "settings_functions.hpp" +#include "global.hpp" #include "caf/all.hpp" enum class hru_error : uint8_t { @@ -13,6 +14,11 @@ enum class hru_error : uint8_t { run_physics_infeasible_state = 2, }; +enum class file_access_error : uint8_t { + writing_error = 1, +}; + +// HRU Errors std::string to_string(hru_error err); bool from_string(caf::string_view in, hru_error& out); bool from_integer(uint8_t in, hru_error& out); @@ -21,6 +27,15 @@ bool inspect(Inspector& f, hru_error& x) { return caf::default_enum_inspect(f, x); } +// File Access Actor +std::string to_string(file_access_error err); +bool from_string(caf::string_view in, file_access_error& out); +bool from_integer(uint8_t in, file_access_error& out); +template<class Inspector> +bool inspect(Inspector& f, file_access_error& x) { + return caf::default_enum_inspect(f, x); +} + CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) // Sender: job_actor // Reciever: summa_actor @@ -185,6 +200,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) CAF_ADD_TYPE_ID(summa, (File_Access_Actor_Settings)) CAF_ADD_TYPE_ID(summa, (Job_Actor_Settings)) CAF_ADD_TYPE_ID(summa, (HRU_Actor_Settings)) + CAF_ADD_TYPE_ID(summa, (serializable_netcdf_gru_actor_info)) // Class Types CAF_ADD_TYPE_ID(summa, (Client)) @@ -198,13 +214,17 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) CAF_ADD_TYPE_ID(summa, (std::vector<double>)) CAF_ADD_TYPE_ID(summa, (std::vector<long int>)) CAF_ADD_TYPE_ID(summa, (std::vector<std::tuple<caf::actor, std::string>>)) + CAF_ADD_TYPE_ID(summa, (std::vector<serializable_netcdf_gru_actor_info>)) CAF_ADD_TYPE_ID(summa, (std::optional<caf::strong_actor_ptr>)) // error types CAF_ADD_TYPE_ID(summa, (hru_error)) + CAF_ADD_TYPE_ID(summa, (file_access_error)) + CAF_END_TYPE_ID_BLOCK(summa) -CAF_ERROR_CODE_ENUM(hru_error) \ No newline at end of file +CAF_ERROR_CODE_ENUM(hru_error) +CAF_ERROR_CODE_ENUM(file_access_error) \ No newline at end of file diff --git a/build/includes/job_actor/GRUinfo.hpp b/build/includes/job_actor/GRUinfo.hpp index 050d6a7fc23327da31dace9d25729ad0880b47e8..04c73c7a3ea2f768781868b061e39cc382aa2a0a 100644 --- a/build/includes/job_actor/GRUinfo.hpp +++ b/build/includes/job_actor/GRUinfo.hpp @@ -4,6 +4,73 @@ #include <iostream> #include <fstream> +/* + * Determine the state of the GRU +*/ +enum class gru_state { + running, + failed, + succeeded +}; + +auto success = [](const gru_state& state) -> int { + return(state == gru_state::succeeded) ? 1 : 0; +}; + +class GRU { + private: + int global_gru_index; // The index of the GRU in the netcdf file + int local_gru_index; // The index of the GRU within this job + caf::actor gru_actor; // The actor for the GRU + + // Modifyable Parameters + int dt_init_factor; // The initial dt for the GRU + + // Status Information + int attempts_left; // The number of attempts left for the GRU to succeed + gru_state state; // The state of the GRU + + // Timing Information + double run_time = 0.0; // The total time to run the GRU + double init_duration = 0.0; // The time to initialize the GRU + double forcing_duration = 0.0; // The time to read the forcing data + double run_physics_duration = 0.0; // The time to run the physics + double write_output_duration = 0.0; // The time to write the output + + + public: + // Constructor + GRU(int global_gru_index, int local_gru_index, caf::actor gru_actor, int dt_init_factor, int max_attempts); + + // Deconstructor + ~GRU(); + + // Getters + int getGlobalGRUIndex(); + + double getRunTime(); + double getInitDuration(); + double getForcingDuration(); + double getRunPhysicsDuration(); + double getWriteOutputDuration(); + + double getAttemptsLeft(); + gru_state getStatus(); + + + // Setters + void setRunTime(double run_time); + void setInitDuration(double init_duration); + void setForcingDuration(double forcing_duration); + void setRunPhysicsDuration(double run_physics_duration); + void setWriteOutputDuration(double write_output_duration); + + void setSuccess(); + +}; + + + class GRUinfo { private: int refGRU; // This will be the same as the refGRU diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp index 7c377280d808d9fc6f977e05226ce41c488456fa..bc717402e7bad630503bb2f0b25b33b9c02802c0 100644 --- a/build/includes/job_actor/job_actor.hpp +++ b/build/includes/job_actor/job_actor.hpp @@ -6,8 +6,17 @@ #include "settings_functions.hpp" #include <unistd.h> #include <limits.h> +#include "global.hpp" namespace caf { + +struct GRU_Container { + std::vector<GRU*> gru_list; + int num_gru_done = 0; + int num_gru_failed = 0; // number of grus that are waiting to be restarted + int num_gru_in_run_domain = 0; // number of grus we are currently solving for +}; + struct job_state { // Actor References caf::actor file_access_actor; // actor reference for the file_access_actor @@ -17,22 +26,26 @@ struct job_state { int start_gru; // Starting GRU for this job int num_gru; // Number of GRUs for this job int num_hru; + int max_run_attempts = 1; // Max number of attemtps to solve a GRU + + + std::vector<GRU*> gru_list; + + GRU_Container gru_container; + // Variables for GRU monitoring int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em) - int max_run_attempts = 1; // Max number of attemtps to solve a GRU - std::vector<GRUinfo*> gru_list; // List of all GRUs under this job actor + // std::vector<GRUinfo*> gru_list; // List of all GRUs under this job actor int num_gru_done = 0; // The number of GRUs that have completed - int gru_init = 0; // Number of GRUs initalized - int err = 0; // Error Code int num_gru_failed = 0; // Number of GRUs that have failed // Timing Variables TimingInfo job_timing; + std::string hostname; - // Output File Names for Timings std::string success_output_file; std::string failed_output_file = "failedHRU"; @@ -45,14 +58,22 @@ struct job_state { }; + + behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings, actor parent); void initCsvOutputFile(stateful_actor<job_state>* self); -void initalizeGRU(stateful_actor<job_state>* self); +void initGRUs(stateful_actor<job_state>* self); + +/** + * Get the information for the GRUs that will be written to the netcdf file +*/ +std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, std::vector<GRU*> &gru_list); +// Initalize the GRU objects and their actors void runGRUs(stateful_actor<job_state>* self); void restartFailures(stateful_actor<job_state>* self); diff --git a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp index faff2ff30aba9d32abca1179ee0441fbb3bb3ad1..c94ba58a97d43186cf0e1cb028cc1105b018d098 100644 --- a/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp +++ b/build/source/actors/file_access_actor/cpp_code/file_access_actor.cpp @@ -3,7 +3,6 @@ #include "file_access_actor_subroutine_wrappers.hpp" #include "fortran_data_types.hpp" #include "message_atoms.hpp" -#include "global.hpp" #include "json.hpp" #include "auxilary.hpp" @@ -36,6 +35,7 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr initalizeFileAccessActor(self); if (self->state.file_access_actor_settings.num_partitions_in_output_buffer > num_gru) { + // Prevents a division with a remainder self->state.file_access_actor_settings.num_partitions_in_output_buffer = num_gru; } @@ -197,7 +197,12 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gr resetFailedArray(); }, - [=](deallocate_structures) { + [=](deallocate_structures, std::vector<serializable_netcdf_gru_actor_info> &netcdf_gru_info) { + int num_gru = netcdf_gru_info.size(); + WriteGRUStatistics(self->state.handle_ncid, &self->state.gru_actor_stats, + netcdf_gru_info.data(), &num_gru, &self->state.err); + + aout(self) << "Deallocating Structure" << std::endl; FileAccessActor_DeallocateStructures(self->state.handle_forcing_file_info, self->state.handle_ncid); // deallocateOutputStructure(&self->state.err); @@ -267,7 +272,8 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { initFailedHRUTracker(&self->state.num_gru); - def_output(self->state.handle_ncid, &self->state.start_gru, &self->state.num_gru, &self->state.num_gru, &err); + def_output(self->state.handle_ncid, &self->state.start_gru, &self->state.num_gru, + &self->state.num_gru, &self->state.gru_actor_stats, &err); if (err != 0) { aout(self) << "ERROR: Create_OutputFile\n"; std::string function = "def_output"; @@ -305,7 +311,8 @@ void initalizeFileAccessActor(stateful_actor<file_access_state>* self) { // read in the inital conditions for the grus/hrus readInitConditions(self); - self->send(self->state.parent, done_file_access_actor_init_v); + // Inital Files Have Been Loaded - Send Message to Job_Actor to Start Simulation + self->send(self->state.parent, init_gru_v); // initalize the forcingFile array self->state.filesLoaded = 0; for (int i = 1; i <= self->state.numFiles; i++) { diff --git a/build/source/actors/file_access_actor/fortran_code/write_to_netcdf.f90 b/build/source/actors/file_access_actor/fortran_code/write_to_netcdf.f90 index 490d7120c859d43fc1667075b997c6a23fba2a59..6b3ce362234bd46055e191f58d5756f956ee3f48 100644 --- a/build/source/actors/file_access_actor/fortran_code/write_to_netcdf.f90 +++ b/build/source/actors/file_access_actor/fortran_code/write_to_netcdf.f90 @@ -11,7 +11,7 @@ public::writeParamToNetCDF public::writeDataToNetCDF public::writeBasinToNetCDF public::writeTimeToNetCDF - +public::writeGRUStatistics contains @@ -282,4 +282,62 @@ subroutine writeTimeToNetCDF(handle_ncid, handle_finalize_stats, handle_output_t end subroutine writeTimeToNetCDF +subroutine writeGRUStatistics(handle_ncid, & + gru_var_ids, & + gru_stats_vector, & + num_gru, & + err) bind(C, name="WriteGRUStatistics") + USE data_types,only:var_i,netcdf_gru_actor_info,serializable_netcdf_gru_actor_info + USE var_lookup, only: maxvarFreq ! number of output frequencies + USE netcdf + implicit none + ! Dummy Variables + type(c_ptr), intent(in), value :: handle_ncid + type(netcdf_gru_actor_info),intent(in) :: gru_var_ids + type(serializable_netcdf_gru_actor_info),intent(in) :: gru_stats_vector(num_gru) + integer(c_int), intent(in) :: num_gru + integer(c_int), intent(out) :: err + + ! Local Variables + type(var_i), pointer :: ncid + real(c_double), dimension(num_gru) :: run_time_array + real(c_double), dimension(num_gru) :: init_time_array + real(c_double), dimension(num_gru) :: forcing_time_array + real(c_double), dimension(num_gru) :: run_physics_time_array + real(c_double), dimension(num_gru) :: write_output_time_array + + integer(c_int), dimension(num_gru) :: successful_array + integer(c_int), dimension(num_gru) :: num_attempts_array + + integer(c_int) :: i + integer(c_int) :: iFreq + ! --------------------------------------------------------------------------------------- + ! * Convert From C++ to Fortran + call c_f_pointer(handle_ncid, ncid) + + ! Assemble fortran arrays + do i=1,num_gru + run_time_array(i) = gru_stats_vector(i)%run_time + init_time_array(i) = gru_stats_vector(i)%init_duration + forcing_time_array(i) = gru_stats_vector(i)%forcing_duration + run_physics_time_array(i) = gru_stats_vector(i)%run_physics_duration + write_output_time_array(i) = gru_stats_vector(i)%write_output_duration + successful_array(i) = gru_stats_vector(i)%successful + num_attempts_array(i) = gru_stats_vector(i)%num_attempts + end do + + ! Write to NetCDF + do iFreq=1, maxvarFreq + err = nf90_put_var(ncid%var(iFreq), gru_var_ids%run_time_var_id, run_time_array) + err = nf90_put_var(ncid%var(iFreq), gru_var_ids%init_duration_var_id, init_time_array) + err = nf90_put_var(ncid%var(iFreq), gru_var_ids%forcing_duration_var_id, forcing_time_array) + err = nf90_put_var(ncid%var(iFreq), gru_var_ids%run_physics_duration_var_id, run_physics_time_array) + err = nf90_put_var(ncid%var(iFreq), gru_var_ids%write_output_duration_var_id, write_output_time_array) + err = nf90_put_var(ncid%var(iFreq), gru_var_ids%state_var_id, successful_array) + err = nf90_put_var(ncid%var(iFreq), gru_var_ids%num_attempts_var_id, num_attempts_array) + end do + +end subroutine writeGRUStatistics + + end module write_to_netcdf_module \ No newline at end of file diff --git a/build/source/actors/global/message_atoms.cpp b/build/source/actors/global/message_atoms.cpp index 27ecd9aa34c1a832fe500409549dfef32559a4e5..25037a479af6b2b24744eac87975ba03d25c5a02 100644 --- a/build/source/actors/global/message_atoms.cpp +++ b/build/source/actors/global/message_atoms.cpp @@ -1,11 +1,7 @@ #include "caf/all.hpp" #include "message_atoms.hpp" -// enum class hru_error : uint8_t { -// run_physics_unhandleable = 1, -// run_physics_infeasible_state = 2, -// }; - +// HRU Errors std::string to_string(hru_error err) { switch(err) { case hru_error::run_physics_unhandleable: @@ -42,3 +38,30 @@ bool from_integer(uint8_t in, hru_error& out) { } } +// File Access Error +std::string to_string(file_access_error err) { + switch(err) { + case file_access_error::writing_error: + return "writing_error"; + default: + return "unknown"; + } +} + +bool from_string(caf::string_view in, file_access_error& out) { + if (in == "writing_error") { + out = file_access_error::writing_error; + return true; + } + return false; +} + +bool from_integer(uint8_t in, file_access_error& out) { + switch(in) { + case 1: + out = file_access_error::writing_error; + return true; + default: + return false; + } +} \ No newline at end of file diff --git a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp index 1c407e7ce8f1187dc47c85ad223d0c5f0e11c296..1065163e7f695f13a3b14818b0e42e7ad4bf5bbe 100644 --- a/build/source/actors/hru_actor/cpp_code/hru_actor.cpp +++ b/build/source/actors/hru_actor/cpp_code/hru_actor.cpp @@ -4,6 +4,7 @@ #include "message_atoms.hpp" #include "hru_actor_subroutine_wrappers.hpp" #include "serialize_data_structure.hpp" +#include <thread> namespace caf { @@ -36,6 +37,7 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, self->state.forcingStep = 1; self->state.output_structure_step_index = 1; self->state.iFile = 1; + // Get the settings for the HRU self->state.hru_actor_settings = hru_actor_settings; @@ -143,8 +145,15 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, self->send(self->state.parent, run_failure_v, self, self->state.indxGRU, err); // self->quit(hru_error::run_hru_unhandleable); // caf::exit_reason - // self->down_msg(hru_error::run_physics_unhandleable); + // self->down_msg(); + self->quit(); + } + + + if (self->state.timestep == 543 && self->state.indxGRU == 2) { + self->send(self->state.parent, hru_error::run_physics_unhandleable); self->quit(); + return; } writeHRUToOutputStructure(&self->state.indxHRU, &self->state.indxGRU, diff --git a/build/source/actors/job_actor/GRUinfo.cpp b/build/source/actors/job_actor/GRUinfo.cpp index e026b6dbd68fcf614d3ee68c788d732a8e235a2a..1b4631d884fbfb613c438769f36334fcdbb509e2 100644 --- a/build/source/actors/job_actor/GRUinfo.cpp +++ b/build/source/actors/job_actor/GRUinfo.cpp @@ -4,15 +4,82 @@ #include <fstream> -GRUinfo::GRUinfo(int refGRU, int indxGRU, caf::actor gru, int dt_init, int maxAttempts) { + +GRU::GRU(int global_gru_index, int local_gru_index, caf::actor gru_actor, int dt_init_factor, int max_attempt) { + this->global_gru_index = global_gru_index; + this->local_gru_index = local_gru_index; + this->gru_actor = gru_actor; + this->dt_init_factor = dt_init_factor; + this->attempts_left = max_attempt; + this->state = gru_state::running; +} +GRU::~GRU() {}; + +// Getters +int GRU::getGlobalGRUIndex() { + return this->global_gru_index; +} + +double GRU::getRunTime() { + return this->run_time; +} + +double GRU::getInitDuration() { + return this->init_duration; +} + +double GRU::getForcingDuration() { + return this->forcing_duration; +} + +double GRU::getRunPhysicsDuration() { + return this->run_physics_duration; +} + +double GRU::getWriteOutputDuration() { + return this->write_output_duration; +} + +double GRU::getAttemptsLeft() { + return this->attempts_left; +} + +gru_state GRU::getStatus() { + return this->state; +} + +// Setters +void GRU::setRunTime(double run_time) { + this->run_time = run_time; +} +void GRU::setInitDuration(double init_duration) { + this->init_duration = init_duration; +} +void GRU::setForcingDuration(double forcing_duration) { + this->forcing_duration = forcing_duration; +} +void GRU::setRunPhysicsDuration(double run_physics_duration) { + this->run_physics_duration = run_physics_duration; +} +void GRU::setWriteOutputDuration(double write_output_duration) { + this->write_output_duration = write_output_duration; +} + +void GRU::setSuccess() { + this->state = gru_state::succeeded; +} + + + + + +GRUinfo::GRUinfo(int refGRU, int indxGRU, caf::actor gru_actor, int dt_init_factor, int max_attempts) { this->refGRU = refGRU; this->indxGRU = indxGRU; - this->GRU = gru; - this->dt_init = dt_init; + this->GRU = gru_actor; + this->dt_init = dt_init_factor; this->currentAttempt = 1; - this->maxAttempts = maxAttempts; - this->completed = false; - this->failed = false; + } GRUinfo::~GRUinfo(){}; diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index e89f16337244c260ac222dfd0b97735f5f01b717..31c82fd6403d7117cf7acc0d16e37cd33c77d6e5 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -4,7 +4,6 @@ #include <chrono> #include <thread> #include "message_atoms.hpp" -#include "global.hpp" #include "job_actor_subroutine_wrappers.hpp" #include "hru_actor.hpp" #include "gru_actor.hpp" @@ -17,7 +16,8 @@ namespace caf { behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings, caf::actor parent) { - + + // Set the error handlers self->set_down_handler([=](const down_msg& dm) { aout(self) << "\n\n ********** DOWN HANDLER ********** \n"; aout(self) << "Lost Connection With A Connected Actor\n"; @@ -26,13 +26,25 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, self->set_error_handler([=](const error& err) { aout(self) << "\n\n ********** ERROR HANDLER ********** \n"; - aout(self) << "Error: " << to_string(err) << "\n"; + + switch(err.category()) { + case type_id_v<hru_error>: + aout(self) << "HRU Error: " << to_string(err) << "\n"; + break; + case type_id_v<file_access_error>: + aout(self) << "File Access Error: " << to_string(err) << "\n"; + break; + default: + aout(self) << "Unknown Error: " << to_string(err) << "\n"; + break; + } }); self->set_exit_handler([=](const exit_msg& em) { aout(self) << "\n\n ********** EXIT HANDLER ********** \n"; aout(self) << "Exit Reason: " << to_string(em.reason) << "\n"; }); + // Timing Information self->state.job_timing = TimingInfo(); @@ -49,6 +61,9 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, self->state.job_actor_settings = job_actor_settings; self->state.hru_actor_settings = hru_actor_settings; + // Init the GRU Container + self->state.gru_container.num_gru_in_run_domain = num_gru; + // hostname information that is useful for verifying that the job actor is running on the correct node char host[HOST_NAME_MAX]; gethostname(host, HOST_NAME_MAX); @@ -82,7 +97,7 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, return {}; // Failure } - initCsvOutputFile(self); + // initCsvOutputFile(self); // Spawn the file_access_actor. This will return the number of forcing files we are working with self->state.file_access_actor = self->spawn(file_access_actor, self->state.start_gru, self->state.num_gru, @@ -93,196 +108,263 @@ behavior job_actor(stateful_actor<job_state>* self, int start_gru, int num_gru, return { - [=](init_hru) { - initalizeGRU(self); + [=](init_gru) { + initGRUs(self); }, - [=](done_init_hru) { - - self->state.gru_init++; - if (self->state.gru_init >= self->state.num_gru) { - aout(self) << "All GRUs are initalized\n"; - self->state.gru_init = 0; // reset counter in case we have failures - runGRUs(self); + [=](done_hru, int local_gru_index, double total_duration, + double init_duration, double forcing_duration, + double run_physics_duration, double write_output_duration) { + + aout(self) << "\nJob_Actor: GRU Finished: \n" << + " global_gru_index = " << + self->state.gru_container.gru_list[local_gru_index-1]->getGlobalGRUIndex() << "\n" << + " local_gru_index = " << local_gru_index << "\n" << + " total_duration = " << total_duration << "\n" << + " init_duration = " << init_duration << "\n" << + " forcing_duration = " << forcing_duration << "\n" << + " run_physics_duration = " << run_physics_duration << "\n" << + " write_output_duration = " << write_output_duration << "\n\n"; + // Update Timing + self->state.gru_container.gru_list[local_gru_index-1]->setRunTime(total_duration); + self->state.gru_container.gru_list[local_gru_index-1]->setInitDuration(init_duration); + self->state.gru_container.gru_list[local_gru_index-1]->setForcingDuration(forcing_duration); + self->state.gru_container.gru_list[local_gru_index-1]->setRunPhysicsDuration(run_physics_duration); + self->state.gru_container.gru_list[local_gru_index-1]->setWriteOutputDuration(write_output_duration); + + self->state.gru_container.gru_list[local_gru_index-1]->setSuccess(); + + self->state.gru_container.num_gru_done++; + + // Check if we have finished all active GRUs + if (self->state.gru_container.num_gru_done >= self->state.gru_container.num_gru_in_run_domain) { + + // Check for failures + if(self->state.gru_container.num_gru_failed == 0) { + //TODO: RENAME DEALLOCATE_STURCTURES this is more of a finalize + std::vector<serializable_netcdf_gru_actor_info> netcdf_gru_info = getGruNetcdfInfo( + self->state.max_run_attempts, + self->state.gru_container.gru_list); + self->send(self->state.file_access_actor, deallocate_structures_v, netcdf_gru_info); + + } else { + // TODO: Handle failures } - }, + } - [=](done_hru, int indx_gru, double total_duration, double init_duration, - double forcing_duration, double run_physics_duration, double write_output_duration) { - aout(self) << "\nDone - GRU:" << self->state.gru_list[indx_gru - 1]->getRefGRU() - << " - IndexInJob = " << indx_gru << "\n"; + } + // aout(self) << "\nDone - GRU:" << self->state.gru_list[indx_gru - 1]->getRefGRU() + // << " - IndexInJob = " << indx_gru << "\n"; - self->state.gru_list[indx_gru - 1]->doneRun(total_duration, init_duration, forcing_duration, - run_physics_duration, write_output_duration); + // self->state.gru_list[indx_gru - 1]->doneRun(total_duration, init_duration, forcing_duration, + // run_physics_duration, write_output_duration); - if (self->state.job_actor_settings.output_csv) { - self->state.gru_list[indx_gru - 1]->writeSuccess(self->state.success_output_file, self->state.hostname); - } + // if (self->state.job_actor_settings.output_csv) { + // self->state.gru_list[indx_gru - 1]->writeSuccess(self->state.success_output_file, self->state.hostname); + // } - self->state.num_gru_done++; + // self->state.num_gru_done++; - // Check if we are done - if (self->state.num_gru_done >= self->state.num_gru) { - self->state.num_gru_done = 0; // just in case there were failures + // // Check if we are done + // if (self->state.num_gru_done >= self->state.num_gru) { + // self->state.num_gru_done = 0; // just in case there were failures - if (self->state.num_gru_failed == 0) { - self->send(self->state.file_access_actor, deallocate_structures_v); - } else { - restartFailures(self); - } - } - }, + // if (self->state.num_gru_failed == 0) { + // self->send(self->state.file_access_actor, deallocate_structures_v); + // } else { + // restartFailures(self); + // } + // } + // }, - [=](run_failure, caf::actor actorRef, int indx_gru, int err) { + // [=](run_failure, caf::actor actorRef, int indx_gru, int err) { - aout(self) << "GRU:" << self->state.gru_list[indx_gru - 1]->getRefGRU() - << "indx_gru = " << indx_gru << "Failed \n" - << "Will have to wait until all GRUs are done before it can be re-tried\n"; + // aout(self) << "GRU:" << self->state.gru_list[indx_gru - 1]->getRefGRU() + // << "indx_gru = " << indx_gru << "Failed \n" + // << "Will have to wait until all GRUs are done before it can be re-tried\n"; - self->state.num_gru_failed++; - self->state.gru_list[indx_gru - 1]->updateFailed(); - - // Let the file_access_actor know this actor failed - self->send(self->state.file_access_actor, run_failure_v, indx_gru); - - // check if we are the last hru to complete - if (self->state.num_gru_done + self->state.num_gru_failed >= self->state.num_gru) { - // restartFailures(self); - self->quit(); - } - }, - - [=](done_file_access_actor_init) { - // Init HRU Actors and the Output Structure - self->send(self, init_hru_v); - }, - - [=](done_init_gru) { - aout(self) << "GRU is Initialized\n"; - self->quit(); - return; - }, - - [=](file_access_actor_done, double read_duration, double write_duration) { - int err = 0; - // Delete GRUs - for (auto GRU : self->state.gru_list) { - delete GRU; - } - self->state.gru_list.clear(); - - - self->state.job_timing.updateEndPoint("total_duration"); - - aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n"; - aout(self) << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n"; - aout(self) << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n"; - aout(self) << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n\n"; + // self->state.num_gru_failed++; + // self->state.gru_list[indx_gru - 1]->updateFailed(); + + // // Let the file_access_actor know this actor failed + // // self->send(self->state.file_access_actor, run_failure_v, indx_gru); + + // // check if we are the last hru to complete + // if (self->state.num_gru_done + self->state.num_gru_failed >= self->state.num_gru) { + // // restartFailures(self); + // self->quit(); + // } + // }, + + // [=](done_init_gru) { + // aout(self) << "GRU is Initialized\n"; + // self->quit(); + // return; + // }, + + // [=](file_access_actor_done, double read_duration, double write_duration) { + // int err = 0; + // // Delete GRUs + // for (auto GRU : self->state.gru_list) { + // delete GRU; + // } + // self->state.gru_list.clear(); + + + // self->state.job_timing.updateEndPoint("total_duration"); + + // aout(self) << "\n________________PRINTING JOB_ACTOR TIMING INFO RESULTS________________\n"; + // aout(self) << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n"; + // aout(self) << "Total Duration = " << self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n"; + // aout(self) << "Total Duration = " << (self->state.job_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n\n"; + + // deallocateJobActor(&err); + // // Tell Parent we are done + // self->send(self->state.parent, + // done_job_v, + // self->state.num_gru_failed, + // self->state.job_timing.getDuration("total_duration").value_or(-1.0), + // read_duration, write_duration); + // self->quit(); + // }, + + // [=](file_access_actor_err, std::string function) { + // aout(self) << "Failure in File Access Actor in function: " << function << "\n"; + // if (function == "def_output") { + // aout(self) << "Error with the output file, will try creating it agian\n"; + // std::this_thread::sleep_for(std::chrono::seconds(5)); + // self->state.file_access_actor = self->spawn(file_access_actor, self->state.start_gru, self->state.num_gru, + // self->state.file_access_actor_settings, self); + // } else { + // aout(self) << "Letting Parent Know we are quitting\n"; + // self->send(self->state.parent, err_v); + // self->quit(); + // } + // } + }; +} - deallocateJobActor(&err); - // Tell Parent we are done - self->send(self->state.parent, - done_job_v, - self->state.num_gru_failed, - self->state.job_timing.getDuration("total_duration").value_or(-1.0), - read_duration, write_duration); - self->quit(); - }, +// void initCsvOutputFile(stateful_actor<job_state>* self) { +// std::string success = "/Success"; // allows us to build the string +// if (self->state.job_actor_settings.output_csv) { +// std::ofstream file; +// self->state.success_output_file = self->state.job_actor_settings.csv_path += success += +// std::to_string(self->state.start_gru) += ".csv"; +// aout(self) << "Success Output File: " << self->state.success_output_file << "\n"; +// file.open(self->state.success_output_file, std::ios_base::out); +// file << +// "hostname," << +// "GRU," << +// "totalDuration," << +// "initDuration," << +// "forcingDuration," << +// "runPhysicsDuration," << +// "writeOutputDuration," << +// "dt_init," << +// "numAttemtps\n"; +// file.close(); +// } +// } + +void initGRUs(stateful_actor<job_state>* self) { + for(int i = 0; i < self->state.gru_container.num_gru_in_run_domain; i++) { + // Spawn the GRU Actor + auto global_gru_index = self->state.gru_container.gru_list.size() + self->state.start_gru; + auto local_gru_index = self->state.gru_container.gru_list.size() + 1; // Fortran reference starts at 1 + auto gru = self->spawn(hru_actor, + global_gru_index, + local_gru_index, + self->state.hru_actor_settings, + self->state.file_access_actor, + self); + + // Create the GRU object + self->state.gru_container.gru_list.push_back( + new GRU(global_gru_index, + local_gru_index, + gru, + self->state.dt_init_start_factor, + self->state.max_run_attempts)); + } + + + + + + // for(int i = 0; i < self->state.num_gru; i++) { + // int start_gru = self->state.gru_list.size() + self->state.start_gru; + // int index_gru = self->state.gru_list.size() + 1; // Fortran reference starts at 1 + // auto gru = self->spawn(hru_actor, + // start_gru, + // index_gru, + // self->state.hru_actor_settings, + // self->state.file_access_actor, + // self); + // self->state.gru_list.push_back(new GRUinfo(start_gru, index_gru, gru, + // self->state.dt_init_start_factor, self->state.max_run_attempts)); + // } + +} - [=](file_access_actor_err, std::string function) { - aout(self) << "Failure in File Access Actor in function: " << function << "\n"; - if (function == "def_output") { - aout(self) << "Error with the output file, will try creating it agian\n"; - std::this_thread::sleep_for(std::chrono::seconds(5)); - self->state.file_access_actor = self->spawn(file_access_actor, self->state.start_gru, self->state.num_gru, - self->state.file_access_actor_settings, self); - } else { - aout(self) << "Letting Parent Know we are quitting\n"; - self->send(self->state.parent, err_v); - self->quit(); - } +std::vector<serializable_netcdf_gru_actor_info> getGruNetcdfInfo(int max_run_attempts, std::vector<GRU*> &gru_list) { + std::vector<serializable_netcdf_gru_actor_info> gru_netcdf_info; + for(auto gru : gru_list) { + serializable_netcdf_gru_actor_info gru_info; + gru_info.run_time = gru->getRunTime(); + gru_info.init_duration = gru->getInitDuration(); + gru_info.forcing_duration = gru->getForcingDuration(); + gru_info.run_physics_duration = gru->getRunPhysicsDuration(); + gru_info.write_output_duration = gru->getWriteOutputDuration(); + + gru_info.num_attempts = max_run_attempts - gru->getAttemptsLeft() + 1; + gru_info.successful = success(gru->getStatus()); - } - }; -} + gru_netcdf_info.push_back(gru_info); -void initCsvOutputFile(stateful_actor<job_state>* self) { - std::string success = "/Success"; // allows us to build the string - if (self->state.job_actor_settings.output_csv) { - std::ofstream file; - self->state.success_output_file = self->state.job_actor_settings.csv_path += success += - std::to_string(self->state.start_gru) += ".csv"; - aout(self) << "Success Output File: " << self->state.success_output_file << "\n"; - file.open(self->state.success_output_file, std::ios_base::out); - file << - "hostname," << - "GRU," << - "totalDuration," << - "initDuration," << - "forcingDuration," << - "runPhysicsDuration," << - "writeOutputDuration," << - "dt_init," << - "numAttemtps\n"; - file.close(); } + return gru_netcdf_info; } -void initalizeGRU(stateful_actor<job_state>* self) { - - for(int i = 0; i < self->state.num_gru; i++) { - int start_gru = self->state.gru_list.size() + self->state.start_gru; - int index_gru = self->state.gru_list.size() + 1; // Fortran reference starts at 1 - auto gru = self->spawn(hru_actor, - start_gru, - index_gru, - self->state.hru_actor_settings, - self->state.file_access_actor, - self); - self->state.gru_list.push_back(new GRUinfo(start_gru, index_gru, gru, - self->state.dt_init_start_factor, self->state.max_run_attempts)); - } - -} -void runGRUs(stateful_actor<job_state>* self) { - for(auto gru : self->state.gru_list) { - if(!gru->isCompleted() && !gru->isFailed()) { - self->send(gru->getActor(), start_hru_v); - } - } -} -void restartFailures(stateful_actor<job_state>* self) { - // Need to let the file_access_actor know so it can set up the new output Manager - self->send(self->state.file_access_actor, restart_failures_v); - - self->state.num_gru = self->state.num_gru_failed; - self->state.num_gru_failed = 0; - self->state.num_gru_done = 0; - - - for(auto gru : self->state.gru_list) { - if (gru->isFailed() && !gru->isMaxAttemptsReached()) { - gru->updateFailed(); - gru->updateDt_init(); - auto newGRU = self->spawn(hru_actor, gru->getRefGRU(), gru->getIndxGRU(), - self->state.hru_actor_settings, self->state.file_access_actor, - self); - gru->updateGRU(newGRU); - gru->updateCurrentAttempt(); - self->send(gru->getActor(), dt_init_factor_v, gru->getDt_init()); - } else { - // Max attempts reached, so we are done with this GRU - self->state.gru_list[gru->getIndxGRU() - 1]->doneRun(-1, -1, -1, -1, -1); - if (self->state.job_actor_settings.output_csv) { - self->state.gru_list[gru->getIndxGRU() - 1]->writeSuccess(self->state.success_output_file, self->state.hostname); - } - self->state.num_gru_done++; - } - } -} +// void runGRUs(stateful_actor<job_state>* self) { +// for(auto gru : self->state.gru_list) { +// if(!gru->isCompleted() && !gru->isFailed()) { +// self->send(gru->getActor(), start_hru_v); +// } +// } +// } + +// void restartFailures(stateful_actor<job_state>* self) { +// // Need to let the file_access_actor know so it can set up the new output Manager +// self->send(self->state.file_access_actor, restart_failures_v); + +// self->state.num_gru = self->state.num_gru_failed; +// self->state.num_gru_failed = 0; +// self->state.num_gru_done = 0; + + +// for(auto gru : self->state.gru_list) { +// if (gru->isFailed() && !gru->isMaxAttemptsReached()) { +// gru->updateFailed(); +// gru->updateDt_init(); +// auto newGRU = self->spawn(hru_actor, gru->getRefGRU(), gru->getIndxGRU(), +// self->state.hru_actor_settings, self->state.file_access_actor, +// self); +// gru->updateGRU(newGRU); +// gru->updateCurrentAttempt(); +// self->send(gru->getActor(), dt_init_factor_v, gru->getDt_init()); +// } else { +// // Max attempts reached, so we are done with this GRU +// self->state.gru_list[gru->getIndxGRU() - 1]->doneRun(-1, -1, -1, -1, -1); +// if (self->state.job_actor_settings.output_csv) { +// self->state.gru_list[gru->getIndxGRU() - 1]->writeSuccess(self->state.success_output_file, self->state.hostname); +// } +// self->state.num_gru_done++; +// } +// } +// } } // End Namespace caf diff --git a/build/source/dshare/data_types.f90 b/build/source/dshare/data_types.f90 index 200f1f60aee6624f034e27e06178bef85654cc2b..bdf57bc5195e31862f8663bc53e8ff6efc8d300e 100755 --- a/build/source/dshare/data_types.f90 +++ b/build/source/dshare/data_types.f90 @@ -19,6 +19,8 @@ ! along with this program. If not, see <http://www.gnu.org/licenses/>. MODULE data_types + USE, intrinsic :: iso_c_binding + ! used to define model data structures USE nrtype, integerMissing=>nr_integerMissing USE var_lookup,only:maxvarFreq @@ -74,6 +76,29 @@ MODULE data_types integer(i4b) :: nTimeSteps ! Number of Timesteps in the file end type var_forc + ! *********************************************************************************************************** + ! Define GRU_Actor var_id structure + ! *********************************************************************************************************** + type,public,bind(C) :: netcdf_gru_actor_info + integer(C_INT) :: run_time_var_id + integer(C_INT) :: init_duration_var_id + integer(C_INT) :: forcing_duration_var_id + integer(C_INT) :: run_physics_duration_var_id + integer(C_INT) :: write_output_duration_var_id + integer(C_INT) :: state_var_id + integer(C_INT) :: num_attempts_var_id + end type netcdf_gru_actor_info + + type,public,bind(C) :: serializable_netcdf_gru_actor_info + real(C_DOUBLE) :: run_time + real(C_DOUBLE) :: init_duration + real(C_DOUBLE) :: forcing_duration + real(C_DOUBLE) :: run_physics_duration + real(C_DOUBLE) :: write_output_duration + integer(C_INT) :: successful + integer(C_INT) :: num_attempts + end type serializable_netcdf_gru_actor_info + ! *********************************************************************************************************** ! Define metadata on model parameters ! *********************************************************************************************************** diff --git a/build/source/dshare/var_lookup.f90 b/build/source/dshare/var_lookup.f90 index 4ca54394705b8ca6c4caacf6d4d8981a94d43959..4b430ca5691e5483fe369c99c25884bf6004f3f2 100755 --- a/build/source/dshare/var_lookup.f90 +++ b/build/source/dshare/var_lookup.f90 @@ -438,6 +438,7 @@ MODULE var_lookup integer(i4b) :: scalarVolLatHt_fus = integerMissing ! volumetric latent heat of fusion (J m-3) ! number of function evaluations integer(i4b) :: numFluxCalls = integerMissing ! number of flux calls (-) + integer(i4b) :: wallClockTime = integerMissing ! wall clock time (s) endtype iLook_diag ! *********************************************************************************************************** @@ -822,7 +823,7 @@ MODULE var_lookup 51, 52, 53, 54, 55, 56, 57, 58, 59, 60,& 61, 62, 63, 64, 65, 66, 67, 68, 69, 70,& 71, 72, 73, 74, 75, 76, 77, 78, 79, 80,& - 81, 82, 83) + 81, 82, 83, 84) ! named variables: model fluxes type(iLook_flux), public,parameter :: iLookFLUX =iLook_flux ( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,& 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,& diff --git a/build/source/netcdf/def_output.f90 b/build/source/netcdf/def_output.f90 index 18d8485efca1277697cee02f479316c2a113aa7c..97f9328a9d542c816827ac8c416f1714601ea4af 100755 --- a/build/source/netcdf/def_output.f90 +++ b/build/source/netcdf/def_output.f90 @@ -21,7 +21,7 @@ module def_output_module USE, intrinsic :: iso_c_binding -USE data_types,only:var_i +USE data_types,only:var_i,netcdf_gru_actor_info USE netcdf USE netcdf_util_module,only:netcdf_err ! netcdf error handling function USE netcdf_util_module,only:nc_file_close ! close NetCDF files @@ -72,7 +72,7 @@ contains ! ********************************************************************************************************** ! public subroutine def_output: define model output file ! ********************************************************************************************************** -subroutine def_output(handle_ncid,startGRU,nGRU,nHRU,err) bind(C, name='def_output') +subroutine def_output(handle_ncid,startGRU,nGRU,nHRU,actor_info,err) bind(C, name='def_output') USE globalData,only:structInfo ! information on the data structures USE globalData,only:forc_meta,attr_meta,type_meta ! metaData structures USE globalData,only:prog_meta,diag_meta,flux_meta,deriv_meta ! metaData structures @@ -95,11 +95,12 @@ subroutine def_output(handle_ncid,startGRU,nGRU,nHRU,err) bind(C, name='def_outp ! --------------------------------------------------------------------------------------- ! * variables from C++ ! --------------------------------------------------------------------------------------- - type(c_ptr),intent(in), value :: handle_ncid ! ncid of the output file - integer(c_int),intent(in) :: startGRU ! startGRU for the entire job (for file creation) - integer(c_int),intent(in) :: nGRU ! number of GRUs - integer(c_int),intent(in) :: nHRU ! number of HRUs - integer(c_int),intent(out) :: err ! error code + type(c_ptr),intent(in), value :: handle_ncid ! ncid of the output file + integer(c_int),intent(in) :: startGRU ! startGRU for the entire job (for file creation) + integer(c_int),intent(in) :: nGRU ! number of GRUs + integer(c_int),intent(in) :: nHRU ! number of HRUs + type(netcdf_gru_actor_info),intent(out):: actor_info ! netcdf actor information + integer(c_int),intent(out) :: err ! error code ! --------------------------------------------------------------------------------------- ! * Fortran Variables For Conversion ! --------------------------------------------------------------------------------------- @@ -154,6 +155,9 @@ subroutine def_output(handle_ncid,startGRU,nGRU,nHRU,err) bind(C, name='def_outp endif end do + + + ! create initial file ! each file will have a master name with a frequency appended at the end: ! e.g., xxxxxxxxx_timestep.nc (for output at every model timestep) @@ -220,6 +224,21 @@ subroutine def_output(handle_ncid,startGRU,nGRU,nHRU,err) bind(C, name='def_outp return end if + ! define timing variables for actors code + ! TODO: Add attributes to these variables + err = nf90_def_var(ncid%var(iFreq),"run_time",outputPrecision,(/gru_DimID/),actor_info%run_time_var_id) + err = nf90_def_var(ncid%var(iFreq),"init_duration",outputPrecision,(/gru_DimID/),actor_info%init_duration_var_id) + err = nf90_def_var(ncid%var(iFreq),"forcing_duration",outputPrecision,(/gru_DimID/),actor_info%forcing_duration_var_id) + err = nf90_def_var(ncid%var(iFreq),"run_physics_duration",outputPrecision,(/gru_DimID/),actor_info%run_physics_duration_var_id) + err = nf90_def_var(ncid%var(iFreq),"write_output_duration",outputPrecision,(/gru_DimID/),actor_info%write_output_duration_var_id) + err = nf90_def_var(ncid%var(iFreq),"successful",nf90_int,(/gru_DimID/),actor_info%state_var_id) + err = nf90_def_var(ncid%var(iFreq),"num_attempts",nf90_int,(/gru_DimID/),actor_info%num_attempts_var_id) + if(err/=0) then + message=trim(message)//trim(cmessage) + print*, message + return + end if + end do end subroutine def_output