diff --git a/build/includes/summa_actor/batch_manager.hpp b/build/includes/summa_actor/batch_manager.hpp index d0a003f40c07a04a82ca2b5238216d89f0b98aba..684684b587f33b98ab8181fed22e253a7d7afcb7 100644 --- a/build/includes/summa_actor/batch_manager.hpp +++ b/build/includes/summa_actor/batch_manager.hpp @@ -2,6 +2,7 @@ #include "caf/all.hpp" #include <vector> #include <string> +#include <sstream> #include <optional> class Batch; @@ -11,6 +12,10 @@ class Batch { int batch_id; int start_hru; int num_hru; + + double run_time; + double read_time; + double write_time; bool assigned_to_actor; std::string hostname; @@ -19,16 +24,24 @@ class Batch { public: Batch(int batch_id = -1, int start_hru = -1, int num_hru = -1); + // Gettiners int getBatchID(); - int getStartHRU(); - int getNumHRU(); - + double getRunTime(); + double getReadTime(); + double getWriteTime(); bool getBatchStatus(); + // Setters + void updateRunTime(double run_time); + void updateReadTime(double read_time); + void updateWriteTime(double write_time); + void printBatchInfo(); + void writeBatchToFile(std::string csv_output); + std::string toString(); /** * @brief Mark batch as assigned to an actor * Update the assigned_to_actor to True and @@ -43,6 +56,9 @@ class Batch { inspector.field("batch_id", batch.batch_id), inspector.field("start_hru", batch.start_hru), inspector.field("num_hru", batch.num_hru), + inspector.field("run_time", batch.run_time), + inspector.field("read_time", batch.read_time), + inspector.field("write_time", batch.write_time), inspector.field("status", batch.assigned_to_actor), inspector.field("hostname", batch.hostname), inspector.field("assigned_actor", batch.assigned_actor)); @@ -67,6 +83,12 @@ class Batch_Container { * with the two parameters that are passed in. */ Batch_Container(int total_hru_count, int num_hru_per_batch); + + /** + * @brief Get the Batches Remaining + * returns the size of batch_list. + */ + int getBatchesRemaining(); /** * Assign a batch to be solved by a client. @@ -118,6 +140,13 @@ class Batch_Container { */ void assembleBatches(int total_hru_count, int num_hru_per_batch); + /** + * @brief + * Find a batch by its id, + * return its index in the vector + */ + std::optional<int> findBatch(int batch_id); + diff --git a/build/includes/summa_actor/client.hpp b/build/includes/summa_actor/client.hpp index 6e985c46c969b3fa76f1587b3cf4b993d9a6ab38..7b7e9496e91c89128e9662224e47fd73cb2c4b31 100644 --- a/build/includes/summa_actor/client.hpp +++ b/build/includes/summa_actor/client.hpp @@ -52,4 +52,6 @@ class Client_Container { int getClientID(caf::actor); + + std::string getHostname_ByClientID(int client_id); }; \ No newline at end of file diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp index 8afa70b6ca7810b964373289d8fbf9931325cecd..8a680b93a4365b47ba97c01acbe76e8b8540f995 100644 --- a/build/includes/summa_actor/summa_client.hpp +++ b/build/includes/summa_actor/summa_client.hpp @@ -18,6 +18,8 @@ struct summa_client_state { int batch_id; int client_id; // id held by server + Batch current_batch; + Summa_Actor_Settings summa_actor_settings; File_Access_Actor_Settings file_access_actor_settings; Job_Actor_Settings job_actor_settings; diff --git a/build/source/actors/summa_actor/batch_manager.cpp b/build/source/actors/summa_actor/batch_manager.cpp index d1e674b4ed78301bba6264a23a342f3a144df905..366b9cf37713b4eae8dd355ae0e1bb31676c80d2 100644 --- a/build/source/actors/summa_actor/batch_manager.cpp +++ b/build/source/actors/summa_actor/batch_manager.cpp @@ -1,5 +1,3 @@ -#include "caf/all.hpp" -#include <vector> #include "batch_manager.hpp" @@ -10,6 +8,10 @@ Batch_Container::Batch_Container(int total_hru_count, int num_hru_per_batch) { this->assembleBatches(this->total_hru_count, this->num_hru_per_batch); } +int Batch_Container::getBatchesRemaining() { + return this->batch_list.size(); +} + void Batch_Container::assembleBatches(int total_hru_count, int num_hru_per_batch) { int remaining_hru_to_batch = total_hru_count; int batch_id = 0; @@ -47,6 +49,29 @@ std::optional<Batch> Batch_Container::assignBatch(std::string hostname, caf::act return {}; } +void Batch_Container::updateBatch_success(Batch successful_batch, std::string output_csv) { + this->solved_batches.push_back(successful_batch); + + successful_batch.writeBatchToFile(output_csv); + + std::optional<int> index_to_remove = this->findBatch(successful_batch.getBatchID()); + if (index_to_remove.has_value()) { + this->batch_list.erase(this->batch_list.begin() + index_to_remove.value()); + } else { + throw "No element in BatchList Matches the succesful_batch"; + } +} + +std::optional<int> Batch_Container::findBatch(int batch_id) { + + for(std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) { + if (this->batch_list[i].getBatchID() == batch_id) { + return i; + } + } + + return {}; +} // ************************** // Batch Class @@ -76,6 +101,31 @@ bool Batch::getBatchStatus() { return this->assigned_to_actor; } +double Batch::getRunTime() { + return this->run_time; +} + +double Batch::getReadTime() { + return this->read_time; +} + +double Batch::getWriteTime() { + return this->write_time; +} + +// Setters +void Batch::updateRunTime(double run_time) { + this->run_time = run_time; +} + +void Batch::updateReadTime(double read_time) { + this->read_time = read_time; +} + +void Batch::updateWriteTime(double write_time) { + this->write_time = write_time; +} + void Batch::assignToActor(std::string hostname, caf::actor assigned_actor) { this->hostname = hostname; this->assigned_actor = assigned_actor; @@ -88,52 +138,33 @@ void Batch::printBatchInfo() { std::cout << "num_hru: " << this->num_hru << "\n"; } -// batch_status Batch::getBatchStatus() { -// return this->status; -// } - -// int Batch::getBatchID() { -// return this->batch_id; -// } - -// int Batch::getStartHRU() { -// return this->start_hru; -// } - -// int Batch::getNumHRU() { -// return this->num_hru; -// } - -// void Batch::solvedBatch(double run_time, double read_time, double write_time) { -// this->status = solved; -// this->run_time = run_time; -// this->read_time = read_time; -// this->write_time = write_time; -// } - -// void Batch::assignedBatch(std::string hostname, caf::actor actor_ref) { -// this->status = assigned; -// this->assigned_host = hostname; -// this->assigned_actor = actor_ref; -// } - -// void Batch::updateRunTime(double run_time) { -// this->run_time = run_time; -// } - -// void Batch::writeBatchToFile(std::string file_name) { -// std::ofstream output_file; -// output_file.open(file_name, std::ios_base::app); -// output_file << -// this->batch_id << "," << -// this->start_hru << "," << -// this->num_hru << "," << -// this->assigned_host << "," << -// this->run_time << "," << -// this->read_time << "," << -// this->write_time << "," << -// this->status << "\n"; -// output_file.close(); -// } +std::string Batch::toString() { + std::stringstream out_string; + + out_string << "batch_id: " << this->batch_id << "\n" << + "start_hru: " << this->start_hru << "\n" << + "num_hru: " << this->num_hru << "\n" << + "run_time: " << this->run_time << "\n" << + "read_time: " << this->read_time << "\n" << + "write_time: " << this->write_time << "\n" << + "assigned_to_actor: " << this->assigned_to_actor << "\n" << + "hostname: " << this->hostname << "\n"; + + return out_string.str(); +} + +void Batch::writeBatchToFile(std::string file_name) { + std::ofstream output_file; + output_file.open(file_name, std::ios_base::app); + output_file << + this->batch_id << "," << + this->start_hru << "," << + this->num_hru << "," << + this->hostname << "," << + this->run_time << "," << + this->read_time << "," << + this->write_time << "\n"; + output_file.close(); +} diff --git a/build/source/actors/summa_actor/client.cpp b/build/source/actors/summa_actor/client.cpp index 592a10a504da34142db832d2aa464d5114d1f6e2..ecf776b1e3b1bd72cd2483c8d4c8fd41448a76ae 100644 --- a/build/source/actors/summa_actor/client.cpp +++ b/build/source/actors/summa_actor/client.cpp @@ -44,3 +44,7 @@ int Client_Container::getClientID(caf::actor client_actor) { } return -1; } + +std::string Client_Container::getHostname_ByClientID(int client_id) { + return this->client_list[client_id].getHostname(); +} diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index f0faa70e5e2962cf4968f241932ca5c34f871cc3..41da453e4afb7230a8b253539e3fdf243ce56ae3 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -89,25 +89,34 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a // Received batch from server to compute [=](Batch& batch) { + self->state.current_batch = batch; aout(self) << "\nReceived batch to compute\n"; - aout(self) << "BatchID = " << batch.getBatchID() << "\n"; - aout(self) << "StartHRU = " << batch.getStartHRU() << "\n"; - aout(self) << "NumHRU = " << batch.getNumHRU() << "\n"; + aout(self) << "BatchID = " << self->state.current_batch.getBatchID() << "\n"; + aout(self) << "StartHRU = " << self->state.current_batch.getStartHRU() << "\n"; + aout(self) << "NumHRU = " << self->state.current_batch.getNumHRU() << "\n"; self->state.summa_actor_ref = self->spawn(summa_actor, - batch.getStartHRU(), - batch.getNumHRU(), + self->state.current_batch.getStartHRU(), + self->state.current_batch.getNumHRU(), self->state.summa_actor_settings, self->state.file_access_actor_settings, self->state.job_actor_settings, self->state.hru_actor_settings, self); }, + + // Received completed batch information from the summa_actor + [=](done_batch, double run_time, double read_time, double write_time) { + aout(self) << "Summa_Actor has finished, sending message to the server for another batch\n"; + aout(self) << "run_time = " << run_time << "\n"; + aout(self) << "read_time = " << read_time << "\n"; + aout(self) << "write_time = " << write_time << "\n"; + + self->state.current_batch.updateRunTime(run_time); + self->state.current_batch.updateReadTime(read_time); + self->state.current_batch.updateWriteTime(write_time); - [=](done_batch, double total_duration, double total_read_duration, double total_write_duration) { - aout(self) << "summa_actor has finished, sending message to the server for another batch\n"; - self->send(server_actor, done_batch_v, self, self->state.client_id, self->state.batch_id, - total_duration, total_read_duration, total_write_duration); + self->send(server_actor, done_batch_v, self, self->state.client_id, self->state.current_batch); }, [=](time_to_exit) { diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index f9e89941f0ce828e87320f0aa6fa22138593ec11..98bf019e5e5dee131e008295f2f9b4817eb1566e 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -56,54 +56,23 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett }, - [=](done_batch, actor client, int client_id, int batch_id, double total_duration, - double total_read_duration, double total_write_duration) { - - // self->state.batch_list[batch_id].solvedBatch(total_duration, total_read_duration, total_write_duration); - // self->state.batch_list[batch_id].writeBatchToFile(self->state.csv_output_name); - // self->state.batches_solved++; - // self->state.batches_remaining = self->state.batch_list.size() - self->state.batches_solved; - - // aout(self) << "\n****************************************\n"; - // aout(self) << "Client finished batch: " << batch_id << "\n"; - // aout(self) << "Client hostname = " << self->state.client_list[client_id].getHostname() << "\n"; - // aout(self) << "Total Batch Duration = " << total_duration << "\n"; - // aout(self) << "Total Batch Read Duration = " << total_read_duration << "\n"; - // aout(self) << "Total Batch Write Duration = " << total_write_duration << "\n"; - // aout(self) << "Batches Solved = " << self->state.batches_solved << "\n"; - // aout(self) << "Batches Remaining = " << self->state.batches_remaining << "\n"; - // aout(self) << "****************************************\n"; - - // // Find a new batch - // std::optional<int> new_batch_id = getUnsolvedBatchID(self); - // if (new_batch_id.has_value()) { - // // update the batch in the batch list with the host and actor_ref - // self->state.batch_list[new_batch_id.value()].assignedBatch(self->state.client_list[client_id].getHostname(), client); - - // int start_hru = self->state.batch_list[new_batch_id.value()].getStartHRU(); - // int num_hru = self->state.batch_list[new_batch_id.value()].getNumHRU(); + [=](done_batch, actor client_actor, int client_id, Batch& batch) { + aout(self) << "Recieved Completed Batch From Client\n"; + + aout(self) << batch.toString() << "\n\n"; - // self->send(client, - // compute_batch_v, - // client_id, - // new_batch_id.value(), - // start_hru, - // num_hru, - // self->state.config_path); + self->state.batch_container->updateBatch_success(batch, self->state.csv_output_name); + aout(self) << "******************\n" << "Batches Remaining: " << + self->state.batch_container->getBatchesRemaining() << "\n******************\n\n"; - // } else { - // // We found no batch this means all batches are assigned - // if (self->state.batches_remaining == 0) { - // aout(self) << "All Batches Solved -- Telling Clients To Exit\n"; - // for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) { - // self->send(self->state.client_list[i].getActor(), time_to_exit_v); - // } - // aout(self) << "\nSUMMA_SERVER -- EXITING\n"; - // self->quit(); - // } else { - // aout(self) << "No Batches left to compute -- letting client stay connected in case batch fails\n"; - // } - // } + std::optional<Batch> new_batch = self->state.batch_container->assignBatch( + self->state.client_container->getHostname_ByClientID(client_id), client_actor); + if (new_batch.has_value()) { + self->send(client_actor, new_batch.value()); + } else { + aout(self) << "no more batches left to assign\n"; + aout(self) << "we are not done yet. Clients could Fail\n"; + } } }; } @@ -119,8 +88,7 @@ void initializeCSVOutput(std::string csv_output_name) { "Hostname," << "Run_Time," << "Read_Time," << - "Write_Time,"<< - "Status\n"; + "Write_Time\n"; csv_output.close(); }