Skip to content
Snippets Groups Projects
Commit f44a0987 authored by KyleKlenk's avatar KyleKlenk
Browse files

Clients can now be sent a Batch object from the server to start computing

parent 582f2f5e
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@
#include "caf/all.hpp"
#include <vector>
#include <string>
#include <optional>
class Batch;
......@@ -24,6 +25,13 @@ class Batch {
void printBatchInfo();
/**
* @brief Mark batch as assigned to an actor
* Update the assigned_to_actor to True and
* update the hostname and assigned_actor instance variables
*/
void assignToActor(std::string hostname, caf::actor assigned_actor);
template <class Inspector>
friend bool inspect(Inspector& inspector, Batch& batch) {
......@@ -62,7 +70,7 @@ class Batch_Container {
* are added to the client for the servers awareness
* The batch is then returned by this method and sent to the respective client
*/
Batch assignBatch(std::string hostname, caf::actor actor_ref);
std::optional<Batch> assignBatch(std::string hostname, caf::actor actor_ref);
/**
* On a successful batch we take the batch given to us by the client
......@@ -90,7 +98,6 @@ class Batch_Container {
*/
void inititalizeCSVOutput(std::string csv_output_name);
/**
* @brief Print the batches from the batch list
*
......
......@@ -36,16 +36,22 @@ void Batch_Container::printBatches() {
}
Batch Batch_Container::assignBatch(std::string hostname, caf::actor actor_ref) {
std::optional<Batch> Batch_Container::assignBatch(std::string hostname, caf::actor actor_ref) {
for (std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) {
if (!this->batch_list[i].getBatchStatus()) {
this->batch_list[i].assignToActor(hostname, actor_ref);
return this->batch_list[i];
}
}
return NULL;
return {};
}
Batch::Batch(int batch_id, int start_hru, int num_hru){
this->batch_id = batch_id;
this->start_hru = start_hru;
......@@ -53,7 +59,6 @@ Batch::Batch(int batch_id, int start_hru, int num_hru){
this->assigned_to_actor = false;
}
// Setters
int Batch::getBatchID() {
return this->batch_id;
......@@ -63,6 +68,11 @@ bool Batch::getBatchStatus() {
return this->assigned_to_actor;
}
void Batch::assignToActor(std::string hostname, caf::actor assigned_actor) {
this->hostname = hostname;
this->assigned_actor = assigned_actor;
this->assigned_to_actor = true;
}
void Batch::printBatchInfo() {
std::cout << "batch_id: " << this->batch_id << "\n";
......
......@@ -88,6 +88,7 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a
},
[=](Batch& batch) {
aout(self) << "Recieved Batch" << std::endl;
aout(self) << batch.getBatchID() << std::endl;
},
......
......@@ -45,36 +45,15 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
self->send(client_actor, connect_to_server_v, self->state.client_container->getClientID(client_actor),
self->state.summa_actor_settings, self->state.file_access_actor_settings, self->state.job_actor_settings,
self->state.hru_actor_settings);
std::optional<Batch> batch = self->state.batch_container->assignBatch(hostname, client_actor);
if (batch.has_value()) {
self->send(client_actor, batch.value());
} else {
aout(self) << "no more batches left to assign\n";
aout(self) << "we are not done yet. Clients could Fail\n";
}
Batch batch = self->state.batch_container->assignBatch(hostname, client_actor);
// std::optional<int> batch_id = getUnsolvedBatchID(self);
// if (batch_id.has_value()) {
// // update the batch in the batch list with the host and actor_ref
// self->state.batch_list[batch_id.value()].assignedBatch(self->state.client_list[client_id].getHostname(), client);
// int start_hru = self->state.batch_list[batch_id.value()].getStartHRU();
// int num_hru = self->state.batch_list[batch_id.value()].getNumHRU();
// self->send(client,
// compute_batch_v,
// client_id,
// batch_id.value(),
// start_hru,
// num_hru,
// self->state.config_path);
// } else {
// aout(self) << "We Are Done - Telling Clients to exit \n";
// for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) {
// self->send(self->state.client_list[i].getActor(), time_to_exit_v);
// }
// self->quit();
// return;
// }
},
[=](done_batch, actor client, int client_id, int batch_id, double total_duration,
......@@ -130,16 +109,6 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
}
std::optional<int> getUnsolvedBatchID(stateful_actor<summa_server_state>* self) {
// Find the first unassigned batch
// for (std::vector<int>::size_type i = 0; i < self->state.batch_list.size(); i++) {
// if (self->state.batch_list[i].getBatchStatus() == unassigned) {
// return i;
// }
// }
return {};
}
void initializeCSVOutput(std::string csv_output_name) {
std::ofstream csv_output;
csv_output.open(csv_output_name, std::ios_base::out);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment