diff --git a/build/includes/file_access_actor/file_access_actor.hpp b/build/includes/file_access_actor/file_access_actor.hpp index 2c87019f702addc487659ef5a230acef2ac41b24..fb30ddd5fa1ef015380784453f2ce255bdeddf9c 100644 --- a/build/includes/file_access_actor/file_access_actor.hpp +++ b/build/includes/file_access_actor/file_access_actor.hpp @@ -8,13 +8,18 @@ #include "fortran_data_types.hpp" #include "auxilary.hpp" #include "global.hpp" +#include "message_atoms.hpp" +#include "forcing_file_info.hpp" +#include "json.hpp" + /********************************************* * File Access Actor Fortran Functions *********************************************/ extern "C" { void defOutputFortran(void* handle_ncid, int* start_gru, int* num_gru, - int* num_hru, int* file_gru, int* err); + int* num_hru, int* file_gru, bool* use_extention, + char const* output_extention, int* err); void fileAccessActor_init_fortran(void* handle_forcing_file_info, int* num_forcing_files, int* num_timesteps, @@ -38,45 +43,51 @@ extern "C" { namespace caf { struct file_access_state { - // Variables set on Spawn - caf::actor parent; - int start_gru; - int num_gru; + // Variables set on Spawn + caf::actor parent; + int start_gru; + int num_gru_local; + int num_gru_global; // For distributed jobs + + NumGRUInfo num_gru_info; + - void *handle_forcing_file_info = new_handle_file_info(); // Handle for the forcing file information - void *handle_ncid = new_handle_var_i(); // output file ids - int num_vectors_in_output_manager; - int num_steps; - int stepsInCurrentFile; - int numFiles; - int filesLoaded; - int err; - int num_output_steps; + void *handle_forcing_file_info = new_handle_file_info(); // Handle for the forcing file information + void *handle_ncid = new_handle_var_i(); // output file ids + int num_vectors_in_output_manager; + int num_steps; + int stepsInCurrentFile; + int numFiles; + int filesLoaded; + int err; + int num_output_steps; - Output_Container* output_container; + Output_Container* output_container; - File_Access_Actor_Settings file_access_actor_settings; + File_Access_Actor_Settings file_access_actor_settings; - std::vector<Forcing_File_Info> forcing_file_list; // list of steps in file + std::vector<Forcing_File_Info> forcing_file_list; // list of steps in file - // Timing Variables - TimingInfo file_access_timing; + // Timing Variables + TimingInfo file_access_timing; - bool write_params_flag = true; + bool write_params_flag = true; }; // called to spawn a file_access_actor -behavior file_access_actor(stateful_actor<file_access_state>* self, int startGRU, int numGRU, - File_Access_Actor_Settings file_access_actor_settings, actor parent); +behavior file_access_actor(stateful_actor<file_access_state>* self, + NumGRUInfo num_gru_info, + File_Access_Actor_Settings file_access_actor_settings, actor parent); /********************************************* * Functions for the file access actor *********************************************/ /* Setup and call the fortran routine that writes the output */ -void writeOutput(stateful_actor<file_access_state>* self, Output_Partition* partition); +void writeOutput(stateful_actor<file_access_state>* self, + Output_Partition* partition); } // end namespace \ No newline at end of file diff --git a/build/includes/global/message_atoms.hpp b/build/includes/global/message_atoms.hpp index 5b359fc03fd514a14b6efe3a666d1c7c0ee6877f..be98a04a00c9bb6a356479e9a17fffb9b4aa363f 100644 --- a/build/includes/global/message_atoms.hpp +++ b/build/includes/global/message_atoms.hpp @@ -122,6 +122,35 @@ bool inspect(Inspector& inspector, hru& hru_data) { inspector.field("compute_veg_flux", hru_data.compute_veg_flux)); } +struct NumGRUInfo { + int start_gru_local; + int start_gru_global; + int num_gru_local; + int num_gru_global; + int file_gru; + bool use_global_for_data_structures; + + // Constructor + NumGRUInfo(int start_gru_local = 0, int start_gru_global= 0, + int num_gru_local = 0, int num_gru_global = 0, int file_gru = 0, + bool use_global_for_data_structures = false) + : start_gru_local(start_gru_local), start_gru_global(start_gru_global), + num_gru_local(num_gru_local), num_gru_global(num_gru_global), + file_gru(file_gru), + use_global_for_data_structures(use_global_for_data_structures) {} +}; +template <class Insepctor> +bool inspect(Insepctor& inspector, NumGRUInfo& num_gru) { + return inspector.object(num_gru).fields( + inspector.field("start_gru_local", num_gru.start_gru_local), + inspector.field("start_gru_global", num_gru.start_gru_global), + inspector.field("num_gru_local", num_gru.num_gru_local), + inspector.field("num_gru_global", num_gru.num_gru_global), + inspector.field("file_gru", num_gru.file_gru), + inspector.field("use_global_for_data_structures", + num_gru.use_global_for_data_structures)); +} + enum class hru_error : uint8_t { run_physics_unhandleable = 1, @@ -325,6 +354,7 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id) CAF_ADD_TYPE_ID(summa, (serializable_netcdf_gru_actor_info)) CAF_ADD_TYPE_ID(summa, (hru)) + CAF_ADD_TYPE_ID(summa, (NumGRUInfo)) // Class Types CAF_ADD_TYPE_ID(summa, (Client)) diff --git a/build/includes/job_actor/job_actor.hpp b/build/includes/job_actor/job_actor.hpp index 73ff1d8490f6481a9aa87dad14484f5bcae8de7b..20345e1f285d9e8fdfa606a7f0510756ee3321f3 100644 --- a/build/includes/job_actor/job_actor.hpp +++ b/build/includes/job_actor/job_actor.hpp @@ -54,6 +54,9 @@ struct job_state { int num_hru; int max_run_attempts = 1; // Max number of attempts to solve a GRU + NumGRUInfo num_gru_info; + + GRU_Container gru_container; // Variables for GRU monitoring @@ -89,6 +92,9 @@ struct distributed_job_state { int num_gru; + NumGRUInfo num_gru_info; + std::vector<NumGRUInfo> node_num_gru_info; + Distributed_Settings distributed_settings; Job_Actor_Settings job_actor_settings; HRU_Actor_Settings hru_actor_settings; @@ -97,7 +103,6 @@ struct distributed_job_state { std::vector<caf::actor> connected_nodes; std::vector<std::tuple<int,int>> node_gru_ranges; // (start_gru, num_gru) - // Forcing information int iFile = 1; // index of current forcing file from forcing file list int stepsInCurrentFFile; @@ -113,11 +118,10 @@ struct distributed_job_state { /** The Job Actor */ behavior job_actor(stateful_actor<job_state>* self, - int start_gru, int num_gru, - File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, - HRU_Actor_Settings hru_actor_settings, - actor parent); + int start_gru, int num_gru, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings, actor parent); /** The Job Actor For Internode Communication */ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, diff --git a/build/includes/job_actor/node_actor.hpp b/build/includes/job_actor/node_actor.hpp index f097cb7a34b48e9d803a2f03fe408e2c0713457a..bf49632d46433a8759b18cec146308e667935292 100644 --- a/build/includes/job_actor/node_actor.hpp +++ b/build/includes/job_actor/node_actor.hpp @@ -19,6 +19,9 @@ struct node_state { int start_gru; int num_gru_local; + int num_gru_global; + + NumGRUInfo num_gru_info; caf::actor file_access_actor; // actor reference for the file_access_actor GRU_Container gru_container; @@ -42,13 +45,11 @@ struct node_state { }; -behavior node_actor(stateful_actor<node_state>* self, - std::string host, // server will spawn this actor, if local do not try to connect via port - actor parent, - Distributed_Settings distributed_settings, - File_Access_Actor_Settings file_access_actor_settings, - Job_Actor_Settings job_actor_settings, - HRU_Actor_Settings hru_actor_settings); +behavior node_actor(stateful_actor<node_state>* self, std::string host, + actor parent, Distributed_Settings distributed_settings, + File_Access_Actor_Settings file_access_actor_settings, + Job_Actor_Settings job_actor_settings, + HRU_Actor_Settings hru_actor_settings); /********************************************* * Functions for the Job Actor diff --git a/build/source/file_access_actor/cppwrap_fileAccess.f90 b/build/source/file_access_actor/cppwrap_fileAccess.f90 index 59e5e464b8d09aea8e4c23367e17f88d8317a698..e65b1755fafebfe26fedd1d755dc21ac25e7bd29 100644 --- a/build/source/file_access_actor/cppwrap_fileAccess.f90 +++ b/build/source/file_access_actor/cppwrap_fileAccess.f90 @@ -364,18 +364,16 @@ subroutine fileAccessActor_init_fortran(& ! Variables for forcing end subroutine fileAccessActor_init_fortran subroutine defOutputFortran(handle_output_ncid, start_gru, num_gru, num_hru, & - file_gru, err) bind(C, name="defOutputFortran") + file_gru, use_extention, file_extention_c, err) bind(C, name="defOutputFortran") USE globalData,only:nGRUrun,nHRUrun USE globalData,only:fileout,output_fileSuffix USE globalData,only:ncid USE globalData,only:integerMissing USE globalData,only:iRunMode,iRunModeFull,iRunModeGRU,iRunModeHRU ! define the running modes - USE summaFileManager,only:OUTPUT_PATH,OUTPUT_PREFIX ! define output file - - USE var_lookup,only:maxvarFreq ! maximum number of output files - + USE var_lookup,only:maxvarFreq ! maximum number of output files USE def_output_module,only:def_output ! module to define model output + USE cppwrap_auxiliary,only:c_f_string ! Convert C String to Fortran String implicit none @@ -385,15 +383,19 @@ subroutine defOutputFortran(handle_output_ncid, start_gru, num_gru, num_hru, & integer(c_int),intent(in) :: num_gru integer(c_int),intent(in) :: num_hru integer(c_int),intent(in) :: file_gru + logical(c_bool),intent(in) :: use_extention + character(kind=c_char,len=1),intent(in):: file_extention_c integer(c_int),intent(out) :: err ! Local Variables type(var_i),pointer :: output_ncid character(len=128) :: fmtGruOutput ! a format string used to write start and end GRU in output file names - + character(len=256) :: file_extention character(len=256) :: message ! error message call c_f_pointer(handle_output_ncid, output_ncid) + call c_f_string(file_extention_c,file_extention, 256) + file_extention = trim(file_extention) output_fileSuffix = '' if (output_fileSuffix(1:1) /= '_') output_fileSuffix='_'//trim(output_fileSuffix) @@ -405,6 +407,9 @@ subroutine defOutputFortran(handle_output_ncid, start_gru, num_gru, num_hru, & fmtGruOutput = "i"//trim(fmtGruOutput)//"."//trim(fmtGruOutput) ! construct the format string for startGRU and endGRU fmtGruOutput = "('_G',"//trim(fmtGruOutput)//",'-',"//trim(fmtGruOutput)//")" write(output_fileSuffix((len_trim(output_fileSuffix)+1):len(output_fileSuffix)),fmtGruOutput) start_gru,start_gru+num_gru-1 + if (use_extention) then + output_fileSuffix = trim(output_fileSuffix)//trim(file_extention) + endif case(iRunModeHRU) write(output_fileSuffix((len_trim(output_fileSuffix)+1):len(output_fileSuffix)),"('_H',i0)") checkHRU end select diff --git a/build/source/file_access_actor/fileAccess_writeOutput.f90 b/build/source/file_access_actor/fileAccess_writeOutput.f90 index 35ddcd8e311f5dbf4bbfc8c0c368c0dc611f92ca..7365999bbfc496fd2d94352007624d9cdb481333 100644 --- a/build/source/file_access_actor/fileAccess_writeOutput.f90 +++ b/build/source/file_access_actor/fileAccess_writeOutput.f90 @@ -394,6 +394,7 @@ subroutine writeScalar(ncid, outputTimestep, outputTimestepUpdate, nSteps, minGR integer(i4b) :: iGRU ! output array real(rkind) :: realVec(numGRU, nSteps)! real vector for all HRUs in the run domain + real(rkind) :: value err=0; message="writeOutput.f90-writeScalar/" @@ -404,9 +405,13 @@ subroutine writeScalar(ncid, outputTimestep, outputTimestepUpdate, nSteps, minGR stepCounter = 0 gruCounter = gruCounter + 1 do iStep = 1, nSteps - if(.not.outputStructure(1)%finalizeStats%gru(iGRU)%hru(1)%tim(iStep)%dat(iFreq)) cycle + if(.not.outputStructure(1)%finalizeStats%gru(iGRU)%hru(1)%tim(iStep)%dat(iFreq)) then + value = realMissing + else + value = stat%gru(iGRU)%hru(1)%var(map(iVar))%tim(iStep)%dat(iFreq) + end if stepCounter = stepCounter + 1 - realVec(gruCounter, stepCounter) = stat%gru(iGRU)%hru(1)%var(map(iVar))%tim(iStep)%dat(iFreq) + realVec(gruCounter, stepCounter) = value outputTimeStepUpdate(iFreq) = stepCounter end do ! iStep end do ! iGRU @@ -421,7 +426,7 @@ subroutine writeScalar(ncid, outputTimestep, outputTimestepUpdate, nSteps, minGR print*, " maxGRU = ", maxGRU print*, " nSteps = ", nSteps print*, " gruCounter = ", gruCounter - print*, " realVec = ", realVec + ! print*, " realVec = ", realVec print*, " iStep = ", iStep err = 20 return diff --git a/build/source/file_access_actor/file_access_actor.cpp b/build/source/file_access_actor/file_access_actor.cpp index 732af9d2affcc1d0d80f1f412cb67afd76077d71..7fc4513343ef8765a19990a22cdabdc5dd2277c5 100644 --- a/build/source/file_access_actor/file_access_actor.cpp +++ b/build/source/file_access_actor/file_access_actor.cpp @@ -1,18 +1,12 @@ #include "file_access_actor.hpp" -#include "forcing_file_info.hpp" -#include "fortran_data_types.hpp" -#include "message_atoms.hpp" -#include "json.hpp" -#include "auxilary.hpp" using json = nlohmann::json; namespace caf { behavior file_access_actor(stateful_actor<file_access_state>* self, - int start_gru, int num_gru, - File_Access_Actor_Settings file_access_actor_settings, - actor parent) { + NumGRUInfo num_gru_info, + File_Access_Actor_Settings file_access_actor_settings, actor parent) { aout(self) << "\n----------File_Access_Actor Started----------\n"; // Set Up timing Info we wish to track @@ -23,17 +17,26 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, self->state.file_access_actor_settings = file_access_actor_settings; auto& fa_settings = self->state.file_access_actor_settings; self->state.parent = parent; - self->state.num_gru = num_gru; - self->state.start_gru = start_gru; + + self->state.num_gru_info = num_gru_info; + if (self->state.num_gru_info.use_global_for_data_structures) { + self->state.start_gru = self->state.num_gru_info.start_gru_global; + self->state.num_gru_local = self->state.num_gru_info.num_gru_global; + } else { + self->state.start_gru = self->state.num_gru_info.start_gru_local; + self->state.num_gru_local = self->state.num_gru_info.num_gru_local; + } + self->state.err = 0; self->state.num_output_steps = fa_settings.num_timesteps_in_output_buffer; - int num_hru = self->state.num_gru; // Filler for num_hrus + int num_hru = self->state.num_gru_local; // Filler for num_hrus fileAccessActor_init_fortran(self->state.handle_forcing_file_info, &self->state.numFiles, &self->state.num_steps, &fa_settings.num_timesteps_in_output_buffer, self->state.handle_ncid, - &self->state.start_gru, &self->state.num_gru, &num_hru, &self->state.err); + &self->state.start_gru, &self->state.num_gru_local, &num_hru, + &self->state.err); if (self->state.err != 0) { aout(self) << "ERROR: File Access Actor - File_Access_init_Fortran\n"; if (self->state.err == 100) @@ -63,17 +66,25 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, // Set up the output container self->state.output_container = new Output_Container( - fa_settings.num_partitions_in_output_buffer, self->state.num_gru, + fa_settings.num_partitions_in_output_buffer, self->state.num_gru_local, fa_settings.num_timesteps_in_output_buffer, self->state.num_steps); return { [=](def_output, int file_gru) { aout(self) << "Creating Output File\n"; - int num_hru = self->state.num_gru; // Filler for num_hrus + std::string actor_address = ""; + + if (self->state.num_gru_info.use_global_for_data_structures) { + actor_address = "_" + to_string(self->address()); + } + + int num_hru = self->state.num_gru_local; // Filler for num_hrus int err = 0; defOutputFortran(self->state.handle_ncid, &self->state.start_gru, - &self->state.num_gru, &num_hru, &file_gru, &err); - if (self->state.err != 0) { + &self->state.num_gru_local, &num_hru, &file_gru, + &self->state.num_gru_info.use_global_for_data_structures, + actor_address.c_str(), &err); + if (err != 0) { aout(self) << "ERROR: Defining Output\n"; self->quit(); } @@ -87,7 +98,7 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, self->state.file_access_timing.updateStartPoint("read_duration"); read_forcingFile(self->state.handle_forcing_file_info, ¤tFile, &self->state.stepsInCurrentFile, &self->state.start_gru, - &self->state.num_gru, &self->state.err); + &self->state.num_gru_local, &self->state.err); if (self->state.err != 0) { aout(self) << "ERROR: Reading Forcing" << std::endl; } @@ -121,7 +132,7 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, self->state.file_access_timing.updateStartPoint("read_duration"); read_forcingFile(self->state.handle_forcing_file_info, ¤tFile, &self->state.stepsInCurrentFile, &self->state.start_gru, - &self->state.num_gru, &self->state.err); + &self->state.num_gru_local, &self->state.err); if (self->state.err != 0) { aout(self) << "ERROR: Reading Forcing" << std::endl; } @@ -159,6 +170,11 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, [=](write_output, int steps_to_write, int start_gru, int max_gru) { self->state.file_access_timing.updateStartPoint("write_duration"); + // aout(self) << "Received requrest to write output\n" + // << "Steps to write: " << steps_to_write << "\n" + // << "Start GRU: " << start_gru << "\n" + // << "Max GRU: " << max_gru << "\n"; + writeOutput_fortran(self->state.handle_ncid, &steps_to_write, &start_gru, &max_gru, &self->state.write_params_flag, &self->state.err); diff --git a/build/source/job_actor/distributed_job_actor.cpp b/build/source/job_actor/distributed_job_actor.cpp index c62cddf58788a4bafe0594085fbbb1003f271454..e9771e5dddb4a2e2ec6295f0dcb7837fd0db540e 100644 --- a/build/source/job_actor/distributed_job_actor.cpp +++ b/build/source/job_actor/distributed_job_actor.cpp @@ -5,21 +5,24 @@ namespace caf { behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, - int start_gru, int num_gru, Distributed_Settings distributed_settings, + int start_gru_global, int num_gru_global, + Distributed_Settings distributed_settings, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, HRU_Actor_Settings hru_actor_settings) { + aout(self) << "Starting Distributed Job Actor\n"; self->state.job_timing = TimingInfo(); self->state.job_timing.addTimePoint("total_duration"); self->state.job_timing.updateStartPoint("total_duration"); self->set_down_handler([=](const down_msg& dm){ - aout(self) << "Received Down Message\n"; + aout(self) << "Received Down Message\n.\n.\n.\n.\nExiting\n"; + exit(0); }); - self->state.start_gru = start_gru; - self->state.num_gru = num_gru; + self->state.start_gru = start_gru_global; + self->state.num_gru = num_gru_global; self->state.distributed_settings = distributed_settings; self->state.file_access_actor_settings = file_access_actor_settings; @@ -39,19 +42,31 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, } aout(self) << "Number of Nodes = " << distributed_settings.num_nodes << "\n"; + aout(self) << "File GRU = " << self->state.file_gru << "\n"; // Set up the node ranges int gru_per_node = (self->state.num_gru + distributed_settings.num_nodes - 1) - / distributed_settings.num_nodes; + / distributed_settings.num_nodes; int remaining = self->state.num_gru; for (int i = 0; i < distributed_settings.num_nodes; i++) { - int start_gru = i * gru_per_node + self->state.start_gru; - int num_gru = gru_per_node; + int start_gru_local = i * gru_per_node + self->state.start_gru; + int num_gru_local = gru_per_node; if (i == distributed_settings.num_nodes - 1) { - num_gru = remaining; + num_gru_local = remaining; } - remaining -= num_gru; - self->state.node_gru_ranges.push_back(std::make_tuple(start_gru, num_gru)); + remaining -= num_gru_local; + self->state.node_gru_ranges.push_back(std::make_tuple(start_gru_local, + num_gru_local)); + + bool use_global_for_data_structures = true; + self->state.node_num_gru_info.push_back( + NumGRUInfo(start_gru_local, self->state.start_gru, num_gru_local, + self->state.num_gru, self->state.file_gru, + use_global_for_data_structures) + ); + + + } // Print the node ranges @@ -92,8 +107,7 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, for (int i = 0; i < distributed_settings.num_nodes; i++) { self->send(self->state.connected_nodes[i], start_job_v, - std::get<0>(self->state.node_gru_ranges[i]), - std::get<1>(self->state.node_gru_ranges[i])); + self->state.node_num_gru_info[i]); } } }, diff --git a/build/source/job_actor/job_actor.cpp b/build/source/job_actor/job_actor.cpp index 26c83e4976230284ab589450dbc9f2425bb4f40f..19991fd092cc1a5e7e985f2d8f01a6e2fa8bbe8d 100644 --- a/build/source/job_actor/job_actor.cpp +++ b/build/source/job_actor/job_actor.cpp @@ -34,6 +34,8 @@ behavior job_actor(stateful_actor<job_state>* self, self->state.start_gru = start_gru; self->state.num_gru = num_gru; self->state.parent = parent; + + // Set the settings variables self->state.file_access_actor_settings = file_access_actor_settings; @@ -69,9 +71,13 @@ behavior job_actor(stateful_actor<job_state>* self, return {}; } + self->state.num_gru_info = NumGRUInfo(self->state.start_gru, + self->state.start_gru, self->state.num_gru, self->state.num_gru, + file_gru, false); + // Spawn the file_access_actor. self->state.file_access_actor = self->spawn(file_access_actor, - self->state.start_gru, self->state.num_gru, + self->state.num_gru_info, self->state.file_access_actor_settings, self); self->send(self->state.file_access_actor, def_output_v, file_gru); diff --git a/build/source/job_actor/node_actor.cpp b/build/source/job_actor/node_actor.cpp index 39960ccab2ad726dbdf79f09d115cba106239cc8..cf3fc978befad1322c3fc73e5015c4ff14641c1a 100644 --- a/build/source/job_actor/node_actor.cpp +++ b/build/source/job_actor/node_actor.cpp @@ -2,8 +2,7 @@ namespace caf { -behavior node_actor(stateful_actor<node_state>* self, - std::string host, // server will spawn this actor, if local do not try to connect via port +behavior node_actor(stateful_actor<node_state>* self, std::string host, actor parent, Distributed_Settings distributed_settings, File_Access_Actor_Settings file_access_actor_settings, Job_Actor_Settings job_actor_settings, @@ -15,7 +14,8 @@ behavior node_actor(stateful_actor<node_state>* self, self->state.node_timing.updateStartPoint("total_duration"); self->set_down_handler([=](const down_msg& dm){ - aout(self) << "Received Down Message\n"; + aout(self) << "Received Down Message\n.\n.\n.\n.\nExiting\n"; + exit(0); }); self->state.max_run_attempts = job_actor_settings.max_run_attempts; @@ -45,24 +45,48 @@ behavior node_actor(stateful_actor<node_state>* self, gethostname(hostname, HOST_NAME_MAX); self->state.hostname = hostname; self->send(self->state.current_server, connect_to_server_v, self, - self->state.hostname); + self->state.hostname); return { - [=](start_job, int start_gru, int num_gru_local) { - aout(self) << "Recieved Start Job Message\n"; - aout(self) << "Start GRU: " << start_gru << " Num GRU: " - << num_gru_local << "\n"; + [=](start_job, NumGRUInfo num_gru_info) { + self->state.num_gru_info = num_gru_info; + aout(self) << "Recieved Start Job Message\n" + << "Start GRU Local: " + << self->state.num_gru_info.start_gru_local + << " -- Start GRU Global:" + << self->state.num_gru_info.start_gru_global + << " -- Num GRU Local: " + << self->state.num_gru_info.num_gru_local + << " -- Num GRU Global: " + << self->state.num_gru_info.num_gru_global + << " -- File GRU: " << self->state.num_gru_info.file_gru + << "\n"; + + + // self->state.start_gru = start_gru; + // self->state.num_gru_local = num_gru_local; + // self->state.num_gru_global = num_gru_global; + self->state.gru_container.num_gru_in_run_domain + = self->state.num_gru_info.num_gru_local; - self->state.start_gru = start_gru; - self->state.num_gru_local = num_gru_local; - self->state.gru_container.num_gru_in_run_domain = num_gru_local; - - int err, file_gru; + + int start_gru, num_gru, num_hru; + if (self->state.num_gru_info.use_global_for_data_structures) { + start_gru = self->state.num_gru_info.start_gru_global; + num_gru = self->state.num_gru_info.num_gru_global; + num_hru = self->state.num_gru_info.num_gru_global; + } else { + start_gru = self->state.num_gru_info.start_gru_local; + num_gru = self->state.num_gru_info.num_gru_local; + num_hru = self->state.num_gru_info.num_gru_local; + } + + + int err, file_gru_to_remove; job_init_fortran(self->state.job_actor_settings.file_manager_path.c_str(), - &self->state.start_gru, &self->state.num_gru_local, - &self->state.num_gru_local, &file_gru, &err); + &start_gru, &num_gru, &num_hru, &file_gru_to_remove, &err); if (err != 0) { aout(self) << "\nERROR: Job_Actor - job_init_fortran\n"; self->quit(); @@ -70,9 +94,11 @@ behavior node_actor(stateful_actor<node_state>* self, } // Spawn the file_access_actor. self->state.file_access_actor = self->spawn(file_access_actor, - self->state.start_gru, self->state.num_gru_local, - self->state.file_access_actor_settings, self); - self->send(self->state.file_access_actor, def_output_v, file_gru); + self->state.num_gru_info, self->state.file_access_actor_settings, + self); + self->monitor(self->state.file_access_actor); + self->send(self->state.file_access_actor, def_output_v, + self->state.num_gru_info.file_gru); }, [=](init_file_access_actor, int num_timesteps) { @@ -137,18 +163,22 @@ behavior node_actor(stateful_actor<node_state>* self, [=](write_output, int steps_to_write) { + int num_gru_to_write; + if (self->state.num_gru_info.use_global_for_data_structures) { + num_gru_to_write = self->state.num_gru_info.num_gru_global; + } else { + num_gru_to_write = self->state.num_gru_info.num_gru_local; + } + self->request(self->state.file_access_actor, infinite, write_output_v, - steps_to_write, 1, self->state.num_gru_local).await( + steps_to_write, 1, num_gru_to_write).await( [=](int err) { if (err != 0) { aout(self) << "Error Writing Output\n"; - for (auto gru : self->state.gru_container.gru_list) - self->send(gru->getGRUActor(), exit_msg_v); - self->send_exit(self->state.file_access_actor, exit_reason::user_shutdown); self->send(self->state.current_server, err); - self->quit(); + // self->quit(); } });