Commit 61e8739c authored by Иван's avatar Иван
Browse files

Lamport algorithm

parent b2458373
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <pthread.h>
#define REQ 0
#define REL 1
#define STOP 2
struct request {
int proc, timestamp;
};
typedef struct request Request;
int equal (Request a, Request b) {
return a.proc==b.proc&&a.timestamp==b.timestamp;
}
int less (Request a, Request b) {
if (a.timestamp<b.timestamp) return 1;
if (a.timestamp>b.timestamp) return 0;
return a.proc<b.proc;
}
struct queue {
Request value;
struct queue *next;
};
typedef struct queue Queue;
Queue *queue=NULL;
pthread_mutex_t qmutex;
pthread_cond_t cond;
void add (Request r) {
Queue *q;
q=malloc (sizeof (Queue));
q->value=r;
q->next=NULL;
if (!queue) queue=q;
else {
Queue *qptr=queue;
while (qptr->next!=NULL) qptr=qptr->next;
qptr->next=q;
}
}
int is_first (Request r) {
Queue *q=queue;
while (q) {
if (less (q->value, r)) return 0;
q=q->next;
}
return 1;
}
void delete (Request r) {
Queue *del=NULL;
pthread_mutex_lock (&qmutex);
if (equal (queue->value, r)) {
del=queue;
queue=queue->next;
}
else {
Queue *q=queue;
while (!(q->next==NULL||equal(q->next->value, r))) q=q->next;
if (q->next) {
del=q->next;
q->next=q->next->next;
}
}
pthread_mutex_unlock (&qmutex);
free (del);
}
int tim=0, size, rank;
void lamport_enter (Request *r) {
pthread_mutex_lock (&qmutex);
r->timestamp=++tim;
add (*r);
pthread_mutex_unlock (&qmutex);
int req[2];
req[0]=REQ;
req[1]=r->timestamp;
int i;
for (i=0; i<size; ++i) {
if (i==rank) continue;
MPI_Send (req, 2, MPI_INT, i, 0, MPI_COMM_WORLD);
}
for (i=0; i<size; ++i) {
if (i==rank) continue;
MPI_Recv (req, 2, MPI_INT, i, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
pthread_mutex_lock (&qmutex);
if (req[1]>tim) tim=req[1];
pthread_mutex_unlock (&qmutex);
}
pthread_mutex_lock (&qmutex);
while (!is_first(*r)) pthread_cond_wait (&cond, &qmutex);
pthread_mutex_unlock (&qmutex);
}
void lamport_exit (Request r) {
delete (r);
int i;
int rel[2];
rel[0]=REL;
rel[1]=r.timestamp;
for (i=0; i<size; ++i) {
if (i==rank) continue;
MPI_Send (rel, 2, MPI_INT, i, 0, MPI_COMM_WORLD);
}
}
void *receive (void *arg) {
int end=0;
int msg[2];
MPI_Status status;
while (!end) {
MPI_Recv (msg, 2, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &status);
int source=status.MPI_SOURCE;
Request r;
r.proc=source;
r.timestamp=msg[1];
if (msg[0]==REQ) {
pthread_mutex_lock (&qmutex);
add (r);
if (r.timestamp>tim) tim=r.timestamp;
msg[1]=tim;
pthread_mutex_unlock (&qmutex);
MPI_Send (msg, 2, MPI_INT, source, 1, MPI_COMM_WORLD);
}
else if (msg[0]==REL) {
delete (r);
pthread_cond_signal (&cond);
}
else if (msg[0]==STOP) end=1;
}
}
void print_text () {
int i;
for (i=0; i<10; ++i) {
printf ("%s %d\n", "string", i);
fflush (stdout);
}
}
typedef struct queue Queue;
int main (int argc, char** argv) {
int level;
MPI_Init_thread (&argc, &argv, MPI_THREAD_MULTIPLE, &level);
MPI_Comm_size (MPI_COMM_WORLD, &size);
MPI_Comm_rank (MPI_COMM_WORLD, &rank);
pthread_t side;
pthread_mutex_init (&qmutex, NULL);
pthread_cond_init (&cond, NULL);
pthread_create (&side, NULL, receive, NULL);
Request r;
r.proc=rank;
lamport_enter (&r);
print_text();
lamport_exit (r);
int stop[2];
stop[0]=STOP;
stop[1]=tim;
MPI_Barrier (MPI_COMM_WORLD);
//MPI_Send (stop, 2, MPI_INT, rank, 0, MPI_COMM_WORLD);
MPI_Finalize();
return 0;
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment