Skip to content
Snippets Groups Projects
Commit 8480aa56 authored by KyleKlenk's avatar KyleKlenk
Browse files

can run and output results

parent 6144d6e1
No related branches found
No related tags found
No related merge requests found
......@@ -42,6 +42,8 @@ class Batch {
void assignedBatch(std::string hostname, caf::actor actor_ref);
void updateRunTime(double run_time);
void writeBatchToFile(std::string file_name);
};
......
......@@ -21,9 +21,10 @@ struct summa_server_state {
std::vector<Batch> solved_batches;
std::vector<Batch> failed_batches;
std::vector<Client> client_list;
std::string csv_output_name;
};
behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path);
int assembleBatches(stateful_actor<summa_server_state>* self);
std::optional<Batch> getUnsolvedBatch(stateful_actor<summa_server_state>* self);
std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self);
}
\ No newline at end of file
......@@ -4,4 +4,6 @@
module load gcc/9.3.0
module load netcdf-fortran
module load openblas
module load caf
\ No newline at end of file
module load caf
export LD_LIBRARY_PATH=/globalhome/kck540/HPC/SummaProjects/Summa-Actors/bin
\ No newline at end of file
......@@ -170,9 +170,10 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU,
},
[=](file_access_actor_err, std::string function) {
aout(self) << "Failure in File Access Actor in function" << function << "\n";
aout(self) << "Failure in File Access Actor in function: " << function << "\n";
if (function == "def_output") {
aout(self) << "Error with the output file, will try creating it agian\n";
std::this_thread::sleep_for(std::chrono::seconds(5));
self->state.file_access_actor = self->spawn(file_access_actor, self->state.startGRU, self->state.numGRU,
self->state.outputStrucSize, self->state.configPath, self);
} else {
......@@ -197,9 +198,15 @@ void initJob(stateful_actor<job_state>* self) {
self->state.successOutputFile = self->state.csvPath += success +=
std::to_string(self->state.startGRU) += ".csv";
file.open(self->state.successOutputFile, std::ios_base::out);
file << "GRU" << "," << "totalDuration" << "," << "initDuration" << "," <<
"forcingDuration" << "," << "runPhysicsDuration" << "," << "writeOutputDuration" <<
"," << "dt_init" << "," << "numAttemtps" << "\n";
file <<
"GRU," <<
"totalDuration," <<
"initDuration," <<
"forcingDuration," <<
"runPhysicsDuration," <<
"writeOutputDuration," <<
"dt_init," <<
"numAttemtps\n";
file.close();
}
......
......@@ -50,4 +50,19 @@ void Batch::updateRunTime(double run_time) {
this->run_time = run_time;
}
void Batch::writeBatchToFile(std::string file_name) {
std::ofstream output_file;
output_file.open(file_name, std::ios_base::app);
output_file <<
this->batch_id << "," <<
this->start_hru << "," <<
this->num_hru << "," <<
this->assigned_host << "," <<
this->run_time << "," <<
this->read_time << "," <<
this->write_time << "," <<
this->status << "\n";
output_file.close();
}
......@@ -73,7 +73,12 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a
self->send(server_actor, connect_to_server_v, self, self->state.hostname);
return {
[=](batch, int client_id, int batch_id, int start_hru, int num_hru, std::string config_path) {
aout(self) << "Received batch to compute" << std::endl;
aout(self) << "\nReceived batch to compute" << "\n";
aout(self) << "BatchID = " << batch_id << "\n";
aout(self) << "Start HRU = " << start_hru << "\n";
aout(self) << "Num HRU = " << num_hru << "\n";
self->state.client_id = client_id;
self->state.batch_id = batch_id;
self->state.summa_actor_ref = self->spawn(summa_actor, start_hru, num_hru, config_path, self);
......
......@@ -12,11 +12,11 @@ namespace caf {
behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path) {
aout(self) << "Summa Server has Started \n";
std::string returnType;
getSettingsTest(std::vector<std::string> {"test", "test2"} ,returnType);
self->state.config_path = config_path;
// std::string returnType;
// getSettingsTest(std::vector<std::string> {"test", "test2"} ,returnType);
// --------------------------Initalize Settings --------------------------
self->state.total_hru_count = getSettings(self->state.config_path, "SimulationSettings", "total_hru_count",
self->state.total_hru_count).value_or(-1);
if (self->state.total_hru_count == -1) {
......@@ -27,7 +27,25 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
if (self->state.num_hru_per_batch == -1) {
aout(self) << "ERROR: With num_hru_per_batch - CHECK Summa_Actors_Settings.json\n";
}
// --------------------------Initalize Settings --------------------------
// -------------------------- Initalize CSV ------------------------------
std::ofstream csv_output;
self->state.csv_output_name = "Batch_Results.csv";
csv_output.open(self->state.csv_output_name, std::ios_base::out);
csv_output <<
"Batch_ID," <<
"Start_HRU," <<
"Num_HRU," <<
"Hostname," <<
"Run_Time," <<
"Read_Time," <<
"Write_Time,"<<
"Status\n";
csv_output.close();
// -------------------------- Initalize CSV ------------------------------
// -------------------------- Assemble Batches ---------------------------
aout(self) << "Assembling HRUs into Batches\n";
if (assembleBatches(self) == -1) {
aout(self) << "ERROR: assembleBatches\n";
......@@ -38,6 +56,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
self->state.batch_list[i].printBatchInfo();
}
}
// -------------------------- Assemble Batches ---------------------------
return {
[=](connect_to_server, actor client, std::string hostname) {
......@@ -46,14 +65,24 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
int client_id = self->state.client_list.size(); // So we can lookup the client in O(1) time
self->state.client_list.push_back(Client(client_id, client, hostname));
std::optional<Batch> batch_to_send = getUnsolvedBatch(self);
if (batch_to_send.has_value()) {
Batch verified_batch = batch_to_send.value();
verified_batch.assignedBatch(hostname, client);
self->send(client, batch_v, client_id, batch_to_send->getBatchID(), batch_to_send->getStartHRU(),
batch_to_send->getNumHRU(), self->state.config_path);
std::optional<int> batch_id = getUnsolvedBatchID(self);
if (batch_id.has_value()) {
// update the batch in the batch list with the host and actor_ref
self->state.batch_list[batch_id.value()].assignedBatch(self->state.client_list[client_id].getHostname(), client);
int start_hru = self->state.batch_list[batch_id.value()].getStartHRU();
int num_hru = self->state.batch_list[batch_id.value()].getNumHRU();
self->send(client,
batch_v,
client_id,
batch_id.value(),
start_hru,
num_hru,
self->state.config_path);
} else {
aout(self) << "We Are Done - Telling Clients to exit \n";
for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) {
self->send(self->state.client_list[i].getActor(), time_to_exit_v);
......@@ -67,6 +96,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
double total_read_duration, double total_write_duration) {
self->state.batch_list[batch_id].solvedBatch(total_duration, total_read_duration, total_write_duration);
self->state.batch_list[batch_id].writeBatchToFile(self->state.csv_output_name);
self->state.batches_solved++;
self->state.batches_remaining = self->state.batch_list.size() - self->state.batches_solved;
......@@ -81,12 +111,22 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
aout(self) << "****************************************\n";
// Find a new batch
std::optional<Batch> batch_to_send = getUnsolvedBatch(self);
if (batch_to_send.has_value()) {
Batch verified_batch = batch_to_send.value();
verified_batch.assignedBatch(self->state.client_list[client_id].getHostname(), client);
self->send(client, batch_v, client_id, verified_batch.getBatchID(), verified_batch.getStartHRU(),
verified_batch.getNumHRU(), self->state.config_path);
std::optional<int> new_batch_id = getUnsolvedBatchID(self);
if (new_batch_id.has_value()) {
// update the batch in the batch list with the host and actor_ref
self->state.batch_list[new_batch_id.value()].assignedBatch(self->state.client_list[client_id].getHostname(), client);
int start_hru = self->state.batch_list[new_batch_id.value()].getStartHRU();
int num_hru = self->state.batch_list[new_batch_id.value()].getNumHRU();
self->send(client,
batch_v,
client_id,
new_batch_id.value(),
start_hru,
num_hru,
self->state.config_path);
} else {
aout(self) << "We Are Done - Telling Clients to exit \n";
for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) {
......@@ -121,17 +161,14 @@ int assembleBatches(stateful_actor<summa_server_state>* self) {
return 0;
}
std::optional<Batch> getUnsolvedBatch(stateful_actor<summa_server_state>* self) {
std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self) {
// Find the first unassigned batch
for (std::vector<int>::size_type i = 0; i < self->state.batch_list.size(); i++) {
if (self->state.batch_list[i].getBatchStatus() == unassigned) {
return self->state.batch_list[i];
return i;
}
}
return {};
}
} // end namespace
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment