Commit 4d63e92d authored by Vladislav Perepelkin's avatar Vladislav Perepelkin
Browse files

Merge branch 'tracking_cores_occupancy' into 'master'

Add output cpu occupancy information

See merge request luna/luna!51
parents 11df5719 ab656817
......@@ -15,8 +15,12 @@
#define STYLE_RED_BOLD "\033[1;31m" // red color with bold text
#define VERSION_RTS "0.6"
#define SHOW_EXEC_TIME if(false)
// Uncomment this define to get cores.txt file after LuNA run
//#define CORES_OCCUPANCY
#define TRACE(msg) if (false) { \
printf("TRACE\t%s\n", std::string(msg).c_str()); \
}
......@@ -37,6 +41,16 @@
__FILE__, __LINE__);\
}
#define INFO(msg) if (true) { \
printf("INFO: \t%s\n", msg.c_str()); \
}
#define SHOW_CORES(msg) {\
FILE *f = fopen("cores.txt","a");\
fprintf(f, "%d | %s", mpi_rank, std::string(msg).c_str());\
fclose(f);\
}
#define WARN(msg) {\
assert(!_prefix.empty());\
fprintf(stderr, "%s " STYLE_YELLOW_BOLD "WARNING: " STYLE_RESET STYLE_BOLD " %s " STYLE_RESET \
......
......@@ -31,6 +31,7 @@ public:
std::string get_help() const;
std::string get_version() const;
bool dynamic_balance() const;
bool cpu_usage_info() const;
unsigned int get_steal_proc_count() const noexcept;
unsigned int get_worker_threads_count() const noexcept;
......@@ -42,5 +43,6 @@ private:
std::string fp_path_;
std::vector<std::string> argv_;
bool dynamic_balance_;
bool cpu_usage_info_ = false;
unsigned int worker_threads_count_=DEFAULT_WORKER_THREADS_COUNT;
};
......@@ -12,6 +12,7 @@ class MpiComm : public Comm
MPI_Comm comm_;
ThreadPool req_;
std::vector<std::thread *> recv_threads_;
int rank_, size_;
public:
......
......@@ -5,6 +5,13 @@
#include <mutex>
#include <queue>
#include <thread>
#include <unordered_map>
enum ThreadPoolType {
WORKERS_POOL,
COMM_RECEIVE_POOL,
COMM_REQUEST_POOL
};
class ThreadPool
{
......@@ -13,7 +20,7 @@ public:
// start (more) threads
// Note: must not start while stop() is in progress
void start(size_t threads_num=1);
void start(ThreadPoolType type, size_t threads_num, bool cpu_usage_info);
// request threads stop and join them (also waits for the queue to
// become empty)
......@@ -34,11 +41,21 @@ private:
mutable std::mutex m_;
std::condition_variable cv_;
std::vector<std::thread*> threads_;
std::vector<bool> cpu_usage_flags_;
std::queue<std::function<void()> > jobs_;
std::function<void()> on_empty_handler_;
std::function<void()> on_submit_handler_;
std::unordered_map<size_t, size_t> cores_occupancy_;
size_t running_jobs_;
bool stop_flag_;
ThreadPoolType type_;
void routine();
std::unordered_map<int, std::string> types_to_strings_ = {
{WORKERS_POOL, "Workers thread pool"},
{COMM_RECEIVE_POOL, "Comm receive thread pool"},
{COMM_REQUEST_POOL, "Comm request thread pool"}
};
};
......@@ -36,16 +36,19 @@ OPTIONS
--disable-warning=<NUM>
Do not show warnings with given number.
-u
Display information flags about cpu usage by all thread pools
-q
--quiet
Do not show progress info.
Do not show progress info.
-g
Include debug information (may reduce performance)
-b
Enable dynamic balance.
Enable dynamic balance.
-O0
......@@ -92,7 +95,7 @@ ENVIRONMENT
precedence over them (if appropirate).
LUNA_HOME
Path to LuNA home directory. If unset, LUNA_HOME is set to
Path to LuNA home directory. If unset, LUNA_HOME is set to
'<script_dir>/..'.
PYTHON
......@@ -108,7 +111,7 @@ ENVIRONMENT
C++ linker flags to link program sources.
LUNA_NO_CLEANUP
If this variable is present, then temporary build directory
If this variable is present, then temporary build directory
will not be removed in the end (same as --no-cleanup option).
REPORTING BUGS
......
......@@ -22,7 +22,10 @@ Config::Config(int argc, char **argv)
} else if (arg=="--version") {
mode_=VERSION;
break;
} else if (arg=="-b") {
} else if (arg == "-u") {
cpu_usage_info_ = true;
}
else if (arg=="-b") {
dynamic_balance_ = true;
} else if (arg.rfind("--worker-threads-count=", 0)==0) {
try
......@@ -111,6 +114,11 @@ bool Config::dynamic_balance() const
return dynamic_balance_;
}
bool Config::cpu_usage_info() const
{
return cpu_usage_info_;
}
unsigned int Config::get_steal_proc_count() const noexcept {
return DEFAULT_STEAL_PROC_COUNT;
}
......
......@@ -31,7 +31,7 @@ MpiComm::MpiComm(MPI_Comm comm, const Config &conf)
MPI_Comm_rank(comm_, &rank_);
MPI_Comm_size(comm_, &size_);
req_.start(conf.get_comm_request_threads_count());
req_.start(COMM_REQUEST_POOL, conf.get_comm_request_threads_count(), conf.cpu_usage_info());
for (auto i=0u; i<conf.get_comm_recv_threads_count(); i++) {
recv_threads_.push_back(new std::thread([this](){
......
......@@ -64,7 +64,7 @@ int RTS::run()
double start_time=wtime();
comm_->barrier();
std::unique_lock<std::mutex> lk(m_);
pool_.start(conf_->get_worker_threads_count());
pool_.start(WORKERS_POOL, conf_->get_worker_threads_count(), conf_->cpu_usage_info());
finished_flag_=false;
......
#include "thread_pool.h"
#include <cassert>
#include <sched.h>
#include <sstream>
#include "common.h"
......@@ -10,12 +12,16 @@ ThreadPool::ThreadPool()
stop();
}
void ThreadPool::start(size_t threads_num)
void ThreadPool::start(ThreadPoolType type, size_t threads_num, bool cpu_usage_info)
{
std::lock_guard<std::mutex> lk(m_);
if (stop_flag_) {
throw std::runtime_error("start while stopping ThreadPool");
}
type_ = type;
if (cpu_usage_info) {
cpu_usage_flags_.resize(std::thread::hardware_concurrency());
}
for (auto i=0u; i<threads_num; i++) {
threads_.push_back(new std::thread([this](){
......@@ -47,6 +53,40 @@ void ThreadPool::stop()
lk.lock();
}
if (!cpu_usage_flags_.empty()) {
std::ostringstream cpu_usage_stream;
cpu_usage_stream << types_to_strings_[type_] << " (cpu usage): [\t";
for (int i = 0; i < cpu_usage_flags_.size(); ++i) {
cpu_usage_stream << i << " cpu" << " : " << (cpu_usage_flags_[i] ? "yes" : "no") << "\t";
}
cpu_usage_stream << "]\n";
INFO(cpu_usage_stream.str());
}
#ifdef CORES_OCCUPANCY
if (cores_occupancy_.size() > 0) {
size_t counter = 0;
for (const auto &kv : cores_occupancy_) {
const auto freq = kv.second;
counter += freq;
}
std::ostringstream out;
out << std::fixed;
out.precision(5);
out << types_to_strings_[type_];
out << " [\t";
for (const auto &kv : cores_occupancy_) {
const auto cpu = kv.first;
const auto freq = kv.second;
out << cpu << " cpu" << " : " << freq << " tasks" << " (" << (double) freq / counter << "%)" << "\t";
}
out << "]" << std::endl;
SHOW_CORES(out.str());
}
#endif
stop_flag_=false;
}
......@@ -100,6 +140,15 @@ void ThreadPool::routine()
auto job=jobs_.front();
jobs_.pop();
running_jobs_++;
const int cpu = sched_getcpu();
if (!cpu_usage_flags_.empty()) {
cpu_usage_flags_[cpu] = true;
}
#ifdef CORES_OCCUPANCY
cores_occupancy_[cpu]++;
#endif
lk.unlock();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment