Commit 2a7b806d authored by Maxim Vershinin's avatar Maxim Vershinin
Browse files

Working Lamport clock algorithm

parent 4719b1db
#include <cstdio>
#include <iostream>
#include <queue>
#include <thread>
#include <atomic>
#include <cstddef>
#include <algorithm>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <chrono>
#include <mpi.h>
class lamport_mutex {
public:
lamport_mutex(int rank, int n_nodes) : rank_(rank), n_nodes_(n_nodes) { }
void init() {
mpi_lamport_message_type = create_mpi_lamport_message_type();
recv_thread_ = std::thread(&lamport_mutex::recv_messages, this);
}
void lock() {
n_reply_msgs.store(0);
lamport_message req_mes(lamport_message::REQUEST_TYPE, ++global_time_, rank_);
send_all(req_mes);
std::unique_lock<std::mutex> lk{m};
while (n_reply_msgs != n_nodes_ || messages_queue_.empty() || messages_queue_.top().rank_ != rank_) {
cv.wait(lk);
}
}
void unlock() {
lamport_message rel_mes(lamport_message::RELEASE_TYPE, ++global_time_, rank_);
send_all(rel_mes);
}
void free() {
MPI_Barrier(MPI_COMM_WORLD);
lamport_message interrupt_mes(lamport_message::INTERRUPT_TYPE, ++global_time_, rank_);
MPI_Send(&interrupt_mes, 1, mpi_lamport_message_type, rank_, 0, MPI_COMM_WORLD);
recv_thread_.join();
MPI_Type_free(&mpi_lamport_message_type);
}
private:
struct lamport_message {
static const int REQUEST_TYPE = 1;
static const int REPLY_TYPE = 2;
static const int RELEASE_TYPE = 3;
static const int INTERRUPT_TYPE = 4;
int type_message_;
long t_;
int rank_;
lamport_message() = default;
lamport_message(int type_message, long t, int rank) : type_message_(type_message), t_(t), rank_(rank)
{ }
};
std::priority_queue<lamport_message, std::vector<lamport_message>, std::function<bool(const lamport_message &, const lamport_message &)>> messages_queue_{[](const lamport_message &m1, const lamport_message &m2) {
if (m1.t_ > m2.t_) {
return true;
} else if (m1.t_ < m2.t_) {
return false;
} else if (m1.rank_ >= m2.rank_) {
return true;
} else {
return false;
}
}};
const int rank_;
const int n_nodes_;
std::atomic_long global_time_{0};
std::atomic_int n_reply_msgs{0};
std::thread recv_thread_{};
std::condition_variable cv{};
std::mutex m{};
MPI_Datatype mpi_lamport_message_type;
MPI_Datatype create_mpi_lamport_message_type() {
MPI_Datatype mpi_lamport_message_type;
constexpr int n_lamport_message_types = 4;
int blocklengths[n_lamport_message_types] = {1, 1, 1, 1};
MPI_Datatype types[n_lamport_message_types] = {MPI_INT, MPI_LONG, MPI_INT, MPI_UB};
MPI_Aint displacements[n_lamport_message_types] = {offsetof(lamport_message, type_message_),
offsetof(lamport_message, t_),
offsetof(lamport_message, rank_),
sizeof(lamport_message)};
MPI_Type_create_struct(n_lamport_message_types, blocklengths, displacements, types, &mpi_lamport_message_type);
MPI_Type_commit(&mpi_lamport_message_type);
return mpi_lamport_message_type;
}
void tick_time(long recv_time) {
if (recv_time > global_time_) {
global_time_.store(recv_time + 1);
}
global_time_ += 1;
}
bool answer_on_message(const lamport_message &sou_msg) {
switch(sou_msg.type_message_) {
case lamport_message::REQUEST_TYPE: {
{
std::lock_guard<std::mutex> lk(m);
messages_queue_.push(sou_msg);
}
if (sou_msg.rank_ != rank_) {
lamport_message reply_msg(lamport_message::REPLY_TYPE, ++global_time_, rank_);
MPI_Send(&reply_msg, 1, mpi_lamport_message_type, sou_msg.rank_, 0, MPI_COMM_WORLD);
break;
} else {
global_time_ += 2;
}
}
case lamport_message::REPLY_TYPE: {
{
std::lock_guard<std::mutex> lk(m);
++n_reply_msgs;
}
cv.notify_one();
break;
}
case lamport_message::RELEASE_TYPE: {
{
std::lock_guard<std::mutex> lk(m);
messages_queue_.pop();
}
cv.notify_one();
break;
}
case lamport_message::INTERRUPT_TYPE: {
return true;
}
default: break;
}
return false;
}
void recv_messages() {
lamport_message buf_msg;
while (true) {
MPI_Recv(&buf_msg,
1,
mpi_lamport_message_type,
MPI_ANY_SOURCE,
MPI_ANY_TAG,
MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
tick_time(buf_msg.t_);
if (answer_on_message(buf_msg)) {
return;
}
}
}
void send_all(const lamport_message &msg) {
for (int i = 0; i < n_nodes_; ++i) {
MPI_Send(&msg,
1,
mpi_lamport_message_type,
i,
0,
MPI_COMM_WORLD);
}
}
};
int main(int argc, char *argv[]) {
int size, rank;
int provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
if (provided != MPI_THREAD_MULTIPLE) {
MPI_Finalize();
std::cerr << "Error: MPI didn't give needed level for execution." << std::endl;
return -1;
}
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
lamport_mutex mtx(rank, size);
mtx.init();
mtx.lock();
std::cerr << "My order to print before sleep: " << rank << std::endl << std::flush;
// std::this_thread::sleep_for(std::chrono::seconds(1));
std::cerr << "My order to print after sleep: " << rank << std::endl << std::flush;
// std::this_thread::sleep_for(std::chrono::seconds(1));
mtx.unlock();
mtx.free();
MPI_Finalize();
return 0;
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment