Skip to content
Snippets Groups Projects
Commit c0fff8ea authored by KyleKlenk's avatar KyleKlenk
Browse files

backup servers get updated data from the lead server

parent 20dce3bd
No related branches found
No related tags found
No related merge requests found
......@@ -8,11 +8,11 @@
#include "settings_functions.hpp"
CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id)
// Sender: job_actor
// Reciever: summa_actor
// Summary: job_actor finished job
// Sender: job_actor
// Reciever: summa_actor
// Summary: job_actor finished job
CAF_ADD_ATOM(summa, done_job)
// Sender:
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, err)
......@@ -119,8 +119,15 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, update_with_current_state)
CAF_ADD_ATOM(summa, heartbeat)
CAF_ADD_ATOM(summa, update_with_current_state)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, new_assigned_batch)
// Sender:
// Reciever:
// Summary:
CAF_ADD_ATOM(summa, no_more_batches)
// Struct Types
CAF_ADD_TYPE_ID(summa, (Distributed_Settings))
CAF_ADD_TYPE_ID(summa, (Summa_Actor_Settings))
......
#pragma once
#include "caf/all.hpp"
#pragma once
#include "client/client.hpp"
......@@ -12,10 +12,6 @@ class Batch_Container {
// Assemble the total number of HRUs given by the user into batches.
void assembleBatches(int total_hru_count, int num_hru_per_batch);
// Find a batch by its id,
// return its index in the vector
std::optional<int> findBatch(int batch_id);
public:
......@@ -23,32 +19,27 @@ class Batch_Container {
// with the two parameters that are passed in.
Batch_Container(int total_hru_count = 0, int num_hru_per_batch = 0);
// returns the size of the batch list
int getBatchesRemaining();
/**
* Assign a batch to be solved by a client.
* The hostname and the actor_ref of the client solving this batch
* are added to the client for the servers awareness
* The batch is then returned by this method and sent to the respective client
*/
std::optional<Batch> assignBatch();
// Find an unsolved batch, set it to assigned and return it.
std::optional<Batch> getUnsolvedBatch();
/**
* On a successful batch we take the batch given to us by the client
* and add it to our solved_batches list.
*
* We can then remove the batch from the global batch list.
*/
// Update the batch status to solved and write the output to a file.
void updateBatch_success(Batch successful_batch, std::string output_csv);
// Update the batch status but do not write the output to a file.
void updateBatch_success(Batch successful_batch);
/**
* A batch failure is returned to us by the client
* This is for when a client failed to solve the batch.
*/
// Update the batch to assigned = true
void setBatchAssigned(Batch batch);
// Check if there are batches left to solve
bool hasUnsolvedBatches();
// TODO: Needs implementation
void updateBatch_failure(Batch failed_batch);
/**
* A client has found to be disconnected. Unassign all batches
* that were assigned to the disconnected client. The client id
......
......@@ -39,16 +39,20 @@ struct summa_server_state {
};
// Summa Server setup behaviour - initializes the state for the server
behavior summa_server_init(stateful_actor<summa_server_state>* self, Distributed_Settings distributed_settings,
Summa_Actor_Settings summa_actor_settings, File_Access_Actor_Settings file_access_actor_settings,
Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings);
// Summa Server behaviour - handles messages from clients
behavior summa_server(stateful_actor<summa_server_state>* self);
int assembleBatches(stateful_actor<summa_server_state>* self);
// Summa Server backup behaviour - handles the exit messages for clients
behavior summa_server_exit(stateful_actor<summa_server_state>* self);
// Creates the csv file that holds the results of the batches
void initializeCSVOutput(std::string csv_output_path, std::string csv_output_name);
// Convience function to keep code clean - just does what you think it does
void printRemainingBatches(stateful_actor<summa_server_state>* self);
}
\ No newline at end of file
......@@ -8,7 +8,7 @@ Batch_Container::Batch_Container(int total_hru_count, int num_hru_per_batch) {
}
int Batch_Container::getBatchesRemaining() {
return this->batch_list.size();
return this->batches_remaining;
}
void Batch_Container::assembleBatches(int total_hru_count, int num_hru_per_batch) {
......@@ -40,8 +40,7 @@ void Batch_Container::updateBatchStatus_LostClient(int batch_id) {
this->batch_list[batch_id].updateAssigned(false);
}
std::optional<Batch> Batch_Container::assignBatch() {
std::optional<Batch> Batch_Container::getUnsolvedBatch() {
for (std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) {
if (!this->batch_list[i].isAssigned() && !this->batch_list[i].isSolved()) {
this->batch_list[i].updateAssigned(true);
......@@ -51,6 +50,10 @@ std::optional<Batch> Batch_Container::assignBatch() {
return {};
}
void Batch_Container::setBatchAssigned(Batch batch) {
this->batch_list[batch.getBatchID()].updateAssigned(true);
}
void Batch_Container::updateBatch_success(Batch successful_batch, std::string output_csv) {
int batch_id = successful_batch.getBatchID();
successful_batch.writeBatchToFile(output_csv);
......@@ -58,13 +61,13 @@ void Batch_Container::updateBatch_success(Batch successful_batch, std::string ou
this->batches_remaining--;
}
std::optional<int> Batch_Container::findBatch(int batch_id) {
void Batch_Container::updateBatch_success(Batch successful_batch) {
int batch_id = successful_batch.getBatchID();
this->batch_list[batch_id].updateSolved(true);
this->batches_remaining--;
}
for(std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) {
if (this->batch_list[i].getBatchID() == batch_id) {
return i;
}
}
return {};
}
\ No newline at end of file
bool Batch_Container::hasUnsolvedBatches() {
return this->batches_remaining > 0;
}
......@@ -84,14 +84,36 @@ behavior summa_backup_server(stateful_actor<summa_server_state>* self, const act
},
[=](update_with_current_state, Batch_Container& batch_container, Client_Container& client_container) {
aout(self) << "Received the containers\n";
aout(self) << "Received the containers from the lead server\n";
self->state.batch_container = &batch_container;
self->state.client_container = &client_container;
aout(self) << "Num-Clients: " << self->state.client_container->getNumClients() << "\n";
},
};
// Client finished a batch and the lead server has sent an update
[=](done_batch, actor client_actor, Batch& batch) {
aout(self) << "Batch: " << batch.getBatchID() << " is done\n";
self->state.batch_container->updateBatch_success(batch);
},
// Client has been assigned new batch by the lead server
[=](new_assigned_batch, actor client_actor, Batch& batch) {
aout(self) << "New Batch: " << batch.getBatchID() << " has been assigned\n";
self->state.batch_container->setBatchAssigned(batch);
self->state.client_container->setBatchForClient(client_actor, batch);
},
// Lead server has no more batches to distribute
[=](no_more_batches, actor client_actor) {
aout(self) << "No more batches to distribute\n";
self->state.client_container->setBatchForClient(client_actor, {});
},
// Simulation has finished
[=](time_to_exit) {
aout(self) << "Received time to exit\n";
self->quit();
},
};
}
}
\ No newline at end of file
......@@ -112,10 +112,6 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a
},
[=](heartbeat) {
aout(self) << "Received Heartbeat \n";
},
[=](time_to_exit) {
aout(self) << "Client Exiting\n";
self->quit();
......
......@@ -58,7 +58,7 @@ behavior summa_server(stateful_actor<summa_server_state>* self) {
Client client = self->state.client_container->getClient(client_actor.address());
std::optional<Batch> batch = self->state.batch_container->assignBatch();
std::optional<Batch> batch = self->state.batch_container->getUnsolvedBatch();
if (batch.has_value()) {
self->state.client_container->setBatchForClient(client_actor, batch.value());
aout(self) << "SENDING: " << batch.value().toString() << "\n";
......@@ -76,53 +76,61 @@ behavior summa_server(stateful_actor<summa_server_state>* self) {
self->state.backup_servers_list.push_back(backup_server);
self->send(backup_server, connect_as_backup_v); // confirm connection with sender
// Now we need to send the backup actor our current state
// so that when we update in the future we just forward the update
self->send(backup_server, update_with_current_state_v, *self->state.batch_container, *self->state.client_container);
},
[=](done_batch, actor client_actor, Batch& batch) {
aout(self) << "Received Completed Batch From Client\n";
aout(self) << batch.toString() << "\n\n";
Client client = self->state.client_container->getClient(client_actor.address());
aout(self) << batch.toString() << "\n\n";\
self->state.batch_container->updateBatch_success(batch, self->state.csv_output_name);
printRemainingBatches(self);
std::optional<Batch> new_batch = self->state.batch_container->assignBatch();
std::optional<Batch> new_batch = self->state.batch_container->getUnsolvedBatch();
if (new_batch.has_value()) {
// send clients new batch and update backup servers
self->state.client_container->setBatchForClient(client_actor, new_batch.value());
self->send(client_actor, new_batch.value());
for (auto& backup_server : self->state.backup_servers_list) {
self->send(backup_server, done_batch_v, client_actor, batch);
self->send(backup_server, new_assigned_batch_v, client_actor, new_batch.value());
}
} else {
if (self->state.batch_container->getBatchesRemaining() > 0) {
aout(self) << "no more batches left to assign\n";
aout(self) << "Keeping Client connected because other clients could Fail\n";
self->state.client_container->setBatchForClient(client_actor, {});
// self->state.client_container->setAssignedBatch(client_id, false);
} else {
aout(self) << "Telling Clients To Exit\n";
while(!self->state.client_container->isEmpty()) {
Client client = self->state.client_container->removeClient_fromBack();
caf::actor client_actor = client.getActor();
self->send(client_actor, time_to_exit_v);
}
aout(self) << "SERVER EXITING!!\n";
self->quit();
// We may be done
if (!self->state.batch_container->hasUnsolvedBatches()) {
// We are done
self->become(summa_server_exit(self));
}
// No Batches left to assign but waiting for all clients to finish
aout(self) << "No batches left to assign - Waiting for All Clients to finish\n";
self->state.client_container->setBatchForClient(client_actor, {});
for (auto& backup_server : self->state.backup_servers_list) {
self->send(backup_server, done_batch_v, client_actor, batch);
self->send(backup_server, no_more_batches_v, client_actor);
}
}
},
};
}
behavior summa_server_exit(stateful_actor<summa_server_state>* self) {
aout(self) << "SUMMA Simulation is complete\n";
aout(self) << "Telling Clients to Exit\n";
while(!self->state.client_container->isEmpty()) {
Client client = self->state.client_container->removeClient_fromBack();
caf::actor client_actor = client.getActor();
self->send(client_actor, time_to_exit_v);
}
aout(self) << "Telling Backup Servers to Exit\n";
for (auto& backup_server : self->state.backup_servers_list) {
self->send(backup_server, time_to_exit_v);
}
self->quit();
return {};
}
void initializeCSVOutput(std::string csv_output_path, std::string csv_output_name) {
std::ofstream csv_output;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment