Commit 3129beda authored by Vladislav Perepelkin's avatar Vladislav Perepelkin
Browse files

Merge branch 'JobStealing' into 'master'

Job stealing

See merge request luna/lo3!29
parents df5b915e 6a627f4e
......@@ -57,7 +57,9 @@ sub main()
req_count B[i][j]=N;
};
cf calc[i][j]: calc_mat(A, B, C[i][j], i, j, N);
cf calc[i][j]: calc_mat(A, B, C[i][j], i, j, N) @{
stealable;
};
empty() @ {
request C[i][j];
......
......@@ -69,6 +69,8 @@ public:
bool migrate(const Locator &loc);
bool check_steal();
void destroy(const Id &id, const Locator &);
void store(const Id &id, const DF &val);
......
......@@ -5,6 +5,7 @@
constexpr unsigned int
DEFAULT_WORKER_THREADS_COUNT=4,
DEFAULT_STEAL_PROC_COUNT=2,
DEFAULT_COMM_REQUEST_THREADS_COUNT=1,
DEFAULT_COMM_RECEIVE_THREADS_COUNT=1;
......@@ -29,7 +30,9 @@ public:
std::string get_help() const;
std::string get_version() const;
bool dynamic_balance() const;
unsigned int get_steal_proc_count() const noexcept;
unsigned int get_worker_threads_count() const noexcept;
unsigned int get_comm_request_threads_count() const noexcept;
unsigned int get_comm_recv_threads_count() const noexcept;
......@@ -38,4 +41,5 @@ private:
RunMode mode_;
std::string fp_path_;
std::vector<std::string> argv_;
bool dynamic_balance_;
};
......@@ -27,6 +27,7 @@ class RTS
ThreadPool pool_;
std::mutex m_;
std::condition_variable cv_;
bool need_jobs_;
public:
virtual ~RTS();
......@@ -56,7 +57,11 @@ public:
void unexpect_pushes(const Id &cfid);
void change_load(int delta) {
int get_steal_req();
void steal_req(int tag, unsigned int proc_count);
void change_load(int delta) {
stopper_->change_works(delta);
}
......@@ -73,6 +78,8 @@ private:
std::map<Id, std::vector<std::pair<Id, DF> > > pushed_;
std::map<Id, std::function<void (const Id &, const DF &)> > waiters_;
std::set<int> steal_requests_;
void _check_requests(const Id &id);
void _post(const Id &id, const DF &, int req_count);
......
......@@ -9,5 +9,7 @@ enum Tags {
TAG_REQUEST,
TAG_RESPONSE,
TAG_DESTROY,
TAG_PUSH
TAG_PUSH,
TAG_STEAL,
TAG_STEAL_REVOKE
};
......@@ -22,6 +22,12 @@ public:
// add job
void submit(std::function<void()>);
void on_empty(std::function<void()>);
void on_submit(std::function<void()>);
bool has_jobs();
virtual std::string to_string() const;
private:
......@@ -29,6 +35,8 @@ private:
std::condition_variable cv_;
std::vector<std::thread*> threads_;
std::queue<std::function<void()> > jobs_;
std::function<void()> on_empty_handler_;
std::function<void()> on_submit_handler_;
size_t running_jobs_;
bool stop_flag_;
......
......@@ -12,7 +12,8 @@ enum BlockRetStatus
CONTINUE,
MIGRATE,
WAIT,
EXIT
EXIT,
STEAL
};
typedef std::function<BlockRetStatus (CF &)> Block;
......
......@@ -23,7 +23,7 @@ DESCRIPTION
OPTIONS
--help
Show this help and exit
'''
BLOCKH='// %s\nBlockRetStatus block_%d(CF &self)\n{\n'
......@@ -78,7 +78,7 @@ def get_sub_block(sub_name):
pos=sub['begin']
sub_blocks[sub_name]=gen_id(pos, 'sub %s' % sub_name)
return sub_blocks[sub_name]
def create_scope(j, parent):
scope={
......@@ -125,17 +125,17 @@ def resolve_id(name, scope):
raise UnresolvedName(name)
if name in scope['names']:
return scope['names'][name]
if name in scope['dfs'] or name in scope['cfs']:
id='_id_%d' % len(scope['create_ids'])
scope['names'][name]=id
scope['create_ids'].append((id, name))
return resolve_id(name, scope)
if is_base_sub_param(name, scope['j']):
raise UnresolvedName('Attempting to access basic type as name',
name)
res=resolve_id(name, scope['parent'])
arg=len(scope['ids'])
if type(res)!=str:
......@@ -150,7 +150,7 @@ def resolve_value(name, scope):
raise UnresolvedName(name)
if name in scope['values']:
return scope['values'][name]
try:
res=resolve_id(name, scope)
scope['values'][name]={
......@@ -160,7 +160,7 @@ def resolve_value(name, scope):
return resolve_value(name, scope)
except UnresolvedName:
pass
res=resolve_value(name, scope['parent'])
arg=len(scope['args'])
scope['args'].append(res['snip'])
......@@ -270,7 +270,7 @@ def gen_formula(x, scope, dest_value_type):
% (ops[0], x['type'], ops[1])
else:
R(x)
for op in x['operands']:
xt=expr_type(op, scope)
......@@ -371,7 +371,7 @@ def value_real(x, scope):
% ref1(x['ref'], scope)
else:
R(x, x['type'])
raise Exception("this is unreachable?")
t=expr_type(x, scope)
if t=='int':
......@@ -468,9 +468,18 @@ def gen_migrate(scope, ja):
res+='\t\treturn MIGRATE;\n\t}\n\n'
else:
raise Exception('Cannot migrate', ja.get('id'), ja['type'])
return res
def gen_stealable(scope, ja):
'''Generate check_steal'''
the_code = ''
for rule in ja.get('rules', []):
if rule['ruletype'] == 'flags':
subflags = rule.get('flags', [])
if len(subflags) == 1 and subflags[0] == "stealable":
the_code += '\t' + 'if (self.check_steal()) {' + '\n\t\t' + 'return STEAL;' + '\n\t' + '}' + '\n\n'
return the_code
def parse_args(scope, ja):
# bind names to sub arguments
......@@ -526,7 +535,7 @@ def gen_exec_extern(scope, ja):
cpp_head+='\t{\n'
args=[]
outs=[]
assert len(ja['args'])==len(sub['args'])
for i in range(len(ja['args'])):
arg=ja['args'][i]
......@@ -575,7 +584,7 @@ def gen_exec_extern(scope, ja):
cpp_afterstores+='\t\t\tself.store(%s, stored);\n' % r
cpp_afterstores+='\t\t}\n'
cpp_tail+='\t}\n\n'
return cpp_head+cpp_alloc_outs+cpp_body+cpp_afterstores+cpp_tail
def gen_exec_struct(scope, ja):
......@@ -588,7 +597,7 @@ def gen_exec_struct(scope, ja):
if ADD_DEBUG_INFO:
res+='\t\tchild->DBG_INFO="%s";\n' % quote(repr_exec(ja))
assert len(ja['args'])==len(sub['args'])
argc, idc=0, 0
for i in range(len(ja['args'])):
......@@ -766,7 +775,7 @@ def gen_requests_preliminary(scope, ja, next_block_info):
assert not rw_rules
return requests
def gen_requests(scope, ja, next_block_info):
'''Can breake the block. Starts and ends in open blocks'''
requests=''
......@@ -778,7 +787,7 @@ def gen_requests(scope, ja, next_block_info):
for r in ja.get('rules', []):
if r['ruletype']=='enum' and r['property']=='wait':
rw_rules.append(r)
# Requests
for r in ja.get('rules', []):
......@@ -858,11 +867,11 @@ def gen_requests(scope, ja, next_block_info):
expect=''
if has_waits:
expect+='\tself.expect_pushes(%s);\n\n' % ref1(ja['id'], scope)
assert not rw_rules
return expect+requests+waits
def gen_bi_exec(ja, parent_scope):
# { migrate} {xp/req}{wait} {exec; afterstuff}
scope=create_scope(ja, parent_scope)
......@@ -875,18 +884,19 @@ def gen_bi_exec(ja, parent_scope):
'comment': 'migrate'
})
cpp+=gen_migrate(scope, ja)
cpp+=gen_stealable(scope, ja)
cpp+=gen_requests(scope, ja, {
'pos': ja['begin'],
'comment': 'execute %s' % repr_ja(ja)
})
if gja[ja['code']]['type']=='struct':
cpp+=gen_exec_struct(scope, ja)
elif gja[ja['code']]['type']=='extern':
cpp+=gen_exec_extern(scope, ja)
else:
R()
cpp+=gen_afterstuff(scope, ja)
cpp+='\treturn EXIT;\n'
......@@ -933,7 +943,7 @@ def gen_bi_for(ja, parent_scope):
scope['values']=json.loads(old_values)
cpp+=gen_afterstuff(scope, ja)
cpp+='\treturn EXIT;\n'
cpp+='}\n\n'
cpp+=cpp_bodyitems
......@@ -1019,7 +1029,7 @@ def gen_bi_if(ja, parent_scope):
cpp+='\t} // if\n'
cpp+=gen_afterstuff(scope, ja)
cpp+='\treturn EXIT;\n'
cpp+='}\n\n'
cpp+=cpp_bodyitems
......@@ -1108,7 +1118,7 @@ if __name__=='__main__':
sys.stderr.write('%s: invalid arguments count, see --help\n' \
% _prog)
sys.exit(1)
try:
for arg in sys.argv[4:]:
if arg=='--add-debug-info':
......@@ -1117,7 +1127,7 @@ if __name__=='__main__':
sys.stderr.write("%s: key not recognized: '%s', see --help\n"\
% (_prog, arg))
sys.exit(1)
gja=json.loads(open(sys.argv[1]).read())
gja['main']['name']='main'
main_id=gen_id(gja['main']['begin'],
......@@ -1146,7 +1156,7 @@ if __name__=='__main__':
mid+=', '.join(args)
mid+=');\n\n'
mid+='\treturn EXIT;\n'
continue
bid, cpp=gen_struct(sub)
......@@ -1164,7 +1174,7 @@ if __name__=='__main__':
mid+='}\n\n'
if gid<200:
tail='extern "C" void init_blocks(BlocksAppender add)\n{\n'
......
......@@ -44,6 +44,9 @@ OPTIONS
-g
Include debug information (may reduce performance)
-b
Enable dynamic balance.
-O0
-O
Specify optimization level. -O0 optimizes compilation time at
......@@ -119,6 +122,7 @@ def parse_args(args):
conf['DEBUG']=False
conf['CLEANUP']=True
conf['TIME']=False
conf['BALANCE']=False
VERBOSE_FLAG=False
COMPILE_ONLY_FLAG=False
......@@ -155,6 +159,8 @@ def parse_args(args):
conf['CLEANUP']=False
elif arg=='-t':
conf['TIME']=True
elif arg=='-b':
conf['BALANCE']=True
else:
if arg[cur].startswith('-'):
warn(1, "suspicious program name: '%s' (mistyped a key?)" \
......@@ -681,6 +687,8 @@ def main():
rts='rts.dbg' if conf['DEBUG'] else 'rts'
cmd=[os.path.join(conf['LUNA_HOME'], 'bin', rts),
os.path.join(conf['BUILD_DIR'], 'libucodes.so')] + conf['ARGV']
if conf['BALANCE']:
cmd+='-b'
env=dict(os.environ)
env['LD_LIBRARY_PATH']=env.get('LD_LIBRARY_PATH', '') \
+ ':' + os.path.join(conf['LUNA_HOME'], 'lib')
......
......@@ -171,6 +171,16 @@ bool CF::migrate(const Locator &loc)
return true;
}
bool CF::check_steal()
{
int steal_rank=rts_->get_steal_req();
if( steal_rank!=-1) {
dest_rank_=steal_rank;
return true;
}
return false;
}
void CF::destroy(const Id &id, const Locator &loc)
{
rts_->destroy(id, loc);
......
......@@ -6,7 +6,7 @@
#include "common.h"
Config::Config(int argc, char **argv)
: mode_(UNSET)
: mode_(UNSET), dynamic_balance_(false)
{
assert(argc>=1);
program_name_=argv[0];
......@@ -16,20 +16,23 @@ Config::Config(int argc, char **argv)
while (i<argc) {
std::string arg(argv[i]);
if (arg=="--help") {
mode_=HELP;
break;
} else if (arg=="--version") {
mode_=VERSION;
break;
} else {
if (mode_==UNSET) {
mode_=NORMAL;
fp_path_=arg;
} else {
argv_.push_back(arg);
}
}
if (arg=="--help") {
mode_=HELP;
break;
} else if (arg=="--version") {
mode_=VERSION;
break;
} else if (arg=="-b"){
dynamic_balance_ = true;
} else {
if (mode_==UNSET) {
mode_=NORMAL;
fp_path_=arg;
} else {
argv_.push_back(arg);
}
}
i++;
}
......@@ -87,6 +90,15 @@ std::string Config::get_version() const
return os.str();
}
bool Config::dynamic_balance() const
{
return dynamic_balance_;
}
unsigned int Config::get_steal_proc_count() const noexcept {
return DEFAULT_STEAL_PROC_COUNT;
}
unsigned int Config::get_worker_threads_count() const noexcept
{
return DEFAULT_WORKER_THREADS_COUNT;
......
......@@ -21,7 +21,7 @@ RTS::~RTS()
}
RTS::RTS(Comm &comm, const Config &conf, const FP &fp)
: comm_(&comm), conf_(&conf), fp_(&fp)
: comm_(&comm), conf_(&conf), fp_(&fp), need_jobs_(false)
{
comm_->set_handler([this](int from, int tag, void *buf, size_t size) {
on_recv(from, tag, buf, size);
......@@ -38,7 +38,25 @@ RTS::RTS(Comm &comm, const Config &conf, const FP &fp)
comm_->bcast(TAG_STOP);
}
);
comm_->barrier();
if(conf.dynamic_balance()) {
unsigned int proc_count = conf_->get_steal_proc_count();
pool_.on_empty([this, proc_count]() {
if (!need_jobs_) {
need_jobs_ = true;
steal_req(TAG_STEAL, proc_count);
}
});
pool_.on_submit([this, proc_count]() {
if (need_jobs_) {
need_jobs_ = false;
steal_req(TAG_STEAL_REVOKE, proc_count);
}
});
}
comm_->barrier();
}
int RTS::run()
......@@ -46,7 +64,6 @@ int RTS::run()
double start_time=wtime();
comm_->barrier();
std::unique_lock<std::mutex> lk(m_);
pool_.start(conf_->get_worker_threads_count());
finished_flag_=false;
......@@ -306,6 +323,29 @@ void RTS::unexpect_pushes(const Id &cfid)
waiters_.erase(it);
}
void RTS::steal_req(int tag, unsigned int proc_count)
{
if(comm_->size() - 1 <= proc_count) {
comm_->bcast(tag);
} else {
int rank = comm_->rank();
for(int i = 0; i < proc_count; i++) {
comm_->send((i+rank)%comm_->size(), tag, nullptr, 0);
}
}
}
int RTS::get_steal_req()
{
std::lock_guard<std::mutex> lk(m_);
if(pool_.has_jobs() && !steal_requests_.empty()) {
int rank=*steal_requests_.begin();
steal_requests_.erase(rank);
return rank;
}
return -1;
}
std::string RTS::get_status() const
{
std::ostringstream os;
......@@ -430,6 +470,26 @@ void RTS::on_recv(int src, int tag, void *buf, size_t size)
operator delete(buf);
break;
}
case TAG_STEAL: {
if(src==comm_->rank()){
return;
}
std::lock_guard <std::mutex> lock(m_);
LOG("Steal request from " + std::to_string(src))
steal_requests_.insert(src);
operator delete(buf);
break;
}
case TAG_STEAL_REVOKE: {
if(src==comm_->rank()){
return;
}
std::lock_guard <std::mutex> lock(m_);
LOG("Steal revoke request from " + std::to_string(src))
steal_requests_.erase(src);
operator delete(buf);
break;
}
default:
ABORT("Tag not implemented: " + std::to_string(tag));
}
......@@ -482,6 +542,27 @@ void RTS::_submit(CF *cf)
});
break;
}
case STEAL: {
int dest_rank = cf->dest_rank_;
LOG("Stealing job " + cf->to_string() + " to " + std::to_string(dest_rank))
assert(dest_rank!=comm_->rank());
size_t size = cf->get_serialization_size();
void *buf = operator new(size);
size_t written = cf->serialize(buf, size);
assert(written == size);
cf->self_.reset();
change_load(1);
LOG("WORKLOAD SEND SENDING");
comm_->send(dest_rank, TAG_CF, buf, size,
[this, buf]() {
operator delete(buf);
change_load(-1);
LOG("WORKLOAD SEND SENT");
});
break;
}
case WAIT: {
break;
}
......
......@@ -5,7 +5,7 @@
#include "common.h"
ThreadPool::ThreadPool()
: running_jobs_(0), stop_flag_(false)
: on_empty_handler_([](){}), on_submit_handler_([](){}), running_jobs_(0), stop_flag_(false)
{
stop();
}
......@@ -13,7 +13,6 @@ ThreadPool::ThreadPool()
void ThreadPool::start(size_t threads_num)
{
std::lock_guard<std::mutex> lk(m_);
if (stop_flag_) {
throw std::runtime_error("start while stopping ThreadPool");
}
......@@ -28,7 +27,6 @@ void ThreadPool::start(size_t threads_num)
void ThreadPool::stop()
{
std::unique_lock<std::mutex> lk(m_);
if (stop_flag_) {
throw std::runtime_error("stop while stopping ThreadPool");
}
......@@ -55,28 +53,46 @@ void ThreadPool::stop()
void ThreadPool::submit(std::function<void()> job)
{
std::lock_guard<std::mutex> lk(m_);
jobs_.push(job);
on_submit_handler_();
cv_.notify_one();
}
void ThreadPool::on_empty(std::function<void()> on_empty_handler)
{
std::lock_guard<std::mutex> lk(m_);
on_empty_handler_ = on_empty_handler;
}
void ThreadPool::on_submit(std::function<void()> on_submit_handler)
{
std::lock_guard<std::mutex> lk(m_);
on_submit_handler_ = on_submit_handler;
}
std::string ThreadPool::to_string() const
{
std::lock_guard<std::mutex> lk(m_);
return std::to_string(threads_.size()) + "Th "
+ std::to_string(jobs_.size()) + "Jb "
+ std::to_string(running_jobs_) + " RJ "
+ (stop_flag_? "S": "");
}