Skip to content
Snippets Groups Projects
Commit 8c988881 authored by KyleKlenk's avatar KyleKlenk
Browse files

Fix compilation errors and improve code structure

parent 41dbe12a
No related branches found
No related tags found
1 merge request!7Data assimilation mode
......@@ -47,7 +47,6 @@ CAF_BEGIN_TYPE_ID_BLOCK(summa, first_custom_type_id)
CAF_ADD_ATOM(summa, init_normal_mode)
CAF_ADD_ATOM(summa, update_hru_async)
// Sender: job_actor
// Reciever: summa_actor
// Summary: job_actor finished job
......
......@@ -12,6 +12,7 @@
#include "file_access_actor.hpp"
#include <unistd.h>
#include <limits.h>
#include <cmath>
/*********************************************
......@@ -52,7 +53,7 @@ struct job_state {
int max_run_attempts = 1; // Max number of attempts to solve a GRU
GRU_Container gru_container;
// Variables for GRU monitoring
int dt_init_start_factor = 1; // Initial Factor for dt_init (coupled_em)
int num_gru_done = 0; // The number of GRUs that have completed
......
......@@ -34,6 +34,7 @@ behavior hru_batch_actor(stateful_actor<hru_batch_state>* self,
self->send(hru_actor, update_hru_v, timestep, forcingstep);
}
},
[=](done_update) {
// aout(self) << "HRU Batch Actor - Done Update\n";
self->state.num_done++;
......@@ -42,6 +43,7 @@ behavior hru_batch_actor(stateful_actor<hru_batch_state>* self,
self->state.num_done = 0;
}
},
[=](exit_msg) {
for(auto& hru_actor : self->state.hru_actors) {
self->send_exit(hru_actor, exit_reason::user_shutdown);
......
......@@ -3,8 +3,6 @@
using json = nlohmann::json;
using chrono_time = std::chrono::time_point<std::chrono::system_clock>;
int batch_size = 20;
namespace caf {
// First Actor that is spawned that is not the Coordinator Actor.
......@@ -25,7 +23,7 @@ behavior job_actor(stateful_actor<job_state>* self,
aout(self) << "\n\n ********** EXIT HANDLER ********** \n";
aout(self) << "Exit Reason: " << to_string(em.reason) << "\n";
});
// Timing Information
self->state.job_timing = TimingInfo();
......@@ -69,7 +67,6 @@ behavior job_actor(stateful_actor<job_state>* self,
aout(self) << "\nERROR: Job_Actor - job_init_fortran\n";
return {};
}
// Spawn the file_access_actor.
self->state.file_access_actor = self->spawn(file_access_actor,
......@@ -154,13 +151,11 @@ behavior job_actor(stateful_actor<job_state>* self,
aout(self) << "Job_Actor: Error Writing Output\n";
for (auto GRU : self->state.gru_container.gru_list)
self->send(GRU->getGRUActor(), exit_msg_v);
// self->send_exit(GRU->getGRUActor(), exit_reason::user_shutdown);
self->send_exit(self->state.file_access_actor,
exit_reason::user_shutdown);
self->quit();
}
// else { aout(self) << "Job_Actor: Done Writing Output\n"; }
});
self->state.timestep++;
......@@ -168,6 +163,8 @@ behavior job_actor(stateful_actor<job_state>* self,
// Check if we are done the simulation
if (self->state.timestep > self->state.num_steps) {
for (auto GRU : self->state.gru_container.gru_list)
GRU->setSuccess();
aout(self) << "Job_Actor: Done Job\n";
self->send(self, finalize_v);
}
......@@ -376,18 +373,26 @@ void spawnHRUActors(stateful_actor<job_state>* self, bool normal_mode) {
}
void spawnHRUBatches(stateful_actor<job_state>* self) {
int batch_size;
auto& gru_container = self->state.gru_container;
gru_container.gru_start_time = std::chrono::high_resolution_clock::now();
gru_container.run_attempts_left = self->state.max_run_attempts;
gru_container.run_attempts_left--;
if (self->state.job_actor_settings.batch_size == 9999) {
batch_size = std::ceil(gru_container.num_gru_in_run_domain /
(std::thread::hardware_concurrency() * 2));
} else {
batch_size = self->state.job_actor_settings.batch_size;
}
int remaining_hru_to_batch = gru_container.num_gru_in_run_domain;
int start_hru_global = self->state.start_gru;
int start_hru_local = 1;
while (remaining_hru_to_batch > 0) {
int current_batch_size = std::min(self->state.job_actor_settings.batch_size,
remaining_hru_to_batch);
int current_batch_size = std::min(batch_size, remaining_hru_to_batch);
auto gru_batch = self->spawn(hru_batch_actor, start_hru_local,
start_hru_global, current_batch_size,
self->state.hru_actor_settings,
......@@ -404,6 +409,8 @@ void spawnHRUBatches(stateful_actor<job_state>* self) {
start_hru_local += current_batch_size;
start_hru_global += current_batch_size;
}
aout(self) << "Number of HRU_Batch_Actors: "
<< gru_container.gru_list.size() << "\n";
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment