diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp index 1b680f98d743f35a13bff70174d31e1507f74240..bad2da82e0ee199e90269127c561c2c1d9b530d2 100644 --- a/build/includes/global/message_atoms.hpp +++ b/build/includes/global/message_atoms.hpp @@ -37,5 +37,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) CAF_ADD_ATOM(summa, start_hru) CAF_ADD_ATOM(summa, file_information) CAF_ADD_ATOM(summa, dt_init_factor) + // Client Actor + CAF_ADD_ATOM(summa, connect_to_server) CAF_END_TYPE_ID_BLOCK(summa) \ No newline at end of file diff --git a/build/includes/summa_actor/batch_manager.hpp b/build/includes/summa_actor/batch_manager.hpp new file mode 100644 index 0000000000000000000000000000000000000000..30ae38f28b5779f6fc2a7f66a273359236b6f108 --- /dev/null +++ b/build/includes/summa_actor/batch_manager.hpp @@ -0,0 +1,33 @@ +#pragma once +#include "caf/all.hpp" +#include <vector> + + +class Batch { + private: + int batch_id; + int start_hru; + int num_hru; + int run_time; + int read_time; + int write_time; + caf::actor assigned_actor; + + + public: + Batch(int batch_id, int start_hru, int num_hru); + + void printBatchInfo(); + +}; + + +class Batch_Manager { + private: + std::vector<Batch> batch_list; + + + + public: + +}; \ No newline at end of file diff --git a/build/includes/summa_actor/client.hpp b/build/includes/summa_actor/client.hpp new file mode 100644 index 0000000000000000000000000000000000000000..358a18c859624e0dcfac0125617eb07c19b14412 --- /dev/null +++ b/build/includes/summa_actor/client.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include "caf/all.hpp" + + + + +class Client { + private: + int id; + int batches_solved; + bool connected; + caf::actor client_actor; + std::string host_name; + + + public: + +}; \ No newline at end of file diff --git a/build/includes/summa_actor/summa_client.hpp b/build/includes/summa_actor/summa_client.hpp index 28d5b8fa8082d67028e02a70161fcdafc56d16a4..b244c690a5a40bbc705c9300ffaba1c7bbc54619 100644 --- a/build/includes/summa_actor/summa_client.hpp +++ b/build/includes/summa_actor/summa_client.hpp @@ -3,10 +3,13 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" +#include <string> + namespace caf { struct summa_client_state { strong_actor_ptr current_server; + std::string hostname; }; behavior summa_client(stateful_actor<summa_client_state>* self); behavior unconnected(stateful_actor<summa_client_state>*); diff --git a/build/includes/summa_actor/summa_server.hpp b/build/includes/summa_actor/summa_server.hpp index bed891e260fa9eb8696842c5694a98eab93502e5..ee4d85cd7d40732aa9c252757981711866cdc971 100644 --- a/build/includes/summa_actor/summa_server.hpp +++ b/build/includes/summa_actor/summa_server.hpp @@ -2,13 +2,22 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" +#include "batch_manager.hpp" +#include <string> + namespace caf { struct summa_server_state { - + int total_hru_count; + int num_clients; + int num_hru_per_batch; + std::vector<Batch*> batch_list; + std::vector<Batch> solved_batches; + std::vector<Batch> failed_batches; }; -behavior summa_server(stateful_actor<summa_server_state>* self); - +behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path); +int parseSettings(stateful_actor<summa_server_state>* self, std::string config_path); +int assembleBatches(stateful_actor<summa_server_state>* self); } \ No newline at end of file diff --git a/build/makefile b/build/makefile index 9c0bace472c477e5366c86c64fbad77b93ae4ef3..b61ce06d48c1ea5d6648d2691b4d9e27d40da2dc 100644 --- a/build/makefile +++ b/build/makefile @@ -254,6 +254,8 @@ SUMMA_ACTOR = $(SOURCE_DIR)/summa_actor/summa_actor.cpp SUMMA_CLIENT = $(SOURCE_DIR)/summa_actor/summa_client.cpp SUMMA_SERVER = $(SOURCE_DIR)/summa_actor/summa_server.cpp +BATCH_MANGER = $(SOURCE_DIR)/summa_actor/batch_manager.cpp + JOB_ACTOR_INCLUDES = -I$(INCLUDE_DIR)/job_actor JOB_ACTOR = $(SOURCE_DIR)/job_actor/job_actor.cpp GRUinfo = $(SOURCE_DIR)/job_actor/GRUinfo.cpp @@ -335,10 +337,10 @@ compile_summa_actor: $(JOB_ACTOR_INCLUDES) compile_summa_client: - $(CC) $(FLAGS_ACTORS) -c $(SUMMA_CLIENT) $(SUMMA_ACTOR_INCLUDES) + $(CC) $(FLAGS_ACTORS) -c $(SUMMA_CLIENT) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) compile_summa_server: - $(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(SUMMA_ACTOR_INCLUDES) + $(CC) $(FLAGS_ACTORS) -c $(SUMMA_SERVER) $(BATCH_MANGER) $(SUMMA_ACTOR_INCLUDES) $(GLOBAL_INCLUDES) compile_main: $(CC) $(FLAGS_ACTORS) -c $(MAIN) $(GLOBAL_INCLUDES) $(SUMMA_ACTOR_INCLUDES) $(JOB_ACTOR_INCLUDES) diff --git a/build/source/actors/job_actor/job_actor.cpp b/build/source/actors/job_actor/job_actor.cpp index 241c5dcff25aec0a82cdf536abc425de10be090d..0378f1e53995372f4cc586b34ac837762dedfc1b 100644 --- a/build/source/actors/job_actor/job_actor.cpp +++ b/build/source/actors/job_actor/job_actor.cpp @@ -196,8 +196,8 @@ behavior job_actor(stateful_actor<job_state>* self, int startGRU, int numGRU, int parseSettings(stateful_actor<job_state>* self, std::string configPath) { json settings; - std::string SummaActorsSettigs = "/Summa_Actors_Settings.json"; - std::ifstream settings_file(configPath + SummaActorsSettigs); + std::string SummaActorsSettings = "/Summa_Actors_Settings.json"; + std::ifstream settings_file(configPath + SummaActorsSettings); settings_file >> settings; settings_file.close(); diff --git a/build/source/actors/main.cpp b/build/source/actors/main.cpp index da5c5daf07a7aabe1bcaac1aabbc7dc499a059bc..3c4643fc653d343e96540e9efc59f9db0f05dba2 100644 --- a/build/source/actors/main.cpp +++ b/build/source/actors/main.cpp @@ -60,15 +60,14 @@ void run_client(actor_system& system, const config& cfg) { void run_server(actor_system& system, const config& cfg) { scoped_actor self{system}; - auto server = system.spawn(summa_server); - aout(self) << "SEVER" << std::endl; - aout(self) << "Attempting to publish actor" << cfg.port << std::endl; + auto server = system.spawn(summa_server, cfg.configPath); + aout(self) << "Attempting to publish summa_server_actor" << cfg.port << std::endl; auto is_port = io::publish(server, cfg.port); if (!is_port) { - std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << std::endl; + std::cerr << "********PUBLISH FAILED*******" << to_string(is_port.error()) << "\n"; return; } - aout(self) << "Successfully Published" << *is_port << std::endl; + aout(self) << "Successfully Published summa_server_actor on port " << *is_port << "\n"; std::string dummy; std::getline(std::cin, dummy); std::cout << "...cya" << std::endl; diff --git a/build/source/actors/summa_actor/batch_manager.cpp b/build/source/actors/summa_actor/batch_manager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..495a0f029439c24e7c275d69abb8f25ec81e8b10 --- /dev/null +++ b/build/source/actors/summa_actor/batch_manager.cpp @@ -0,0 +1,15 @@ +#include "caf/all.hpp" +#include <vector> +#include "batch_manager.hpp" + +Batch::Batch(int batch_id, int start_hru, int num_hru) { + this->batch_id = batch_id; + this->start_hru = start_hru; + this->num_hru = num_hru; +} + +void Batch::printBatchInfo() { + std::cout << "batch_id: " << this->batch_id << "\n"; + std::cout << "start_hru: " << this->start_hru << "\n"; + std::cout << "num_hru: " << this->num_hru << "\n"; +} \ No newline at end of file diff --git a/build/source/actors/summa_actor/client.cpp b/build/source/actors/summa_actor/client.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/build/source/actors/summa_actor/summa_client.cpp b/build/source/actors/summa_actor/summa_client.cpp index b85968041bcde6809fcb8ce19de2ce2293bf0e11..2ec1f4a2b91ed687be5e17764d2d7db5ef0e8e3b 100644 --- a/build/source/actors/summa_actor/summa_client.cpp +++ b/build/source/actors/summa_actor/summa_client.cpp @@ -2,6 +2,10 @@ #include "caf/io/all.hpp" #include "summa_client.hpp" +#include "message_atoms.hpp" + +#include <unistd.h> +#include <limits.h> namespace caf { @@ -59,8 +63,12 @@ void connecting(stateful_actor<summa_client_state>* self, const std::string& hos } behavior running(stateful_actor<summa_client_state>* self, const actor& server_actor) { + char host[HOST_NAME_MAX]; aout(self) << "Client Has Started Successfully" << std::endl; - self->send(server_actor, 80); + gethostname(host, 1024); + self->state.hostname = host; + + self->send(server_actor, connect_to_server_v, self, self->state.hostname); return { [=](std::string test) { aout(self) << test << std::endl; diff --git a/build/source/actors/summa_actor/summa_server.cpp b/build/source/actors/summa_actor/summa_server.cpp index 2c2d8371e1676a98139c5452903c177665e2ff97..137b6d56fb5aaaeb88ecf0f42bb626a821324112 100644 --- a/build/source/actors/summa_actor/summa_server.cpp +++ b/build/source/actors/summa_actor/summa_server.cpp @@ -1,19 +1,105 @@ #include "caf/all.hpp" #include "caf/io/all.hpp" - +#include "json.hpp" +#include <string> +#include "batch_manager.hpp" #include "summa_server.hpp" +#include "message_atoms.hpp" + + +using json = nlohmann::json; + namespace caf { -behavior summa_server(stateful_actor<summa_server_state>* self) { - aout(self) << "Summa Server has Started \n"; +behavior summa_server(stateful_actor<summa_server_state>* self, std::string config_path) { + aout(self) << "Summa Server has Started \n"; - return { - [=](int i) { - aout(self) << "Received " << i << "\n"; - return "Received"; + if (parseSettings(self, config_path) == -1) { + aout(self) << "ERROR WITH JSON SETTINGS FILE!!\n"; + aout(self) << "Summa_Server_Actor Exiting\n"; + self->quit(); + } else { + aout(self) << "-------------------------------------\n"; + aout(self) << "-----Summa_Server_Actor_Settings-----\n"; + aout(self) << "Total HRUs to compute = " << self->state.total_hru_count << "\n"; + aout(self) << "Number of HRUs per batch = " << self->state.num_hru_per_batch << "\n"; + aout(self) << "-------------------------------------\n"; + } + + aout(self) << "Assembling HRUs into Batches\n"; + if (assembleBatches(self) == -1) { + aout(self) << "ERROR: assembleBatches\n"; + } else { + aout(self) << "HRU Batches Assembled, Ready For Clients to Connect \n"; + + for (int i = 0; i < self->state.batch_list.size(); i++) { + self->state.batch_list[i]->printBatchInfo(); } + } + return { + [=](connect_to_server, actor client, std::string hostname) { + aout(self) << "Actor Trying to connect with hostname " << hostname << "\n"; + }, }; } + +int parseSettings(stateful_actor<summa_server_state>* self, std::string config_path) { + json settings; + std::string SummaActorsSettings = "/Summa_Actors_Settings.json"; + std::ifstream settings_file(config_path + SummaActorsSettings); + settings_file >> settings; + settings_file.close(); + + if (settings.find("SimulationSettings") != settings.end()) { + json simulation_settings = settings["SimulationSettings"]; + + if (simulation_settings.find("total_hru_count") != simulation_settings.end()) { + self->state.total_hru_count = simulation_settings["total_hru_count"]; + } else { + aout(self) << "ERROR Finding Total HRU Count - Exiting because the number " << + "of HRUs to compute is unknown\n"; + return -1; + } + + if (simulation_settings.find("num_hru_per_batch") != simulation_settings.end()) { + self->state.num_hru_per_batch = simulation_settings["num_hru_per_batch"]; + } else { + aout(self) << "ERROR Finding initial value for the number of HRUs in a batch " << + "setting num_hru_per_batch to default value of 500\n"; + self->state.num_hru_per_batch = 500; + } + + return 0; + + } else { + aout(self) << "Error Finding SimulationSettings in JSON File - Exiting as Num"; + return -1; + } +} + +int assembleBatches(stateful_actor<summa_server_state>* self) { + int remaining_hru_to_batch = self->state.total_hru_count; + int count_index = 0; // this is like the offset for slurm bash scripts + int start_hru = 1; + + while(remaining_hru_to_batch > 0) { + if (self->state.num_hru_per_batch > remaining_hru_to_batch) { + self->state.batch_list.push_back(new Batch(count_index, start_hru, + remaining_hru_to_batch)); + remaining_hru_to_batch = 0; + } else { + self->state.batch_list.push_back(new Batch(count_index, start_hru, + self->state.num_hru_per_batch)); + + remaining_hru_to_batch -= self->state.num_hru_per_batch; + start_hru += self->state.num_hru_per_batch; + count_index += 1; + } + } + + return 0; } + +} // end namespace