Skip to content
Snippets Groups Projects
Commit 6797b1e5 authored by KyleKlenk's avatar KyleKlenk
Browse files

New data structures implmented full, program is working as well as it did...

New data structures implmented full, program is working as well as it did before the new data structures

Shutdown messages still need to be implemented.
parent e1953d11
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@
#include "caf/all.hpp"
#include <vector>
#include <string>
#include <sstream>
#include <optional>
class Batch;
......@@ -11,6 +12,10 @@ class Batch {
int batch_id;
int start_hru;
int num_hru;
double run_time;
double read_time;
double write_time;
bool assigned_to_actor;
std::string hostname;
......@@ -19,16 +24,24 @@ class Batch {
public:
Batch(int batch_id = -1, int start_hru = -1, int num_hru = -1);
// Gettiners
int getBatchID();
int getStartHRU();
int getNumHRU();
double getRunTime();
double getReadTime();
double getWriteTime();
bool getBatchStatus();
// Setters
void updateRunTime(double run_time);
void updateReadTime(double read_time);
void updateWriteTime(double write_time);
void printBatchInfo();
void writeBatchToFile(std::string csv_output);
std::string toString();
/**
* @brief Mark batch as assigned to an actor
* Update the assigned_to_actor to True and
......@@ -43,6 +56,9 @@ class Batch {
inspector.field("batch_id", batch.batch_id),
inspector.field("start_hru", batch.start_hru),
inspector.field("num_hru", batch.num_hru),
inspector.field("run_time", batch.run_time),
inspector.field("read_time", batch.read_time),
inspector.field("write_time", batch.write_time),
inspector.field("status", batch.assigned_to_actor),
inspector.field("hostname", batch.hostname),
inspector.field("assigned_actor", batch.assigned_actor));
......@@ -67,6 +83,12 @@ class Batch_Container {
* with the two parameters that are passed in.
*/
Batch_Container(int total_hru_count, int num_hru_per_batch);
/**
* @brief Get the Batches Remaining
* returns the size of batch_list.
*/
int getBatchesRemaining();
/**
* Assign a batch to be solved by a client.
......@@ -118,6 +140,13 @@ class Batch_Container {
*/
void assembleBatches(int total_hru_count, int num_hru_per_batch);
/**
* @brief
* Find a batch by its id,
* return its index in the vector
*/
std::optional<int> findBatch(int batch_id);
......
......@@ -52,4 +52,6 @@ class Client_Container {
int getClientID(caf::actor);
std::string getHostname_ByClientID(int client_id);
};
\ No newline at end of file
......@@ -18,6 +18,8 @@ struct summa_client_state {
int batch_id;
int client_id; // id held by server
Batch current_batch;
Summa_Actor_Settings summa_actor_settings;
File_Access_Actor_Settings file_access_actor_settings;
Job_Actor_Settings job_actor_settings;
......
#include "caf/all.hpp"
#include <vector>
#include "batch_manager.hpp"
......@@ -10,6 +8,10 @@ Batch_Container::Batch_Container(int total_hru_count, int num_hru_per_batch) {
this->assembleBatches(this->total_hru_count, this->num_hru_per_batch);
}
int Batch_Container::getBatchesRemaining() {
return this->batch_list.size();
}
void Batch_Container::assembleBatches(int total_hru_count, int num_hru_per_batch) {
int remaining_hru_to_batch = total_hru_count;
int batch_id = 0;
......@@ -47,6 +49,29 @@ std::optional<Batch> Batch_Container::assignBatch(std::string hostname, caf::act
return {};
}
void Batch_Container::updateBatch_success(Batch successful_batch, std::string output_csv) {
this->solved_batches.push_back(successful_batch);
successful_batch.writeBatchToFile(output_csv);
std::optional<int> index_to_remove = this->findBatch(successful_batch.getBatchID());
if (index_to_remove.has_value()) {
this->batch_list.erase(this->batch_list.begin() + index_to_remove.value());
} else {
throw "No element in BatchList Matches the succesful_batch";
}
}
std::optional<int> Batch_Container::findBatch(int batch_id) {
for(std::vector<int>::size_type i = 0; i < this->batch_list.size(); i++) {
if (this->batch_list[i].getBatchID() == batch_id) {
return i;
}
}
return {};
}
// **************************
// Batch Class
......@@ -76,6 +101,31 @@ bool Batch::getBatchStatus() {
return this->assigned_to_actor;
}
double Batch::getRunTime() {
return this->run_time;
}
double Batch::getReadTime() {
return this->read_time;
}
double Batch::getWriteTime() {
return this->write_time;
}
// Setters
void Batch::updateRunTime(double run_time) {
this->run_time = run_time;
}
void Batch::updateReadTime(double read_time) {
this->read_time = read_time;
}
void Batch::updateWriteTime(double write_time) {
this->write_time = write_time;
}
void Batch::assignToActor(std::string hostname, caf::actor assigned_actor) {
this->hostname = hostname;
this->assigned_actor = assigned_actor;
......@@ -88,52 +138,33 @@ void Batch::printBatchInfo() {
std::cout << "num_hru: " << this->num_hru << "\n";
}
// batch_status Batch::getBatchStatus() {
// return this->status;
// }
// int Batch::getBatchID() {
// return this->batch_id;
// }
// int Batch::getStartHRU() {
// return this->start_hru;
// }
// int Batch::getNumHRU() {
// return this->num_hru;
// }
// void Batch::solvedBatch(double run_time, double read_time, double write_time) {
// this->status = solved;
// this->run_time = run_time;
// this->read_time = read_time;
// this->write_time = write_time;
// }
// void Batch::assignedBatch(std::string hostname, caf::actor actor_ref) {
// this->status = assigned;
// this->assigned_host = hostname;
// this->assigned_actor = actor_ref;
// }
// void Batch::updateRunTime(double run_time) {
// this->run_time = run_time;
// }
// void Batch::writeBatchToFile(std::string file_name) {
// std::ofstream output_file;
// output_file.open(file_name, std::ios_base::app);
// output_file <<
// this->batch_id << "," <<
// this->start_hru << "," <<
// this->num_hru << "," <<
// this->assigned_host << "," <<
// this->run_time << "," <<
// this->read_time << "," <<
// this->write_time << "," <<
// this->status << "\n";
// output_file.close();
// }
std::string Batch::toString() {
std::stringstream out_string;
out_string << "batch_id: " << this->batch_id << "\n" <<
"start_hru: " << this->start_hru << "\n" <<
"num_hru: " << this->num_hru << "\n" <<
"run_time: " << this->run_time << "\n" <<
"read_time: " << this->read_time << "\n" <<
"write_time: " << this->write_time << "\n" <<
"assigned_to_actor: " << this->assigned_to_actor << "\n" <<
"hostname: " << this->hostname << "\n";
return out_string.str();
}
void Batch::writeBatchToFile(std::string file_name) {
std::ofstream output_file;
output_file.open(file_name, std::ios_base::app);
output_file <<
this->batch_id << "," <<
this->start_hru << "," <<
this->num_hru << "," <<
this->hostname << "," <<
this->run_time << "," <<
this->read_time << "," <<
this->write_time << "\n";
output_file.close();
}
......@@ -44,3 +44,7 @@ int Client_Container::getClientID(caf::actor client_actor) {
}
return -1;
}
std::string Client_Container::getHostname_ByClientID(int client_id) {
return this->client_list[client_id].getHostname();
}
......@@ -89,25 +89,34 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a
// Received batch from server to compute
[=](Batch& batch) {
self->state.current_batch = batch;
aout(self) << "\nReceived batch to compute\n";
aout(self) << "BatchID = " << batch.getBatchID() << "\n";
aout(self) << "StartHRU = " << batch.getStartHRU() << "\n";
aout(self) << "NumHRU = " << batch.getNumHRU() << "\n";
aout(self) << "BatchID = " << self->state.current_batch.getBatchID() << "\n";
aout(self) << "StartHRU = " << self->state.current_batch.getStartHRU() << "\n";
aout(self) << "NumHRU = " << self->state.current_batch.getNumHRU() << "\n";
self->state.summa_actor_ref = self->spawn(summa_actor,
batch.getStartHRU(),
batch.getNumHRU(),
self->state.current_batch.getStartHRU(),
self->state.current_batch.getNumHRU(),
self->state.summa_actor_settings,
self->state.file_access_actor_settings,
self->state.job_actor_settings,
self->state.hru_actor_settings,
self);
},
// Received completed batch information from the summa_actor
[=](done_batch, double run_time, double read_time, double write_time) {
aout(self) << "Summa_Actor has finished, sending message to the server for another batch\n";
aout(self) << "run_time = " << run_time << "\n";
aout(self) << "read_time = " << read_time << "\n";
aout(self) << "write_time = " << write_time << "\n";
self->state.current_batch.updateRunTime(run_time);
self->state.current_batch.updateReadTime(read_time);
self->state.current_batch.updateWriteTime(write_time);
[=](done_batch, double total_duration, double total_read_duration, double total_write_duration) {
aout(self) << "summa_actor has finished, sending message to the server for another batch\n";
self->send(server_actor, done_batch_v, self, self->state.client_id, self->state.batch_id,
total_duration, total_read_duration, total_write_duration);
self->send(server_actor, done_batch_v, self, self->state.client_id, self->state.current_batch);
},
[=](time_to_exit) {
......
......@@ -56,54 +56,23 @@ behavior summa_server(stateful_actor<summa_server_state>* self, Distributed_Sett
},
[=](done_batch, actor client, int client_id, int batch_id, double total_duration,
double total_read_duration, double total_write_duration) {
// self->state.batch_list[batch_id].solvedBatch(total_duration, total_read_duration, total_write_duration);
// self->state.batch_list[batch_id].writeBatchToFile(self->state.csv_output_name);
// self->state.batches_solved++;
// self->state.batches_remaining = self->state.batch_list.size() - self->state.batches_solved;
// aout(self) << "\n****************************************\n";
// aout(self) << "Client finished batch: " << batch_id << "\n";
// aout(self) << "Client hostname = " << self->state.client_list[client_id].getHostname() << "\n";
// aout(self) << "Total Batch Duration = " << total_duration << "\n";
// aout(self) << "Total Batch Read Duration = " << total_read_duration << "\n";
// aout(self) << "Total Batch Write Duration = " << total_write_duration << "\n";
// aout(self) << "Batches Solved = " << self->state.batches_solved << "\n";
// aout(self) << "Batches Remaining = " << self->state.batches_remaining << "\n";
// aout(self) << "****************************************\n";
// // Find a new batch
// std::optional<int> new_batch_id = getUnsolvedBatchID(self);
// if (new_batch_id.has_value()) {
// // update the batch in the batch list with the host and actor_ref
// self->state.batch_list[new_batch_id.value()].assignedBatch(self->state.client_list[client_id].getHostname(), client);
// int start_hru = self->state.batch_list[new_batch_id.value()].getStartHRU();
// int num_hru = self->state.batch_list[new_batch_id.value()].getNumHRU();
[=](done_batch, actor client_actor, int client_id, Batch& batch) {
aout(self) << "Recieved Completed Batch From Client\n";
aout(self) << batch.toString() << "\n\n";
// self->send(client,
// compute_batch_v,
// client_id,
// new_batch_id.value(),
// start_hru,
// num_hru,
// self->state.config_path);
self->state.batch_container->updateBatch_success(batch, self->state.csv_output_name);
aout(self) << "******************\n" << "Batches Remaining: " <<
self->state.batch_container->getBatchesRemaining() << "\n******************\n\n";
// } else {
// // We found no batch this means all batches are assigned
// if (self->state.batches_remaining == 0) {
// aout(self) << "All Batches Solved -- Telling Clients To Exit\n";
// for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) {
// self->send(self->state.client_list[i].getActor(), time_to_exit_v);
// }
// aout(self) << "\nSUMMA_SERVER -- EXITING\n";
// self->quit();
// } else {
// aout(self) << "No Batches left to compute -- letting client stay connected in case batch fails\n";
// }
// }
std::optional<Batch> new_batch = self->state.batch_container->assignBatch(
self->state.client_container->getHostname_ByClientID(client_id), client_actor);
if (new_batch.has_value()) {
self->send(client_actor, new_batch.value());
} else {
aout(self) << "no more batches left to assign\n";
aout(self) << "we are not done yet. Clients could Fail\n";
}
}
};
}
......@@ -119,8 +88,7 @@ void initializeCSVOutput(std::string csv_output_name) {
"Hostname," <<
"Run_Time," <<
"Read_Time," <<
"Write_Time,"<<
"Status\n";
"Write_Time\n";
csv_output.close();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment