Commit 39cdc40b authored by Alexander Borzov's avatar Alexander Borzov
Browse files

Сделал передачу данных во время вычислений. Закомментил и распараллеливание в...

Сделал передачу данных во время вычислений. Закомментил и распараллеливание в тех функциях где быстрее посчитать, чем передавать данные.
parent 61f47fce
//
// Created by COMPUTER on 08.04.2021.
//
#include "Blas.h"
\ No newline at end of file
//
// Created by COMPUTER on 08.04.2021.
//
#ifndef KOTLIN_CPP_BLAS_H
#define KOTLIN_CPP_BLAS_H
#include <IpBlas.hpp>
#include "Net.h"
#include "Funcs.h"
double asum(int xw, double *x, Net &net) {
int rank = net.rank;
int size = net.size;
int displs[size];
initDispls(displs, size, 1, xw);
int start = displs[rank];
int finish = ((rank == size - 1) ? xw : displs[mod(rank + 1, size)]);
double partSum = Ipopt::IpBlasDasum(finish - start, x + start,
1);
double result;
net.AllReduce(partSum, &result);
return result;
}
void axpy(int xw, double a, double *x, double b, double *y, Net &net) {
int rank = net.rank;
int size = net.size;
int displs[size];
initDispls(displs, size, 1, xw);
int start = displs[rank];
int finish = ((rank == size - 1) ? xw : displs[mod(rank + 1, size)]);
Ipopt::IpBlasDaxpy(finish - start, 1.0, x + start, 1, y + start, 1);
net.MPI_ALLGATHERV(
(char *) (y + displs[rank]), xw, (char *) y, displs, MPI_DOUBLE);
}
void copy(int xw, double *x, double *y, Net &net) {
Ipopt::IpBlasDcopy(xw, x, 1, y, 1);
}
double dot(int xw, double *x, double *y, Net &net) {
int rank = net.rank;
int size = net.size;
int displs[size];
initDispls(displs, size, 1, xw);
int start = displs[rank];
int finish = ((rank == size - 1) ? xw : displs[mod(rank + 1, size)]);
double partSum = Ipopt::IpBlasDdot(finish - start, y + start, 1, x + start,
1);
double result;
net.AllReduce(partSum, &result);
return result;
}
double nrm2(int xw, double *x, Net &net) {
int rank = net.rank;
int size = net.size;
int displs[size];
initDispls(displs, size, 1, xw);
int start = displs[rank];
int finish = ((rank == size - 1) ? xw : displs[mod(rank + 1, size)]);
double partSum = Ipopt::IpBlasDnrm2(finish - start, x + start, 1);
double result;
net.AllReduce(partSum * partSum, &result);
return result;
}
void scal(int xw, double a, double *x, Net &net) {
Ipopt::IpBlasDscal(xw, a, x, 1);
}
#endif //KOTLIN_CPP_BLAS_H
......@@ -33,6 +33,7 @@ add_library( # Sets the name of the library.
Variable.cpp
variables/VariableClass.cpp
variables/DoubleArray.cpp
Blas.cpp
)
# Searches for a specified prebuilt library and stores the path as a
......
......@@ -10,9 +10,9 @@ void printArr(int* arr,int size) {
}
printf("\n");
}
void printArr(double* arr,int size) {
void printArr(const double* arr,int size) {
for(int i=0; i < size; i++) {
printf("%lf ", arr[i]);
printf("arr: %lf ", arr[i]);
}
printf("\n");
}
......
......@@ -8,7 +8,7 @@
#define LOG_TAG "native"
#define printf(...) __android_log_print(ANDROID_LOG_DEBUG, LOG_TAG, __VA_ARGS__)
void printArr(int* arr,int size);
void printArr(double* arr,int size);
void printArr(const double* arr,int size);
int mod(int a,int m);
void initDispls(int *displs, int size, int ch, int cw);
#endif //UNTITLED_FUNCS_H
......@@ -11,12 +11,12 @@
#include "VM.h"//FIXME:перенести в отдельный файл struct Client
#include "Funcs.h"
int getIntervalSize(const int part,const int size, const int* displs, int bufsize) {
return ((part == size - 1) ? bufsize : displs[mod(part + 1,size)]) - displs[part];
int getIntervalSize(const int part, const int size, const int *displs, int bufsize) {
return ((part == size - 1) ? bufsize : displs[mod(part + 1, size)]) - displs[part];
}
//вся реализация написана узлов связанных кольцом. Для начала так
Net::Net(std::vector<Client>& clients, int port, int rank) : serverSocket(port) {
Net::Net(std::vector<Client> &clients, int port, int rank) : serverSocket(port) {
this->port = port;
this->rank = rank;
//rank = vm.currUserNumber;
......@@ -36,41 +36,43 @@ Net::Net(std::vector<Client>& clients, int port, int rank) : serverSocket(port)
}
printf("after connect\n");
printf("before accept\n");
if(prev == nullptr) {
if (prev == nullptr) {
prev = serverSocket.accept();
}
printf("after accept\n");
}
int Net::MPI_Send(const char *value, size_t size_v, Socket* destination) {
return destination->send(value,size_v);
int Net::MPI_Send(const char *value, size_t size_v, Socket *destination) {
return destination->send(value, size_v);
}
int Net::MPI_Recv(char *value, size_t size_v, Socket* sender) {
return sender->read(value,size_v);
int Net::MPI_Recv(char *value, size_t size_v, Socket *sender) {
return sender->read(value, size_v);
}
int Net::MPI_ALLGATHERV(const char *sendbuf, int bufsize, char *recvbuf, const int* displs, Type type) {// const int *count,
int Net::MPI_ALLGATHERV(const char *sendbuf, int bufsize, char *recvbuf, const int *displs,
Type type) {// const int *count,
int part = rank;
int typeSize = getSizeOf(type);
for(int i = displs[part]*typeSize; i < getIntervalSize(part,size,displs,bufsize)*typeSize; i++) {
recvbuf[i] = sendbuf[i - displs[part]*typeSize];
for (int i = displs[part] * typeSize;
i < getIntervalSize(part, size, displs, bufsize) * typeSize; i++) {//TODO:ускорить
recvbuf[i] = sendbuf[i - displs[part] * typeSize];
}
//printArr(reinterpret_cast<double *>(recvbuf), 4*4);
for(int k = 0; k < size - 1; k++) {
for (int k = 0; k < size - 1; k++) {
int recvPart = mod(part + 1, size);
int sendCount = getIntervalSize(part,size,displs,bufsize)*typeSize;
int recvCount = getIntervalSize(recvPart,size,displs,bufsize)*typeSize;
if(rank % 2 == 0) {
MPI_Send(recvbuf + displs[part]*typeSize, sendCount, prev);
int sendCount = getIntervalSize(part, size, displs, bufsize) * typeSize;
int recvCount = getIntervalSize(recvPart, size, displs, bufsize) * typeSize;
if (rank % 2 == 0) {
MPI_Send(recvbuf + displs[part] * typeSize, sendCount, prev);
//printArr(reinterpret_cast<double *>(recvbuf), 4*4);
MPI_Recv(recvbuf + displs[recvPart]*typeSize, recvCount, next);
MPI_Recv(recvbuf + displs[recvPart] * typeSize, recvCount, next);
} else {
//printArr(reinterpret_cast<double *>(recvbuf), 4*4);
MPI_Recv(recvbuf + displs[recvPart]*typeSize, recvCount, next);
MPI_Recv(recvbuf + displs[recvPart] * typeSize, recvCount, next);
//printArr(reinterpret_cast<double *>(recvbuf), 4*4);
MPI_Send(recvbuf + displs[part]*typeSize, sendCount, prev);
MPI_Send(recvbuf + displs[part] * typeSize, sendCount, prev);
//printArr(reinterpret_cast<double *>(recvbuf), 4*4);
}
//printArr(reinterpret_cast<double *>(recvbuf), bufsize/sizeof(double));
......@@ -79,28 +81,63 @@ int Net::MPI_ALLGATHERV(const char *sendbuf, int bufsize, char *recvbuf, const i
return 0;
}
int Net::MPI_ALLGATHERV(const char *sendbuf, char *recvbuf, const int *displs,
const int *recvcount, Type type) {// const int *count,
int part = rank;
int typeSize = getSizeOf(type);
/*for (int i = displs[part] * typeSize; i < (displs[part] + recvcount[part]) * typeSize; i++) {
recvbuf[i] = sendbuf[i - displs[part] * typeSize];
}*/
memcpy((void *) sendbuf, recvbuf + displs[part] * typeSize, recvcount[part]);//если sendbuf это часть recvbuf, то не обязательно
//printArr(reinterpret_cast<const double *>(sendbuf), 2);
for (int k = 0; k < size - 1; k++) {
int recvPart = mod(part + 1, size);
int sendCount = recvcount[part] * typeSize;
int recvCount = recvcount[recvPart] * typeSize;
if (rank % 2 == 0) {
MPI_Send(recvbuf + displs[part] * typeSize, sendCount, prev);
//printArr(reinterpret_cast<double *>(recvbuf), sendCount/getSizeOf(type));
MPI_Recv(recvbuf + displs[recvPart] * typeSize, recvCount, next);
//printArr(reinterpret_cast<double *>(recvbuf), sendCount/getSizeOf(type));
} else {
//printArr(reinterpret_cast<double *>(recvbuf), sendCount/getSizeOf(type));
MPI_Recv(recvbuf + displs[recvPart] * typeSize, recvCount, next);
//printArr(reinterpret_cast<double *>(recvbuf), sendCount/getSizeOf(type));
MPI_Send(recvbuf + displs[part] * typeSize, sendCount, prev);
//printArr(reinterpret_cast<double *>(recvbuf), sendCount/getSizeOf(type));
}
//printArr(reinterpret_cast<double *>(recvbuf), bufsize/sizeof(double));
part = (part + 1) % size;
}
return 0;
}
int Net::getSizeOf(Type type) {
switch (type) {
case MPI_DOUBLE: return sizeof(double);
case MPI_INT: return sizeof(int);
case MPI_DOUBLE:
return sizeof(double);
case MPI_INT:
return sizeof(int);
}
}
void Net::AllReduce(double partOfResult, double* recvResult) {
void Net::AllReduce(double partOfResult, double *recvResult) {
double results[size];
double sum = 0;
results[rank] = partOfResult;
for(int i = 0; i < size; i++) {
if(rank % 2 == 0) {
MPI_Send(reinterpret_cast<char *>(results + mod(rank + i,size)), sizeof(double), next);
MPI_Recv(reinterpret_cast<char *>(results + mod(rank + i + 1,size)), sizeof(double), prev);
for (int i = 0; i < size; i++) {
if (rank % 2 == 0) {
MPI_Send(reinterpret_cast<char *>(results + mod(rank + i, size)), sizeof(double), next);
MPI_Recv(reinterpret_cast<char *>(results + mod(rank + i + 1, size)), sizeof(double),
prev);
} else {
MPI_Recv(reinterpret_cast<char *>(results + mod(rank + i + 1,size)), sizeof(double), prev);
MPI_Send(reinterpret_cast<char *>(results + mod(rank + i,size)), sizeof(double), next);
MPI_Recv(reinterpret_cast<char *>(results + mod(rank + i + 1, size)), sizeof(double),
prev);
MPI_Send(reinterpret_cast<char *>(results + mod(rank + i, size)), sizeof(double), next);
}
}
for(int i = 0; i < size; i++) {
sum+=results[i];
for (int i = 0; i < size; i++) {
sum += results[i];
}
*recvResult = sum;
}
......
......@@ -35,6 +35,10 @@ public:
prev->close();
delete prev;
}
int MPI_ALLGATHERV(const char *sendbuf, char *recvbuf, const int *displs,
const int *recvcount, Type type);
private:
//void connect(Client* client);
int port;
......@@ -42,6 +46,7 @@ private:
//Task& task;
int getSizeOf(Type type);
};
......
......@@ -63,9 +63,15 @@ VM::VM(jint nodeNum, jbyte *taskBody) {
net = new Net(clients, clients[rank].port, rank);
}
static double now_ms() {
struct timespec res;
clock_gettime(CLOCK_REALTIME, &res);
return 1000.0 * res.tv_sec + (double) res.tv_nsec / 1e6;
}
void VM::run() {
initOps();
realStack.reserve(20);
realStack.reserve(50);
//int restoreCounter = 8;
while (++cursor < operations.size()) {
/*
......@@ -376,12 +382,28 @@ void VM::doubleArrSet(std::vector<Variable *> &args, std::vector<int> &argsIds)
result.set(dim, args[cursor++]->d());
}
static double diff(struct timespec start, struct timespec finish) {
long BILLION = 1000000000L;
return (finish.tv_sec - start.tv_sec)
+ (double) (finish.tv_nsec - start.tv_nsec)
/ (double) BILLION;
}
void VM::mulDoubleArrArr(std::vector<Variable *> &args, std::vector<int> &argsIds) {
DoubleArray &result = *args[0]->data.darr;
checkpoint.set(argsIds[0], result);
DoubleArray &a = *args[1]->data.darr;
DoubleArray &b = *args[2]->data.darr;
//double start, finish;
//start = now_ms();
struct timespec start;
struct timespec finish;
clock_gettime(CLOCK_REALTIME, &start);
result.mul(a, b, *net);
//finish = now_ms();
clock_gettime(CLOCK_REALTIME, &finish);
printf("time: %lf", diff(start,finish));
}
void VM::mulDoubleDoubleArr(std::vector<Variable *> &args, std::vector<int> &argsIds) {
......
......@@ -6,6 +6,7 @@
#include "VM.h"
#include <android/log.h>
#include <IpBlas.hpp>
#define LOG_TAG "native"
#define printf(...) __android_log_print(ANDROID_LOG_DEBUG, LOG_TAG, __VA_ARGS__)
......@@ -22,4 +23,72 @@ Java_ru_nsu_fit_borzov_kotlin_1cpp_MainActivity_vmStart(JNIEnv *env, jobject thi
VM vm(nodeNum, taskBody);
vm.run();
env->ReleaseByteArrayElements(task, taskBody, isCopy);
}
static double now_ms() {
struct timespec res;
clock_gettime(CLOCK_REALTIME, &res);
return 1000.0 * res.tv_sec + (double) res.tv_nsec / 1e6;
}
static double diff(struct timespec start, struct timespec finish) {
long BILLION = 1000000000L;
return (finish.tv_sec - start.tv_sec)
+ (double) (finish.tv_nsec - start.tv_nsec)
/ (double) BILLION;
}
extern "C"
JNIEXPORT void JNICALL
Java_ru_nsu_fit_borzov_kotlin_1cpp_MainActivity_vmMul(JNIEnv *env, jobject thiz) {
int size = 500;
double *mA = new double[size * size];
double *mB = new double[size * 1];
int m = size;
int n = size;
int o = 1;
double *res = new double[n * o];
for (int i = 0; i < m; i++) {
for (int j = 0; j < n; j++) {
mA[i * n + j] = i;
}
}
for (int i = 0; i < n; i++) {
for (int j = 0; j < o; j++) {
mB[i * o + j] = i;
}
}
// double start, finish;
// start = now_ms();
struct timespec start;
struct timespec finish;
clock_gettime(CLOCK_REALTIME, &start);
/*
for (int i = 0; i < m; i++) {
for (int j = 0; j < o; j++) {
for (int k = 0; k < n; k++) {
res[i][j] += mA[i][k] * mB[k][j];
}
}
}
*/
Ipopt::IpBlasDgemv(true, m, n, 1.0, mA, n,
mB, 1, 0.0, res,
1);
// finish = now_ms();
// printf("1 proc mul time:%lf", finish - start);
clock_gettime(CLOCK_REALTIME, &finish);
for (int i = 0; i < m; i++) {
for (int j = 0; j < o; j++) {
printf("%lf\n", res[i * o + j]);
}
}
printf("1 proc time: %lf\n", diff(start, finish));
}
\ No newline at end of file
......@@ -2,6 +2,7 @@
#include "../Funcs.h"
#include <IpBlas.hpp>
#include <utility>
#include <thread>
#include "VariableClass.h"
DoubleArray::DoubleArray(std::vector<int> dimensions, double *ptr) : ptr(ptr), dimensions(
......@@ -29,21 +30,25 @@ DoubleArray::DoubleArray(const DoubleArray &doubleArray) : ptr(new double[double
}
void DoubleArray::plus(const DoubleArray &y, Net &net) {
// DoubleArray &x = *this;
// int xw = x.size();
// int rank = net.rank;
// int size = net.size;
// int displs[size];
// initDispls(displs, size, 1, xw);
// int start = displs[rank];
// int finish = ((rank == size - 1) ? xw : displs[mod(rank + 1, size)]);
//
// Ipopt::IpBlasDaxpy(finish - start, 1.0, y.ptr.get() + start, 1, x.ptr.get() + start, 1);
// net.MPI_ALLGATHERV(
// (char *) (x.ptr.get() + displs[rank]), xw, (char *) x.ptr.get(), displs, MPI_DOUBLE);
DoubleArray &x = *this;
int xw = x.size();
int rank = net.rank;
int size = net.size;
int displs[size];
initDispls(displs, size, 1, xw);
int start = displs[rank];
int finish = ((rank == size - 1) ? xw : displs[mod(rank + 1, size)]);
Ipopt::IpBlasDaxpy(finish - start, 1.0, y.ptr.get() + start, 1, x.ptr.get() + start, 1);
net.MPI_ALLGATHERV(
(char *) (x.ptr.get() + displs[rank]), xw, (char *) x.ptr.get(), displs, MPI_DOUBLE);
Ipopt::IpBlasDaxpy(xw, 1.0, y.ptr.get(), 1, x.ptr.get(), 1);
}
void DoubleArray::minus(const DoubleArray &y, Net &net) {
/*
DoubleArray &x = *this;
int xw = x.size();
int rank = net.rank;
......@@ -56,6 +61,10 @@ void DoubleArray::minus(const DoubleArray &y, Net &net) {
Ipopt::IpBlasDaxpy(finish - start, -1.0, y.ptr.get() + start, 1, x.ptr.get() + start, 1);
net.MPI_ALLGATHERV(
(char *) (x.ptr.get() + displs[rank]), xw, (char *) x.ptr.get(), displs, MPI_DOUBLE);
*/
DoubleArray &x = *this;
int xw = x.size();
Ipopt::IpBlasDaxpy(xw, -1.0, y.ptr.get(), 1, x.ptr.get(), 1);
}
Variable DoubleArray::copyVariable() const {
......@@ -86,6 +95,7 @@ void DoubleArray::mul(const double value) {
}
double DoubleArray::scalar(const DoubleArray &y, Net &net) const {
/*
const DoubleArray &x = *this;
int xw = x.size();
int rank = net.rank;
......@@ -99,9 +109,60 @@ double DoubleArray::scalar(const DoubleArray &y, Net &net) const {
double result;
net.AllReduce(partSum, &result);
return result;
//net.MPI_ALLGATHERV(
// (char *) (x.ptr.get() + displs[rank]), xw, (char *) x.ptr.get(), displs, MPI_DOUBLE);
*/
const DoubleArray &x = *this;
int xw = x.size();
double result = Ipopt::IpBlasDdot(xw, y.ptr.get(), 1, x.ptr.get(),
1);
return result;
}
//void DoubleArray::mul(const DoubleArray &x, const DoubleArray &y,
// Net &net) {//FIXME:пока что только для столбца. заменить на DGEMM
// constexpr int ROWS = 0;
// constexpr int COLUMNS = 1;
// int M = x.dimensions[ROWS];
// //int N = y.dimensions[COLUMNS];
// int N = 1;
// int K = x.dimensions[COLUMNS];
// int rank = net.rank;
// int size = net.size;
// int displs[size];
// initDispls(displs, size, M, 1);
// int start = displs[rank];
// int finish = ((rank == size - 1) ? M : displs[mod(rank + 1, size)]);
// Ipopt::IpBlasDgemv(true, finish - start, K, 1.0, x.ptr.get() + start * K, K,
// y.ptr.get(), 1, 0.0, this->ptr.get() + start,
// 1);//почему trans==true 0_0. Несколько дней мучался чтобы нормально умножить.
// net.MPI_ALLGATHERV(
// (char *) (this->ptr.get() + start), this->size(), (char *) this->ptr.get(), displs,
// MPI_DOUBLE);
//}
void
sendingThreadFunc(double *data, double *recvTmpData, Net &net, int dataSize, int countParts,
const bool *wasCounted) {
int size = net.size;
int displs[size];
initDispls(displs, size, dataSize, 1);
int recvcount[size];
int partSize = dataSize / size / countParts;
for (int i = 0; i < size; i++) {
recvcount[i] = partSize;
}
for (int i = 0; i < countParts; i++) {
//std::unique_lock<std::mutex> lk(mutex);
while (!wasCounted[i]) {
//cv.wait_for(lk,100ms);
}
net.MPI_ALLGATHERV((char *) (data + displs[net.rank]), (char *) recvTmpData, displs,
recvcount,
MPI_DOUBLE);
for (int j = 0; j < size; j++) {
displs[j] += recvcount[j];
}
}
}
void DoubleArray::mul(const DoubleArray &x, const DoubleArray &y,
......@@ -118,12 +179,57 @@ void DoubleArray::mul(const DoubleArray &x, const DoubleArray &y,
initDispls(displs, size, M, 1);
int start = displs[rank];
int finish = ((rank == size - 1) ? M : displs[mod(rank + 1, size)]);
Ipopt::IpBlasDgemv(true, finish - start, K, 1.0, x.ptr.get() + start * K, K,
y.ptr.get(), 1, 0.0, this->ptr.get() + start,
1);//почему trans==true 0_0. Несколько дней мучался чтобы нормально умножить.
net.MPI_ALLGATHERV(
(char *) (this->ptr.get() + start), this->size(), (char *) this->ptr.get(), displs, MPI_DOUBLE);
int countParts = 10;
bool *wasCounted = new bool[countParts];
for (int i = 0; i < countParts; i++) {
wasCounted[i] = false;
}
double *tmpData = new double[K];
std::thread sendingThread = std::thread(sendingThreadFunc, tmpData, tmpData, std::ref(net),
M, countParts, wasCounted);
int partSize = (finish - start) / countParts;
int startPartPos = 0;
for (int i = 0; i < countParts; i++) {
//TODO:тут конечно всё с учетом что ровно поделится. Надо в начале так реализовать.
Ipopt::IpBlasDgemv(true, partSize, K, 1.0, x.ptr.get() + start * K + startPartPos * K, K,
y.ptr.get(), 1, 0.0, tmpData + start + startPartPos,
1);
startPartPos += partSize;
wasCounted[i] = true;
}
sendingThread.join();
delete[] wasCounted;
this->ptr.reset(tmpData);
// Ipopt::IpBlasDgemv(true, finish - start, K, 1.0, x.ptr.get() + start * K, K,
// y.ptr.get(), 1, 0.0, this->ptr.get() + start,
// 1);//почему trans==true 0_0. Несколько дней мучался чтобы нормально умножить.
// net.MPI_ALLGATHERV(
// (char *) (this->ptr.get() + start), this->size(), (char *) this->ptr.get(), displs,
// MPI_DOUBLE);
}
double DoubleArray::sum(Net &net) const {
const DoubleArray &x = *this;
int xw = x.size();
int rank = net.rank;
int size = net.size;
int displs[size];
initDispls(displs, size, 1, xw);
int start = displs[rank];
int finish = ((rank == size - 1) ? xw : displs[mod(rank + 1, size)]);
double partSum = Ipopt::IpBlasDasum(finish - start, x.ptr.get() + start,
1);
double result;
net.AllReduce(partSum, &result);
return result;