Commit 14416a11 authored by a-chmil's avatar a-chmil
Browse files

Added steal request accept

parent e70cb301
......@@ -58,6 +58,8 @@ public:
int get_steal_req();
void steal_req();
void change_load(int delta) {
stopper_->change_works(delta);
}
......@@ -77,7 +79,6 @@ private:
std::set<int> steal_requests_;
_get_steal_request()
void _check_requests(const Id &id);
void _post(const Id &id, const DF &, int req_count);
......
......@@ -9,5 +9,6 @@ enum Tags {
TAG_REQUEST,
TAG_RESPONSE,
TAG_DESTROY,
TAG_PUSH
TAG_PUSH,
TAG_STEAL
};
......@@ -174,7 +174,7 @@ bool CF::migrate(const Locator &loc)
bool CF::check_steal()
{
int steal_rank=rts_->get_steal_req();
if( steal_id!=-1) {
if( steal_rank!=-1) {
dest_rank_=steal_rank;
return true;
}
......
......@@ -306,6 +306,15 @@ void RTS::unexpect_pushes(const Id &cfid)
waiters_.erase(it);
}
void RTS::steal_req()
{
int rank = comm_->rank();
int req_rank = (rank + 1) % comm_->size();
if(req_rank != rank) {
comm_->send(comm_->next_rank(), TAG_STEAL, nullptr, 0);
}
}
int RTS::get_steal_req()
{
std::lock_guard<std::mutex> lk(m_);
......@@ -441,6 +450,12 @@ void RTS::on_recv(int src, int tag, void *buf, size_t size)
operator delete(buf);
break;
}
case TAG_STEAL: {
std::lock_guard <std::mutex> lock(m_);
steal_requests_.insert(src);
operator delete(buf);
break;
}
default:
ABORT("Tag not implemented: " + std::to_string(tag));
}
......@@ -493,36 +508,39 @@ void RTS::_submit(CF *cf)
});
break;
}
case WAIT: {
break;
}
case CONTINUE:
break;
case STEAL:
int dest_rank=cf->dest_rank_;
LOG("Stealing job "+cf->to_string()+" to "+std::to_string(dest_rank))
assert(dest_rank_!=comm_->rank());
size_t size=cf->get_serialization_size();
void *buf=operator new(size);
size_t written=cf->serialize(buf, size);
assert(written==size);
case STEAL: {
int dest_rank = cf->dest_rank_;
LOG("Stealing job " + cf->to_string() + " to " + std::to_string(dest_rank))
assert(dest_rank!=comm_->rank());
size_t size = cf->get_serialization_size();
void *buf = operator new(size);
size_t written = cf->serialize(buf, size);
assert(written == size);
cf->self_.reset();
change_load(1);
LOG("WORKLOAD SEND SENDING");
comm_->send(dest_rank, TAG_CF, buf, size,
[this, buf](){
[this, buf]() {
operator delete(buf);
change_load(-1);
LOG("WORKLOAD SEND SENT");
});
break;
}
case WAIT: {
break;
}
case CONTINUE:
break;
default:
ABORT("Unexpected block ret status: "
+ std::to_string(ret));
}
} while (ret==CONTINUE);
});
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment