From 1406d5c63fe1a44fba1999e1382162f64a9bed20 Mon Sep 17 00:00:00 2001 From: KyleKlenk <kyle.c.klenk@gmail.com> Date: Tue, 12 Jul 2022 13:42:12 -0600 Subject: [PATCH] Works with one client and transfers information about the host and run_times --- build/includes/global/global.hpp | 8 +++ build/includes/summa_actor/batch_manager.hpp | 11 +-- build/includes/summa_actor/client.hpp | 8 ++- build/includes/summa_actor/summa_client.hpp | 1 + build/includes/summa_actor/summa_server.hpp | 7 +- .../file_access_actor/file_access_actor.cpp | 3 - build/source/actors/main.cpp | 4 -- .../actors/summa_actor/batch_manager.cpp | 9 ++- build/source/actors/summa_actor/client.cpp | 12 +++- .../source/actors/summa_actor/summa_actor.cpp | 17 +++-- .../actors/summa_actor/summa_client.cpp | 8 ++- .../actors/summa_actor/summa_server.cpp | 72 +++++++++++-------- 12 files changed, 103 insertions(+), 57 deletions(-) diff --git a/build/includes/global/global.hpp b/build/includes/global/global.hpp index eb6835a..69b695b 100644 --- a/build/includes/global/global.hpp +++ b/build/includes/global/global.hpp @@ -3,6 +3,7 @@ #include <chrono> #include <optional> #include <iostream> +#include <vector> #include <bits/stdc++.h> #include <unistd.h> #include "json.hpp" @@ -11,7 +12,14 @@ using json = nlohmann::json; extern bool debug; +template<typename T> +int getSettingsTest(std::vector<std::string> keys, T return_value) { + for (std::vector<int>::size_type i = 0; i < keys.size(); i++) { + std::cout<< keys[i] << std::endl; + } + return 0; +} /** * Return the time between to time points diff --git a/build/includes/summa_actor/batch_manager.hpp b/build/includes/summa_actor/batch_manager.hpp index 884f2c6..2e8f46d 100644 --- a/build/includes/summa_actor/batch_manager.hpp +++ b/build/includes/summa_actor/batch_manager.hpp @@ -1,6 +1,7 @@ #pragma once #include "caf/all.hpp" #include <vector> +#include <string> enum batch_status { @@ -18,6 +19,7 @@ class Batch { double run_time; double read_time; double write_time; + std::string assigned_host; caf::actor assigned_actor; batch_status status; @@ -35,9 +37,9 @@ class Batch { int getNumHRU(); - void solvedBatch(); + void solvedBatch(double run_time, double read_time, double write_time); - void assignedBatch(); + void assignedBatch(std::string hostname, caf::actor actor_ref); void updateRunTime(double run_time); }; @@ -46,9 +48,8 @@ class Batch { class Batch_Manager { private: std::vector<Batch> batch_list; - - - + std::vector<Batch> solved_batches; + std::vector<Batch> failed_batches; public: }; \ No newline at end of file diff --git a/build/includes/summa_actor/client.hpp b/build/includes/summa_actor/client.hpp index 310fcb4..29e0a70 100644 --- a/build/includes/summa_actor/client.hpp +++ b/build/includes/summa_actor/client.hpp @@ -10,13 +10,17 @@ class Client { int batches_solved; bool connected; caf::actor client_actor; - std::string host_name; + std::string hostname; Batch* current_batch; public: - Client(int id, caf::actor client_actor, std::string host_name); + Client(int id, caf::actor client_actor, std::string hostname); caf::actor getActor(); + int getID(); + + std::string getHostname(); + }; \ No newline at end of file diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp index f18a25d..9210a7b 100644 --- a/build/includes/summa_actor/summa_client.hpp +++ b/build/includes/summa_actor/summa_client.hpp @@ -13,6 +13,7 @@ struct summa_client_state { std::string config_path; actor summa_actor_ref; int batch_id; + int client_id; // id held by server }; behavior summa_client(stateful_actor<summa_client_state>* self); behavior unconnected(stateful_actor<summa_client_state>*); diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index 1bd7b9c..83d4fee 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -5,6 +5,7 @@ #include "batch_manager.hpp" #include "client.hpp" #include <string> +#include <optional> namespace caf { @@ -16,13 +17,13 @@ struct summa_server_state { int batches_remaining = 0; int batches_solved = 0; std::string config_path; - std::vector<Batch*> batch_list; + std::vector<Batch> batch_list; std::vector<Batch> solved_batches; std::vector<Batch> failed_batches; - std::vector<Client*> client_list; + std::vector<Client> client_list; }; behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path); int assembleBatches(stateful_actor<summa_server_state>* self); -Batch* getUnsolvedBatch(stateful_actor<summa_server_state>* self); +std::optional<Batch> getUnsolvedBatch(stateful_actor<summa_server_state>* self); } \ No newline at end of file diff --git a/build/source/actors/file_access_actor/file_access_actor.cpp b/build/source/actors/file_access_actor/file_access_actor.cpp index 7d0033d..dad7a63 100644 --- a/build/source/actors/file_access_actor/file_access_actor.cpp +++ b/build/source/actors/file_access_actor/file_access_actor.cpp @@ -116,7 +116,6 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU } }, - [=](write_output, int indxGRU, int indxHRU, int numStepsToWrite, caf::actor refToRespondTo) { int err; @@ -126,8 +125,6 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU if (err != 0) { aout(self) << "FILE_ACCESS_ACTOR - ERROR Writing Output \n"; } - - }, [=](read_and_write, int indxGRU, int indxHRU, int numStepsToWrite, int currentFile, diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index 1aa2a47..046d503 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -75,10 +75,6 @@ void run_server(actor_system& system, const config& cfg) { return; } aout(self) << "Successfully Published summa_server_actor on port " << *is_port << "\n"; - std::string dummy; - std::getline(std::cin, dummy); - std::cout << "...cya" << std::endl; - anon_send_exit(server, exit_reason::user_shutdown); } diff --git a/build/source/actors/summa_actor/batch_manager.cpp b/build/source/actors/summa_actor/batch_manager.cpp index aa777d5..5fa2677 100644 --- a/build/source/actors/summa_actor/batch_manager.cpp +++ b/build/source/actors/summa_actor/batch_manager.cpp @@ -33,12 +33,17 @@ int Batch::getNumHRU() { return this->num_hru; } -void Batch::solvedBatch() { +void Batch::solvedBatch(double run_time, double read_time, double write_time) { this->status = solved; + this->run_time = run_time; + this->read_time = read_time; + this->write_time = write_time; } -void Batch::assignedBatch() { +void Batch::assignedBatch(std::string hostname, caf::actor actor_ref) { this->status = assigned; + this->assigned_host = hostname; + this->assigned_actor = actor_ref; } void Batch::updateRunTime(double run_time) { diff --git a/build/source/actors/summa_actor/client.cpp b/build/source/actors/summa_actor/client.cpp index 078f316..bc5faa1 100644 --- a/build/source/actors/summa_actor/client.cpp +++ b/build/source/actors/summa_actor/client.cpp @@ -2,14 +2,22 @@ #include "client.hpp" -Client::Client(int id, caf::actor client_actor, std::string host_name) { +Client::Client(int id, caf::actor client_actor, std::string hostname) { this->id = id; this->client_actor = client_actor; - this->host_name = host_name; + this->hostname = hostname; this->connected = true; } caf::actor Client::getActor() { return this->client_actor; +} + +int Client::getID() { + return this->id; +} + +std::string Client::getHostname() { + return this->hostname; } \ No newline at end of file diff --git a/build/source/actors/summa_actor/summa_actor.cpp b/build/source/actors/summa_actor/summa_actor.cpp index 7f941ab..a62e5fb 100644 --- a/build/source/actors/summa_actor/summa_actor.cpp +++ b/build/source/actors/summa_actor/summa_actor.cpp @@ -54,20 +54,27 @@ behavior summa_actor(stateful_actor<summa_actor_state>* self, int startGRU, int aout(self) << "Job Read Duration = " << self->state.timing_info_for_jobs.job_read_duration[i] << "\n"; aout(self) << "Job Write Duration = " << self->state.timing_info_for_jobs.job_write_duration[i] << "\n"; } + + // TODO: Output CSV file for finished jobs aout(self) << "\n________________SUMMA_ACTOR TIMING INFO________________\n"; aout(self) << "Total Duration = " << self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0) << " Seconds\n"; aout(self) << "Total Duration = " << self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0) / 60 << " Minutes\n"; aout(self) << "Total Duration = " << (self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0) / 60) / 60 << " Hours\n\n"; - aout(self) << "Total Read Duration = " << std::accumulate(self->state.timing_info_for_jobs.job_read_duration.begin(), + double total_read_duration = std::accumulate(self->state.timing_info_for_jobs.job_read_duration.begin(), self->state.timing_info_for_jobs.job_read_duration.end(), - 0.0) << "Seconds \n"; - aout(self) << "Total Write Duration = " << std::accumulate(self->state.timing_info_for_jobs.job_write_duration.begin(), + 0.0); + aout(self) << "Total Read Duration = " << total_read_duration << "Seconds \n"; + double total_write_duration = std::accumulate(self->state.timing_info_for_jobs.job_write_duration.begin(), self->state.timing_info_for_jobs.job_write_duration.end(), - 0.0) << "Seconds \n"; + 0.0); + aout(self) << "Total Write Duration = " << total_write_duration << "Seconds \n"; aout(self) << "Program Finished \n"; - self->send(self->state.parent, done_batch_v, self->state.summa_actor_timing.getDuration("total duration").value_or(-1.0)); + self->send(self->state.parent, done_batch_v, + self->state.summa_actor_timing.getDuration("total_duration").value_or(-1.0), + total_read_duration, + total_write_duration); } else { // spawn a new job diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index b192b72..1ef8231 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -72,15 +72,17 @@ behavior running(stateful_actor<summa_client_state>* self, const actor& server_a self->send(server_actor, connect_to_server_v, self, self->state.hostname); return { - [=](batch, int batch_id, int start_hru, int num_hru, std::string config_path) { + [=](batch, int client_id, int batch_id, int start_hru, int num_hru, std::string config_path) { aout(self) << "Received batch to compute" << std::endl; + self->state.client_id = client_id; self->state.batch_id = batch_id; self->state.summa_actor_ref = self->spawn(summa_actor, start_hru, num_hru, config_path, self); }, - [=](done_batch, double duration) { + [=](done_batch, double total_duration, double total_read_duration, double total_write_duration) { aout(self) << "summa_actor has finished, sending message to the server for another batch\n"; - self->send(server_actor, done_batch_v, self, duration, self->state.batch_id); + self->send(server_actor, done_batch_v, self, self->state.client_id, self->state.batch_id, + total_duration, total_read_duration, total_write_duration); }, [=](time_to_exit) { diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index d696941..41204bd 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -5,14 +5,18 @@ #include "summa_server.hpp" #include "message_atoms.hpp" #include "global.hpp" +#include <optional> namespace caf { behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path) { aout(self) << "Summa Server has Started \n"; - self->state.config_path = config_path; + std::string returnType; + getSettingsTest(std::vector<std::string> {"test", "test2"} ,returnType); + + self->state.config_path = config_path; self->state.total_hru_count = getSettings(self->state.config_path, "SimulationSettings", "total_hru_count", self->state.total_hru_count).value_or(-1); if (self->state.total_hru_count == -1) { @@ -25,58 +29,71 @@ behavior summa_server(stateful_actor<summa_server_state>* self, std::string conf } aout(self) << "Assembling HRUs into Batches\n"; - if (assembleBatches(self) == -1) { aout(self) << "ERROR: assembleBatches\n"; } else { aout(self) << "HRU Batches Assembled, Ready For Clients to Connect \n"; for (std::vector<int>::size_type i = 0; i < self->state.batch_list.size(); i++) { - self->state.batch_list[i]->printBatchInfo(); + self->state.batch_list[i].printBatchInfo(); } } return { [=](connect_to_server, actor client, std::string hostname) { + // Client is connecting - Add it to our client list and assign it a batch aout(self) << "Actor trying to connect with hostname " << hostname << "\n"; int client_id = self->state.client_list.size(); // So we can lookup the client in O(1) time - self->state.client_list.push_back(new Client(client_id, client, hostname)); - Batch *batch_to_send = getUnsolvedBatch(self); + self->state.client_list.push_back(Client(client_id, client, hostname)); - batch_to_send->assignedBatch(); + std::optional<Batch> batch_to_send = getUnsolvedBatch(self); + if (batch_to_send.has_value()) { + Batch verified_batch = batch_to_send.value(); + verified_batch.assignedBatch(hostname, client); + self->send(client, batch_v, client_id, batch_to_send->getBatchID(), batch_to_send->getStartHRU(), + batch_to_send->getNumHRU(), self->state.config_path); - self->send(client, batch_v, batch_to_send->getBatchID(), batch_to_send->getStartHRU(), - batch_to_send->getNumHRU(), self->state.config_path); + } else { + aout(self) << "We Are Done - Telling Clients to exit \n"; + for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) { + self->send(self->state.client_list[i].getActor(), time_to_exit_v); + } + self->quit(); + return; + } }, - [=](done_batch, actor client, double duration, int batch_id) { - aout(self) << "Client finished batch: " << batch_id << "\n"; - self->state.batch_list[batch_id]->solvedBatch(); + [=](done_batch, actor client, int client_id, int batch_id, double total_duration, + double total_read_duration, double total_write_duration) { + + self->state.batch_list[batch_id].solvedBatch(total_duration, total_read_duration, total_write_duration); self->state.batches_solved++; self->state.batches_remaining = self->state.batch_list.size() - self->state.batches_solved; aout(self) << "\n****************************************\n"; + aout(self) << "Client finished batch: " << batch_id << "\n"; + aout(self) << "Client hostname = " << self->state.client_list[client_id].getHostname() << "\n"; + aout(self) << "Total Batch Duration = " << total_duration << "\n"; + aout(self) << "Total Batch Read Duration = " << total_read_duration << "\n"; + aout(self) << "Total Batch Write Duration = " << total_write_duration << "\n"; aout(self) << "Batches Solved = " << self->state.batches_solved << "\n"; aout(self) << "Batches Remaining = " << self->state.batches_remaining << "\n"; aout(self) << "****************************************\n"; // Find a new batch - Batch *batch_to_send = getUnsolvedBatch(self); - - if (batch_to_send == NULL) { + std::optional<Batch> batch_to_send = getUnsolvedBatch(self); + if (batch_to_send.has_value()) { + Batch verified_batch = batch_to_send.value(); + verified_batch.assignedBatch(self->state.client_list[client_id].getHostname(), client); + self->send(client, batch_v, client_id, batch_to_send->getBatchID(), batch_to_send->getStartHRU(), + batch_to_send->getNumHRU(), self->state.config_path); + } else { aout(self) << "We Are Done - Telling Clients to exit \n"; for (std::vector<int>::size_type i = 0; i < self->state.client_list.size(); i++) { - self->send(self->state.client_list[i]->getActor(), time_to_exit_v); + self->send(self->state.client_list[i].getActor(), time_to_exit_v); } - self->quit(); - return; - - } else { - self->send(client, batch_v, batch_to_send->getBatchID(), batch_to_send->getStartHRU(), - batch_to_send->getNumHRU(), self->state.config_path); } - } }; } @@ -89,11 +106,11 @@ int assembleBatches(stateful_actor<summa_server_state>* self) { while(remaining_hru_to_batch > 0) { if (self->state.num_hru_per_batch > remaining_hru_to_batch) { - self->state.batch_list.push_back(new Batch(count_index, start_hru, + self->state.batch_list.push_back(Batch(count_index, start_hru, remaining_hru_to_batch)); remaining_hru_to_batch = 0; } else { - self->state.batch_list.push_back(new Batch(count_index, start_hru, + self->state.batch_list.push_back(Batch(count_index, start_hru, self->state.num_hru_per_batch)); remaining_hru_to_batch -= self->state.num_hru_per_batch; @@ -101,20 +118,19 @@ int assembleBatches(stateful_actor<summa_server_state>* self) { count_index += 1; } } - return 0; } -Batch* getUnsolvedBatch(stateful_actor<summa_server_state>* self) { +std::optional<Batch> getUnsolvedBatch(stateful_actor<summa_server_state>* self) { // Find the first unassigned batch for (std::vector<int>::size_type i = 0; i < self->state.batch_list.size(); i++) { - if (self->state.batch_list[i]->getBatchStatus() == unassigned) { + if (self->state.batch_list[i].getBatchStatus() == unassigned) { return self->state.batch_list[i]; } } - return NULL; + return {}; } -- GitLab