Skip to content

Commit a6193e1

Browse files
committed
Add support for tarantool clusters
Refactor SocketChannelProvider implementations. Now we have two SingleSocketChannelProviderImpl and RoundRobinSocketProviderImpl used by simple and cluster clients respectively. To achieve this a BaseSocketChannelProvider was extracted. Add a service discovery implementation based on a tarantool stored procedure which is called to obtain a new list of cluster nodes. Integrate service discovery and current cluster client. The client now is aware of potential nodes changing using a configurable background job which periodically checks whether node addresses have changed. If so the client refreshes the RoundRobinSocketProviderImpl and replaces old nodes by new ones. It also requires some additional effort in case of missing the current node in the list. We need to reconnect to another node as soon as possible with a minimal delay between client requests. To achieve this we currently try to catch a moment when the requests in progress have been accomplished and we can finish reconnection process. Fix a regression in TarantoolClientImpl. It is a wrong comparison between response result code and original request operation code. To perform a right thing TarantoolOp class was created to wrap an original future (see TarantoolClientImpl.complete(packet, feature)). Closes: 34
1 parent b25e8f0 commit a6193e1

24 files changed

+758
-940
lines changed

Diff for: src/main/java/org/tarantool/BaseSocketChannelProvider.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress, int timeout
8282
*/
8383
private boolean areRetriesExhausted(int retries) {
8484
int limit = getRetriesLimit();
85-
if (limit < 0)
85+
if (limit < 0) {
8686
return false;
87+
}
8788
return retries >= limit;
8889
}
8990
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.tarantool;
2+
3+
import java.net.SocketAddress;
4+
import java.util.Collection;
5+
6+
public interface RefreshableSocketProvider {
7+
8+
Collection<SocketAddress> getAddresses();
9+
10+
void refreshAddresses(Collection<String> addresses);
11+
12+
}

Diff for: src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java

+62-28
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,31 @@
22

33
import java.io.IOException;
44
import java.net.InetSocketAddress;
5+
import java.net.SocketAddress;
56
import java.nio.channels.SocketChannel;
67
import java.util.ArrayList;
78
import java.util.Arrays;
89
import java.util.Collection;
910
import java.util.Collections;
1011
import java.util.List;
1112
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.concurrent.locks.Lock;
14+
import java.util.concurrent.locks.ReadWriteLock;
15+
import java.util.concurrent.locks.ReentrantReadWriteLock;
16+
import java.util.stream.Collectors;
1217

1318
/**
1419
* Basic reconnection strategy that changes addresses in a round-robin fashion.
1520
* To be used with {@link TarantoolClientImpl}.
1621
*/
17-
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider {
22+
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider implements RefreshableSocketProvider {
1823

19-
/**
20-
* Timeout to establish socket connection with an individual server.
21-
*/
22-
private int timeout; // 0 is infinite.
24+
private static final int NO_TIMEOUT = 0;
2325

2426
/**
25-
* Server addresses as configured.
27+
* Timeout to establish socket connection with an individual server.
2628
*/
27-
private final List<String> addresses = new ArrayList<>();
29+
private int timeout = NO_TIMEOUT;
2830

2931
/**
3032
* Socket addresses.
@@ -39,42 +41,63 @@ public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider {
3941
/**
4042
* Lock
4143
*/
42-
// private ReadWriteLock addressListLock = new ReentrantReadWriteLock();
44+
private ReadWriteLock addressListLock = new ReentrantReadWriteLock();
4345

4446
/**
4547
* Constructs an instance.
4648
*
4749
* @param addresses Array of addresses in a form of [host]:[port].
4850
*/
4951
public RoundRobinSocketProviderImpl(String... addresses) {
50-
if (addresses == null || addresses.length == 0) {
51-
throw new IllegalArgumentException("Addresses are null or empty.");
52+
if (addresses.length == 0) {
53+
throw new IllegalArgumentException("Addresses list must contain at least one address.");
5254
}
5355

5456
updateAddressList(Arrays.asList(addresses));
5557
}
5658

5759
private void updateAddressList(Collection<String> addresses) {
58-
String lastAddress = getLastObtainedAddress();
59-
this.addresses.clear();
60-
this.addresses.addAll(addresses);
61-
this.addresses.forEach(address -> socketAddresses.add(parseAddress(address)));
62-
if (lastAddress != null) {
63-
int recoveredPosition = this.addresses.indexOf(lastAddress);
64-
currentPosition.set(recoveredPosition);
60+
Lock writeLock = addressListLock.writeLock();
61+
writeLock.lock();
62+
try {
63+
InetSocketAddress lastAddress = getLastObtainedAddress();
64+
socketAddresses.clear();
65+
addresses.stream()
66+
.map(this::parseAddress)
67+
.collect(Collectors.toCollection(() -> socketAddresses));
68+
if (lastAddress != null) {
69+
int recoveredPosition = socketAddresses.indexOf(lastAddress);
70+
currentPosition.set(recoveredPosition);
71+
} else {
72+
currentPosition.set(-1);
73+
}
74+
} finally {
75+
writeLock.unlock();
6576
}
6677
}
6778

6879
/**
6980
* @return Configured addresses in a form of [host]:[port].
7081
*/
71-
public List<String> getAddresses() {
72-
return Collections.unmodifiableList(this.addresses);
82+
public List<SocketAddress> getAddresses() {
83+
Lock readLock = addressListLock.readLock();
84+
readLock.lock();
85+
try {
86+
return Collections.unmodifiableList(this.socketAddresses);
87+
} finally {
88+
readLock.unlock();
89+
}
7390
}
7491

75-
public String getLastObtainedAddress() {
76-
int index = currentPosition.get();
77-
return index >= 0 ? addresses.get(index) : null;
92+
private InetSocketAddress getLastObtainedAddress() {
93+
Lock readLock = addressListLock.readLock();
94+
readLock.lock();
95+
try {
96+
int index = currentPosition.get();
97+
return index >= 0 ? socketAddresses.get(index) : null;
98+
} finally {
99+
readLock.unlock();
100+
}
78101
}
79102

80103
/**
@@ -91,9 +114,7 @@ public RoundRobinSocketProviderImpl setTimeout(int timeout) {
91114
if (timeout < 0) {
92115
throw new IllegalArgumentException("timeout is negative.");
93116
}
94-
95117
this.timeout = timeout;
96-
97118
return this;
98119
}
99120

@@ -108,6 +129,7 @@ public int getTimeout() {
108129
@Override
109130
protected SocketChannel doRetry(int retryNumber, Throwable lastError) {
110131
int attempts = getAddressCount();
132+
// todo: recalc deadline?
111133
long deadline = System.currentTimeMillis() + timeout * attempts;
112134
while (!Thread.currentThread().isInterrupted()) {
113135
try {
@@ -135,18 +157,30 @@ protected SocketChannel doRetry(int retryNumber, Throwable lastError) {
135157
* @return Number of configured addresses.
136158
*/
137159
protected int getAddressCount() {
138-
return socketAddresses.size();
160+
Lock readLock = addressListLock.readLock();
161+
readLock.lock();
162+
try {
163+
return socketAddresses.size();
164+
} finally {
165+
readLock.unlock();
166+
}
139167
}
140168

141169
/**
142170
* @return Socket address to use for the next reconnection attempt.
143171
*/
144172
protected InetSocketAddress getNextSocketAddress() {
145-
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
146-
return socketAddresses.get(position);
173+
Lock readLock = addressListLock.readLock();
174+
readLock.lock();
175+
try {
176+
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
177+
return socketAddresses.get(position);
178+
} finally {
179+
readLock.unlock();
180+
}
147181
}
148182

149-
public void setAddresses(Collection<String> addresses) {
183+
public void refreshAddresses(Collection<String> addresses) {
150184
if (addresses == null || addresses.isEmpty()) {
151185
throw new IllegalArgumentException("Addresses are null or empty.");
152186
}

Diff for: src/main/java/org/tarantool/StringUtils.java

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.tarantool;
2+
3+
public class StringUtils {
4+
5+
public static boolean isEmpty(String string) {
6+
return (string == null) || (string.isEmpty());
7+
}
8+
9+
public static boolean isNotEmpty(String string) {
10+
return !isEmpty(string);
11+
}
12+
13+
public static boolean isBlank(String string) {
14+
return (string == null) || (string.trim().isEmpty());
15+
}
16+
17+
public static boolean isNotBlank(String string) {
18+
return !isBlank(string);
19+
}
20+
21+
}

Diff for: src/main/java/org/tarantool/TarantoolBase.java

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.atomic.AtomicLong;
1111

1212
public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<Integer, List<?>, Object, Result> {
13+
1314
protected String serverVersion;
1415
/**
1516
* Connection state

0 commit comments

Comments
 (0)