Commit e70cb301 authored by a-chmil's avatar a-chmil
Browse files

Added job stealing syntax support

parent b70a935b
......@@ -69,6 +69,8 @@ public:
bool migrate(const Locator &loc);
bool check_steal();
void destroy(const Id &id, const Locator &);
void store(const Id &id, const DF &val);
......
......@@ -56,7 +56,9 @@ public:
void unexpect_pushes(const Id &cfid);
void change_load(int delta) {
int get_steal_req();
void change_load(int delta) {
stopper_->change_works(delta);
}
......@@ -73,6 +75,9 @@ private:
std::map<Id, std::vector<std::pair<Id, DF> > > pushed_;
std::map<Id, std::function<void (const Id &, const DF &)> > waiters_;
std::set<int> steal_requests_;
_get_steal_request()
void _check_requests(const Id &id);
void _post(const Id &id, const DF &, int req_count);
......
......@@ -12,7 +12,8 @@ enum BlockRetStatus
CONTINUE,
MIGRATE,
WAIT,
EXIT
EXIT,
STEAL
};
typedef std::function<BlockRetStatus (CF &)> Block;
......
......@@ -171,6 +171,16 @@ bool CF::migrate(const Locator &loc)
return true;
}
bool CF::check_steal()
{
int steal_rank=rts_->get_steal_req();
if( steal_id!=-1) {
dest_rank_=steal_rank;
return true;
}
return false;
}
void CF::destroy(const Id &id, const Locator &loc)
{
rts_->destroy(id, loc);
......
......@@ -306,6 +306,17 @@ void RTS::unexpect_pushes(const Id &cfid)
waiters_.erase(it);
}
int RTS::get_steal_req()
{
std::lock_guard<std::mutex> lk(m_);
if(!steal_requests_.empty()) {
int rank=*steal_requests_.begin();
steal_requests_.erase(rank);
return rank;
}
return -1;
}
std::string RTS::get_status() const
{
std::ostringstream os;
......@@ -487,6 +498,26 @@ void RTS::_submit(CF *cf)
}
case CONTINUE:
break;
case STEAL:
int dest_rank=cf->dest_rank_;
LOG("Stealing job "+cf->to_string()+" to "+std::to_string(dest_rank))
assert(dest_rank_!=comm_->rank());
size_t size=cf->get_serialization_size();
void *buf=operator new(size);
size_t written=cf->serialize(buf, size);
assert(written==size);
cf->self_.reset();
change_load(1);
LOG("WORKLOAD SEND SENDING");
comm_->send(dest_rank, TAG_CF, buf, size,
[this, buf](){
operator delete(buf);
change_load(-1);
LOG("WORKLOAD SEND SENT");
});
break;
default:
ABORT("Unexpected block ret status: "
+ std::to_string(ret));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment