From 55696d4ad7a4b214b4a0aef66e2ebb560281d120 Mon Sep 17 00:00:00 2001 From: KyleKlenk <kyle.c.klenk@gmail.com> Date: Thu, 23 Jun 2022 16:29:02 -0600 Subject: [PATCH] Summa_client can connect to Summa_server and give it information about itself --- build/includes/global/message_atoms.hpp | 2 + build/includes/summa_actor/batch_manager.hpp | 33 ++++++ build/includes/summa_actor/client.hpp | 19 ++++ build/includes/summa_actor/summa_client.hpp | 3 + build/includes/summa_actor/summa_server.hpp | 15 ++- build/makefile | 6 +- build/source/actors/job_actor/job_actor.cpp | 4 +- build/source/actors/main.cpp | 9 +- .../actors/summa_actor/batch_manager.cpp | 15 +++ build/source/actors/summa_actor/client.cpp | 0 .../actors/summa_actor/summa_client.cpp | 10 +- .../actors/summa_actor/summa_server.cpp | 100 ++++++++++++++++-- 12 files changed, 196 insertions(+), 20 deletions(-) create mode 100644 build/includes/summa_actor/batch_manager.hpp create mode 100644 build/includes/summa_actor/client.hpp create mode 100644 build/source/actors/summa_actor/batch_manager.cpp create mode 100644 build/source/actors/summa_actor/client.cpp diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp index 1b680f9..bad2da8 100644 --- a/build/includes/global/message_atoms.hpp +++ b/build/includes/global/message_atoms.hpp @@ -37,5 +37,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) CAF_ADD_ATOM(summa, start_hru) CAF_ADD_ATOM(summa, file_information) CAF_ADD_ATOM(summa, dt_init_factor) + // Client Actor + CAF_ADD_ATOM(summa, connect_to_server) CAF_END_TYPE_ID_BLOCK(summa) \ No newline at end of file diff --git a/build/includes/summa_actor/batch_manager.hpp b/build/includes/summa_actor/batch_manager.hpp new file mode 100644 index 0000000..30ae38f --- /dev/null +++ b/build/includes/summa_actor/batch_manager.hpp @@ -0,0 +1,33 @@ +#pragma once +#include "caf/all.hpp" +#include <vector> + + +class Batch { + private: + int batch_id; + int start_hru; + int num_hru; + int run_time; + int read_time; + int write_time; + caf::actor assigned_actor; + + + public: + Batch(int batch_id, int start_hru, int num_hru); + + void printBatchInfo(); + +}; + + +class Batch_Manager { + private: + std::vector<Batch> batch_list; + + + + public: + +}; \ No newline at end of file diff --git a/build/includes/summa_actor/client.hpp b/build/includes/summa_actor/client.hpp new file mode 100644 index 0000000..358a18c --- /dev/null +++ b/build/includes/summa_actor/client.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include "caf/all.hpp" + + + + +class Client { + private: + int id; + int batches_solved; + bool connected; + caf::actor client_actor; + std::string host_name; + + + public: + +}; \ No newline at end of file diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp index 28d5b8f..b244c69 100644 --- a/build/includes/summa_actor/summa_client.hpp +++ b/build/includes/summa_actor/summa_client.hpp @@ -3,10 +3,13 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" +#include <string> + namespace caf { struct summa_client_state { strong_actor_ptr current_server; + std::string hostname; }; behavior summa_client(stateful_actor<summa_client_state>* self); behavior unconnected(stateful_actor<summa_client_state>*); diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index bed891e..ee4d85c 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -2,13 +2,22 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" +#include "batch_manager.hpp" +#include <string> + namespace caf { struct summa_server_state { - + int total_hru_count; + int num_clients; + int num_hru_per_batch; + std::vector<Batch*> batch_list; + std::vector<Batch> solved_batches; + std::vector<Batch> failed_batches; }; -behavior summa_server(stateful_actor<summa_server_state>* self); - +behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path); +int parseSettings(stateful_actor<summa_server_state>* self, std::string config_path); +int assembleBatches(stateful_actor<summa_server_state>* self); } \ No newline at end of file diff --git a/build/makefile b/build/makefile index 9c0bace..b61ce06 100644 --- a/build/makefile +++ b/build/makefile @@ -254,6 +254,8 @@ SUMMA_ACTOR = $(SOURCE_DIR)/summa_actor/summa_actor.cpp SUMMA_CLIENT = $(SOURCE_DIR)/summa_actor/summa_client.cpp SUMMA_SERVER = $(SOURCE_DIR)/summa_actor/summa_server.cpp +BATCH_MANGER = $(SOURCE_DIR)/summa_actor/batch_manager.cpp + JOB_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/job_actor JOB_ACTOR = $(SOURCE_DIR)/job_actor/job_actor.cpp GRUinfo = $(SOURCE_DIR)/job_actor/GRUinfo.cpp @@ -335,10 +337,10 @@ compile_summa_actor: $(JOB_ACTOR_INCLUDES) compile_summa_client: - $(CC) $(FLAGS_ACTORS) -c $(SUMMA_CLIENT) $(SUMMA_ACTOR_INCLUDES) + $(CC) $(FLAGS_ACTORS) -c $(SUMMA_CLIENT) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) compile_summa_server: - $(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(SUMMA_ACTOR_INCLUDES) + $(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(BATCH_MANGER) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) compile_main: $(CC) $(FLAGS_ACTORS) -c $(MAIN) $(GLOBAL_INCLUDES) $(SUMMA_ACTOR_INCLUDES) $(JOB_ACTOR_INCLUDES) diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index 241c5dc..0378f1e 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -196,8 +196,8 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU, int parseSettings(stateful_actor<job_state>* self, std::string configPath) { json settings; - std::string SummaActorsSettigs = "/Summa_Actors_Settings.json"; - std::ifstream settings_file(configPath + SummaActorsSettigs); + std::string SummaActorsSettings = "/Summa_Actors_Settings.json"; + std::ifstream settings_file(configPath + SummaActorsSettings); settings_file >> settings; settings_file.close(); diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index da5c5da..3c4643f 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -60,15 +60,14 @@ void run_client(actor_system& system, const config& cfg) { void run_server(actor_system& system, const config& cfg) { scoped_actor self{system}; - auto server = system.spawn(summa_server); - aout(self) << "SEVER" << std::endl; - aout(self) << "Attempting to publish actor" << cfg.port << std::endl; + auto server = system.spawn(summa_server, cfg.configPath); + aout(self) << "Attempting to publish summa_server_actor" << cfg.port << std::endl; auto is_port = io::publish(server, cfg.port); if (!is_port) { - std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << std::endl; + std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << "\n"; return; } - aout(self) << "Successfully Published" << *is_port << std::endl; + aout(self) << "Successfully Published summa_server_actor on port " << *is_port << "\n"; std::string dummy; std::getline(std::cin, dummy); std::cout << "...cya" << std::endl; diff --git a/build/source/actors/summa_actor/batch_manager.cpp b/build/source/actors/summa_actor/batch_manager.cpp new file mode 100644 index 0000000..495a0f0 --- /dev/null +++ b/build/source/actors/summa_actor/batch_manager.cpp @@ -0,0 +1,15 @@ +#include "caf/all.hpp" +#include <vector> +#include "batch_manager.hpp" + +Batch::Batch(int batch_id, int start_hru, int num_hru) { + this->batch_id = batch_id; + this->start_hru = start_hru; + this->num_hru = num_hru; +} + +void Batch::printBatchInfo() { + std::cout << "batch_id: " << this->batch_id << "\n"; + std::cout << "start_hru: " << this->start_hru << "\n"; + std::cout << "num_hru: " << this->num_hru << "\n"; +} \ No newline at end of file diff --git a/build/source/actors/summa_actor/client.cpp b/build/source/actors/summa_actor/client.cpp new file mode 100644 index 0000000..e69de29 diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index b859680..2ec1f4a 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -2,6 +2,10 @@ #include "caf/io/all.hpp" #include "summa_client.hpp" +#include "message_atoms.hpp" + +#include <unistd.h> +#include <limits.h> namespace caf { @@ -59,8 +63,12 @@ void connecting(stateful_actor<summa_client_state>* self, const std::string& hos } behavior running(stateful_actor<summa_client_state>* self, const actor& server_actor) { + char host[HOST_NAME_MAX]; aout(self) << "Client Has Started Successfully" << std::endl; - self->send(server_actor, 80); + gethostname(host, 1024); + self->state.hostname = host; + + self->send(server_actor, connect_to_server_v, self, self->state.hostname); return { [=](std::string test) { aout(self) << test << std::endl; diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index 2c2d837..137b6d5 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -1,19 +1,105 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" - +#include "json.hpp" +#include <string> +#include "batch_manager.hpp" #include "summa_server.hpp" +#include "message_atoms.hpp" + + +using json = nlohmann::json; + namespace caf { -behavior summa_server(stateful_actor<summa_server_state>* self) { - aout(self) << "Summa Server has Started \n"; +behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path) { + aout(self) << "Summa Server has Started \n"; - return { - [=](int i) { - aout(self) << "Received " << i << "\n"; - return "Received"; + if (parseSettings(self, config_path) == -1) { + aout(self) << "ERROR WITH JSON SETTINGS FILE!!\n"; + aout(self) << "Summa_Server_Actor Exiting\n"; + self->quit(); + } else { + aout(self) << "-------------------------------------\n"; + aout(self) << "-----Summa_Server_Actor_Settings-----\n"; + aout(self) << "Total HRUs to compute = " << self->state.total_hru_count << "\n"; + aout(self) << "Number of HRUs per batch = " << self->state.num_hru_per_batch << "\n"; + aout(self) << "-------------------------------------\n"; + } + + aout(self) << "Assembling HRUs into Batches\n"; + if (assembleBatches(self) == -1) { + aout(self) << "ERROR: assembleBatches\n"; + } else { + aout(self) << "HRU Batches Assembled, Ready For Clients to Connect \n"; + + for (int i = 0; i < self->state.batch_list.size(); i++) { + self->state.batch_list[i]->printBatchInfo(); } + } + return { + [=](connect_to_server, actor client, std::string hostname) { + aout(self) << "Actor Trying to connect with hostname " << hostname << "\n"; + }, }; } + +int parseSettings(stateful_actor<summa_server_state>* self, std::string config_path) { + json settings; + std::string SummaActorsSettings = "/Summa_Actors_Settings.json"; + std::ifstream settings_file(config_path + SummaActorsSettings); + settings_file >> settings; + settings_file.close(); + + if (settings.find("SimulationSettings") != settings.end()) { + json simulation_settings = settings["SimulationSettings"]; + + if (simulation_settings.find("total_hru_count") != simulation_settings.end()) { + self->state.total_hru_count = simulation_settings["total_hru_count"]; + } else { + aout(self) << "ERROR Finding Total HRU Count - Exiting because the number " << + "of HRUs to compute is unknown\n"; + return -1; + } + + if (simulation_settings.find("num_hru_per_batch") != simulation_settings.end()) { + self->state.num_hru_per_batch = simulation_settings["num_hru_per_batch"]; + } else { + aout(self) << "ERROR Finding initial value for the number of HRUs in a batch " << + "setting num_hru_per_batch to default value of 500\n"; + self->state.num_hru_per_batch = 500; + } + + return 0; + + } else { + aout(self) << "Error Finding SimulationSettings in JSON File - Exiting as Num"; + return -1; + } +} + +int assembleBatches(stateful_actor<summa_server_state>* self) { + int remaining_hru_to_batch = self->state.total_hru_count; + int count_index = 0; // this is like the offset for slurm bash scripts + int start_hru = 1; + + while(remaining_hru_to_batch > 0) { + if (self->state.num_hru_per_batch > remaining_hru_to_batch) { + self->state.batch_list.push_back(new Batch(count_index, start_hru, + remaining_hru_to_batch)); + remaining_hru_to_batch = 0; + } else { + self->state.batch_list.push_back(new Batch(count_index, start_hru, + self->state.num_hru_per_batch)); + + remaining_hru_to_batch -= self->state.num_hru_per_batch; + start_hru += self->state.num_hru_per_batch; + count_index += 1; + } + } + + return 0; } + +} // end namespace -- GitLab