diff --git a/build/includes/summa_actor/batch_manager.hpp b/build/includes/summa_actor/batch_manager.hpp index 2e8f46d8e1e1293b7589f5571acf524e215cc0f4..062cd06b0681653aa722b003cc0c82f30298c20c 100644 --- a/build/includes/summa_actor/batch_manager.hpp +++ b/build/includes/summa_actor/batch_manager.hpp @@ -42,6 +42,8 @@ class Batch { void assignedBatch(std::string hostname, caf::actor actor_ref); void updateRunTime(double run_time); + + void writeBatchToFile(std::string file_name); }; diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index 83d4fee66d47aa261259cc2b20fdf1d0f66b2eb4..2a082b36038d4a972c88bafaefbb3c60f2592cc1 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -21,9 +21,10 @@ struct summa_server_state { std::vector<Batch> solved_batches; std::vector<Batch> failed_batches; std::vector<Client> client_list; + std::string csv_output_name; }; behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path); int assembleBatches(stateful_actor<summa_server_state>* self); -std::optional<Batch> getUnsolvedBatch(stateful_actor<summa_server_state>* self); +std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self); } \ No newline at end of file diff --git a/build/module_load.sh b/build/module_load.sh index 6359abc429921687d6a78cc6067801551117c46a..041d0e199a153b24fe8e9ba6836160dcd49ec32e 100755 --- a/build/module_load.sh +++ b/build/module_load.sh @@ -4,4 +4,6 @@ module load gcc/9.3.0 module load netcdf-fortran module load openblas -module load caf \ No newline at end of file +module load caf + +export LD_LIBRARY_PATH=/globalhome/kck540/HPC/SummaProjects/Summa-Actors/bin \ No newline at end of file diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index 11381b8eaf12dcffda13ecdff34a1d1b6c569435..1fb5f5e1f0760b9cc140fd2168c0011a66731104 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -170,9 +170,10 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU, }, [=](file_access_actor_err, std::string function) { - aout(self) << "Failure in File Access Actor in function" << function << "\n"; + aout(self) << "Failure in File Access Actor in function: " << function << "\n"; if (function == "def_output") { aout(self) << "Error with the output file, will try creating it agian\n"; + std::this_thread::sleep_for(std::chrono::seconds(5)); self->state.file_access_actor = self->spawn(file_access_actor, self->state.startGRU, self->state.numGRU, self->state.outputStrucSize, self->state.configPath, self); } else { @@ -197,9 +198,15 @@ void initJob(stateful_actor<job_state>* self) { self->state.successOutputFile = self->state.csvPath += success += std::to_string(self->state.startGRU) += ".csv"; file.open(self->state.successOutputFile, std::ios_base::out); - file << "GRU" << "," << "totalDuration" << "," << "initDuration" << "," << - "forcingDuration" << "," << "runPhysicsDuration" << "," << "writeOutputDuration" << - "," << "dt_init" << "," << "numAttemtps" << "\n"; + file << + "GRU," << + "totalDuration," << + "initDuration," << + "forcingDuration," << + "runPhysicsDuration," << + "writeOutputDuration," << + "dt_init," << + "numAttemtps\n"; file.close(); } diff --git a/build/source/actors/summa_actor/batch_manager.cpp b/build/source/actors/summa_actor/batch_manager.cpp index 5fa2677893b8d9e1f2584013b9dc86675e2432b9..85beab394bedbff8e9ce337cb670938afb87baf6 100644 --- a/build/source/actors/summa_actor/batch_manager.cpp +++ b/build/source/actors/summa_actor/batch_manager.cpp @@ -50,4 +50,19 @@ void Batch::updateRunTime(double run_time) { this->run_time = run_time; } +void Batch::writeBatchToFile(std::string file_name) { + std::ofstream output_file; + output_file.open(file_name, std::ios_base::app); + output_file << + this->batch_id << "," << + this->start_hru << "," << + this->num_hru << "," << + this->assigned_host << "," << + this->run_time << "," << + this->read_time << "," << + this->write_time << "," << + this->status << "\n"; + output_file.close(); +} + diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index 1ef82310a79d790ba276e09901e28ffe9c7f9c9a..522b5100814cff4fc5be132fbb6daf99f413bd41 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -73,7 +73,12 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a self->send(server_actor, connect_to_server_v, self, self->state.hostname); return { [=](batch, int client_id, int batch_id, int start_hru, int num_hru, std::string config_path) { - aout(self) << "Received batch to compute" << std::endl; + aout(self) << "\nReceived batch to compute" << "\n"; + aout(self) << "BatchID = " << batch_id << "\n"; + aout(self) << "Start HRU = " << start_hru << "\n"; + aout(self) << "Num HRU = " << num_hru << "\n"; + + self->state.client_id = client_id; self->state.batch_id = batch_id; self->state.summa_actor_ref = self->spawn(summa_actor, start_hru, num_hru, config_path, self); diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index 814a107a72098d51087046b6b00ce61aed0c8c9e..fec6fd0493c7ff5405022a41137e18bceda7bf5a 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -12,11 +12,11 @@ namespace caf { behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path) { aout(self) << "Summa Server has Started \n"; - std::string returnType; - getSettingsTest(std::vector<std::string> {"test", "test2"} ,returnType); - - self->state.config_path = config_path; + // std::string returnType; + // getSettingsTest(std::vector<std::string> {"test", "test2"} ,returnType); + + // --------------------------Initalize Settings -------------------------- self->state.total_hru_count = getSettings(self->state.config_path, "SimulationSettings", "total_hru_count", self->state.total_hru_count).value_or(-1); if (self->state.total_hru_count == -1) { @@ -27,7 +27,25 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf if (self->state.num_hru_per_batch == -1) { aout(self) << "ERROR: With num_hru_per_batch - CHECK Summa_Actors_Settings.json\n"; } - + // --------------------------Initalize Settings -------------------------- + + // -------------------------- Initalize CSV ------------------------------ + std::ofstream csv_output; + self->state.csv_output_name = "Batch_Results.csv"; + csv_output.open(self->state.csv_output_name, std::ios_base::out); + csv_output << + "Batch_ID," << + "Start_HRU," << + "Num_HRU," << + "Hostname," << + "Run_Time," << + "Read_Time," << + "Write_Time,"<< + "Status\n"; + csv_output.close(); + // -------------------------- Initalize CSV ------------------------------ + + // -------------------------- Assemble Batches --------------------------- aout(self) << "Assembling HRUs into Batches\n"; if (assembleBatches(self) == -1) { aout(self) << "ERROR: assembleBatches\n"; @@ -38,6 +56,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf self->state.batch_list[i].printBatchInfo(); } } + // -------------------------- Assemble Batches --------------------------- return { [=](connect_to_server, actor client, std::string hostname) { @@ -46,14 +65,24 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf int client_id = self->state.client_list.size(); // So we can lookup the client in O(1) time self->state.client_list.push_back(Client(client_id, client, hostname)); - std::optional<Batch> batch_to_send = getUnsolvedBatch(self); - if (batch_to_send.has_value()) { - Batch verified_batch = batch_to_send.value(); - verified_batch.assignedBatch(hostname, client); - self->send(client, batch_v, client_id, batch_to_send->getBatchID(), batch_to_send->getStartHRU(), - batch_to_send->getNumHRU(), self->state.config_path); + std::optional<int> batch_id = getUnsolvedBatchID(self); + if (batch_id.has_value()) { + // update the batch in the batch list with the host and actor_ref + self->state.batch_list[batch_id.value()].assignedBatch(self->state.client_list[client_id].getHostname(), client); + + int start_hru = self->state.batch_list[batch_id.value()].getStartHRU(); + int num_hru = self->state.batch_list[batch_id.value()].getNumHRU(); + + self->send(client, + batch_v, + client_id, + batch_id.value(), + start_hru, + num_hru, + self->state.config_path); } else { + aout(self) << "We Are Done - Telling Clients to exit \n"; for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) { self->send(self->state.client_list[i].getActor(), time_to_exit_v); @@ -67,6 +96,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf double total_read_duration, double total_write_duration) { self->state.batch_list[batch_id].solvedBatch(total_duration, total_read_duration, total_write_duration); + self->state.batch_list[batch_id].writeBatchToFile(self->state.csv_output_name); self->state.batches_solved++; self->state.batches_remaining = self->state.batch_list.size() - self->state.batches_solved; @@ -81,12 +111,22 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf aout(self) << "****************************************\n"; // Find a new batch - std::optional<Batch> batch_to_send = getUnsolvedBatch(self); - if (batch_to_send.has_value()) { - Batch verified_batch = batch_to_send.value(); - verified_batch.assignedBatch(self->state.client_list[client_id].getHostname(), client); - self->send(client, batch_v, client_id, verified_batch.getBatchID(), verified_batch.getStartHRU(), - verified_batch.getNumHRU(), self->state.config_path); + std::optional<int> new_batch_id = getUnsolvedBatchID(self); + if (new_batch_id.has_value()) { + // update the batch in the batch list with the host and actor_ref + self->state.batch_list[new_batch_id.value()].assignedBatch(self->state.client_list[client_id].getHostname(), client); + + int start_hru = self->state.batch_list[new_batch_id.value()].getStartHRU(); + int num_hru = self->state.batch_list[new_batch_id.value()].getNumHRU(); + + self->send(client, + batch_v, + client_id, + new_batch_id.value(), + start_hru, + num_hru, + self->state.config_path); + } else { aout(self) << "We Are Done - Telling Clients to exit \n"; for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) { @@ -121,17 +161,14 @@ int assembleBatches(stateful_actor<summa_server_state>* self) { return 0; } -std::optional<Batch> getUnsolvedBatch(stateful_actor<summa_server_state>* self) { - +std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self) { // Find the first unassigned batch for (std::vector<int>::size_type i = 0; i < self->state.batch_list.size(); i++) { if (self->state.batch_list[i].getBatchStatus() == unassigned) { - return self->state.batch_list[i]; + return i; } } - return {}; - } } // end namespace