diff --git a/bin/submit_summa_actors_32.sh b/bin/submit_summa_actors_32.sh deleted file mode 100644 index 91470b77f721ffbab839ea7aa648f383fea73cf3..0000000000000000000000000000000000000000 --- a/bin/submit_summa_actors_32.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -#SBATCH --nodes=1 -#SBATCH --exclusive -#SBATCH --time=1:00:00 -#SBATCH --mem=0 -#SBATCH --job-name=Summa-Actors -#SBATCH --output=/scratch/gwf/gwf_cmt/kck540/Summa-Sundials-Output/slurm/slurm-%A.out -#SBATCH --account=hpc_c_giws_clark - -module load StdEnv/2020 -module load gcc/9.3.0 -module load openblas/0.3.17 -module load netcdf-fortran/4.5.2 - -# for Actors -module load caf - -export LD_LIBRARY_PATH=/globalhome/kck540/HPC/Libraries/sundials/v6.6/instdir/lib64:$LD_LIBRARY_PATH - - -gru_max=12000 -gru_count=25 -max_job=3 -summa_exe=/globalhome/kck540/HPC/Summa-Projects/Summa-Actors/bin/summa_actors -# config_summa=/globalhome/kck540/HPC/Summa-Projects/Summa-Actors/bin/config.json -file_manager=/project/gwf/gwf_cmt/kck540/domain_NorthAmerica/Summa-Projects/input_data/summa_actors_input/fileManager.txt -config_summa=/globalhome/kck540/HPC/Summa-Projects/Summa-Actors/bin/Summa_Actors_Settings.json - -offset=$SLURM_ARRAY_TASK_ID -gru_start=$(( 1 + gru_count*offset )) -check=$(( $gru_start + $gru_count )) - -# Adust the number of grus for the last job -# if [ $check -gt $gru_max ] || [ $offset -eq $(( max_job-1 )) ] -# then -# gru_count=$(( gru_max-gru_start+1 )) -# fi - -$summa_exe -g $gru_start $gru_count -c $config_summa --caf.scheduler.max-threads=32 \ No newline at end of file diff --git a/build/CMakeLists.txt b/build/CMakeLists.txt index 49f844fb7eaa7048ef47fe7ba36b0e5029c1ca25..575348f0a413f9263a98b4f27758e41d31c0b854 100644 --- a/build/CMakeLists.txt +++ b/build/CMakeLists.txt @@ -62,7 +62,7 @@ else() "${PARENT_DIR}/build/includes/file_access_actor" "${PARENT_DIR}/build/includes/hru_actor") link_directories("/usr/local/lib") - set(LIB_ACTORS CAF::core CAF::io -lopenblas -lnetcdff) + set(LIB_ACTORS CAF::core CAF::io -lopenblas -lnetcdff -lnetcdf) endif() # Define directories that contains source code diff --git a/build/includes/summa_actor/batch.hpp b/build/includes/summa_actor/batch.hpp index 141f9ed57bcd50a6ad8cde54efef3b167757e4fb..0c3ab85dad32af55d93da77eac39397ce8de6fa1 100644 --- a/build/includes/summa_actor/batch.hpp +++ b/build/includes/summa_actor/batch.hpp @@ -3,54 +3,53 @@ #include <string> class Batch { - private: - int batch_id; - int start_hru; - int num_hru; + private: + int batch_id_; + int start_hru_; + int num_hru_; + double run_time_; + double read_time_; + double write_time_; + bool assigned_to_actor_; + bool solved_; - double run_time; - double read_time; - double write_time; + public: + Batch(int batch_id = -1, int start_hru = -1, int num_hru = -1); - bool assigned_to_actor; - bool solved; + // Getters + int getBatchID(); + int getStartHRU(); + int getNumHRU(); + double getRunTime(); + double getReadTime(); + double getWriteTime(); + bool isAssigned(); + bool isSolved(); + std::string getBatchInfoString(); + // Setters + void updateRunTime(double run_time); + void updateReadTime(double read_time); + void updateWriteTime(double write_time); + void updateAssigned(bool boolean); + void updateSolved(bool boolean); + void printBatchInfo(); + void writeBatchToFile(std::string csv_output, std::string hostname); - public: - Batch(int batch_id = -1, int start_hru = -1, int num_hru = -1); - - // Getters - int getBatchID(); - int getStartHRU(); - int getNumHRU(); - double getRunTime(); - double getReadTime(); - double getWriteTime(); - bool isAssigned(); - bool isSolved(); - // Setters - void updateRunTime(double run_time); - void updateReadTime(double read_time); - void updateWriteTime(double write_time); - void updateAssigned(bool boolean); - void updateSolved(bool boolean); - void printBatchInfo(); - void writeBatchToFile(std::string csv_output, std::string hostname); - - std::string toString(); + std::string toString(); - void assignToActor(std::string hostname, caf::actor assigned_actor); + void assignToActor(std::string hostname, caf::actor assigned_actor); - template <class Inspector> - friend bool inspect(Inspector& inspector, Batch& batch) { - return inspector.object(batch).fields( - inspector.field("batch_id", batch.batch_id), - inspector.field("start_hru", batch.start_hru), - inspector.field("num_hru", batch.num_hru), - inspector.field("run_time", batch.run_time), - inspector.field("read_time", batch.read_time), - inspector.field("write_time", batch.write_time), - inspector.field("assigned_to_actor", batch.assigned_to_actor), - inspector.field("solved", batch.solved)); - } + template <class Inspector> + friend bool inspect(Inspector& inspector, Batch& batch) { + return inspector.object(batch).fields( + inspector.field("batch_id", batch.batch_id_), + inspector.field("start_hru", batch.start_hru_), + inspector.field("num_hru", batch.num_hru_), + inspector.field("run_time", batch.run_time_), + inspector.field("read_time", batch.read_time_), + inspector.field("write_time", batch.write_time_), + inspector.field("assigned_to_actor", batch.assigned_to_actor_), + inspector.field("solved", batch.solved_)); + } }; \ No newline at end of file diff --git a/build/includes/summa_actor/batch_container.hpp b/build/includes/summa_actor/batch_container.hpp index 8a61429f08f957bd33e639063e4f9d26861b9575..f3c9571133c9a6983191657f2ef9ac4933462429 100644 --- a/build/includes/summa_actor/batch_container.hpp +++ b/build/includes/summa_actor/batch_container.hpp @@ -4,77 +4,90 @@ class Batch_Container { - private: - int total_hru_count; - int num_hru_per_batch; - int batches_remaining; - std::vector<Batch> batch_list; - - // Assemble the total number of HRUs given by the user into batches. - void assembleBatches(int total_hru_count, int num_hru_per_batch); + private: + int start_hru_; + int total_hru_count_; + int num_hru_per_batch_; + int batches_remaining_; + std::vector<Batch> batch_list_; - public: + // Assemble the total number of HRUs given by the user into batches. + void assembleBatches(); + + public: - // Creating the batch_manager will also create the batches - // with the two parameters that are passed in. - Batch_Container(int total_hru_count = 0, int num_hru_per_batch = 0); + // Creating the batch_manager will also create the batches + // with the two parameters that are passed in. + Batch_Container(int start_hru = 1, int total_hru_count = 0, + int num_hru_per_batch = 0); + + // returns the size of the batch list + int getBatchesRemaining(); + int getTotalBatches(); + + // Find an unsolved batch, set it to assigned and return it. + std::optional<Batch> getUnsolvedBatch(); + + // Update the batch status to solved and write the output to a file. + void updateBatch_success(Batch successful_batch, std::string output_csv, std::string hostname); + // Update the batch status but do not write the output to a file. + void updateBatch_success(Batch successful_batch); + // Update batch by id + void updateBatch_success(int batch_id, double run_time, double read_time, + double write_time); + + // Update the batch to assigned = true + void setBatchAssigned(Batch batch); + // Update the batch to assigned = false + void setBatchUnassigned(Batch batch); + + // Check if there are batches left to solve + bool hasUnsolvedBatches(); + + // TODO: Needs implementation + void updateBatch_failure(Batch failed_batch); + + std::string getAllBatchInfoString(); + + + double getTotalReadTime(); + double getTotalWriteTime(); + + + /** + * A client has found to be disconnected. Unassign all batches + * that were assigned to the disconnected client. The client id + * is passed in as a parameter + */ + void updatedBatch_disconnectedClient(int client_id); + + /** + * Create the csv file for the completed batches. + */ + void inititalizeCSVOutput(std::string csv_output_name); + + /** + * @brief Print the batches from the batch list + * + */ + void printBatches(); + std::string getBatchesAsString(); + + /** + * @brief Find the batch with the batch_id parameter + * update the batches assigned actor member variable to false + * + */ + void updateBatchStatus_LostClient(int batch_id); + + template <class Inspector> + friend bool inspect(Inspector& inspector, Batch_Container& batch_container) { + return inspector.object(batch_container).fields( + inspector.field("total_hru_count", batch_container.total_hru_count_), + inspector.field("num_hru_per_batch", batch_container.num_hru_per_batch_), + inspector.field("batches_remaining", batch_container.batches_remaining_), + inspector.field("batch_list", batch_container.batch_list_)); + } - // returns the size of the batch list - int getBatchesRemaining(); - - // Find an unsolved batch, set it to assigned and return it. - std::optional<Batch> getUnsolvedBatch(); - - // Update the batch status to solved and write the output to a file. - void updateBatch_success(Batch successful_batch, std::string output_csv, std::string hostname); - // Update the batch status but do not write the output to a file. - void updateBatch_success(Batch successful_batch); - - // Update the batch to assigned = true - void setBatchAssigned(Batch batch); - // Update the batch to assigned = false - void setBatchUnassigned(Batch batch); - - // Check if there are batches left to solve - bool hasUnsolvedBatches(); - - // TODO: Needs implementation - void updateBatch_failure(Batch failed_batch); - - - /** - * A client has found to be disconnected. Unassign all batches - * that were assigned to the disconnected client. The client id - * is passed in as a parameter - */ - void updatedBatch_disconnectedClient(int client_id); - - /** - * Create the csv file for the completed batches. - */ - void inititalizeCSVOutput(std::string csv_output_name); - - /** - * @brief Print the batches from the batch list - * - */ - void printBatches(); - - /** - * @brief Find the batch with the batch_id parameter - * update the batches assigned actor member variable to false - * - */ - void updateBatchStatus_LostClient(int batch_id); - - template <class Inspector> - friend bool inspect(Inspector& inspector, Batch_Container& batch_container) { - return inspector.object(batch_container).fields( - inspector.field("total_hru_count", batch_container.total_hru_count), - inspector.field("num_hru_per_batch", batch_container.num_hru_per_batch), - inspector.field("batches_remaining", batch_container.batches_remaining), - inspector.field("batch_list", batch_container.batch_list)); - } - }; \ No newline at end of file diff --git a/build/includes/summa_actor/summa_actor.hpp b/build/includes/summa_actor/summa_actor.hpp index 8d26e51750421c1e25ecd89b1eaac9095980344a..36457e1e7c47659c764593dc59aae4c8b01e5e66 100644 --- a/build/includes/summa_actor/summa_actor.hpp +++ b/build/includes/summa_actor/summa_actor.hpp @@ -4,7 +4,7 @@ #include "caf/io/all.hpp" #include "timing_info.hpp" #include "settings_functions.hpp" - +#include "batch_container.hpp" #include <chrono> #include <string> #include <vector> @@ -26,13 +26,16 @@ struct summa_actor_state { // Program Parameters int startGRU; // starting GRU for the simulation int numGRU; // number of GRUs to compute - std::string configPath;// path to the fileManager.txt file - // Information about the jobs + int fileGRU; // number of GRUs in the file + std::string configPath; // path to the fileManager.txt file int numFailed = 0; // Number of jobs that have failed - caf::actor currentJob; // Reference to the current job actor caf::actor parent; + // Batches + Batch_Container batch_container; + int current_batch_id; + // settings for all child actors (save in case we need to recover) Summa_Actor_Settings summa_actor_settings; @@ -41,9 +44,12 @@ struct summa_actor_state { HRU_Actor_Settings hru_actor_settings; }; -behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, - Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings, actor parent); +behavior summa_actor(stateful_actor<summa_actor_state>* self, + int startGRU, int numGRU, + Summa_Actor_Settings summa_actor_settings, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings, actor parent); void spawnJob(stateful_actor<summa_actor_state>* self); diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index b15d631d07dac60e6259ecd7f1d37469fbe2d49c..314d62111ffc156dc6765db9dd1118423c886fce 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -45,9 +45,12 @@ struct summa_server_state { }; // Summa Server setup behaviour - initializes the state for the server -behavior summa_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings, - Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings); +behavior summa_server_init(stateful_actor<summa_server_state>* self, + Distributed_Settings distributed_settings, + Summa_Actor_Settings summa_actor_settings, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings); // Summa Server behaviour - handles messages from clients behavior summa_server(stateful_actor<summa_server_state>* self); diff --git a/build/source/actors/global/settings_functions.cpp b/build/source/actors/global/settings_functions.cpp index fa7d3ecf827e1b4f5dbae4ca7737a849edd6b6b3..b8d1b44b4107de8a3c22e4f576d6c93a10545b8a 100644 --- a/build/source/actors/global/settings_functions.cpp +++ b/build/source/actors/global/settings_functions.cpp @@ -191,9 +191,12 @@ HRU_Actor_Settings readHRUActorSettings(std::string json_settings_file) { void check_settings_from_json(Distributed_Settings &distributed_settings, - Summa_Actor_Settings &summa_actor_settings, File_Access_Actor_Settings &file_access_actor_settings, - Job_Actor_Settings &job_actor_settings, HRU_Actor_Settings &hru_actor_settings) { - + Summa_Actor_Settings &summa_actor_settings, + File_Access_Actor_Settings &file_access_actor_settings, + Job_Actor_Settings &job_actor_settings, + HRU_Actor_Settings &hru_actor_settings) { + + if (distributed_settings.distributed_mode) { std::cout << "************ DISTRIBUTED_SETTINGS ************\n" << distributed_settings.distributed_mode << "\n"; for (auto& host : distributed_settings.servers_list) { @@ -201,17 +204,27 @@ void check_settings_from_json(Distributed_Settings &distributed_settings, } std::cout << distributed_settings.port << "\n" << distributed_settings.total_hru_count << "\n" - << distributed_settings.num_hru_per_batch << "\n" - << "************ SUMMA_ACTOR_SETTINGS ************\n" - << summa_actor_settings.max_gru_per_job << "\n\n\n" - << "************ FILE_ACCESS_ACTOR_SETTINGS ************\n" - << file_access_actor_settings.num_partitions_in_output_buffer << "\n" - << file_access_actor_settings.num_timesteps_in_output_buffer << "\n\n\n" - << "************ JOB_ACTOR_SETTINGS ************\n" - << job_actor_settings.file_manager_path << "\n" - << "************ HRU_ACTOR_SETTINGS ************\n" - << hru_actor_settings.print_output << "\n" - << hru_actor_settings.output_frequency << "\n"; + << distributed_settings.num_hru_per_batch << "\n"; + } + + std::cout << "************ SUMMA_ACTORS SETTINGS ************\n" + << "Max GRU per Job: " + << summa_actor_settings.max_gru_per_job << "\n" + << "Num Partitions in Output Buffer: " + << file_access_actor_settings.num_partitions_in_output_buffer << "\n" + << "Num Timesteps in Output Buffer: " + << file_access_actor_settings.num_timesteps_in_output_buffer << "\n" + << "File Manager Path: " + << job_actor_settings.file_manager_path << "\n" + << "Max Run Attempts Per GRU: " + << job_actor_settings.max_run_attempts << "\n" + << "Print GRU Timestep: " + << hru_actor_settings.print_output << "\n" + << "GRU Timestep Print Frequency: " + << hru_actor_settings.output_frequency << "\n" + << "GRU Initial Timestep Factor: " + << hru_actor_settings.dt_init_factor << "\n" + << "********************************************\n\n"; } diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index e8da6c96cb890e2f2475b675adaf8c0c1c1ac62d..2c7d619bf4f8983c3c15dfebc2e8b8a822f939ab 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -56,9 +56,9 @@ behavior job_actor(stateful_actor<job_state>* self, /* Calls: - - summa_SetTimesDirsAndFiles() - - summa_defineGlobalData() - - read_icond_nlayers() + - summa_SetTimesDirsAndFiles + - summa_defineGlobalData + - read_icond_nlayers - Allocates time structures */ job_init_fortran(self->state.job_actor_settings.file_manager_path.c_str(), @@ -118,12 +118,13 @@ behavior job_actor(stateful_actor<job_state>* self, chrono_time end_point = high_resolution_clock::now(); double total_duration = duration_cast<seconds>(end_point - gru_container.gru_start_time).count(); + gru_container.num_gru_done++; + + aout(self) << "GRU Finished: " << gru_container.num_gru_done << "/" + << gru_container.num_gru_in_run_domain << " -- " + << "GlobalGRU=" << gru_container.gru_list[local_gru_index-1]->getGlobalGRUIndex() + << " -- LocalGRU=" << local_gru_index << "\n"; - aout(self) << "\nJob_Actor: GRU Finished: \n" - << " global_gru_index = " - << gru_container.gru_list[local_gru_index-1]->getGlobalGRUIndex() << "\n" - << " local_gru_index = " << local_gru_index << "\n" - << " total_duration = " << total_duration << "\n\n"; // Update Timing gru_container.gru_list[local_gru_index-1]->setRunTime(total_duration); gru_container.gru_list[local_gru_index-1]->setInitDuration(-1); @@ -133,7 +134,6 @@ behavior job_actor(stateful_actor<job_state>* self, gru_container.gru_list[local_gru_index-1]->setSuccess(); - gru_container.num_gru_done++; // Check if all GRUs are finished diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index debf0c7123201957760ae6f06c70e6dbcdad6bec..d198ea8a3e3c8e0ff80af72e8c0769ccd0b45bb0 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -161,8 +161,6 @@ void caf_main(actor_system& sys, const config& cfg) { job_actor_settings.file_manager_path = cfg.master_file; } - - aout(self) << "Printing Settings For SUMMA Simulation\n"; check_settings_from_json(distributed_settings, summa_actor_settings, file_access_actor_settings, diff --git a/build/source/actors/summa_actor/batch.cpp b/build/source/actors/summa_actor/batch.cpp index ae895c6d5ff454a8eaf652ea01f8734b405e963c..10f9935e6995f90bcc67997566b4667c947c09f8 100644 --- a/build/source/actors/summa_actor/batch.cpp +++ b/build/source/actors/summa_actor/batch.cpp @@ -1,102 +1,72 @@ #include "batch.hpp" Batch::Batch(int batch_id, int start_hru, int num_hru){ - this->batch_id = batch_id; - this->start_hru = start_hru; - this->num_hru = num_hru; - this->run_time = 0.0; - this->read_time = 0.0; - this->write_time = 0.0; - this->assigned_to_actor = false; - this->solved = false; + batch_id_ = batch_id; + start_hru_ = start_hru; + num_hru_ = num_hru; + run_time_ = 0.0; + read_time_ = 0.0; + write_time_ = 0.0; + assigned_to_actor_ = false; + solved_ = false; } // Getters -int Batch::getBatchID() { - return this->batch_id; -} - -int Batch::getStartHRU() { - return this->start_hru; -} - -int Batch::getNumHRU() { - return this->num_hru; -} - -double Batch::getRunTime() { - return this->run_time; -} - -double Batch::getReadTime() { - return this->read_time; -} - -double Batch::getWriteTime() { - return this->write_time; -} - -bool Batch::isAssigned() { - return this->assigned_to_actor; -} - -bool Batch::isSolved() { - return this->solved; +int Batch::getBatchID() { return batch_id_; } +int Batch::getStartHRU() { return start_hru_; } +int Batch::getNumHRU() { return num_hru_; } +double Batch::getRunTime() { return run_time_; } +double Batch::getReadTime() { return read_time_; } +double Batch::getWriteTime() { return write_time_; } +bool Batch::isAssigned() { return assigned_to_actor_; } +bool Batch::isSolved() { return solved_; } + +std::string Batch::getBatchInfoString() { + std::string out_string = "batch_id=" + std::to_string(batch_id_) + + " -- start_hru=" + std::to_string(start_hru_) + + " -- num_hru: " + std::to_string(num_hru_) + "\n"; + return out_string; } // Setters -void Batch::updateRunTime(double run_time) { - this->run_time = run_time; -} - -void Batch::updateReadTime(double read_time) { - this->read_time = read_time; -} - -void Batch::updateWriteTime(double write_time) { - this->write_time = write_time; -} +void Batch::updateRunTime(double run_time) { run_time_ = run_time; } +void Batch::updateReadTime(double read_time) { read_time_ = read_time; } +void Batch::updateWriteTime(double write_time) { write_time_ = write_time; } +void Batch::updateAssigned(bool boolean) { assigned_to_actor_ = boolean; } +void Batch::updateSolved(bool boolean) { solved_ = boolean; } -void Batch::updateAssigned(bool boolean) { - this->assigned_to_actor = boolean; +void Batch::printBatchInfo() { + std::cout << "batch_id=" << batch_id_ << " -- start_hru=" << start_hru_ + << " -- num_hru: " << num_hru_ << "\n"; } -void Batch::updateSolved(bool boolean) { - this->solved = boolean; -} -void Batch::printBatchInfo() { - std::cout << "batch_id: " << this->batch_id << "\n"; - std::cout << "start_hru: " << this->start_hru << "\n"; - std::cout << "num_hru: " << this->num_hru << "\n"; -} std::string Batch::toString() { - std::stringstream out_string; - - out_string << "batch_id: " << this->batch_id << "\n" << - "start_hru: " << this->start_hru << "\n" << - "num_hru: " << this->num_hru << "\n" << - "run_time: " << this->run_time << "\n" << - "read_time: " << this->read_time << "\n" << - "write_time: " << this->write_time << "\n" << - "assigned_to_actor: " << this->assigned_to_actor << "\n" << - "solved: " << this->solved << "\n"; + std::stringstream out_string; + out_string << "batch_id: " << batch_id_ << "\n" << + "start_hru: " << start_hru_ << "\n" << + "num_hru: " << num_hru_ << "\n" << + "run_time: " << run_time_ << "\n" << + "read_time: " << read_time_ << "\n" << + "write_time: " << write_time_ << "\n" << + // "assigned_to_actor: " << assigned_to_actor_ << "\n" << + "solved: " << solved_ << "\n"; - return out_string.str(); + return out_string.str(); } void Batch::writeBatchToFile(std::string file_name, std::string hostname) { std::ofstream output_file; output_file.open(file_name, std::ios_base::app); output_file << - this->batch_id << "," << - this->start_hru << "," << - this->num_hru << "," << + batch_id_ << "," << + start_hru_ << "," << + num_hru_ << "," << hostname << "," << - this->run_time << "," << - this->read_time << "," << - this->write_time << "\n"; + run_time_ << "," << + read_time_ << "," << + write_time_ << "\n"; output_file.close(); } \ No newline at end of file diff --git a/build/source/actors/summa_actor/batch_container.cpp b/build/source/actors/summa_actor/batch_container.cpp index b5ae235d4a3fb4c1409eac258d578ccef9bbec61..644a1a872dd2b077081e8a56b0706b83f8f0b9bf 100644 --- a/build/source/actors/summa_actor/batch_container.cpp +++ b/build/source/actors/summa_actor/batch_container.cpp @@ -1,77 +1,119 @@ #include "batch_container.hpp" -Batch_Container::Batch_Container(int total_hru_count, int num_hru_per_batch) { - this->total_hru_count = total_hru_count; - this->num_hru_per_batch = num_hru_per_batch; - this->assembleBatches(this->total_hru_count, this->num_hru_per_batch); - this->batches_remaining = this->batch_list.size(); // batch_list set in assemble batches +Batch_Container::Batch_Container(int start_hru, int total_hru_count, + int num_hru_per_batch) { + start_hru_ = start_hru; + total_hru_count_ = total_hru_count; + num_hru_per_batch_ = num_hru_per_batch; + assembleBatches(); + batches_remaining_ = batch_list_.size(); // batch_list set in assemble batches } -int Batch_Container::getBatchesRemaining() { - return this->batches_remaining; -} +int Batch_Container::getBatchesRemaining() { return batches_remaining_;} +int Batch_Container::getTotalBatches() { return batch_list_.size();} -void Batch_Container::assembleBatches(int total_hru_count, int num_hru_per_batch) { - int remaining_hru_to_batch = total_hru_count; - int batch_id = 0; - int start_hru = 1; - - while(remaining_hru_to_batch > 0) { - if (num_hru_per_batch > remaining_hru_to_batch) { - this->batch_list.push_back(Batch(batch_id, start_hru, remaining_hru_to_batch)); - remaining_hru_to_batch = 0; - } else { - this->batch_list.push_back(Batch(batch_id, start_hru, num_hru_per_batch)); - - remaining_hru_to_batch -= num_hru_per_batch; - start_hru += num_hru_per_batch; - batch_id += 1; - } - } +void Batch_Container::assembleBatches() { + int remaining_hru_to_batch = total_hru_count_; + int batch_id = 0; + int start_hru_local = start_hru_; + + while (remaining_hru_to_batch > 0) { + int current_batch_size = std::min(num_hru_per_batch_, remaining_hru_to_batch); + batch_list_.push_back(Batch(batch_id, start_hru_local, current_batch_size)); + + remaining_hru_to_batch -= current_batch_size; + start_hru_local += current_batch_size; + if (current_batch_size == num_hru_per_batch_) + batch_id += 1; + } } void Batch_Container::printBatches() { - for (std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) { - this->batch_list[i].printBatchInfo(); - } + for (auto& batch : batch_list_) { + batch.printBatchInfo(); + } +} + +std::string Batch_Container::getBatchesAsString() { + std::string out_string = ""; + for (auto& batch : batch_list_) { + out_string += batch.getBatchInfoString(); + } + return out_string; } void Batch_Container::updateBatchStatus_LostClient(int batch_id) { - this->batch_list[batch_id].updateAssigned(false); + batch_list_[batch_id].updateAssigned(false); } std::optional<Batch> Batch_Container::getUnsolvedBatch() { - for (std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) { - if (!this->batch_list[i].isAssigned() && !this->batch_list[i].isSolved()) { - this->batch_list[i].updateAssigned(true); - return this->batch_list[i]; - } + for (auto& batch : batch_list_) { + if (!batch.isAssigned() && !batch.isSolved()) { + batch.updateAssigned(true); + return batch; } - return {}; + } + return {}; } void Batch_Container::setBatchAssigned(Batch batch) { - this->batch_list[batch.getBatchID()].updateAssigned(true); + batch_list_[batch.getBatchID()].updateAssigned(true); } void Batch_Container::setBatchUnassigned(Batch batch) { - this->batch_list[batch.getBatchID()].updateAssigned(false); + batch_list_[batch.getBatchID()].updateAssigned(false); } -void Batch_Container::updateBatch_success(Batch successful_batch, std::string output_csv, std::string hostname) { - int batch_id = successful_batch.getBatchID(); - successful_batch.writeBatchToFile(output_csv, hostname); - this->batch_list[batch_id].updateSolved(true); - this->batches_remaining--; +void Batch_Container::updateBatch_success(Batch successful_batch, + std::string output_csv, + std::string hostname) { + successful_batch.writeBatchToFile(output_csv, hostname); + batch_list_[successful_batch.getBatchID()].updateSolved(true); + batches_remaining_--; } + void Batch_Container::updateBatch_success(int batch_id, double run_time, + double read_time, double write_time) { + batch_list_[batch_id].updateRunTime(run_time); + batch_list_[batch_id].updateReadTime(read_time); + batch_list_[batch_id].updateWriteTime(write_time); + batch_list_[batch_id].updateSolved(true); + batches_remaining_--; + } + void Batch_Container::updateBatch_success(Batch successful_batch) { - int batch_id = successful_batch.getBatchID(); - this->batch_list[batch_id].updateSolved(true); - this->batches_remaining--; + batch_list_[successful_batch.getBatchID()].updateSolved(true); + batches_remaining_--; } +bool Batch_Container::hasUnsolvedBatches() { return batches_remaining_ > 0;} + -bool Batch_Container::hasUnsolvedBatches() { - return this->batches_remaining > 0; +std::string Batch_Container::getAllBatchInfoString() { + std::string out_string = ""; + for (auto& batch : batch_list_) { + out_string += "_____________________________\n"; + out_string += batch.toString(); + out_string += "_____________________________\n"; + } + return out_string; } + +double Batch_Container::getTotalReadTime() { + double total_read_time = 0.0; + for (auto& batch : batch_list_) { + total_read_time += batch.getReadTime(); + } + return total_read_time; +} + +double Batch_Container::getTotalWriteTime() { + double total_write_time = 0.0; + for (auto& batch : batch_list_) { + total_write_time += batch.getWriteTime(); + } + return total_write_time; +} + + + diff --git a/build/source/actors/summa_actor/summa_actor.cpp b/build/source/actors/summa_actor/summa_actor.cpp index 3af72090a65a2df4455c5f1f7d75cb2cd0c0cf3f..2327f784cb0d0fbcec9eec0fea0b0c57a270cf11 100644 --- a/build/source/actors/summa_actor/summa_actor.cpp +++ b/build/source/actors/summa_actor/summa_actor.cpp @@ -9,115 +9,183 @@ #include <chrono> #include <string> #include <fstream> +#include <netcdf.h> using json = nlohmann::json; +// Helper function to extract the information from the file_manager +std::string extractEnclosed(const std::string& line) { + std::size_t first_quote = line.find_first_of("'"); + std::size_t last_quote = line.find_last_of("'"); + if (first_quote != std::string::npos && last_quote != std::string::npos + && first_quote < last_quote) { + return line.substr(first_quote + 1, last_quote - first_quote - 1); + } + return ""; +} -namespace caf { +// Check the number of GRUs in the attribute file +int getNumGRUInFile(const std::string &file_manager) { + std::ifstream file(file_manager); + std::string attributeFile, settingPath; + if (!file.is_open()) + return -1; + + std::string line; + while (std::getline(file, line)) { + if (line.compare(0, 13, "attributeFile") == 0) + attributeFile = extractEnclosed(line); + if (line.compare(0, 12, "settingsPath") == 0) + settingPath = extractEnclosed(line); + } + + file.close(); + + size_t fileGRU = -1; + int ncid, gru_dim; + if (attributeFile.empty() || settingPath.empty()) + return fileGRU; + + std::string combined = settingPath + attributeFile; + + if (NC_NOERR != nc_open(combined.c_str(), NC_NOWRITE, &ncid)) + return fileGRU; + if (NC_NOERR != nc_inq_dimid(ncid, "gru", &gru_dim)) { + nc_close(ncid); + return -1; + } + if (NC_NOERR != nc_inq_dimlen(ncid, gru_dim, &fileGRU)) { + nc_close(ncid); + return -1; + } + nc_close(ncid); + return fileGRU; +} -behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, - Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings, actor parent) { - - // Set Timing Variables - self->state.summa_actor_timing = TimingInfo(); - self->state.summa_actor_timing.addTimePoint("total_duration"); - self->state.summa_actor_timing.updateStartPoint("total_duration"); - // Set Variables - self->state.startGRU = startGRU; - self->state.numGRU = numGRU; - self->state.parent = parent; - - self->state.summa_actor_settings = summa_actor_settings; - self->state.file_access_actor_settings = file_access_actor_settings; - self->state.job_actor_settings = job_actor_settings; - self->state.hru_actor_settings = hru_actor_settings; - - // Create the job_actor and start SUMMA - spawnJob(self); - - return { - [=](done_job, int numFailed, double job_duration, double read_duration, double write_duration) { - auto& timing_info = self->state.timing_info_for_jobs; - - - self->state.numFailed += numFailed; - - timing_info.job_duration.push_back(job_duration); - timing_info.job_read_duration.push_back(read_duration); - timing_info.job_write_duration.push_back(write_duration); - - if (self->state.numGRU <= 0) { - self->state.summa_actor_timing.updateEndPoint("total_duration"); - - - - for (size_t i = 0; i < timing_info.job_duration.size(); ++i) { - - - aout(self) << "\n________________Job " << i + 1 << " Info_____________\n" - << "Job Duration = " << timing_info.job_duration[i] << "\n" - << "Job Read Duration = " << timing_info.job_read_duration[i] << "\n" - << "Job Write Duration = " << timing_info.job_write_duration[i] << "\n" - << "_____________________________________________________\n\n"; - } - - - double total_read_duration = std::accumulate(timing_info.job_read_duration.begin(), - timing_info.job_read_duration.end(), - 0.0); - double total_write_duration = std::accumulate(timing_info.job_write_duration.begin(), - timing_info.job_write_duration.end(), - 0.0); - - aout(self) << "\n________________SUMMA INFO________________\n" - << "Total Duration = " << self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n" - << "Total Duration = " << self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n" - << "Total Duration = " << (self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n" - << "Total Read Duration = " << total_read_duration << "Seconds\n" - << "Total Write Duration = " << total_write_duration << "Seconds\n" - << "Num Failed = " << self->state.numFailed << "\n" - << "___________________Program Finished__________________\n"; - - - - self->send(self->state.parent, done_batch_v, - self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0), - total_read_duration, - total_write_duration); - - } else { - spawnJob(self); - } - }, - - [=](err) { - aout(self) << "Unrecoverable Error: Attempting To Fail Gracefully\n"; - self->quit(); - } - }; + +namespace caf { +behavior summa_actor(stateful_actor<summa_actor_state>* self, + int startGRU, int numGRU, + Summa_Actor_Settings summa_actor_settings, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings, actor parent) { + + // Set Timing Variables + self->state.summa_actor_timing = TimingInfo(); + self->state.summa_actor_timing.addTimePoint("total_duration"); + self->state.summa_actor_timing.updateStartPoint("total_duration"); + // Set Variables + self->state.startGRU = startGRU; + self->state.numGRU = numGRU; + self->state.parent = parent; + // Set Settings + self->state.summa_actor_settings = summa_actor_settings; + self->state.file_access_actor_settings = file_access_actor_settings; + self->state.job_actor_settings = job_actor_settings; + self->state.hru_actor_settings = hru_actor_settings; + // Double check the number of GRUs in the file + self->state.fileGRU = getNumGRUInFile(job_actor_settings.file_manager_path); + if (self->state.fileGRU == -1) + aout(self) << "***WARNING***: UNABLE TO VERIFY NUMBER OF GRUS" + << " - Job Actor MAY CRASH\n"; + + if (self->state.fileGRU > 0) { + // Fix the number of GRUs if it exceeds the number of GRUs in the file + if (self->state.startGRU + self->state.numGRU > self->state.fileGRU) { + self->state.numGRU = self->state.fileGRU - self->state.startGRU + 1; + } + } + // No else: if we cannot verify we try to run anyway + self->state.batch_container = Batch_Container(self->state.startGRU, + self->state.numGRU, + self->state.summa_actor_settings.max_gru_per_job); + + aout(self) << "Starting SUMMA With " << + self->state.batch_container.getBatchesRemaining() << " Batches\n"; + aout(self) << "###################################################\n" + << self->state.batch_container.getBatchesAsString() + << "###################################################\n"; + + std::optional<Batch> batch = + self->state.batch_container.getUnsolvedBatch(); + if (!batch.has_value()) { + aout(self) << "ERROR--Summa_Actor: No Batches To Solve\n"; + self->quit(); + } + self->state.current_batch_id = batch->getBatchID(); + aout(self) << "Starting Batch " << self->state.current_batch_id + 1 << "\n"; + auto batch_val = batch.value(); + self->state.currentJob = self->spawn(job_actor, batch->getStartHRU(), + batch->getNumHRU(), self->state.file_access_actor_settings, + self->state.job_actor_settings, self->state.hru_actor_settings, self); + + return { + + + [=](done_job, int numFailed, double job_duration, double read_duration, + double write_duration) { + + self->state.batch_container.updateBatch_success( + self->state.current_batch_id, job_duration, read_duration, + write_duration); + + aout(self) << "###########################################\n" + << "Job Finished: " + << self->state.batch_container.getTotalBatches() - + self->state.batch_container.getBatchesRemaining() + << "/" << self->state.batch_container.getTotalBatches() << "\n" + << "###########################################\n"; + + self->state.numFailed += numFailed; + + + + if (self->state.batch_container.hasUnsolvedBatches()) { + spawnJob(self); + } else { + aout(self) << "All Batches Finished\n"; + aout(self) << self->state.batch_container.getAllBatchInfoString(); + self->state.summa_actor_timing.updateEndPoint("total_duration"); + + aout(self) << "\n________________SUMMA INFO________________\n" + << "Total Duration = " + << self->state.summa_actor_timing.getDuration( + "total_duration").value_or(-1.0) << " Seconds\n" + << "Total Duration = " + << self->state.summa_actor_timing.getDuration( + "total_duration").value_or(-1.0) / 60 << " Minutes\n" + << "Total Duration = " + << (self->state.summa_actor_timing.getDuration( + "total_duration").value_or(-1.0) / 60) / 60 << " Hours\n" + << "Total Read Duration = " + << self->state.batch_container.getTotalReadTime() << "Seconds\n" + << "Total Write Duration = " + << self->state.batch_container.getTotalWriteTime() << "Seconds\n" + << "Num Failed = " << self->state.numFailed << "\n" + << "___________________Program Finished__________________\n"; + + self->quit(); + } + }, + + [=](err) { + aout(self) << "Unrecoverable Error: Attempting To Fail Gracefully\n"; + self->quit(); + } + }; } void spawnJob(stateful_actor<summa_actor_state>* self) { - // Ensure we do not start a job with too many GRUs - if (self->state.numGRU > self->state.summa_actor_settings.max_gru_per_job) { - // spawn the job actor - aout(self) << "\n Starting Job with startGRU = " << self->state.startGRU << "\n"; - self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.summa_actor_settings.max_gru_per_job, - self->state.file_access_actor_settings, self->state.job_actor_settings, - self->state.hru_actor_settings, self); - - // Update GRU count - self->state.numGRU = self->state.numGRU - self->state.summa_actor_settings.max_gru_per_job; - self->state.startGRU = self->state.startGRU + self->state.summa_actor_settings.max_gru_per_job; - - } else { - - self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.numGRU, - self->state.file_access_actor_settings, self->state.job_actor_settings, - self->state.hru_actor_settings, self); - self->state.numGRU = 0; - } + std::optional<Batch> batch = + self->state.batch_container.getUnsolvedBatch(); + self->state.current_batch_id = batch->getBatchID(); + aout(self) << "Starting Batch " << self->state.current_batch_id + 1 << "\n"; + auto batch_val = batch.value(); + self->state.currentJob = self->spawn(job_actor, batch->getStartHRU(), + batch->getNumHRU(), self->state.file_access_actor_settings, + self->state.job_actor_settings, self->state.hru_actor_settings, + self); } } // end namespace diff --git a/bin/Summa_Actors_Settings.json b/utils/sbatch/Summa_Actors_Settings.json similarity index 94% rename from bin/Summa_Actors_Settings.json rename to utils/sbatch/Summa_Actors_Settings.json index 193db1fdb8c108610b7ae494e244b8d96c0b5999..e7e7f532b62faf1d301bc91f2fca5c8eab708fae 100644 --- a/bin/Summa_Actors_Settings.json +++ b/utils/sbatch/Summa_Actors_Settings.json @@ -12,7 +12,7 @@ }, "File_Access_Actor": { - "num_partitions_in_output_buffer": 16, + "num_partitions_in_output_buffer": 32, "num_timesteps_in_output_buffer": 1000 }, diff --git a/utils/sbatch/submit_summa_actors_32.sh b/utils/sbatch/submit_summa_actors_32.sh new file mode 100644 index 0000000000000000000000000000000000000000..e8ae85356b2bbc43a5fb35667910d51ba6b0277b --- /dev/null +++ b/utils/sbatch/submit_summa_actors_32.sh @@ -0,0 +1,30 @@ +#!/bin/bash +#SBATCH --nodes=1 +#SBATCH --exclusive +#SBATCH --time=48:00:00 +#SBATCH --mem=0 +#SBATCH --job-name=Summa-Actors +#SBATCH --output=/scratch/gwf/gwf_cmt/kck540/Summa-Global-Actors-Output/Africa/slurm/slurm-%A_%a.out +#SBATCH --account=hpc_c_giws_clark + + +module load StdEnv/2020 +module load gcc/9.3.0 +module load openblas/0.3.17 +module load netcdf-fortran/4.5.2 + +# for Actors +module load caf + +gru_count=220000 +max_job=3 +total_hru=648266 +summa_exe=/globalhome/kck540/HPC/Summa-Projects/Summa-Actors/bin/summa_be +config_summa=/project/gwf/gwf_cmt/kck540/Summa-Global-Actors-Input/Africa/settings_summa/Copernicus_Summa_Actors_Config.json + +# offset=$SLURM_ARRAY_TASK_ID +# gru_start=$(( 1 + gru_count*offset )) +# check=$(( $gru_start + $gru_count )) + +# $summa_exe -g $gru_start $gru_count -c $config_summa --caf.scheduler.max-threads=32 +$summa_exe -g 648001 266 -c $config_summa --caf.scheduler.max-threads=32