Commit 30017bbc authored by a-chmil's avatar a-chmil
Browse files

rope init

parent eb707f36
......@@ -19,7 +19,7 @@
mkfile_path := $(abspath $(lastword $(MAKEFILE_LIST)))
mkfile_dir := $(dir $(mkfile_path))
SHELL := /bin/bash
CXX_WARN ?= \
-Wall -Werror -Wpedantic -Wno-vla -Wno-sign-compare \
-Wno-unused-but-set-variable -Wno-unused-variable \
......@@ -50,6 +50,7 @@ export LD_LIBRARY_PATH
default: build_all
test: build_all
source /home2/ssd1/perepelkin/kudinov/env.sh
LUNA_HOME=${LUNA_HOME} ${PYTHON} scripts/run_tests.py tests
fftest: build_all
......
......@@ -15,8 +15,8 @@ C++ sub empty() ${<END}
// do nothing
}END
#define FG_SIZE 40
#define FG_COUNT 10
#define FG_SIZE 200
#define FG_COUNT 20
sub calc_mat(name A, name B, name C, int i, int j, int N)
{
......@@ -24,7 +24,9 @@ sub calc_mat(name A, name B, name C, int i, int j, int N)
for k=0..N-1
{
cf f[i][j][k]: mult_mat(A[i][k], B[k][j], Ctmp[k]);
cf f[i][j][k]: mult_mat(A[i][k], B[k][j], Ctmp[k]) @{
stealable;
};
}
if N>1
......@@ -57,9 +59,7 @@ sub main()
req_count B[i][j]=N;
};
cf calc[i][j]: calc_mat(A, B, C[i][j], i, j, N) @{
stealable;
};
cf calc[i][j]: calc_mat(A, B, C[i][j], i, j, N);
empty() @ {
request C[i][j];
......
#pragma once
#include <functional>
#include <set>
#include "balancer.h"
class AbstractBalancer {
protected:
Balancer *balancer_;
public:
virtual ~AbstractBalancer() {}
AbstractBalancer(Balancer &balancer) : balancer_(&balancer) {}
virtual int calculate_rank(int pos) = 0;
virtual void accept_msg(int src, int tag, void *buf, size_t size) = 0;
virtual std::set<int> get_msg_acceptable_tags() = 0;
virtual void notify_pool_empty() = 0;
virtual void notify_pool_submitted() = 0;
};
#pragma once
#include "comm.h"
#include <utility>
#include <map>
#include "tags.h"
#include "locator.h"
class Balancer {
Comm *comm_;
std::function<unsigned int()> queue_size_;
std::function<void()> DFs_relocator_;
public:
Balancer(Comm &comm,
std::function<unsigned int()> queue_size,
std::function<void()> DFs_relocator);
int rank() const;
int next_rank() const;
int prev_rank() const;
int size();
void send(int dest, int tag, const void *buf, size_t size,
std::function<void()> finisher=nullptr);
unsigned int get_queue_size();
void relocate_DFs();
void bcast(int tag);
};
\ No newline at end of file
......@@ -69,8 +69,6 @@ public:
bool migrate(const Locator &loc);
bool check_steal();
void destroy(const Id &id, const Locator &);
void store(const Id &id, const DF &val);
......@@ -78,6 +76,8 @@ public:
void push(const Id &dfid, const DF &val, const Id &cfid,
const Locator &);
RTS *rts();
void expect_pushes(const Id &cfid);
std::string to_string() const;
......
......@@ -32,6 +32,8 @@ public:
virtual int next_rank() const;
virtual int prev_rank() const;
virtual bool is_root() const noexcept;
virtual void barrier()=0;
......
......@@ -15,13 +15,13 @@
#define STYLE_RED_BOLD "\033[1;31m" // red color with bold text
#define VERSION_RTS "0.6"
#define SHOW_EXEC_TIME if(false)
#define SHOW_EXEC_TIME if(true)
#define TRACE(msg) if (false) { \
printf("TRACE\t%s\n", std::string(msg).c_str()); \
}
#define LOG(msg) if (false) {\
#define LOG(msg) if (true) {\
assert(!_prefix.empty());\
FILE *f = fopen(logfilename.c_str(),"a");\
fprintf(f,"%d | %lf | %s | %s:%d\n", \
......
......@@ -5,7 +5,8 @@
constexpr unsigned int
DEFAULT_WORKER_THREADS_COUNT=4,
DEFAULT_STEAL_PROC_COUNT=2,
DEFAULT_STEAL_PROC_COUNT=4,
DEFAULT_ROPE_LENGTH=20,
DEFAULT_COMM_REQUEST_THREADS_COUNT=1,
DEFAULT_COMM_RECEIVE_THREADS_COUNT=1;
......@@ -32,6 +33,7 @@ public:
std::string get_version() const;
bool dynamic_balance() const;
unsigned int get_rope_length() const noexcept;
unsigned int get_steal_proc_count() const noexcept;
unsigned int get_worker_threads_count() const noexcept;
unsigned int get_comm_request_threads_count() const noexcept;
......
......@@ -8,7 +8,13 @@ class CyclicLocator : public Locator
public:
virtual ~CyclicLocator() {}
CyclicLocator();
CyclicLocator(int pos);
virtual int get_next_rank(Comm &) const noexcept;
virtual size_t get_serialization_size() const;
virtual size_t serialize(void *buf, size_t buf_size) const;
virtual size_t deserialize(const void *buf, size_t buf_size);
virtual std::shared_ptr<Locator> copy() const;
};
#pragma once
#include "common.h"
#include "serializable.h"
#include <memory>
class Comm;
class Locator
enum LocatorType
{
LOCATOR_UNKNOWN = 0,
LOCATOR_CYCLIC,
LOCATOR_ROPE,
LOCATOR_REQUEST
};
class Locator : virtual public Serializable
{
LocatorType type_;
public:
Locator() : type_(LOCATOR_UNKNOWN) {}
Locator(LocatorType type) : type_(type) {}
virtual ~Locator() {}
virtual int get_next_rank(Comm &) const=0;
virtual int get_next_rank(Comm &) const = 0;
virtual LocatorType get_type() const { return type_; };
virtual size_t get_serialization_size() const = 0;
virtual size_t serialize(void *buf, size_t buf_size) const = 0;
virtual size_t deserialize(const void *buf, size_t buf_size) = 0;
virtual std::shared_ptr<Locator> copy() const = 0;
};
#pragma once
#include "thread_pool.h"
#include "balancer.h"
#include "abstract_balancer.h"
#include <atomic>
#include <mutex>
#include <thread>
#include <utility>
#include "config.h"
class RequestBalancer : public AbstractBalancer {
mutable std::mutex m_;
bool need_jobs_;
std::set<int> steal_requests_;
const Config *conf_;
std::thread request_thread_;
std::atomic<bool> stop_flag_;
void steal_req(int tag, unsigned int proc_count);
void thread_executor();
void stop();
public:
RequestBalancer(Balancer &, const Config &);
virtual int calculate_rank(int pos);
virtual void accept_msg(int src, int tag, void *buf, size_t size);
virtual std::set<int> get_msg_acceptable_tags();
virtual void notify_pool_empty();
virtual void notify_pool_submitted();
};
#pragma once
#include "locator.h"
#include "rts.h"
class RequestLocator : public Locator
{
RTS *rts_;
public:
virtual ~RequestLocator() {}
RequestLocator(RTS *);
virtual int get_next_rank(Comm &) const noexcept;
virtual size_t get_serialization_size() const;
virtual size_t serialize(void *buf, size_t buf_size) const;
virtual size_t deserialize(const void *buf, size_t buf_size);
virtual std::shared_ptr<Locator> copy() const;
};
\ No newline at end of file
#pragma once
#include "thread_pool.h"
#include "balancer.h"
#include "abstract_balancer.h"
#include <atomic>
#include <mutex>
#include <thread>
#include <utility>
#include "config.h"
class RopeBalancer : public AbstractBalancer
{
mutable std::mutex m_;
std::thread ropeBalancer_thread_;
std::atomic<bool> stop_flag_;
// rope
std::pair<unsigned int, unsigned int> rope_borders_;
const Config *conf_;
void thread_executor();
void accept_rope_rebalance_req(unsigned int neigh_left_border,
unsigned int neigh_queue_size);
void change_right_border(unsigned int new_right_border);
void stop();
public:
RopeBalancer(Balancer &, const Config &);
virtual int calculate_rank(int pos);
virtual void accept_msg(int src, int tag, void *buf, size_t size);
virtual std::set<int> get_msg_acceptable_tags();
virtual void notify_pool_empty() {};
virtual void notify_pool_submitted() {};
};
#pragma once
#include "locator.h"
#include "rts.h"
class RopeLocator : public Locator
{
RTS *rts_;
int pos_;
public:
virtual ~RopeLocator() {}
RopeLocator(RTS *);
RopeLocator(RTS *, int pos);
virtual int get_next_rank(Comm &) const noexcept;
virtual size_t get_serialization_size() const;
virtual size_t serialize(void *buf, size_t buf_size) const;
virtual size_t deserialize(const void *buf, size_t buf_size);
virtual std::shared_ptr<Locator> copy() const;
};
\ No newline at end of file
......@@ -4,7 +4,13 @@
#include <map>
#include <mutex>
#include <set>
#include <utility>
#include <tuple>
#include "rope_balancer.h"
#include "request_balancer.h"
#include "locator.h"
#include "abstract_balancer.h"
#include "config.h"
#include "df.h"
#include "id.h"
......@@ -28,6 +34,8 @@ class RTS
std::mutex m_;
std::condition_variable cv_;
bool need_jobs_;
std::map<LocatorType, AbstractBalancer*> balancers_;
std::map<int, LocatorType> messageRouting_;
public:
virtual ~RTS();
......@@ -39,6 +47,8 @@ public:
void submit(CF *);
void relocate_DFs();
const std::string &argv(unsigned int n);
void post(const Id &id, const DF &val, const Locator &loc,
......@@ -57,33 +67,30 @@ public:
void unexpect_pushes(const Id &cfid);
int get_steal_req();
void steal_req(int tag, unsigned int proc_count);
void change_load(int delta) {
stopper_->change_works(delta);
}
std::string get_status() const;
int calculate_rank(int pos, LocatorType type);
private:
std::shared_ptr<Locator> _get_locator(LocatorType type);
void on_recv(int src, int tag, void *buf, size_t);
void _submit(CF *);
std::map<Id, std::pair<int, DF> > posts_;
std::map<Id, std::vector<std::function<void (const DF &)> > > requests_;
std::set<Id> destroys_;
std::map<Id, std::tuple<std::shared_ptr<Locator>, int, DF> > posts_;
std::map<Id, std::vector<std::pair<std::shared_ptr<Locator>, std::function<void (const DF &)> > > > requests_;
std::map<Id, std::shared_ptr<Locator> > destroys_;
std::map<Id, std::vector<std::pair<Id, DF> > > pushed_;
std::map<Id, std::vector<std::tuple<std::shared_ptr<Locator>, Id, DF> > > pushed_;
std::map<Id, std::function<void (const Id &, const DF &)> > waiters_;
std::set<int> steal_requests_;
void _check_requests(const Id &id);
void _post(const Id &id, const DF &, int req_count);
void _request(const Id &id, std::function<void (const DF &)> cb);
void _destroy(const Id &id);
void _push(const Id &dfid, const DF &val, const Id &cfid);
void _post(const Id &id, const DF &, std::shared_ptr<Locator>, int req_count);
void _request(const Id &id, std::shared_ptr<Locator>, std::function<void (const DF &)> cb);
void _destroy(const Id &id, std::shared_ptr<Locator>);
void _push(const Id &dfid, const DF &val, const Id &cfid, std::shared_ptr<Locator>);
};
......@@ -11,5 +11,7 @@ enum Tags {
TAG_DESTROY,
TAG_PUSH,
TAG_STEAL,
TAG_STEAL_REVOKE
TAG_STEAL_REVOKE,
TAG_ROPE_REBALANCE,
TAG_ROPE_REBALANCE_ANSWER
};
......@@ -28,6 +28,8 @@ public:
bool has_jobs();
unsigned int queue_size();
virtual std::string to_string() const;
private:
......
......@@ -5,6 +5,8 @@
#include "cf.h"
#include "cyclic_locator.h"
#include "rope_locator.h"
#include "request_locator.h"
enum BlockRetStatus
{
......@@ -12,8 +14,7 @@ enum BlockRetStatus
CONTINUE,
MIGRATE,
WAIT,
EXIT,
STEAL
EXIT
};
typedef std::function<BlockRetStatus (CF &)> Block;
......
......@@ -294,6 +294,46 @@ class LocatorCyclicMap(Locator):
asm+=prefix('\t', self.Expr.gen(regs))
return asm
class LocatorRopeExpr(Locator):
def __init__(self, expr):
self.Expr=expr
def gen(self, regs):
asm=''
asm+='ROPE\n'
asm+=prefix('\t', self.Expr.gen(regs))
return asm
class LocatorRopeMap(Locator):
def __init__(self, expr, params):
self.Expr=expr
def gen(self, regs):
asm=''
asm+='ROPE\n'
asm+=prefix('\t', self.Expr.gen(regs))
return asm
class LocatorReqExpr(Locator):
def __init__(self, expr):
self.Expr=expr
def gen(self, regs):
asm=''
asm+='REQ\n'
asm+=prefix('\t', self.Expr.gen(regs))
return asm
class LocatorReqMap(Locator):
def __init__(self, expr, params):
self.Expr=expr
def gen(self, regs):
asm=''
asm+='REQ\n'
asm+=prefix('\t', self.Expr.gen(regs))
return asm
class ExternArgument:
def __init__(self, param, arg, regs, comment):
self.Param=param
......@@ -855,7 +895,6 @@ class Bi(Scope):
self.Pushes=[]
self.Deletes=[]
self.Setdfs=[]
self.isStealable = False
self.j_outargs=list()
for rule in self.j.get('rules', []):
......@@ -900,10 +939,6 @@ class Bi(Scope):
elif rule['ruletype']=='enum' and rule['property']=='delete':
for df in rule['items']:
self.Deletes.append(ExprIdRef(df, self.Regs))
elif rule['ruletype']=='flags':
for df in rule['flags']:
if df == 'stealable':
self.isStealable = True
def is_requested(self, id, preliminary=False):
if preliminary:
return False
......@@ -1005,6 +1040,14 @@ class Bi(Scope):
and rule['property']=='locator_cyclic':
self.set_locator(LocatorCyclicExpr(
create_expr(rule['expr'], self.Regs)))
if rule['ruletype']=='expr' \
and rule['property']=='rope_cyclic':
self.set_locator(LocatorRopeExpr(
create_expr(rule['expr'], self.Regs)))
if rule['ruletype']=='expr' \
and rule['property']=='stealable':
self.set_locator(LocatorReqExpr(
create_expr(rule['expr'], self.Regs)))
def gen_early_init(self):
return ''
......@@ -1459,15 +1502,11 @@ class BiExecExtern(BiExec):
def gen_exec(self):
asm=''
if self.isStealable == True:
asm+='\tCHECK_STEAL\n'
asm+='\tEXEC %s\n' % self.CodeName
for a in self.Args:
asm+=prefix('\t\t', a.gen(self.Regs))
asm+='\t\tEOS ; (%s args)\n' % self.CodeName
asm+='\n'
if self.isStealable == True:
asm+= self.gen_migrate()
return asm
class BiExecStruct(BiExec):
......@@ -1538,6 +1577,44 @@ class Sub(Scope):
subst=params))
return LocatorCyclicExpr(create_expr(rule['expr'], regs,
**kwargs))
if rule['ruletype']=='map' \
and rule['property']=='locator_rope' \
and id.Name==rule['id'][0]:
if id.Indices or rule['id'][1:]:
if len(id.Indices)!=len(rule['id'][1:]):
print(id)
R(rule['id'])
params=kwargs.get('subst', {})
for i in range(len(rule['id'][1:])):
param=rule['id'][1:][i]
assert param['type']=='id'
assert not param['ref'][1:]
param_name=param['ref'][0]
assert param_name not in params
params[param_name]=id.Indices[i]
return LocatorRopeExpr(create_expr(rule['expr'], regs,
subst=params))
return LocatorRopeExpr(create_expr(rule['expr'], regs,
**kwargs))
if rule['ruletype']=='map' \
and rule['property']=='stealable' \
and id.Name==rule['id'][0]:
if id.Indices or rule['id'][1:]:
if len(id.Indices)!=len(rule['id'][1:]):
print(id)
R(rule['id'])
params=kwargs.get('subst', {})
for i in range(len(rule['id'][1:])):
param=rule['id'][1:][i]
assert param['type']=='id'
assert not param['ref'][1:]
param_name=param['ref'][0]
assert param_name not in params
params[param_name]=id.Indices[i]
return LocatorReqExpr(create_expr(rule['expr'], regs,
subst=params))
return LocatorReqExpr(create_expr(rule['expr'], regs,
**kwargs))
raise NotImplementedError('no locator?', id)
class SubExtern(Sub):
......@@ -1582,7 +1659,6 @@ class SubStruct(Sub):
def parse_locators(self):
for rule in self.j.get('rules', []):
if rule['ruletype']=='map' \
......@@ -1590,6 +1666,16 @@ class SubStruct(Sub):
for idx in rule['id'][1:]:
assert idx['type']=='id' and not idx['ref'][1:]
self.Regs.add_locator(rule['id'], rule['expr'])
if rule['ruletype']=='map' \
and rule['property']=='locator_rope':
for idx in rule['id'][1:]:
assert idx['type']=='id' and not idx['ref'][1:]
self.Regs.add_locator(rule['id'], rule['expr'])
if rule['ruletype']=='map' \
and rule['property']=='stealable':