Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • gwu479/Summa-Actors
  • numerical_simulations_lab/actors/Summa-Actors
2 results
Show changes
Showing
with 4377 additions and 317 deletions
#ifndef SUMMAACTOR_H_
#define SUMMAACTOR_H_
#include "SummaManager.h"
using namespace caf;
using json = nlohmann::json;
/**
* Top Level Actor for Summa. This actor recieves the number of GRUs to compute from main and
* divides them into jobs that compute one at a time.
*
* @param startGRU - starting GRU for the simulation
* @param numGRU - total number of GRUs to compute
* @param configPath - location of file information for SUMMA
* @return behavior
*/
behavior summa_actor(stateful_actor<summa_manager>* self, int startGRU, int numGRU, std::string configPath) {
self->state.start = std::chrono::high_resolution_clock::now();
// Set Variables
self->state.startGRU = startGRU;
self->state.numGRU = numGRU;
self->state.configPath = configPath;
parseSettings(self, configPath);
aout(self) << "SETTINGS FOR SUMMA_ACTOR\n";
aout(self) << "Output Structure Size = " << self->state.outputStrucSize << "\n";
aout(self) << "Max GRUs Per Job = " << self->state.maxGRUPerJob << "\n";
// Create the job_actor and start SUMMA
spawnJob(self);
return {
[=](done_job, int numFailed) {
self->state.numFailed += numFailed;
aout(self) << "Job Done\n";
if (self->state.numGRU <= 0) {
self->state.end = std::chrono::high_resolution_clock::now();
self->state.duration = calculateTime(self->state.start, self->state.end);
self->state.duration = self->state.duration / 1000; // Convert to milliseconds
aout(self) << "Total Program Duration:\n";
aout(self) << " " << self->state.duration / 1000 << " Seconds\n";
aout(self) << " " << (self->state.duration / 1000) / 60 << " Minutes\n";
aout(self) << " " << ((self->state.duration / 1000) / 60) / 60 << " Hours\n";
aout(self) << "Program Finished \n";
} else {
// spawn a new job
spawnJob(self);
}
},
[=](err) {
aout(self) << "Unrecoverable Error: Attempting To Fail Gracefully\n";
self->quit();
}
};
}
void spawnJob(stateful_actor<summa_manager>* self) {
// Ensure we do not start a job with too many GRUs
if (self->state.numGRU > self->state.maxGRUPerJob) {
// spawn the job actor
aout(self) << "\n Starting Job with startGRU = " << self->state.startGRU << "\n";
self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.maxGRUPerJob,
self->state.configPath, self->state.outputStrucSize, self);
// Update GRU count
self->state.numGRU = self->state.numGRU - self->state.maxGRUPerJob;
self->state.startGRU = self->state.startGRU + self->state.maxGRUPerJob;
} else {
self->state.currentJob = self->spawn(job_actor, self->state.startGRU, self->state.numGRU,
self->state.configPath, self->state.outputStrucSize, self);
self->state.numGRU = 0;
}
}
void parseSettings(stateful_actor<summa_manager>* self, std::string configPath) {
json settings;
std::string SummaActorsSettings = "/Summa_Actors_Settings.json";
std::ifstream settings_file(configPath + SummaActorsSettings);
settings_file >> settings;
settings_file.close();
if (settings.find("SummaActor") != settings.end()) {
json SummaActorConfig = settings["SummaActor"];
// Find the desired OutputStrucSize
if (SummaActorConfig.find("OuputStructureSize") != SummaActorConfig.end()) {
self->state.outputStrucSize = SummaActorConfig["OuputStructureSize"];
} else {
aout(self) << "Error Finding OutputStructureSize in JOSN - Reverting to default value\n";
self->state.outputStrucSize = 250;
}
// Find the desired maxGRUPerJob size
if (SummaActorConfig.find("maxGRUPerJob") != SummaActorConfig.end()) {
self->state.maxGRUPerJob = SummaActorConfig["maxGRUPerJob"];
} else {
aout(self) << "Error Finding maxGRUPerJob in JOSN - Reverting to default value\n";
self->state.maxGRUPerJob = 500;
}
} else {
aout(self) << "Error Finding SummaActor in JSON - Reverting to default values\n";
self->state.outputStrucSize = 250;
self->state.maxGRUPerJob = 500;
}
}
#endif
\ No newline at end of file
#ifndef SUMMAMANGER_H_
#define SUMMAMANGER_H_
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "JobActor.h"
#include <iostream>
#include <chrono>
#include <string>
#include "json.hpp"
#include <fstream>
#include "global.h"
struct summa_manager {
// Timing Information
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::time_point<std::chrono::system_clock> end;
double duration;
// Program Parameters
int startGRU; // starting GRU for the simulation
int numGRU; // number of GRUs to compute
std::string configPath;// path to the fileManager.txt file
// Information about the jobs
int numFailed = 0; // Number of jobs that have failed
// Values Set By Summa_Actors_Settings.json
int maxGRUPerJob; // maximum number of GRUs a job can compute at once
int outputStrucSize;
caf::actor currentJob; // Reference to the current job actor
};
/**
* @brief Function to spawn a job actor
*/
void spawnJob(stateful_actor<summa_manager>* self);
void parseSettings(stateful_actor<summa_manager>* self, std::string configPath);
#endif
\ No newline at end of file
This diff is collapsed.
#include "file_access_actor.hpp"
#include "forcing_file_info.hpp"
#include "fortran_data_types.hpp"
#include "message_atoms.hpp"
#include "json.hpp"
#include "auxilary.hpp"
using json = nlohmann::json;
namespace caf {
behavior file_access_actor(stateful_actor<file_access_state>* self, int start_gru, int num_gru,
File_Access_Actor_Settings file_access_actor_settings, actor parent) {
aout(self) << "\n----------File_Access_Actor Started----------\n";
// Set Up timing Info we wish to track
self->state.file_access_timing = TimingInfo();
self->state.file_access_timing.addTimePoint("read_duration");
self->state.file_access_timing.addTimePoint("write_duration");
// Save the parameters passed from job_actor
self->state.file_access_actor_settings = file_access_actor_settings;
self->state.parent = parent;
self->state.num_gru = num_gru;
self->state.start_gru = start_gru;
self->state.err = 0;
self->state.num_output_steps = self->state.file_access_actor_settings.num_timesteps_in_output_buffer;
fileAccessActor_init_fortran(self->state.handle_forcing_file_info,
&self->state.numFiles,
&self->state.num_steps,
&self->state.file_access_actor_settings.num_timesteps_in_output_buffer,
self->state.handle_ncid,
&self->state.start_gru,
&self->state.num_gru,
&self->state.num_gru, // Filler for num_hrus
&self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: File Access Actor - File_Access_init_Fortran\n";
if (self->state.err == 100) {
self->send(self->state.parent, file_access_error::mDecisions_error, self);
} else {
self->send(self->state.parent, file_access_error::unhandleable_error, self);
}
return {};
}
aout(self) << "Simluations Steps: " << self->state.num_steps << "\n";
// Inital Files Have Been Loaded - Send Message to Job_Actor to Start Simulation
self->send(self->state.parent, init_gru_v);
// initalize the forcingFile array
self->state.filesLoaded = 0;
for (int i = 1; i <= self->state.numFiles; i++) {
self->state.forcing_file_list.push_back(Forcing_File_Info(i));
}
// Check that the number of timesteps in the output buffer is not greater than the number of timesteps in the simulation
if (self->state.num_steps < self->state.file_access_actor_settings.num_timesteps_in_output_buffer) {
self->state.num_output_steps = self->state.num_steps;
self->state.file_access_actor_settings.num_timesteps_in_output_buffer = self->state.num_steps;
}
// Set up the output container
self->state.output_container = new Output_Container(self->state.file_access_actor_settings.num_partitions_in_output_buffer,
self->state.num_gru,
self->state.file_access_actor_settings.num_timesteps_in_output_buffer,
self->state.num_steps);
return {
// Message from the HRU actor to get the forcing file that is loaded
[=](access_forcing, int currentFile, caf::actor refToRespondTo) {
if (currentFile <= self->state.numFiles) {
if(self->state.forcing_file_list[currentFile - 1].isFileLoaded()) { // C++ starts at 0 Fortran starts at 1
// Send the HRU actor the new forcing file
// then tell it to get back to running
self->send(refToRespondTo, new_forcing_file_v,
self->state.forcing_file_list[currentFile - 1].getNumSteps(),
currentFile);
} else {
self->state.file_access_timing.updateStartPoint("read_duration");
// Load the file
read_forcingFile(self->state.handle_forcing_file_info, &currentFile,
&self->state.stepsInCurrentFile, &self->state.start_gru,
&self->state.num_gru, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
}
self->state.filesLoaded += 1;
self->state.forcing_file_list[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.file_access_timing.updateEndPoint("read_duration");
// Check if we have loaded all forcing files
if(self->state.filesLoaded <= self->state.numFiles) {
self->send(self, access_forcing_internal_v, currentFile + 1);
}
// Send the HRU actor the new forcing file
// then tell it to get back to running
self->send(refToRespondTo, new_forcing_file_v,
self->state.forcing_file_list[currentFile - 1].getNumSteps(),
currentFile);
}
} else {
aout(self) << currentFile << " is larger than expected for a forcing file request from an HRU" << std::endl;
}
},
// Internal Message to load all forcing files, calling this message allows other messages to be processed
[=](access_forcing_internal, int currentFile) {
if (self->state.filesLoaded <= self->state.numFiles &&
currentFile <= self->state.numFiles) {
if (self->state.forcing_file_list[currentFile - 1].isFileLoaded()) {
aout(self) << "File Loaded when shouldn't be \n";
}
self->state.file_access_timing.updateStartPoint("read_duration");
read_forcingFile(self->state.handle_forcing_file_info, &currentFile,
&self->state.stepsInCurrentFile, &self->state.start_gru,
&self->state.num_gru, &self->state.err);
if (self->state.err != 0) {
aout(self) << "ERROR: Reading Forcing" << std::endl;
}
self->state.filesLoaded += 1;
self->state.forcing_file_list[currentFile - 1].updateNumSteps(self->state.stepsInCurrentFile);
self->state.file_access_timing.updateEndPoint("read_duration");
self->send(self, access_forcing_internal_v, currentFile + 1);
} else {
aout(self) << "All Forcing Files Loaded \n";
}
},
// Message from HRU Actor so it knows how many timesteps it can write before waiting
[=] (get_num_output_steps) { return self->state.num_output_steps; },
[=](write_output, int index_gru, int index_hru, caf::actor hru_actor) {
self->state.file_access_timing.updateStartPoint("write_duration");
Output_Partition *output_partition = self->state.output_container->getOutputPartition(index_gru);
output_partition->setGRUReadyToWrite(hru_actor);
if (output_partition->isReadyToWrite()) {
writeOutput(self, output_partition);
}
self->state.file_access_timing.updateEndPoint("write_duration");
},
[=](restart_failures) {
self->state.output_container->reconstruct();
},
[=](run_failure, int local_gru_index) {
self->state.file_access_timing.updateStartPoint("write_duration");
Output_Partition *output_partition = self->state.output_container->getOutputPartition(local_gru_index);
output_partition->addFailedGRUIndex(local_gru_index);
if (output_partition->isReadyToWrite()) {
writeOutput(self, output_partition);
}
self->state.file_access_timing.updateEndPoint("write_duration");
},
[=](finalize) {
aout(self) << "File Access Actor: Deallocating Structure" << std::endl;
self->state.output_container->~Output_Container(); // Delete Output Container
FileAccessActor_DeallocateStructures(self->state.handle_forcing_file_info, self->state.handle_ncid);
aout(self) << "\n________________FILE_ACCESS_ACTOR TIMING INFO RESULTS________________\n";
aout(self) << "Total Read Duration = " << self->state.file_access_timing.getDuration("read_duration").value_or(-1.0) << " Seconds\n";
aout(self) << "Total Write Duration = " << self->state.file_access_timing.getDuration("write_duration").value_or(-1.0) << " Seconds\n";
self->quit();
return std::make_tuple(self->state.file_access_timing.getDuration("read_duration").value_or(-1.0),
self->state.file_access_timing.getDuration("write_duration").value_or(-1.0));
},
};
}
void writeOutput(stateful_actor<file_access_state>* self, Output_Partition* partition) {
int num_timesteps_to_write = partition->getNumStoredTimesteps();
int start_gru = partition->getStartGRUIndex();
int max_gru = partition->getMaxGRUIndex();
bool write_param_flag = partition->isWriteParams();
writeOutput_fortran(self->state.handle_ncid, &num_timesteps_to_write,
&start_gru, &max_gru, &write_param_flag, &self->state.err);
partition->updateTimeSteps();
int num_steps_before_next_write = partition->getNumStoredTimesteps();
std::vector<caf::actor> hrus_to_update = partition->getReadyToWriteList();
for (int i = 0; i < hrus_to_update.size(); i++) {
self->send(hrus_to_update[i], num_steps_before_write_v, num_steps_before_next_write);
self->send(hrus_to_update[i], run_hru_v);
}
partition->resetReadyToWriteList();
}
} // end namespace
\ No newline at end of file
#include "forcing_file_info.hpp"
Forcing_File_Info::Forcing_File_Info(int file_ID) {
this->file_ID = file_ID;
this->num_steps = 0;
this->is_loaded = false;
}
int Forcing_File_Info::getNumSteps() {
return this->num_steps;
}
bool Forcing_File_Info::isFileLoaded() {
return this->is_loaded;
}
void Forcing_File_Info::updateIsLoaded() {
this->is_loaded = true;
}
void Forcing_File_Info::updateNumSteps(int num_steps) {
this->num_steps = num_steps;
this->is_loaded = true;
}
This diff is collapsed.
This diff is collapsed.
character(len=64), parameter :: summaVersion = ''
character(len=64), parameter :: buildTime = ''
character(len=64), parameter :: gitBranch = ''
character(len=64), parameter :: gitHash = ''
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
#ifndef COMMONFUNCTIONS_H_
#define COMMONFUNCTIONS_H_
#include <chrono>
// Gobal Flag for Debuging only main function will change this
bool debug;
/**
* Return the time between to time points
*/
double calculateTime(std::chrono::time_point<std::chrono::system_clock> start,
std::chrono::time_point<std::chrono::system_clock> end);
#include "global.hpp"
double calculateTime(std::chrono::time_point<std::chrono::system_clock> start,
......@@ -20,8 +6,3 @@ double calculateTime(std::chrono::time_point<std::chrono::system_clock> start,
return std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
}
#endif
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.