Skip to content

Commit cf16d5c

Browse files
committed
add round robin fail-over for a client
Added cluster-ready client implementation that would fail-over to the next configured server in a round-robin fashion. Closes tarantool#37
1 parent 3c477bd commit cf16d5c

14 files changed

+783
-21
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ script:
1818
else
1919
mvn verify
2020
fi
21-
- cat testroot/jdk-testing.log
21+
- head -n -0 testroot/*.log
2222

2323
after_success:
2424
- |

pom.xml

+3
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@
6262
</goals>
6363
</execution>
6464
</executions>
65+
<configuration>
66+
<trimStackTrace>false</trimStackTrace>
67+
</configuration>
6568
</plugin>
6669
<plugin>
6770
<groupId>org.jacoco</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package org.tarantool;
2+
3+
import java.io.IOException;
4+
import java.net.InetSocketAddress;
5+
import java.nio.channels.SocketChannel;
6+
import java.util.Arrays;
7+
8+
/**
9+
* Basic reconnection strategy that changes addresses in a round-robin fashion.
10+
* To be used with {@link TarantoolClientImpl}.
11+
*/
12+
public class RoundRobinSocketProviderImpl implements SocketChannelProvider {
13+
/** Timeout to establish socket connection with an individual server. */
14+
private int timeout; // 0 is infinite.
15+
/** Limit of retries. */
16+
private int retriesLimit = -1; // No-limit.
17+
/** Server addresses as configured. */
18+
private final String[] addrs;
19+
/** Socket addresses. */
20+
private final InetSocketAddress[] sockAddrs;
21+
/** Current position within {@link #sockAddrs} array. */
22+
private int pos;
23+
24+
/**
25+
* Constructs an instance.
26+
*
27+
* @param addrs Array of addresses in a form of [host]:[port].
28+
*/
29+
public RoundRobinSocketProviderImpl(String... addrs) {
30+
if (addrs == null || addrs.length == 0)
31+
throw new IllegalArgumentException("addrs is null or empty.");
32+
33+
this.addrs = Arrays.copyOf(addrs, addrs.length);
34+
35+
sockAddrs = new InetSocketAddress[this.addrs.length];
36+
37+
for (int i = 0; i < this.addrs.length; i++) {
38+
sockAddrs[i] = parseAddress(this.addrs[i]);
39+
}
40+
}
41+
42+
/**
43+
* @return Configured addresses in a form of [host]:[port].
44+
*/
45+
public String[] getAddresses() {
46+
return this.addrs;
47+
}
48+
49+
/**
50+
* Sets maximum amount of time to wait for a socket connection establishment
51+
* with an individual server.
52+
*
53+
* Zero means infinite timeout.
54+
*
55+
* @param timeout Timeout value, ms.
56+
* @return {@code this}.
57+
* @throws IllegalArgumentException If timeout is negative.
58+
*/
59+
public RoundRobinSocketProviderImpl setTimeout(int timeout) {
60+
if (timeout < 0)
61+
throw new IllegalArgumentException("timeout is negative.");
62+
63+
this.timeout = timeout;
64+
65+
return this;
66+
}
67+
68+
/**
69+
* @return Maximum amount of time to wait for a socket connection establishment
70+
* with an individual server.
71+
*/
72+
public int getTimeout() {
73+
return timeout;
74+
}
75+
76+
/**
77+
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
78+
* The retry count is maintained by a {@link #get(int, Throwable)} caller
79+
* when a socket level connection was established.
80+
*
81+
* Negative value means unlimited.
82+
*
83+
* @param retriesLimit Limit of retries to use.
84+
* @return {@code this}.
85+
*/
86+
public RoundRobinSocketProviderImpl setRetriesLimit(int retriesLimit) {
87+
this.retriesLimit = retriesLimit;
88+
89+
return this;
90+
}
91+
92+
/**
93+
* @return Maximum reconnect attempts to make before raising exception.
94+
*/
95+
public int getRetriesLimit() {
96+
return retriesLimit;
97+
}
98+
99+
/** {@inheritDoc} */
100+
@Override
101+
public SocketChannel get(int retryNumber, Throwable lastError) {
102+
if (areRetriesExhausted(retryNumber)) {
103+
throw new CommunicationException("Connection retries exceeded.", lastError);
104+
}
105+
int attempts = getAddressCount();
106+
long deadline = System.currentTimeMillis() + timeout * attempts;
107+
while (!Thread.currentThread().isInterrupted()) {
108+
SocketChannel channel = null;
109+
try {
110+
channel = SocketChannel.open();
111+
InetSocketAddress addr = getNextSocketAddress();
112+
channel.socket().connect(addr, timeout);
113+
return channel;
114+
} catch (IOException e) {
115+
if (channel != null) {
116+
try {
117+
channel.close();
118+
} catch (IOException ignored) {
119+
// No-op.
120+
}
121+
}
122+
long now = System.currentTimeMillis();
123+
if (deadline <= now) {
124+
throw new CommunicationException("Connection time out.", e);
125+
}
126+
if (--attempts == 0) {
127+
// Tried all addresses without any lack, but still have time.
128+
attempts = getAddressCount();
129+
try {
130+
Thread.sleep((deadline - now) / attempts);
131+
} catch (InterruptedException ignored) {
132+
Thread.currentThread().interrupt();
133+
}
134+
}
135+
}
136+
}
137+
throw new CommunicationException("Thread interrupted.", new InterruptedException());
138+
}
139+
140+
/**
141+
* @return Number of configured addresses.
142+
*/
143+
protected int getAddressCount() {
144+
return sockAddrs.length;
145+
}
146+
147+
/**
148+
* @return Socket address to use for the next reconnection attempt.
149+
*/
150+
protected InetSocketAddress getNextSocketAddress() {
151+
InetSocketAddress res = sockAddrs[pos];
152+
pos = (pos + 1) % sockAddrs.length;
153+
return res;
154+
}
155+
156+
/**
157+
* Parse a string address in the form of [host]:[port]
158+
* and builds a socket address.
159+
*
160+
* @param addr Server address.
161+
* @return Socket address.
162+
*/
163+
protected InetSocketAddress parseAddress(String addr) {
164+
int idx = addr.indexOf(':');
165+
String host = (idx < 0) ? addr : addr.substring(0, idx);
166+
int port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1));
167+
return new InetSocketAddress(host, port);
168+
}
169+
170+
/**
171+
* Provides a decision on whether retries limit is hit.
172+
*
173+
* @param retries Current count of retries.
174+
* @return {@code true} if retries are exhausted.
175+
*/
176+
private boolean areRetriesExhausted(int retries) {
177+
int limit = getRetriesLimit();
178+
if (limit < 0)
179+
return false;
180+
return retries >= limit;
181+
}
182+
}

src/main/java/org/tarantool/TarantoolClientImpl.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -246,14 +246,15 @@ protected synchronized void die(String message, Exception cause) {
246246
if (thumbstone != null) {
247247
return;
248248
}
249-
this.thumbstone = new CommunicationException(message, cause);
249+
final CommunicationException err = new CommunicationException(message, cause);
250+
this.thumbstone = err;
250251
while (!futures.isEmpty()) {
251252
Iterator<Map.Entry<Long, FutureImpl<?>>> iterator = futures.entrySet().iterator();
252253
while (iterator.hasNext()) {
253254
Map.Entry<Long, FutureImpl<?>> elem = iterator.next();
254255
if (elem != null) {
255256
FutureImpl<?> future = elem.getValue();
256-
fail(future, cause);
257+
fail(future, err);
257258
}
258259
iterator.remove();
259260
}
@@ -606,6 +607,14 @@ protected boolean isDead(FutureImpl<?> q) {
606607
return false;
607608
}
608609

610+
/**
611+
* A subclass may use this as a trigger to start retries.
612+
* This method is called when state becomes ALIVE.
613+
*/
614+
protected void onReconnect() {
615+
// No-op, override.
616+
}
617+
609618
public Exception getThumbstone() {
610619
return thumbstone;
611620
}
@@ -679,6 +688,7 @@ protected boolean compareAndSet(int expect, int update) {
679688
if (update == ALIVE) {
680689
CountDownLatch latch = nextAliveLatch.getAndSet(new CountDownLatch(1));
681690
latch.countDown();
691+
onReconnect();
682692
} else if (update == CLOSED) {
683693
closedLatch.countDown();
684694
}
@@ -706,7 +716,9 @@ private CountDownLatch getStateLatch(int state) {
706716
throw new IllegalStateException("State is CLOSED.");
707717
}
708718
CountDownLatch latch = nextAliveLatch.get();
709-
return (getState() == ALIVE) ? null : latch;
719+
/* It may happen so that an error is detected but the state is still alive.
720+
Wait for the 'next' alive state in such cases. */
721+
return (getState() == ALIVE && thumbstone == null) ? null : latch;
710722
}
711723
return null;
712724
}

0 commit comments

Comments
 (0)