Skip to content

Commit 3b56867

Browse files
committed
add a rr-socket provider ability to refresh its instances list.
add a reconnection process if an obtained list doesn't contain the node we currently connected to
1 parent 6ccc607 commit 3b56867

18 files changed

+558
-457
lines changed

Diff for: src/main/java/org/tarantool/BaseSocketChannelProvider.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress, int timeout
8282
*/
8383
private boolean areRetriesExhausted(int retries) {
8484
int limit = getRetriesLimit();
85-
if (limit < 0)
85+
if (limit < 0) {
8686
return false;
87+
}
8788
return retries >= limit;
8889
}
8990
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.tarantool;
2+
3+
import java.net.SocketAddress;
4+
import java.util.Collection;
5+
6+
public interface RefreshableSocketProvider {
7+
8+
Collection<SocketAddress> getAddresses();
9+
10+
void refreshAddresses(Collection<String> addresses);
11+
12+
}

Diff for: src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java

+62-28
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,31 @@
22

33
import java.io.IOException;
44
import java.net.InetSocketAddress;
5+
import java.net.SocketAddress;
56
import java.nio.channels.SocketChannel;
67
import java.util.ArrayList;
78
import java.util.Arrays;
89
import java.util.Collection;
910
import java.util.Collections;
1011
import java.util.List;
1112
import java.util.concurrent.atomic.AtomicInteger;
13+
import java.util.concurrent.locks.Lock;
14+
import java.util.concurrent.locks.ReadWriteLock;
15+
import java.util.concurrent.locks.ReentrantReadWriteLock;
16+
import java.util.stream.Collectors;
1217

1318
/**
1419
* Basic reconnection strategy that changes addresses in a round-robin fashion.
1520
* To be used with {@link TarantoolClientImpl}.
1621
*/
17-
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider {
22+
public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider implements RefreshableSocketProvider {
1823

19-
/**
20-
* Timeout to establish socket connection with an individual server.
21-
*/
22-
private int timeout; // 0 is infinite.
24+
private static final int NO_TIMEOUT = 0;
2325

2426
/**
25-
* Server addresses as configured.
27+
* Timeout to establish socket connection with an individual server.
2628
*/
27-
private final List<String> addresses = new ArrayList<>();
29+
private int timeout = NO_TIMEOUT;
2830

2931
/**
3032
* Socket addresses.
@@ -39,42 +41,63 @@ public class RoundRobinSocketProviderImpl extends BaseSocketChannelProvider {
3941
/**
4042
* Lock
4143
*/
42-
// private ReadWriteLock addressListLock = new ReentrantReadWriteLock();
44+
private ReadWriteLock addressListLock = new ReentrantReadWriteLock();
4345

4446
/**
4547
* Constructs an instance.
4648
*
4749
* @param addresses Array of addresses in a form of [host]:[port].
4850
*/
4951
public RoundRobinSocketProviderImpl(String... addresses) {
50-
if (addresses == null || addresses.length == 0) {
51-
throw new IllegalArgumentException("Addresses are null or empty.");
52+
if (addresses.length == 0) {
53+
throw new IllegalArgumentException("Addresses list must contain at least one address.");
5254
}
5355

5456
updateAddressList(Arrays.asList(addresses));
5557
}
5658

5759
private void updateAddressList(Collection<String> addresses) {
58-
String lastAddress = getLastObtainedAddress();
59-
this.addresses.clear();
60-
this.addresses.addAll(addresses);
61-
this.addresses.forEach(address -> socketAddresses.add(parseAddress(address)));
62-
if (lastAddress != null) {
63-
int recoveredPosition = this.addresses.indexOf(lastAddress);
64-
currentPosition.set(recoveredPosition);
60+
Lock writeLock = addressListLock.writeLock();
61+
writeLock.lock();
62+
try {
63+
InetSocketAddress lastAddress = getLastObtainedAddress();
64+
socketAddresses.clear();
65+
addresses.stream()
66+
.map(this::parseAddress)
67+
.collect(Collectors.toCollection(() -> socketAddresses));
68+
if (lastAddress != null) {
69+
int recoveredPosition = socketAddresses.indexOf(lastAddress);
70+
currentPosition.set(recoveredPosition);
71+
} else {
72+
currentPosition.set(-1);
73+
}
74+
} finally {
75+
writeLock.unlock();
6576
}
6677
}
6778

6879
/**
6980
* @return Configured addresses in a form of [host]:[port].
7081
*/
71-
public List<String> getAddresses() {
72-
return Collections.unmodifiableList(this.addresses);
82+
public List<SocketAddress> getAddresses() {
83+
Lock readLock = addressListLock.readLock();
84+
readLock.lock();
85+
try {
86+
return Collections.unmodifiableList(this.socketAddresses);
87+
} finally {
88+
readLock.unlock();
89+
}
7390
}
7491

75-
public String getLastObtainedAddress() {
76-
int index = currentPosition.get();
77-
return index >= 0 ? addresses.get(index) : null;
92+
private InetSocketAddress getLastObtainedAddress() {
93+
Lock readLock = addressListLock.readLock();
94+
readLock.lock();
95+
try {
96+
int index = currentPosition.get();
97+
return index >= 0 ? socketAddresses.get(index) : null;
98+
} finally {
99+
readLock.unlock();
100+
}
78101
}
79102

80103
/**
@@ -91,9 +114,7 @@ public RoundRobinSocketProviderImpl setTimeout(int timeout) {
91114
if (timeout < 0) {
92115
throw new IllegalArgumentException("timeout is negative.");
93116
}
94-
95117
this.timeout = timeout;
96-
97118
return this;
98119
}
99120

@@ -108,6 +129,7 @@ public int getTimeout() {
108129
@Override
109130
protected SocketChannel doRetry(int retryNumber, Throwable lastError) {
110131
int attempts = getAddressCount();
132+
// todo: recalc deadline?
111133
long deadline = System.currentTimeMillis() + timeout * attempts;
112134
while (!Thread.currentThread().isInterrupted()) {
113135
try {
@@ -135,18 +157,30 @@ protected SocketChannel doRetry(int retryNumber, Throwable lastError) {
135157
* @return Number of configured addresses.
136158
*/
137159
protected int getAddressCount() {
138-
return socketAddresses.size();
160+
Lock readLock = addressListLock.readLock();
161+
readLock.lock();
162+
try {
163+
return socketAddresses.size();
164+
} finally {
165+
readLock.unlock();
166+
}
139167
}
140168

141169
/**
142170
* @return Socket address to use for the next reconnection attempt.
143171
*/
144172
protected InetSocketAddress getNextSocketAddress() {
145-
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
146-
return socketAddresses.get(position);
173+
Lock readLock = addressListLock.readLock();
174+
readLock.lock();
175+
try {
176+
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
177+
return socketAddresses.get(position);
178+
} finally {
179+
readLock.unlock();
180+
}
147181
}
148182

149-
public void setAddresses(Collection<String> addresses) {
183+
public void refreshAddresses(Collection<String> addresses) {
150184
if (addresses == null || addresses.isEmpty()) {
151185
throw new IllegalArgumentException("Addresses are null or empty.");
152186
}

Diff for: src/main/java/org/tarantool/StringUtils.java

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.tarantool;
2+
3+
public class StringUtils {
4+
5+
public static boolean isEmpty(String string) {
6+
return (string == null) || (string.isEmpty());
7+
}
8+
9+
public static boolean isNotEmpty(String string) {
10+
return !isEmpty(string);
11+
}
12+
13+
public static boolean isBlank(String string) {
14+
return (string == null) || (string.trim().isEmpty());
15+
}
16+
17+
public static boolean isNotBlank(String string) {
18+
return !isBlank(string);
19+
}
20+
21+
}

Diff for: src/main/java/org/tarantool/TarantoolBase.java

+4-8
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,8 @@
1010
import java.util.concurrent.atomic.AtomicLong;
1111

1212
public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<Integer, List<?>, Object, Result> {
13-
protected String serverVersion;
14-
/**
15-
* Connection state
16-
*/
17-
protected String salt;
13+
14+
protected TarantoolInstanceConnectionMeta instanceConnectionMeta;
1815
protected MsgPackLite msgPackLite = MsgPackLite.INSTANCE;
1916
protected AtomicLong syncId = new AtomicLong();
2017
protected int initialRequestSize = 4096;
@@ -26,8 +23,7 @@ public TarantoolBase(String username, String password, Socket socket) {
2623
super();
2724
try {
2825
TarantoolInstanceConnectionMeta connectMeta = BinaryProtoUtils.connect(socket, username, password);
29-
this.serverVersion = connectMeta.getServerVersion();
30-
this.salt = connectMeta.getSalt();
26+
this.instanceConnectionMeta = connectMeta;
3127
} catch (IOException e) {
3228
throw new CommunicationException("Couldn't connect to tarantool", e);
3329
}
@@ -81,7 +77,7 @@ public void setInitialRequestSize(int initialRequestSize) {
8177
}
8278

8379
public String getServerVersion() {
84-
return serverVersion;
80+
return instanceConnectionMeta.getServerVersion();
8581
}
8682

8783
}

0 commit comments

Comments
 (0)