Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Alexander Borzov
diplom_front_vm_cpp_kotlin
Commits
5175cd33
Commit
5175cd33
authored
May 10, 2021
by
Alexander Borzov
Browse files
собрал всё перед первым запуском. Нужно тестировать
parent
0ebea302
Changes
32
Hide whitespace changes
Inline
Side-by-side
app/build.gradle
View file @
5175cd33
...
...
@@ -9,7 +9,7 @@ android {
defaultConfig
{
applicationId
"ru.nsu.fit.borzov.kotlin_cpp"
minSdkVersion
1
6
minSdkVersion
2
6
targetSdkVersion
30
versionCode
1
versionName
"1.0"
...
...
@@ -77,4 +77,7 @@ dependencies {
//noinspection GradleCompatible
implementation
'com.android.support:design:28.0.0'
implementation
'com.github.d-max:spots-dialog:1.1@aar'
// https://mvnrepository.com/artifact/com.alipay.sofa/jraft-core
implementation
group:
'com.alipay.sofa'
,
name:
'jraft-core'
,
version:
'1.2.3'
}
\ No newline at end of file
app/src/main/cpp/CMakeLists.txt
View file @
5175cd33
...
...
@@ -35,6 +35,7 @@ add_library( # Sets the name of the library.
variables/DoubleArray.cpp
Blas.cpp
CheckpointStorage.cpp
Structs.h
)
# Searches for a specified prebuilt library and stores the path as a
...
...
app/src/main/cpp/Net.cpp
View file @
5175cd33
...
...
@@ -19,6 +19,9 @@ int getIntervalSize(const int part, const int size, const int *displs, int bufsi
Net
::
Net
(
std
::
vector
<
Client
>
&
clients
,
int
port
,
int
rank
)
:
serverSocket
(
port
)
{
this
->
port
=
port
;
this
->
rank
=
rank
;
for
(
int
i
=
0
;
i
<
clients
.
size
();
i
++
)
{
nodes
.
emplace_back
(
clients
[
i
]);
}
//rank = vm.currUserNumber;
size
=
clients
.
size
();
Client
neighbour
=
clients
[(
rank
+
1
)
%
size
];
//FIXME: ссылку возвращать
...
...
@@ -43,6 +46,61 @@ Net::Net(std::vector<Client> &clients, int port, int rank) : serverSocket(port)
}
Client
&
findInList
(
std
::
list
<
Client
>
&
nodes
,
int
n
)
{
int
i
=
0
;
for
(
auto
it
=
nodes
.
begin
();
it
!=
nodes
.
end
();
++
it
,
++
i
)
{
if
(
i
==
n
)
{
return
*
it
;
}
}
}
void
Net
::
updateNodes
(
std
::
vector
<
int
>
&
numDeadNodes
)
{
int
vectorCursor
=
0
;
int
i
=
0
;
int
posInNewList
=
0
;
bool
needToReconnectNext
=
false
;
bool
needToReconnectPrev
=
false
;
for
(
auto
it
=
numDeadNodes
.
begin
();
it
!=
numDeadNodes
.
end
();
++
it
)
{
if
(
*
it
==
(
rank
+
1
)
%
size
)
{
needToReconnectNext
=
true
;
}
if
(
*
it
==
(
rank
+
size
-
1
)
%
size
)
{
needToReconnectPrev
=
true
;
}
}
for
(
auto
it
=
nodes
.
begin
();
it
!=
nodes
.
end
();
++
it
,
++
i
,
++
posInNewList
)
{
if
(
i
==
rank
)
{
rank
=
posInNewList
;
}
if
(
i
==
numDeadNodes
[
vectorCursor
])
{
nodes
.
erase
(
it
);
++
vectorCursor
;
--
posInNewList
;
}
}
size
=
nodes
.
size
();
if
(
needToReconnectNext
)
{
Client
&
neighbour
=
findInList
(
nodes
,
(
rank
+
1
)
%
size
);
next
->
close
();
delete
next
;
next
=
new
Socket
(
neighbour
.
ip
,
neighbour
.
port
);
//next->connect();
printf
(
"before connect
\n
"
);
while
(
0
!=
next
->
connect
())
{
delete
next
;
next
=
new
Socket
(
neighbour
.
ip
,
neighbour
.
port
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milliseconds
(
1000
));
printf
(
"connectind...
\n
"
);
}
printf
(
"after connect
\n
"
);
}
if
(
needToReconnectPrev
)
{
prev
->
close
();
prev
=
serverSocket
.
accept
();
}
}
int
Net
::
MPI_Send
(
const
char
*
value
,
size_t
size_v
,
Socket
*
destination
)
{
return
destination
->
send
(
value
,
size_v
);
}
...
...
@@ -89,8 +147,8 @@ int Net::MPI_ALLGATHERV(const char *sendbuf, int bufsize, char *recvbuf, const i
}
}
else
{
//printArr(reinterpret_cast<double *>(recvbuf), 4*4);
int
someNodeHasFailure
=
MPI_Recv
(
recvbuf
+
displs
[
recvPart
]
*
typeSize
,
recvCount
,
next
);
int
someNodeHasFailure
=
MPI_Recv
(
recvbuf
+
displs
[
recvPart
]
*
typeSize
,
recvCount
,
next
);
if
(
someNodeHasFailure
)
{
return
-
1
;
}
...
...
app/src/main/cpp/Net.h
View file @
5175cd33
...
...
@@ -7,6 +7,8 @@
#include "socket/Socket.h"
#include "socket/ServerSocket.h"
#include "Structs.h"
#include <list>
class
Client
;
...
...
@@ -25,11 +27,11 @@ public:
int
MPI_Recv
(
char
*
value
,
size_t
size_v
,
Socket
*
sender
);
void
updateNodes
(
std
::
vector
<
int
>
&
numDeadNodes
);
Socket
*
next
=
nullptr
;
Socket
*
prev
=
nullptr
;
Socket
*
max
=
nullptr
;
Socket
*
vova
=
nullptr
;
std
::
list
<
Client
>
nodes
;
int
MPI_ALLGATHERV
(
const
char
*
sendbuf
,
int
bufsize
,
char
*
recvbuf
,
const
int
*
displs
,
Type
type
);
...
...
app/src/main/cpp/Structs.h
0 → 100644
View file @
5175cd33
//
// Created by COMPUTER on 10.05.2021.
//
#ifndef KOTLIN_CPP_STRUCTS_H
#define KOTLIN_CPP_STRUCTS_H
#include <vector>
struct
Row
{
int
opId
;
std
::
vector
<
int
>
argsIds
;
Row
(
int
opId
,
std
::
vector
<
int
>
argsIds
)
{
this
->
opId
=
opId
;
this
->
argsIds
=
std
::
move
(
argsIds
);
}
}
typedef
Row
;
struct
Client
{
std
::
string
ip
;
int
port
;
Client
(
std
::
string
ip
,
int
port
)
{
this
->
ip
=
ip
;
this
->
port
=
port
;
}
}
typedef
Client
;
#endif //KOTLIN_CPP_STRUCTS_H
app/src/main/cpp/VM.cpp
View file @
5175cd33
...
...
@@ -9,10 +9,10 @@
#include "variables/DoubleArray.h"
#include "socket/Socket.h"
constexpr
int
PORT_MAX_SOCKET
=
5002
;
constexpr
int
PORT_VOVA_MASTER_SOCKET
=
5003
;
constexpr
int
PORT_VOVA_WORKER_SOCKET
=
5004
;
constexpr
int
MESSAGE_TYPE_GET_ID
=
0
;
constexpr
int
MESSAGE_TYPE_PERMISSION_FOR_CHECKPOINT
=
1
;
constexpr
int
MESSAGE_TYPE_IS_EVERYONE_ALIVE
=
2
;
VM
::
VM
(
jint
nodeNum
,
jbyte
*
taskBody
)
{
jbyte
*
cursor
=
taskBody
;
...
...
@@ -67,8 +67,7 @@ VM::VM(jint nodeNum, jbyte *taskBody) {
//printf("%s %d\n",clients[i].ip.c_str(), clients[i].port);
}
net
=
new
Net
(
clients
,
clients
[
rank
].
port
,
rank
);
max
=
new
Socket
(
"127.0.0.1"
,
PORT_MAX_SOCKET
);
vova_worker
=
new
Socket
(
"127.0.0.1"
,
PORT_VOVA_WORKER_SOCKET
);
raft
=
new
Socket
(
"127.0.0.1"
,
PORT_VOVA_WORKER_SOCKET
);
}
...
...
@@ -80,27 +79,25 @@ static double now_ms() {
int64_t
VM
::
getNewCheckpointId
()
{
constexpr
int
MESSAGE_TYPE_GET_ID
=
1
;
constexpr
int
requestSize
=
sizeof
(
int
)
+
sizeof
(
int64_t
)
*
2
;
char
request
[
requestSize
];
char
*
request_cursor
=
request
;
writeNext
<
int
>
(
&
request_cursor
,
MESSAGE_TYPE_GET_ID
);
writeNext
<
int64_t
>
(
&
request_cursor
,
checkpointStorage
.
checkpointId
);
writeNext
<
int64_t
>
(
&
request_cursor
,
checkpointStorage
.
checkpointId
-
1
);
vova_worker
->
send
(
request
,
requestSize
);
raft
->
send
(
request
,
requestSize
);
int64_t
response
;
vova_worker
->
read
(
reinterpret_cast
<
char
*>
(
&
response
),
sizeof
(
int64_t
));
raft
->
read
(
reinterpret_cast
<
char
*>
(
&
response
),
sizeof
(
int64_t
));
return
response
;
}
bool
VM
::
requestPermissionForCheckpoint
()
{
constexpr
int
MESSAGE_PERMISSION_FOR_CHECKPOINT
=
0
;
constexpr
int
requestSize
=
sizeof
(
char
);
char
request
[
1
];
writeNext
<
char
>
(
reinterpret_cast
<
char
**>
(
&
request
),
MESSAGE_PERMISSION_FOR_CHECKPOINT
);
vova_worker
->
send
(
reinterpret_cast
<
const
char
*>
(
request
),
requestSize
);
writeNext
<
char
>
(
reinterpret_cast
<
char
**>
(
&
request
),
MESSAGE_
TYPE_
PERMISSION_FOR_CHECKPOINT
);
raft
->
send
(
reinterpret_cast
<
const
char
*>
(
request
),
requestSize
);
char
response
;
vova_worker
->
read
(
reinterpret_cast
<
char
*>
(
&
response
),
sizeof
(
char
));
raft
->
read
(
reinterpret_cast
<
char
*>
(
&
response
),
sizeof
(
char
));
return
response
;
}
...
...
@@ -110,18 +107,25 @@ void VM::run() {
//int restoreCounter = 8;
while
(
++
cursor
<
operations
.
size
())
{
//проверить у рафта (Макс) нужно ли делать восстановление. (проверть неблокирующий сокет)
char
request
[
1
];
int
isNeedToRollback
=
max
->
tryRead
(
request
,
1
);
if
(
isNeedToRollback
)
{
int
buf
[
1
];
buf
[
0
]
=
MESSAGE_TYPE_IS_EVERYONE_ALIVE
;
raft
->
send
(
reinterpret_cast
<
const
char
*>
(
buf
),
sizeof
(
int
));
int
numberOfDeadNodes
=
raft
->
read
(
reinterpret_cast
<
char
*>
(
buf
),
sizeof
(
int
));
if
(
numberOfDeadNodes
!=
0
)
{
//если да, то спросить у Вовы какой именно чекпоинт ( ЗАПРОС НА ОТКАТ, ид первого чекпоинта, ид второго чекпоинта). Откатиться.
std
::
vector
<
int
>
deadNodes
;
for
(
int
i
=
0
;
i
<
numberOfDeadNodes
;
i
++
)
{
raft
->
read
(
reinterpret_cast
<
char
*>
(
buf
),
sizeof
(
int
));
deadNodes
.
emplace_back
(
*
buf
);
}
net
->
updateNodes
(
deadNodes
);
int64_t
checkpointId
=
getNewCheckpointId
();
checkpointStorage
.
restore
(
checkpointId
,
realStack
,
cursor
);
}
//TODO: обновить список нодов и количество (Спросить у Максима) net->setNodes();
checkpointStorage
.
doStep
(
realStack
,
cursor
);
if
(
checkpointStorage
.
isNeedCheckpoint
())
{
requestPermissionForCheckpoint
();
checkpointStorage
.
newCheckpoint
(
)
checkpointStorage
.
newCheckpoint
(
realStack
,
cursor
);
}
/*
if (cursor % restoreCounter == 0) {
...
...
app/src/main/cpp/VM.h
View file @
5175cd33
...
...
@@ -10,29 +10,10 @@
#include "Variable.h"
#include "Checkpoint.h"
#include "CheckpointStorage.h"
//#include "Task.h"
//class Net;
struct
Row
{
int
opId
;
std
::
vector
<
int
>
argsIds
;
Row
(
int
opId
,
std
::
vector
<
int
>
argsIds
)
{
this
->
opId
=
opId
;
this
->
argsIds
=
std
::
move
(
argsIds
);
}
}
typedef
Row
;
struct
Client
{
std
::
string
ip
;
int
port
;
Client
(
std
::
string
ip
,
int
port
)
{
this
->
ip
=
ip
;
this
->
port
=
port
;
}
}
typedef
Client
;
class
VM
{
public:
...
...
@@ -56,9 +37,9 @@ private:
template
<
typename
T
>
void
writeNext
(
char
**
readCursor
,
T
value
)
{
reinterpret_cast
<
T
*>
(
*
readCursor
)
=
value
;
T
*
data
=
reinterpret_cast
<
T
*>
(
*
readCursor
);
*
data
=
value
;
*
readCursor
+=
sizeof
(
T
);
return
value
;
}
std
::
vector
<
std
::
function
<
void
(
std
::
vector
<
Variable
*>
&
,
std
::
vector
<
int
>
&
)
>>
ops
;
...
...
@@ -69,9 +50,7 @@ private:
std
::
vector
<
Client
>
clients
;
int
rank
;
Net
*
net
=
nullptr
;
Socket
*
max
=
nullptr
;
Socket
*
vova_worker
=
nullptr
;
Socket
*
vova_master
=
nullptr
;
Socket
*
raft
=
nullptr
;
int
cursor
=
-
1
;
...
...
app/src/main/java/ru/nsu/fit/borzov/kotlin_cpp/MainActivity.kt
View file @
5175cd33
...
...
@@ -6,8 +6,8 @@ import android.util.Log
import
androidx.appcompat.app.AppCompatActivity
import
kotlinx.android.synthetic.main.activity_main.*
import
org.w3c.dom.Node
import
ru.nsu.fit.borzov.
interNodesC
ommunication.
InterNodes
Communication
Master
import
ru.nsu.fit.borzov.
interNodesC
ommunication.
InterNodesC
ommunication
Worker
import
ru.nsu.fit.borzov.
kotlin_cpp.c
ommunication.
EstablishingCluster
Communication
import
ru.nsu.fit.borzov.
kotlin_cpp.c
ommunication.
c
ommunication
.RaftConditionEntity
import
ru.nsu.fit.borzov.kotlin_cpp.dto.AddTaskDto
import
ru.nsu.fit.borzov.kotlin_cpp.dto.NodeDto
import
ru.nsu.fit.borzov.kotlin_cpp.dto.TaskDto
...
...
@@ -60,24 +60,24 @@ class MainActivity : AppCompatActivity() {
str
.
append
(
task
.
data
[
i
].
toInt
())
}
Log
.
v
(
"c"
,
str
.
toString
())
val
interNodesWorker
=
InterNodesCommunicationWorker
()
val
interNodesMaster
=
InterNodesCommunicationMaster
()
val
masterPort
=
5003
//
val
workerPort
=
5004
interNodesWorker
.
setPort
(
workerPort
)
interNodesWorker
.
setMaster
(
InetAddress
.
getByName
(
task
.
nodes
[
0
].
ip
),
masterPort
)
interNodesMaster
.
setPort
(
masterPort
)
val
masterThread
:
Thread
?
=
null
;
if
(
task
.
nodeNum
==
0
)
{
val
masterThread
=
Thread
(
interNodesMaster
)
masterThread
.
start
()
val
timeout
=
1000
val
myAddress
=
task
.
nodes
[
task
.
nodeNum
].
ip
+
":"
+
task
.
nodes
[
task
.
nodeNum
].
port
;
var
clusterAddresses
=
StringBuilder
();
task
.
nodes
.
forEach
{
clusterAddresses
.
append
(
it
.
ip
).
append
(
":"
).
append
(
it
.
port
).
append
(
","
)
}
val
workerThread
=
Thread
(
interNodesMaster
)
workerThread
.
start
()
Thread
(
interNodesWorker
).
start
()
val
raftConditionEntity
=
EstablishingClusterCommunication
(
"No, thanks!"
,
myAddress
,
clusterAddresses
.
toString
(),
timeout
);
raftConditionEntity
.
start
(
timeout
,
workerPort
)
vmStart
(
task
.
nodeNum
,
task
.
data
)
masterThread
?.
join
()
workerThread
.
join
()
raftConditionEntity
.
interrupt
()
//Thread.sleep(1000);
if
(
task
.
nodeNum
==
0
)
{
server
.
addAnswer
(
task
.
id
,
"answer"
.
toByteArray
())
...
...
app/src/main/java/ru/nsu/fit/borzov/kotlin_cpp/communication/EstablishingClusterCommunication.java
0 → 100644
View file @
5175cd33
package
ru.nsu.fit.borzov.kotlin_cpp.communication
;
import
java.io.IOException
;
import
ru.nsu.fit.borzov.kotlin_cpp.communication.communication.CommunicationService
;
import
ru.nsu.fit.borzov.kotlin_cpp.communication.communication.RaftConditionEntity
;
import
ru.nsu.fit.borzov.kotlin_cpp.communication.interNodesCommunication.CheckpointListenerService
;
public
class
EstablishingClusterCommunication
{
public
EstablishingClusterCommunication
(
final
String
groupId
,
final
String
serverId
,
final
String
clusterAddresses
,
final
int
electionTimeout
)
{
communicationService
=
new
CommunicationService
(
groupId
,
serverId
,
clusterAddresses
,
electionTimeout
);
}
public
void
start
(
final
int
leaderCheckTimeout
,
final
int
vmConnectionPort
)
throws
IOException
{
RaftConditionEntity
raftConditionEntity
=
communicationService
.
startCluster
(
leaderCheckTimeout
);
checkpointListenerService
=
new
CheckpointListenerService
(
raftConditionEntity
,
vmConnectionPort
);
checkpointListenerService
.
start
();
}
public
void
interrupt
()
{
checkpointListenerService
.
interrupt
();
communicationService
.
interrupt
();
}
private
final
CommunicationService
communicationService
;
private
CheckpointListenerService
checkpointListenerService
;
}
app/src/main/java/ru/nsu/fit/borzov/kotlin_cpp/communication/communication/CommunicationService.java
0 → 100644
View file @
5175cd33
package
ru.nsu.fit.borzov.kotlin_cpp.communication.communication
;
import
com.alipay.sofa.jraft.Node
;
import
com.alipay.sofa.jraft.NodeManager
;
import
com.alipay.sofa.jraft.entity.PeerId
;
import
com.alipay.sofa.jraft.option.CliOptions
;
import
com.alipay.sofa.jraft.rpc.CliClientService
;
import
com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl
;
import
java.util.List
;
import
java.util.stream.Collectors
;
import
ru.nsu.fit.borzov.kotlin_cpp.communication.communication.raft.ClusterNode
;
import
ru.nsu.fit.borzov.kotlin_cpp.communication.communication.raft.ClusterNodeOptions
;
public
class
CommunicationService
{
private
final
ClusterNode
clusterNode
;
private
Thread
leaderCheckerThread
;
public
CommunicationService
(
final
String
groupId
,
final
String
serverId
,
final
String
clusterAddresses
,
final
int
electionTimeout
)
{
ClusterNodeOptions
clusterNodeOptions
=
new
ClusterNodeOptions
();
clusterNodeOptions
.
setGroupId
(
groupId
);
clusterNodeOptions
.
setServerId
(
serverId
);
clusterNodeOptions
.
setClusterAddresses
(
clusterAddresses
);
clusterNodeOptions
.
setElectionTimeoutUs
(
electionTimeout
);
this
.
clusterNode
=
new
ClusterNode
();
this
.
clusterNode
.
init
(
clusterNodeOptions
);
}
public
RaftConditionEntity
startCluster
(
final
int
leaderCheckTimeout
)
{
final
String
groupId
=
clusterNode
.
getNode
().
getGroupId
();
final
CliClientService
cliClientService
=
new
CliClientServiceImpl
();
cliClientService
.
init
(
new
CliOptions
());
PeerId
leaderId
=
clusterNode
.
getNode
().
getLeaderId
();
List
<
NodeEndpoint
>
nodes
=
NodeManager
.
getInstance
().
getNodesByGroupId
(
groupId
).
stream
().
map
((
Node
node
)
->
{
NodeEndpoint
nodeEndpoint
=
new
NodeEndpoint
();
nodeEndpoint
.
setAddress
(
node
.
getNodeId
().
getPeerId
().
getIp
());
nodeEndpoint
.
setPort
(
node
.
getNodeId
().
getPeerId
().
getPort
());
return
nodeEndpoint
;
}).
collect
(
Collectors
.
toList
());
RaftConditionEntity
raftConditionEntity
=
new
RaftConditionEntity
();
raftConditionEntity
.
setLeaderId
(
new
NodeEndpoint
(
leaderId
.
getIp
(),
leaderId
.
getPort
()));
raftConditionEntity
.
setNodes
(
nodes
);
this
.
leaderCheckerThread
=
new
Thread
(
new
RaftStateChecker
(
groupId
,
cliClientService
,
leaderCheckTimeout
,
raftConditionEntity
));
this
.
leaderCheckerThread
.
setDaemon
(
true
);
this
.
leaderCheckerThread
.
start
();
return
raftConditionEntity
;
}
public
void
interrupt
()
{
this
.
leaderCheckerThread
.
interrupt
();
this
.
clusterNode
.
getNode
().
shutdown
();
}
}
app/src/main/java/ru/nsu/fit/borzov/kotlin_cpp/communication/communication/NodeEndpoint.java
0 → 100644
View file @
5175cd33
package
ru.nsu.fit.borzov.kotlin_cpp.communication.communication
;
public
class
NodeEndpoint
{
private
String
address
;
private
int
port
;
public
NodeEndpoint
()
{
}
public
NodeEndpoint
(
String
address
,
int
port
)
{
this
.
address
=
address
;
this
.
port
=
port
;
}
public
String
getAddress
()
{
return
address
;
}
public
void
setAddress
(
String
address
)
{
this
.
address
=
address
;
}
public
int
getPort
()
{
return
port
;
}
public
void
setPort
(
int
port
)
{
this
.
port
=
port
;
}
}
app/src/main/java/ru/nsu/fit/borzov/kotlin_cpp/communication/communication/RaftConditionEntity.java
0 → 100644
View file @
5175cd33
package
ru.nsu.fit.borzov.kotlin_cpp.communication.communication
;
import
java.util.List
;
import
ru.nsu.fit.borzov.kotlin_cpp.communication.communication.NodeEndpoint
;
public
class
RaftConditionEntity
{
private
List
<
NodeEndpoint
>
nodes
;
private
NodeEndpoint
leaderId
;
private
List
<
Integer
>
deadNodesIndexes
;
private
boolean
isChangeCondition
=
false
;
public
List
<
Integer
>
getDeadNodesIndexes
()
{
return
deadNodesIndexes
;
}
public
void
setDeadNodesIndexes
(
List
<
Integer
>
deadNodesIndexes
)
{
this
.
deadNodesIndexes
=
deadNodesIndexes
;
}
public
List
<
NodeEndpoint
>
getNodes
()
{
return
nodes
;
}
public
void
setNodes
(
List
<
NodeEndpoint
>
nodes
)
{
this
.
nodes
=
nodes
;
}
public
NodeEndpoint
getLeaderId
()
{
return
leaderId
;
}
public
void
setLeaderId
(
NodeEndpoint
leaderId
)
{
this
.
leaderId
=
leaderId
;
}
public
boolean
isChangeCondition
()
{
return
isChangeCondition
;
}
public
void
setChangeCondition
(
boolean
changeCondition
)
{
isChangeCondition
=
changeCondition
;
}
}
app/src/main/java/ru/nsu/fit/borzov/kotlin_cpp/communication/communication/RaftStateChecker.java
0 → 100644
View file @
5175cd33
package
ru.nsu.fit.borzov.kotlin_cpp.communication.communication
;
import
com.alipay.sofa.jraft.Node
;
import
com.alipay.sofa.jraft.NodeManager
;
import
com.alipay.sofa.jraft.RouteTable
;
import
com.alipay.sofa.jraft.Status
;
import
com.alipay.sofa.jraft.entity.PeerId
;
import
com.alipay.sofa.jraft.rpc.CliClientService
;
import
java.io.DataOutputStream
;
import
java.io.IOException
;
import
java.net.InetAddress
;
import
java.net.Socket
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.concurrent.TimeoutException
;
import
java.util.stream.Collectors
;
import
ru.nsu.fit.borzov.kotlin_cpp.communication.communication.exception.NotLeaderRefreshException
;
public
class
RaftStateChecker
implements
Runnable
{
private
final
String
groupId
;
private
final
CliClientService
cliClientService
;
private
final
int
timeoutMs
;
private
final
RaftConditionEntity
raftConditionEntity
;
RaftStateChecker
(
final
String
groupId
,
final
CliClientService
cliClientService
,
final
int
timeoutMs
,
final
RaftConditionEntity
raftConditionEntity
)
{
this
.
groupId
=
groupId
;
this
.
cliClientService
=
cliClientService
;
this
.
timeoutMs
=
timeoutMs
;
this
.
raftConditionEntity
=
raftConditionEntity
;
}
@Override
public
void
run
()
{
NodeEndpoint
leaderEndpoint
;
List
<
NodeEndpoint
>
nodes
;
synchronized
(
raftConditionEntity
)
{
leaderEndpoint
=
raftConditionEntity
.
getLeaderId
();
nodes
=
raftConditionEntity
.
getNodes
();
}
while
(
true
)
{
Status
refreshLeaderStatus
=
null
;