diff --git a/build/includes/summa_actor/batch_manager.hpp b/build/includes/summa_actor/batch_manager.hpp index 1bdb554b2c0de94aa53289d823a0c2623eb6ae58..a104cb9b20a888d03700de0ca0682b830961f36d 100644 --- a/build/includes/summa_actor/batch_manager.hpp +++ b/build/includes/summa_actor/batch_manager.hpp @@ -2,6 +2,7 @@ #include "caf/all.hpp" #include <vector> #include <string> +#include <optional> class Batch; @@ -24,6 +25,13 @@ class Batch { void printBatchInfo(); + /** + * @brief Mark batch as assigned to an actor + * Update the assigned_to_actor to True and + * update the hostname and assigned_actor instance variables + */ + void assignToActor(std::string hostname, caf::actor assigned_actor); + template <class Inspector> friend bool inspect(Inspector& inspector, Batch& batch) { @@ -62,7 +70,7 @@ class Batch_Container { * are added to the client for the servers awareness * The batch is then returned by this method and sent to the respective client */ - Batch assignBatch(std::string hostname, caf::actor actor_ref); + std::optional<Batch> assignBatch(std::string hostname, caf::actor actor_ref); /** * On a successful batch we take the batch given to us by the client @@ -90,7 +98,6 @@ class Batch_Container { */ void inititalizeCSVOutput(std::string csv_output_name); - /** * @brief Print the batches from the batch list * diff --git a/build/source/actors/summa_actor/batch_manager.cpp b/build/source/actors/summa_actor/batch_manager.cpp index 16ff2ca575594d4279e67409763422d280820ee9..75b5769e848a53fadd87c1e0e2020869d2884341 100644 --- a/build/source/actors/summa_actor/batch_manager.cpp +++ b/build/source/actors/summa_actor/batch_manager.cpp @@ -36,16 +36,22 @@ void Batch_Container::printBatches() { } -Batch Batch_Container::assignBatch(std::string hostname, caf::actor actor_ref) { +std::optional<Batch> Batch_Container::assignBatch(std::string hostname, caf::actor actor_ref) { + for (std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) { if (!this->batch_list[i].getBatchStatus()) { + this->batch_list[i].assignToActor(hostname, actor_ref); return this->batch_list[i]; } } - return NULL; + return {}; } + + + + Batch::Batch(int batch_id, int start_hru, int num_hru){ this->batch_id = batch_id; this->start_hru = start_hru; @@ -53,7 +59,6 @@ Batch::Batch(int batch_id, int start_hru, int num_hru){ this->assigned_to_actor = false; } - // Setters int Batch::getBatchID() { return this->batch_id; @@ -63,6 +68,11 @@ bool Batch::getBatchStatus() { return this->assigned_to_actor; } +void Batch::assignToActor(std::string hostname, caf::actor assigned_actor) { + this->hostname = hostname; + this->assigned_actor = assigned_actor; + this->assigned_to_actor = true; +} void Batch::printBatchInfo() { std::cout << "batch_id: " << this->batch_id << "\n"; diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index 71b01c1f44a8385e875f6e7e21e30ddc1e9f7c9b..4fde772f40260243a8260abcd4f5e41972b694f0 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -88,6 +88,7 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a }, [=](Batch& batch) { + aout(self) << "Recieved Batch" << std::endl; aout(self) << batch.getBatchID() << std::endl; }, diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index b6c211f84b5e47328a2232cc879995abaa9fd0f1..cf0fe865a4fc5e01a9b05ab6413e3f9a9cb6bf91 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -45,36 +45,15 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett self->send(client_actor, connect_to_server_v, self->state.client_container->getClientID(client_actor), self->state.summa_actor_settings, self->state.file_access_actor_settings, self->state.job_actor_settings, self->state.hru_actor_settings); - + std::optional<Batch> batch = self->state.batch_container->assignBatch(hostname, client_actor); + if (batch.has_value()) { + self->send(client_actor, batch.value()); + } else { + aout(self) << "no more batches left to assign\n"; + aout(self) << "we are not done yet. Clients could Fail\n"; + } - Batch batch = self->state.batch_container->assignBatch(hostname, client_actor); - - // std::optional<int> batch_id = getUnsolvedBatchID(self); - // if (batch_id.has_value()) { - // // update the batch in the batch list with the host and actor_ref - // self->state.batch_list[batch_id.value()].assignedBatch(self->state.client_list[client_id].getHostname(), client); - - // int start_hru = self->state.batch_list[batch_id.value()].getStartHRU(); - // int num_hru = self->state.batch_list[batch_id.value()].getNumHRU(); - - // self->send(client, - // compute_batch_v, - // client_id, - // batch_id.value(), - // start_hru, - // num_hru, - // self->state.config_path); - - // } else { - - // aout(self) << "We Are Done - Telling Clients to exit \n"; - // for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) { - // self->send(self->state.client_list[i].getActor(), time_to_exit_v); - // } - // self->quit(); - // return; - // } }, [=](done_batch, actor client, int client_id, int batch_id, double total_duration, @@ -130,16 +109,6 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett } -std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self) { - // Find the first unassigned batch - // for (std::vector<int>::size_type i = 0; i < self->state.batch_list.size(); i++) { - // if (self->state.batch_list[i].getBatchStatus() == unassigned) { - // return i; - // } - // } - return {}; -} - void initializeCSVOutput(std::string csv_output_name) { std::ofstream csv_output; csv_output.open(csv_output_name, std::ios_base::out);