Skip to content
Snippets Groups Projects
Commit 7cff2666 authored by KyleKlenk's avatar KyleKlenk
Browse files

Client can solve multiple batches of HRUs

parent 55696d4a
No related branches found
No related tags found
No related merge requests found
Showing
with 163 additions and 18 deletions
......@@ -39,5 +39,9 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id)
CAF_ADD_ATOM(summa, dt_init_factor)
// Client Actor
CAF_ADD_ATOM(summa, connect_to_server)
CAF_ADD_ATOM(summa, batch)
// Server Actor
CAF_ADD_ATOM(summa, done_batch)
CAF_ADD_ATOM(summa, time_to_exit)
CAF_END_TYPE_ID_BLOCK(summa)
\ No newline at end of file
......@@ -3,15 +3,23 @@
#include <vector>
enum batch_status {
unassigned,
assigned,
solved,
failed
};
class Batch {
private:
int batch_id;
int start_hru;
int num_hru;
int run_time;
int read_time;
int write_time;
double run_time;
double read_time;
double write_time;
caf::actor assigned_actor;
batch_status status;
public:
......@@ -19,9 +27,26 @@ class Batch {
void printBatchInfo();
batch_status getBatchStatus();
int getBatchID();
int getStartHRU();
int getNumHRU();
void solvedBatch();
void assignedBatch();
void updateRunTime(double run_time);
};
class Batch_Manager {
private:
std::vector<Batch> batch_list;
......
#pragma once
#include "caf/all.hpp"
#include "batch_manager.hpp"
class Client {
......@@ -12,8 +11,12 @@ class Client {
bool connected;
caf::actor client_actor;
std::string host_name;
Batch* current_batch;
public:
Client(int id, caf::actor client_actor, std::string host_name);
caf::actor getActor();
};
\ No newline at end of file
......@@ -25,10 +25,11 @@ struct summa_actor_state {
int outputStrucSize;
caf::actor currentJob; // Reference to the current job actor
caf::actor parent;
};
behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, std::string configPath);
behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, std::string configPath, actor parent);
void spawnJob(stateful_actor<summa_actor_state>* self);
......
......@@ -10,6 +10,9 @@ namespace caf {
struct summa_client_state {
strong_actor_ptr current_server;
std::string hostname;
std::string config_path;
actor summa_actor_ref;
int batch_id;
};
behavior summa_client(stateful_actor<summa_client_state>* self);
behavior unconnected(stateful_actor<summa_client_state>*);
......
......@@ -3,6 +3,7 @@
#include "caf/all.hpp"
#include "caf/io/all.hpp"
#include "batch_manager.hpp"
#include "client.hpp"
#include <string>
......@@ -11,13 +12,16 @@ namespace caf {
struct summa_server_state {
int total_hru_count;
int num_clients;
int num_hru_per_batch;
int num_hru_per_batch;
std::string config_path;
std::vector<Batch*> batch_list;
std::vector<Batch> solved_batches;
std::vector<Batch> failed_batches;
std::vector<Client*> client_list;
};
behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path);
int parseSettings(stateful_actor<summa_server_state>* self, std::string config_path);
int assembleBatches(stateful_actor<summa_server_state>* self);
Batch* getUnsolvedBatch(stateful_actor<summa_server_state>* self);
}
\ No newline at end of file
......@@ -255,6 +255,7 @@ SUMMA_CLIENT = $(SOURCE_DIR)/summa_actor/summa_client.cpp
SUMMA_SERVER = $(SOURCE_DIR)/summa_actor/summa_server.cpp
BATCH_MANGER = $(SOURCE_DIR)/summa_actor/batch_manager.cpp
CLIENT_MANAGER = $(SOURCE_DIR)/summa_actor/client.cpp
JOB_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/job_actor
JOB_ACTOR = $(SOURCE_DIR)/job_actor/job_actor.cpp
......@@ -340,7 +341,7 @@ compile_summa_client:
$(CC) $(FLAGS_ACTORS) -c $(SUMMA_CLIENT) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES)
compile_summa_server:
$(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(BATCH_MANGER) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES)
$(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(BATCH_MANGER) $(CLIENT_MANAGER) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES)
compile_main:
$(CC) $(FLAGS_ACTORS) -c $(MAIN) $(GLOBAL_INCLUDES) $(SUMMA_ACTOR_INCLUDES) $(JOB_ACTOR_INCLUDES)
......
......@@ -22,7 +22,7 @@ class config : public actor_system_config {
std::string configPath = "";
bool debugMode = false;
uint16_t port = 4444;
std::string host = "localhost";
std::string host = "cnic-giws-cpu-19001-02";
bool server_mode = false;
bool distributed = false;
......@@ -52,7 +52,7 @@ void run_client(actor_system& system, const config& cfg) {
} else {
aout(self) << "Starting SUMMA in non-distributed mode \n";
auto summa = system.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath);
auto summa = system.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath, self);
}
}
......@@ -105,7 +105,7 @@ void caf_main(actor_system& sys, const config& cfg) {
auto system = cfg.server_mode ? run_server : run_client;
system(sys, cfg);
} else {
auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath);
auto summa = sys.spawn(summa_actor, cfg.startGRU, cfg.countGRU, cfg.configPath, self);
}
// start SUMMA
// auto system = cfg.server_mode ? run_server : run_client;
......
......@@ -6,10 +6,39 @@ Batch::Batch(int batch_id, int start_hru, int num_hru) {
this->batch_id = batch_id;
this->start_hru = start_hru;
this->num_hru = num_hru;
this->status = unassigned;
}
void Batch::printBatchInfo() {
std::cout << "batch_id: " << this->batch_id << "\n";
std::cout << "start_hru: " << this->start_hru << "\n";
std::cout << "num_hru: " << this->num_hru << "\n";
}
\ No newline at end of file
}
batch_status Batch::getBatchStatus() {
return this->status;
}
int Batch::getBatchID() {
return this->batch_id;
}
int Batch::getStartHRU() {
return this->start_hru;
}
int Batch::getNumHRU() {
return this->num_hru;
}
void Batch::solvedBatch() {
this->status = solved;
}
void Batch::assignedBatch() {
this->status = assigned;
}
void Batch::updateRunTime(double run_time) {
this->run_time = run_time;
}
#include "caf/all.hpp"
#include "client.hpp"
Client::Client(int id, caf::actor client_actor, std::string host_name) {
this->id = id;
this->client_actor = client_actor;
this->host_name = host_name;
this->connected = true;
}
caf::actor Client::getActor() {
return this->client_actor;
}
\ No newline at end of file
......@@ -14,12 +14,13 @@ using json = nlohmann::json;
namespace caf {
behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, std::string configPath) {
behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int numGRU, std::string configPath, actor parent) {
self->state.start = std::chrono::high_resolution_clock::now();
// Set Variables
self->state.startGRU = startGRU;
self->state.numGRU = numGRU;
self->state.configPath = configPath;
self->state.parent = parent;
parseSettings(self, configPath);
aout(self) << "SETTINGS FOR SUMMA_ACTOR\n";
......@@ -45,9 +46,14 @@ behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int
aout(self) << " " << self->state.duration / 1000 << " Seconds\n";
aout(self) << " " << (self->state.duration / 1000) / 60 << " Minutes\n";
aout(self) << " " << ((self->state.duration / 1000) / 60) / 60 << " Hours\n";
aout(self) << "Program Finished \n";
self->send(self->state.parent, done_batch_v, self->state.duration);
} else {
// spawn a new job
spawnJob(self);
......
......@@ -2,6 +2,7 @@
#include "caf/io/all.hpp"
#include "summa_client.hpp"
#include "summa_actor.hpp"
#include "message_atoms.hpp"
#include <unistd.h>
......@@ -70,9 +71,22 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a
self->send(server_actor, connect_to_server_v, self, self->state.hostname);
return {
[=](std::string test) {
aout(self) << test << std::endl;
[=](batch, int batch_id, int start_hru, int num_hru, std::string config_path) {
aout(self) << "Received batch to compute" << std::endl;
self->state.batch_id = batch_id;
self->state.summa_actor_ref = self->spawn(summa_actor, start_hru, num_hru, config_path, self);
},
[=](done_batch, double duration) {
aout(self) << "summa_actor has finished, sending message to the server for another batch\n";
self->send(server_actor, done_batch_v, self, duration, self->state.batch_id);
},
[=](time_to_exit) {
aout(self) << "Client Exiting\n";
self->quit();
}
};
}
}
\ No newline at end of file
......@@ -14,7 +14,7 @@ namespace caf {
behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path) {
aout(self) << "Summa Server has Started \n";
self->state.config_path = config_path;
if (parseSettings(self, config_path) == -1) {
aout(self) << "ERROR WITH JSON SETTINGS FILE!!\n";
aout(self) << "Summa_Server_Actor Exiting\n";
......@@ -40,8 +40,35 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf
return {
[=](connect_to_server, actor client, std::string hostname) {
aout(self) << "Actor Trying to connect with hostname " << hostname << "\n";
aout(self) << "Actor trying to connect with hostname " << hostname << "\n";
int client_id = self->state.client_list.size(); // So we can lookup the client in O(1) time
self->state.client_list.push_back(new Client(client_id, client, hostname));
Batch *batch_to_send = getUnsolvedBatch(self);
batch_to_send->assignedBatch();
self->send(client, batch_v, batch_to_send->getBatchID(), batch_to_send->getStartHRU(),
batch_to_send->getNumHRU(), self->state.config_path);
},
[=](done_batch, actor client, double duration, int batch_id) {
aout(self) << "Client has Solved Batch " << batch_id << "\n";
self->state.batch_list[batch_id]->solvedBatch();
Batch *batch_to_send = getUnsolvedBatch(self);
if (batch_to_send == NULL) {
aout(self) << "We Are Done - Telling Clients to exit \n";
for (int i = 0; i < self->state.client_list.size(); i++) {
self->send(self->state.client_list[i]->getActor(), time_to_exit_v);
}
} else {
self->send(client, batch_v, batch_to_send->getBatchID(), batch_to_send->getStartHRU(),
batch_to_send->getNumHRU(), self->state.config_path);
}
}
};
}
......@@ -102,4 +129,17 @@ int assembleBatches(stateful_actor<summa_server_state>* self) {
return 0;
}
Batch* getUnsolvedBatch(stateful_actor<summa_server_state>* self) {
// Find the first unassigned batch
for (int i = 0; i < self->state.batch_list.size(); i++) {
if (self->state.batch_list[i]->getBatchStatus() == unassigned) {
return self->state.batch_list[i];
}
}
return NULL;
}
} // end namespace
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment