diff --git a/build/includes/file_access_actor/file_access_actor.hpp b/build/includes/file_access_actor/file_access_actor.hpp index fb30ddd5fa1ef015380784453f2ce255bdeddf9c..32dc0cfed22f0744946ed75ffaba0bf8a4776185 100644 --- a/build/includes/file_access_actor/file_access_actor.hpp +++ b/build/includes/file_access_actor/file_access_actor.hpp @@ -46,8 +46,7 @@ struct file_access_state { // Variables set on Spawn caf::actor parent; int start_gru; - int num_gru_local; - int num_gru_global; // For distributed jobs + int num_gru; NumGRUInfo num_gru_info; @@ -59,7 +58,6 @@ struct file_access_state { int stepsInCurrentFile; int numFiles; int filesLoaded; - int err; int num_output_steps; Output_Container* output_container; diff --git a/build/source/file_access_actor/fileAccess_writeOutput.f90 b/build/source/file_access_actor/fileAccess_writeOutput.f90 index 7365999bbfc496fd2d94352007624d9cdb481333..3f21df782a05d7b35003e509bba79acdc631700d 100644 --- a/build/source/file_access_actor/fileAccess_writeOutput.f90 +++ b/build/source/file_access_actor/fileAccess_writeOutput.f90 @@ -315,6 +315,7 @@ subroutine writeData(ncid,outputTimestep,outputTimestepUpdate,maxLayers,nSteps, integer(i4b) :: stepCounter ! counter to know how much data we have to write, needed because we do not always write nSteps integer(i4b) :: iStep integer(i4b) :: iGRU + real(rkind) :: val ! initialize error control err=0;message="writeData/" ! loop through output frequencies @@ -332,10 +333,15 @@ subroutine writeData(ncid,outputTimestep,outputTimestepUpdate,maxLayers,nSteps, call netcdf_err(err,message); if (err/=0) return do iStep = 1, nSteps + ! Find HRU that is not missing or NaN ! check if we want this timestep - if(.not.outputStructure(1)%finalizeStats%gru(minGRU)%hru(1)%tim(iStep)%dat(iFreq)) cycle + do iGRU = minGRU, maxGRU + if(.not.outputStructure(1)%finalizeStats%gru(iGRU)%hru(1)%tim(iStep)%dat(iFreq)) cycle + val = outputStructure(1)%forcStruct%gru(iGRU)%hru(1)%var(iVar)%tim(iStep) + exit + end do stepCounter = stepCounter+1 - timeVec(stepCounter) = outputStructure(1)%forcStruct%gru(minGRU)%hru(1)%var(iVar)%tim(iStep) + timeVec(stepCounter) = val end do ! iStep err = nf90_put_var(ncid%var(iFreq),ncVarID,timeVec(1:stepCounter),start=(/outputTimestep(iFreq)/),count=(/stepCounter/)) call netcdf_err(err,message); if (err/=0)then; print*, "err"; return; endif @@ -369,7 +375,7 @@ end subroutine writeData subroutine writeScalar(ncid, outputTimestep, outputTimestepUpdate, nSteps, minGRU, maxGRU, & numGRU, iFreq, iVar, meta, stat, map, err, message) USE data_types,only:var_info ! metadata type - + USE, intrinsic :: ieee_arithmetic implicit none ! declare dummy variables type(var_i) ,intent(in) :: ncid ! fileid @@ -394,7 +400,7 @@ subroutine writeScalar(ncid, outputTimestep, outputTimestepUpdate, nSteps, minGR integer(i4b) :: iGRU ! output array real(rkind) :: realVec(numGRU, nSteps)! real vector for all HRUs in the run domain - real(rkind) :: value + real(rkind) :: val err=0; message="writeOutput.f90-writeScalar/" @@ -406,12 +412,12 @@ subroutine writeScalar(ncid, outputTimestep, outputTimestepUpdate, nSteps, minGR gruCounter = gruCounter + 1 do iStep = 1, nSteps if(.not.outputStructure(1)%finalizeStats%gru(iGRU)%hru(1)%tim(iStep)%dat(iFreq)) then - value = realMissing + val = realMissing else - value = stat%gru(iGRU)%hru(1)%var(map(iVar))%tim(iStep)%dat(iFreq) + val = stat%gru(iGRU)%hru(1)%var(map(iVar))%tim(iStep)%dat(iFreq) end if stepCounter = stepCounter + 1 - realVec(gruCounter, stepCounter) = value + realVec(gruCounter, stepCounter) = val outputTimeStepUpdate(iFreq) = stepCounter end do ! iStep end do ! iGRU @@ -426,11 +432,11 @@ subroutine writeScalar(ncid, outputTimestep, outputTimestepUpdate, nSteps, minGR print*, " maxGRU = ", maxGRU print*, " nSteps = ", nSteps print*, " gruCounter = ", gruCounter - ! print*, " realVec = ", realVec print*, " iStep = ", iStep err = 20 return endif + err = nf90_put_var(ncid%var(iFreq),meta(iVar)%ncVarID(iFreq),realVec(1:gruCounter, 1:stepCounter),start=(/minGRU,outputTimestep(iFreq)/),count=(/numGRU,stepCounter/)) class default; err=20; message=trim(message)//'stats must be scalarv and of type gru_hru_doubleVec'; return end select ! stat diff --git a/build/source/file_access_actor/file_access_actor.cpp b/build/source/file_access_actor/file_access_actor.cpp index 7fc4513343ef8765a19990a22cdabdc5dd2277c5..39f5b8887a10417bf4b56094f1c536bc35fbeaa2 100644 --- a/build/source/file_access_actor/file_access_actor.cpp +++ b/build/source/file_access_actor/file_access_actor.cpp @@ -19,27 +19,27 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, self->state.parent = parent; self->state.num_gru_info = num_gru_info; + if (self->state.num_gru_info.use_global_for_data_structures) { self->state.start_gru = self->state.num_gru_info.start_gru_global; - self->state.num_gru_local = self->state.num_gru_info.num_gru_global; + self->state.num_gru = self->state.num_gru_info.num_gru_global; } else { self->state.start_gru = self->state.num_gru_info.start_gru_local; - self->state.num_gru_local = self->state.num_gru_info.num_gru_local; + self->state.num_gru = self->state.num_gru_info.num_gru_local; } - self->state.err = 0; self->state.num_output_steps = fa_settings.num_timesteps_in_output_buffer; - - int num_hru = self->state.num_gru_local; // Filler for num_hrus + + int num_hru = self->state.num_gru; + int err = 0; fileAccessActor_init_fortran(self->state.handle_forcing_file_info, &self->state.numFiles, &self->state.num_steps, &fa_settings.num_timesteps_in_output_buffer, self->state.handle_ncid, - &self->state.start_gru, &self->state.num_gru_local, &num_hru, - &self->state.err); - if (self->state.err != 0) { + &self->state.start_gru, &self->state.num_gru, &num_hru, &err); + if (err != 0) { aout(self) << "ERROR: File Access Actor - File_Access_init_Fortran\n"; - if (self->state.err == 100) + if (err == 100) self->send(self->state.parent, file_access_error::mDecisions_error, self); else @@ -65,23 +65,24 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, } // Set up the output container - self->state.output_container = new Output_Container( - fa_settings.num_partitions_in_output_buffer, self->state.num_gru_local, - fa_settings.num_timesteps_in_output_buffer, self->state.num_steps); + if (!self->state.num_gru_info.use_global_for_data_structures) { + self->state.output_container = new Output_Container( + fa_settings.num_partitions_in_output_buffer, self->state.num_gru, + fa_settings.num_timesteps_in_output_buffer, self->state.num_steps); + } return { [=](def_output, int file_gru) { aout(self) << "Creating Output File\n"; std::string actor_address = ""; + int num_hru = self->state.num_gru; if (self->state.num_gru_info.use_global_for_data_structures) { actor_address = "_" + to_string(self->address()); } - - int num_hru = self->state.num_gru_local; // Filler for num_hrus int err = 0; defOutputFortran(self->state.handle_ncid, &self->state.start_gru, - &self->state.num_gru_local, &num_hru, &file_gru, + &self->state.num_gru, &num_hru, &file_gru, &self->state.num_gru_info.use_global_for_data_structures, actor_address.c_str(), &err); if (err != 0) { @@ -96,10 +97,11 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, // Note: C++ starts at 0 and Fortran starts at 1 if(!self->state.forcing_file_list[currentFile - 1].isFileLoaded()) { self->state.file_access_timing.updateStartPoint("read_duration"); + int err = 0; read_forcingFile(self->state.handle_forcing_file_info, ¤tFile, &self->state.stepsInCurrentFile, &self->state.start_gru, - &self->state.num_gru_local, &self->state.err); - if (self->state.err != 0) { + &self->state.num_gru, &err); + if (err != 0) { aout(self) << "ERROR: Reading Forcing" << std::endl; } self->state.filesLoaded += 1; @@ -130,10 +132,11 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, } self->state.file_access_timing.updateStartPoint("read_duration"); + int err = 0; read_forcingFile(self->state.handle_forcing_file_info, ¤tFile, &self->state.stepsInCurrentFile, &self->state.start_gru, - &self->state.num_gru_local, &self->state.err); - if (self->state.err != 0) { + &self->state.num_gru, &err); + if (err != 0) { aout(self) << "ERROR: Reading Forcing" << std::endl; } @@ -169,19 +172,13 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, // Write message from the job actor TODO: This could be async [=](write_output, int steps_to_write, int start_gru, int max_gru) { self->state.file_access_timing.updateStartPoint("write_duration"); - - // aout(self) << "Received requrest to write output\n" - // << "Steps to write: " << steps_to_write << "\n" - // << "Start GRU: " << start_gru << "\n" - // << "Max GRU: " << max_gru << "\n"; - + int err = 0; writeOutput_fortran(self->state.handle_ncid, &steps_to_write, - &start_gru, &max_gru, &self->state.write_params_flag, - &self->state.err); + &start_gru, &max_gru, &self->state.write_params_flag, &err); if (self->state.write_params_flag) self->state.write_params_flag = false; self->state.file_access_timing.updateEndPoint("write_duration"); - return self->state.err; + return err; }, [=](restart_failures) { @@ -205,7 +202,10 @@ behavior file_access_actor(stateful_actor<file_access_state>* self, [=](finalize) { aout(self) << "File Access Actor: Deallocating Structures\n"; - self->state.output_container->~Output_Container(); + // TODO: output container can be wrapped in a smart pointer + if (!self->state.num_gru_info.use_global_for_data_structures) { + self->state.output_container->~Output_Container(); + } FileAccessActor_DeallocateStructures(self->state.handle_forcing_file_info, self->state.handle_ncid); @@ -235,9 +235,9 @@ void writeOutput(stateful_actor<file_access_state>* self, int start_gru = partition->getStartGRUIndex(); int max_gru = partition->getMaxGRUIndex(); bool write_param_flag = partition->isWriteParams(); - + int err = 0; writeOutput_fortran(self->state.handle_ncid, &num_timesteps_to_write, - &start_gru, &max_gru, &write_param_flag, &self->state.err); + &start_gru, &max_gru, &write_param_flag, &err); partition->updateTimeSteps(); diff --git a/build/source/hru_actor/hru_actor.cpp b/build/source/hru_actor/hru_actor.cpp index 9b34d0e2b013c3b36f1ec4f63717f6ce62957fd4..05f114280cd148b7f5ecd356d5f6d7b62561e2ad 100644 --- a/build/source/hru_actor/hru_actor.cpp +++ b/build/source/hru_actor/hru_actor.cpp @@ -19,6 +19,11 @@ behavior hru_actor(stateful_actor<hru_state>* self, int refGRU, int indxGRU, self->state.indxHRU = 1; self->state.indxGRU = indxGRU; self->state.refGRU = refGRU; + if (hru_extra_logging) { + aout(self) << "HRU Actor: indxHRU = " << self->state.indxHRU + << " indxGRU = " << self->state.indxGRU + << " refGRU = " << self->state.refGRU << "\n"; + } // Get the settings for the HRU self->state.hru_actor_settings = hru_actor_settings; self->state.dt_init_factor = hru_actor_settings.dt_init_factor; diff --git a/build/source/hru_actor/hru_batch_actor.cpp b/build/source/hru_actor/hru_batch_actor.cpp index 6e186a4e6e528ddeb68c972743f97bab16f2cace..7d1f5348e0c2bf4563239e903b8d571c101a65e7 100644 --- a/build/source/hru_actor/hru_batch_actor.cpp +++ b/build/source/hru_actor/hru_batch_actor.cpp @@ -3,9 +3,9 @@ namespace caf { behavior hru_batch_actor(stateful_actor<hru_batch_state>* self, - int start_gru_local, int start_gru_global, int num_gru, - HRU_Actor_Settings hru_actor_settings, - caf::actor file_access_actor, caf::actor parent) { + int start_gru_local, int start_gru_global, int num_gru, + HRU_Actor_Settings hru_actor_settings, caf::actor file_access_actor, + caf::actor parent) { self->state.file_access_actor = file_access_actor; self->state.parent = parent; diff --git a/build/source/job_actor/distributed_job_actor.cpp b/build/source/job_actor/distributed_job_actor.cpp index e9771e5dddb4a2e2ec6295f0dcb7837fd0db540e..fe13e8dd9bd4050933477bcfb1f3a7517fa9a040 100644 --- a/build/source/job_actor/distributed_job_actor.cpp +++ b/build/source/job_actor/distributed_job_actor.cpp @@ -71,9 +71,9 @@ behavior distributed_job_actor(stateful_actor<distributed_job_state>* self, // Print the node ranges for (int i = 0; i < distributed_settings.num_nodes; i++) { - aout(self) << "Node " << i << " GRU Range: " - << std::get<0>(self->state.node_gru_ranges[i]) << " - " - << std::get<1>(self->state.node_gru_ranges[i]) << "\n"; + aout(self) << "Node " << i << " GRU Range: " + << self->state.node_num_gru_info[i].start_gru_local << " - " + << self->state.node_num_gru_info[i].num_gru_local << "\n"; } auto is_published = self->system().middleman().publish(self, diff --git a/build/source/job_actor/node_actor.cpp b/build/source/job_actor/node_actor.cpp index cf3fc978befad1322c3fc73e5015c4ff14641c1a..9d43a319cd7b1dc512ece552554985a4e4703eea 100644 --- a/build/source/job_actor/node_actor.cpp +++ b/build/source/job_actor/node_actor.cpp @@ -64,10 +64,6 @@ behavior node_actor(stateful_actor<node_state>* self, std::string host, << " -- File GRU: " << self->state.num_gru_info.file_gru << "\n"; - - // self->state.start_gru = start_gru; - // self->state.num_gru_local = num_gru_local; - // self->state.num_gru_global = num_gru_global; self->state.gru_container.num_gru_in_run_domain = self->state.num_gru_info.num_gru_local; @@ -244,28 +240,31 @@ void spawnHRUBatches(stateful_actor<node_state>* self) { } else { batch_size = self->state.job_actor_settings.batch_size; } - + + int start_hru_index; + int start_hru_ref = self->state.num_gru_info.start_gru_local; int remaining_hru_to_batch = gru_container.num_gru_in_run_domain; - int start_hru_global = self->state.start_gru; - int start_hru_local = 1; + + if (self->state.num_gru_info.use_global_for_data_structures) { + start_hru_index = self->state.num_gru_info.start_gru_local; + } else { + start_hru_index = 1; + } while (remaining_hru_to_batch > 0) { int current_batch_size = std::min(batch_size, remaining_hru_to_batch); - auto gru_batch = self->spawn(hru_batch_actor, start_hru_local, - start_hru_global, current_batch_size, - self->state.hru_actor_settings, - self->state.file_access_actor, self); - - gru_container.gru_list.push_back(new GRU(start_hru_global, - start_hru_local, gru_batch, - self->state.dt_init_start_factor, - self->state.hru_actor_settings.rel_tol, - self->state.hru_actor_settings.abs_tol, - self->state.max_run_attempts)); + auto gru_batch = self->spawn(hru_batch_actor, start_hru_index, + start_hru_ref, current_batch_size, self->state.hru_actor_settings, + self->state.file_access_actor, self); + + gru_container.gru_list.push_back(new GRU(start_hru_ref, + start_hru_index, gru_batch, self->state.dt_init_start_factor, + self->state.hru_actor_settings.rel_tol, + self->state.hru_actor_settings.abs_tol, self->state.max_run_attempts)); remaining_hru_to_batch -= current_batch_size; - start_hru_local += current_batch_size; - start_hru_global += current_batch_size; + start_hru_index += current_batch_size; + start_hru_ref += current_batch_size; } aout(self) << "Number of HRU_Batch_Actors: " << gru_container.gru_list.size() << "\n";