Skip to content

Commit 04d1ed0

Browse files
committed
Add support for tarantool clusters
Refactor SocketChannelProvider implementations. Now we have two SingleSocketChannelProviderImpl and RoundRobinSocketProviderImpl used by simple and cluster clients respectively. To achieve this a BaseSocketChannelProvider was extracted. Add a service discovery implementation based on a tarantool stored procedure which is called to obtain a new list of cluster nodes. Integrate service discovery and current cluster client. The client now is aware of potential nodes changing using a configurable background job which periodically checks whether node addresses have changed. If so the client refreshes the RoundRobinSocketProviderImpl and replaces old nodes by new ones. It also requires some additional effort in case of missing the current node in the list. We need to reconnect to another node as soon as possible with a minimal delay between client requests. To achieve this we currently try to catch a moment when the requests in progress have been accomplished and we can finish reconnection process. Fix a regression in TarantoolClientImpl. It is a wrong comparison between response result code and original request operation code. To perform a right thing TarantoolOp class was created to wrap an original future (see TarantoolClientImpl.complete(packet, feature)). Closes: 34
1 parent b25e8f0 commit 04d1ed0

29 files changed

+1025
-982
lines changed

Diff for: src/main/java/org/tarantool/BaseSocketChannelProvider.java

+32-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ public abstract class BaseSocketChannelProvider implements SocketChannelProvider
1111
*/
1212
private int retriesLimit = RETRY_NO_LIMIT;
1313

14+
/**
15+
* Timeout to establish socket connection with an individual server.
16+
*/
17+
private int timeout = NO_TIMEOUT;
18+
1419
@Override
1520
public final SocketChannel get(int retryNumber, Throwable lastError) {
1621
if (areRetriesExhausted(retryNumber)) {
@@ -56,7 +61,7 @@ protected InetSocketAddress parseAddress(String address) {
5661
return new InetSocketAddress(host, port);
5762
}
5863

59-
protected SocketChannel openChannel(InetSocketAddress socketAddress, int timeout) throws IOException {
64+
protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOException {
6065
SocketChannel channel = null;
6166
try {
6267
channel = SocketChannel.open();
@@ -74,6 +79,30 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress, int timeout
7479
}
7580
}
7681

82+
/**
83+
* Sets maximum amount of time to wait for a socket connection establishment
84+
* with an individual server.
85+
* <p>
86+
* Zero means infinite timeout.
87+
*
88+
* @param timeout timeout value, ms.
89+
* @throws IllegalArgumentException if timeout is negative.
90+
*/
91+
public void setTimeout(int timeout) {
92+
if (timeout < 0) {
93+
throw new IllegalArgumentException("timeout is negative.");
94+
}
95+
this.timeout = timeout;
96+
}
97+
98+
/**
99+
* @return Maximum amount of time to wait for a socket connection establishment
100+
* with an individual server.
101+
*/
102+
public int getTimeout() {
103+
return timeout;
104+
}
105+
77106
/**
78107
* Provides a decision on whether retries limit is hit.
79108
*
@@ -82,8 +111,9 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress, int timeout
82111
*/
83112
private boolean areRetriesExhausted(int retries) {
84113
int limit = getRetriesLimit();
85-
if (limit < 0)
114+
if (limit < 0) {
86115
return false;
116+
}
87117
return retries >= limit;
88118
}
89119
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.tarantool;
2+
3+
import java.net.SocketAddress;
4+
import java.util.Collection;
5+
6+
public interface RefreshableSocketProvider {
7+
8+
Collection<SocketAddress> getAddresses();
9+
10+
void refreshAddresses(Collection<String> addresses);
11+
12+
}

Diff for: src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java

+61-60
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,24 @@
22

33
import java.io.IOException;
44
import java.net.InetSocketAddress;
5+
import java.net.SocketAddress;
56
import java.nio.channels.SocketChannel;
67
import java.util.ArrayList;
78
import java.util.Arrays;
89
import java.util.Collection;
910
import java.util.Collections;
1011
import java.util.List;
1112
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.concurrent.locks.Lock;
14+
import java.util.concurrent.locks.ReadWriteLock;
15+
import java.util.concurrent.locks.ReentrantReadWriteLock;
16+
import java.util.stream.Collectors;
1217

1318
/**
1419
* Basic reconnection strategy that changes addresses in a round-robin fashion.
1520
* To be used with {@link TarantoolClientImpl}.
1621
*/
17-
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider {
18-
19-
/**
20-
* Timeout to establish socket connection with an individual server.
21-
*/
22-
private int timeout; // 0 is infinite.
23-
24-
/**
25-
* Server addresses as configured.
26-
*/
27-
private final List<String> addresses = new ArrayList<>();
22+
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider implements RefreshableSocketProvider {
2823

2924
/**
3025
* Socket addresses.
@@ -39,79 +34,73 @@ public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider {
3934
/**
4035
* Lock
4136
*/
42-
// private ReadWriteLock addressListLock = new ReentrantReadWriteLock();
37+
private ReadWriteLock addressListLock = new ReentrantReadWriteLock();
4338

4439
/**
4540
* Constructs an instance.
4641
*
4742
* @param addresses Array of addresses in a form of [host]:[port].
4843
*/
4944
public RoundRobinSocketProviderImpl(String... addresses) {
50-
if (addresses == null || addresses.length == 0) {
51-
throw new IllegalArgumentException("Addresses are null or empty.");
45+
if (addresses.length == 0) {
46+
throw new IllegalArgumentException("Addresses list must contain at least one address.");
5247
}
5348

5449
updateAddressList(Arrays.asList(addresses));
5550
}
5651

5752
private void updateAddressList(Collection<String> addresses) {
58-
String lastAddress = getLastObtainedAddress();
59-
this.addresses.clear();
60-
this.addresses.addAll(addresses);
61-
this.addresses.forEach(address -> socketAddresses.add(parseAddress(address)));
62-
if (lastAddress != null) {
63-
int recoveredPosition = this.addresses.indexOf(lastAddress);
64-
currentPosition.set(recoveredPosition);
53+
Lock writeLock = addressListLock.writeLock();
54+
writeLock.lock();
55+
try {
56+
InetSocketAddress lastAddress = getLastObtainedAddress();
57+
socketAddresses.clear();
58+
addresses.stream()
59+
.map(this::parseAddress)
60+
.collect(Collectors.toCollection(() -> socketAddresses));
61+
if (lastAddress != null) {
62+
int recoveredPosition = socketAddresses.indexOf(lastAddress);
63+
currentPosition.set(recoveredPosition);
64+
} else {
65+
currentPosition.set(-1);
66+
}
67+
} finally {
68+
writeLock.unlock();
6569
}
6670
}
6771

6872
/**
69-
* @return Configured addresses in a form of [host]:[port].
70-
*/
71-
public List<String> getAddresses() {
72-
return Collections.unmodifiableList(this.addresses);
73-
}
74-
75-
public String getLastObtainedAddress() {
76-
int index = currentPosition.get();
77-
return index >= 0 ? addresses.get(index) : null;
78-
}
79-
80-
/**
81-
* Sets maximum amount of time to wait for a socket connection establishment
82-
* with an individual server.
83-
* <p>
84-
* Zero means infinite timeout.
85-
*
86-
* @param timeout Timeout value, ms.
87-
* @return {@code this}.
88-
* @throws IllegalArgumentException If timeout is negative.
73+
* @return resolved socket addresses
8974
*/
90-
public RoundRobinSocketProviderImpl setTimeout(int timeout) {
91-
if (timeout < 0) {
92-
throw new IllegalArgumentException("timeout is negative.");
75+
public List<SocketAddress> getAddresses() {
76+
Lock readLock = addressListLock.readLock();
77+
readLock.lock();
78+
try {
79+
return Collections.unmodifiableList(this.socketAddresses);
80+
} finally {
81+
readLock.unlock();
9382
}
94-
95-
this.timeout = timeout;
96-
97-
return this;
9883
}
9984

100-
/**
101-
* @return Maximum amount of time to wait for a socket connection establishment
102-
* with an individual server.
103-
*/
104-
public int getTimeout() {
105-
return timeout;
85+
protected InetSocketAddress getLastObtainedAddress() {
86+
Lock readLock = addressListLock.readLock();
87+
readLock.lock();
88+
try {
89+
int index = currentPosition.get();
90+
return index >= 0 ? socketAddresses.get(index) : null;
91+
} finally {
92+
readLock.unlock();
93+
}
10694
}
10795

10896
@Override
10997
protected SocketChannel doRetry(int retryNumber, Throwable lastError) {
11098
int attempts = getAddressCount();
111-
long deadline = System.currentTimeMillis() + timeout * attempts;
99+
// todo: recalc deadline?
100+
long deadline = System.currentTimeMillis() + getTimeout() * attempts;
112101
while (!Thread.currentThread().isInterrupted()) {
113102
try {
114-
return openChannel(getNextSocketAddress(), timeout);
103+
return openChannel(getNextSocketAddress());
115104
} catch (IOException e) {
116105
long now = System.currentTimeMillis();
117106
if (deadline <= now) {
@@ -135,18 +124,30 @@ protected SocketChannel doRetry(int retryNumber, Throwable lastError) {
135124
* @return Number of configured addresses.
136125
*/
137126
protected int getAddressCount() {
138-
return socketAddresses.size();
127+
Lock readLock = addressListLock.readLock();
128+
readLock.lock();
129+
try {
130+
return socketAddresses.size();
131+
} finally {
132+
readLock.unlock();
133+
}
139134
}
140135

141136
/**
142137
* @return Socket address to use for the next reconnection attempt.
143138
*/
144139
protected InetSocketAddress getNextSocketAddress() {
145-
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
146-
return socketAddresses.get(position);
140+
Lock readLock = addressListLock.readLock();
141+
readLock.lock();
142+
try {
143+
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
144+
return socketAddresses.get(position);
145+
} finally {
146+
readLock.unlock();
147+
}
147148
}
148149

149-
public void setAddresses(Collection<String> addresses) {
150+
public void refreshAddresses(Collection<String> addresses) {
150151
if (addresses == null || addresses.isEmpty()) {
151152
throw new IllegalArgumentException("Addresses are null or empty.");
152153
}

Diff for: src/main/java/org/tarantool/SingleSocketChannelProviderImpl.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.IOException;
44
import java.net.InetSocketAddress;
5+
import java.net.SocketAddress;
56
import java.nio.channels.SocketChannel;
67

78
/**
@@ -11,24 +12,33 @@
1112
public class SingleSocketChannelProviderImpl extends BaseSocketChannelProvider {
1213

1314
private InetSocketAddress address;
14-
private int timeout;
1515

1616
/**
17-
*
17+
* Creates a simple provider
1818
*
1919
* @param address instance address
20-
* @param timeout time in millis
2120
*/
22-
public SingleSocketChannelProviderImpl(String address, int timeout) {
21+
public SingleSocketChannelProviderImpl(String address) {
22+
setAddress(address);
23+
}
24+
25+
public SocketAddress getAddress() {
26+
return address;
27+
}
28+
29+
public void setAddress(String address) {
30+
if (StringUtils.isBlank(address)) {
31+
throw new IllegalArgumentException("address must not be empty");
32+
}
33+
2334
this.address = parseAddress(address);
24-
this.timeout = timeout;
2535
}
2636

2737
@Override
2838
protected SocketChannel doRetry(int retryNumber, Throwable lastError) {
2939
long deadline = System.currentTimeMillis();
3040
try {
31-
return openChannel(address, timeout);
41+
return openChannel(address);
3242
} catch (IOException e) {
3343
if (deadline <= System.currentTimeMillis()) {
3444
throw new CommunicationException("Connection time out.", e);

Diff for: src/main/java/org/tarantool/SocketChannelProvider.java

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
public interface SocketChannelProvider {
77

88
int RETRY_NO_LIMIT = -1;
9+
int NO_TIMEOUT = 0;
910

1011
/**
1112
* Provides socket channel to init restore connection.

Diff for: src/main/java/org/tarantool/StringUtils.java

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.tarantool;
2+
3+
public class StringUtils {
4+
5+
public static boolean isEmpty(String string) {
6+
return (string == null) || (string.isEmpty());
7+
}
8+
9+
public static boolean isNotEmpty(String string) {
10+
return !isEmpty(string);
11+
}
12+
13+
public static boolean isBlank(String string) {
14+
return (string == null) || (string.trim().isEmpty());
15+
}
16+
17+
public static boolean isNotBlank(String string) {
18+
return !isBlank(string);
19+
}
20+
21+
}

Diff for: src/main/java/org/tarantool/TarantoolBase.java

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.atomic.AtomicLong;
1111

1212
public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<Integer, List<?>, Object, Result> {
13+
1314
protected String serverVersion;
1415
/**
1516
* Connection state

0 commit comments

Comments
 (0)