Commit 053b824b authored by Vladimir Zakirov's avatar Vladimir Zakirov
Browse files

Add a stop for sending messages

Resolves #43
parent 2d439c2f
......@@ -97,10 +97,15 @@ private:
void start_creating_recovery_point();
void make_recovery_point();
void make_recovery_point_ready();
bool is_creating_recovery_point = false;
std::vector<CF *> zvk_forked_cf;
std::unordered_set<size_t> all_cfs;
void run_request_callback(const Id &id, DF& df, std::shared_ptr<CF>* cf_ptr);
void send_response_for_request(const Id &id, DF& df, std::shared_ptr<CF>* cf_ptr, size_t src);
IdleStopper<int> *rec_point_stopper_;
std::mutex rec_m_;
std::condition_variable rec_cv_;
};
......@@ -13,4 +13,6 @@ enum Tags {
TAG_STEAL,
TAG_STEAL_REVOKE,
TAG_RECOVERY_POINT,
TAG_RECOVERY_POINT_MARKER,
TAG_RECOVERY_POINT_STOP_MARKER,
};
......@@ -44,6 +44,16 @@ RTS::RTS(Comm &comm, const Config &conf, const FP &fp)
comm_->bcast(TAG_STOP);
}
);
rec_point_stopper_=new IdleStopper<int>(comm_->rank(),
[this](const int &marker){
// send to next
comm_->send(comm_->next_rank(), TAG_RECOVERY_POINT_MARKER, marker);
},
[this]() {
// all idle
comm_->bcast(TAG_RECOVERY_POINT_STOP_MARKER);
}
);
if(conf.dynamic_balance()) {
unsigned int proc_count = conf_->get_steal_proc_count();
......@@ -68,17 +78,39 @@ RTS::RTS(Comm &comm, const Config &conf, const FP &fp)
comm_->barrier();
}
void RTS::make_recovery_point_ready() {
comm_->barrier();
rec_cv_.notify_all();
}
void RTS::make_recovery_point() {
std::cout << "in call me function1\n";
std::cout << "num new cf = " << zvk_forked_cf.size() << std::endl;
std::cout << "num all cf in system = " << all_cfs.size() << std::endl;
comm_->barrier();
if (comm_->is_root()) {
rec_point_stopper_->start();
}
{
std::unique_lock<std::mutex> lk(rec_m_);
rec_cv_.wait(lk);
}
is_creating_recovery_point = false;
for (auto cf : zvk_forked_cf) {
_submit(cf, true);
}
zvk_forked_cf.clear();
is_creating_recovery_point = false;
std::cout << "end call me\n";
comm_->barrier();
rec_cv_.notify_all();
// std::cout << "in call me function1\n";
// std::cout << "num new cf = " << zvk_forked_cf.size() << std::endl;
// std::cout << "num all cf in system = " << all_cfs.size() << std::endl;
// for (auto cf : zvk_forked_cf) {
// _submit(cf, true);
// }
// zvk_forked_cf.clear();
// is_creating_recovery_point = false;
// std::cout << "end call me\n";
// comm_->barrier();
}
void RTS::delete_cf_from_set(CF* cf) {
......@@ -105,12 +137,13 @@ int RTS::run()
_submit(cf);
stopper_->start();
}
// if (comm_->is_root()) {
// lk.unlock();
// std::this_thread::sleep_for(std::chrono::milliseconds(20));
// lk.lock();
// start_creating_recovery_point();
// }
if (comm_->is_root()) {
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(20));
lk.lock();
start_creating_recovery_point();
}
while (!finished_flag_) {
cv_.wait(lk);
......@@ -452,6 +485,10 @@ void RTS::on_recv(int src, int tag, void *buf, size_t size)
switch (tag) {
case TAG_IDLE_MARKER:
assert(size==sizeof(int));
while (is_creating_recovery_point) {
std::unique_lock<std::mutex> lk(rec_m_);
rec_cv_.wait(lk);
}
stopper_->receive(*static_cast<int*>(buf));
operator delete(buf);
break;
......@@ -573,6 +610,16 @@ void RTS::on_recv(int src, int tag, void *buf, size_t size)
pool_.set_need_make_point(true);
break;
}
case TAG_RECOVERY_POINT_MARKER: {
assert(size==sizeof(int));
rec_point_stopper_->receive(*static_cast<int*>(buf));
operator delete(buf);
break;
}
case TAG_RECOVERY_POINT_STOP_MARKER: {
make_recovery_point_ready();
break;
}
default:
ABORT("Tag not implemented: " + std::to_string(tag));
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment