Skip to content

Commit 7382d08

Browse files
committed
Support auto refresh a list of cluster nodes
Refactor SocketChannelProvider implementations. Now we have two SingleSocketChannelProviderImpl and RoundRobinSocketProviderImpl used by simple and cluster clients respectively. To achieve this a BaseSocketChannelProvider was extracted. Add a service discovery implementation based on a tarantool stored procedure which is called to obtain a new list of cluster nodes. Integrate service discovery and current cluster client. The client now is aware of potential nodes changing using a configurable background job which periodically checks whether node addresses have changed. If so the client refreshes the RoundRobinSocketProviderImpl and replaces old nodes by new ones. It also requires some additional effort in case of missing the current node in the list. We need to reconnect to another node as soon as possible with a minimal delay between client requests. To achieve this we currently try to catch a moment when the requests in progress have been accomplished and we can finish reconnection process. Closes: #34
1 parent 4715727 commit 7382d08

21 files changed

+1530
-308
lines changed
+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package org.tarantool;
2+
3+
import java.io.IOException;
4+
import java.net.InetSocketAddress;
5+
import java.nio.channels.SocketChannel;
6+
7+
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
8+
9+
/**
10+
* Limit of retries.
11+
*/
12+
private int retriesLimit = RETRY_NO_LIMIT;
13+
14+
/**
15+
* Timeout to establish socket connection with an individual server.
16+
*/
17+
private int timeout = NO_TIMEOUT;
18+
19+
/**
20+
* @param retryNumber number of current retry. Reset after successful connect.
21+
* @param lastError the last error occurs when reconnecting
22+
* @return
23+
*/
24+
@Override
25+
public final SocketChannel get(int retryNumber, Throwable lastError) {
26+
if (areRetriesExhausted(retryNumber)) {
27+
throw new CommunicationException("Connection retries exceeded.", lastError);
28+
}
29+
30+
long deadline = System.currentTimeMillis() + timeout;
31+
while (!Thread.currentThread().isInterrupted()) {
32+
try {
33+
return doRetry(retryNumber, lastError);
34+
} catch (IOException e) {
35+
long timeLeft = deadline - System.currentTimeMillis();
36+
if (timeLeft <= 0) {
37+
throw new CommunicationException("Connection time out.", e);
38+
}
39+
try {
40+
Thread.sleep(timeLeft / 10);
41+
} catch (InterruptedException ignored) {
42+
Thread.currentThread().interrupt();
43+
}
44+
}
45+
}
46+
throw new CommunicationException("Thread interrupted.", new InterruptedException());
47+
}
48+
49+
protected abstract SocketChannel doRetry(int retryNumber, Throwable lastError) throws IOException;
50+
51+
/**
52+
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
53+
* The retry count is maintained by a {@link #get(int, Throwable)} caller
54+
* when a socket level connection was established.
55+
* <p>
56+
* Negative value means unlimited attempts.
57+
*
58+
* @param retriesLimit Limit of retries to use.
59+
*/
60+
public void setRetriesLimit(int retriesLimit) {
61+
this.retriesLimit = retriesLimit;
62+
}
63+
64+
/**
65+
* @return Maximum reconnect attempts to make before raising exception.
66+
*/
67+
public int getRetriesLimit() {
68+
return retriesLimit;
69+
}
70+
71+
/**
72+
* Parse a string address in the form of host[:port]
73+
* and builds a socket address.
74+
*
75+
* @param address Server address.
76+
* @return Socket address.
77+
*/
78+
protected InetSocketAddress parseAddress(String address) {
79+
int separatorPosition = address.indexOf(':');
80+
String host = (separatorPosition < 0) ? address : address.substring(0, separatorPosition);
81+
int port = (separatorPosition < 0) ? 3301 : Integer.parseInt(address.substring(separatorPosition + 1));
82+
return new InetSocketAddress(host, port);
83+
}
84+
85+
protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOException {
86+
SocketChannel channel = null;
87+
try {
88+
channel = SocketChannel.open();
89+
channel.socket().connect(socketAddress, timeout);
90+
return channel;
91+
} catch (IOException e) {
92+
if (channel != null) {
93+
try {
94+
channel.close();
95+
} catch (IOException ignored) {
96+
// No-op.
97+
}
98+
}
99+
throw e;
100+
}
101+
}
102+
103+
/**
104+
* Sets maximum amount of time to wait for a socket connection establishment
105+
* with an individual server.
106+
* <p>
107+
* Zero means infinite timeout.
108+
*
109+
* @param timeout timeout value, ms.
110+
* @throws IllegalArgumentException if timeout is negative.
111+
*/
112+
public void setTimeout(int timeout) {
113+
if (timeout < 0) {
114+
throw new IllegalArgumentException("timeout is negative.");
115+
}
116+
this.timeout = timeout;
117+
}
118+
119+
/**
120+
* @return Maximum amount of time to wait for a socket connection establishment
121+
* with an individual server.
122+
*/
123+
public int getTimeout() {
124+
return timeout;
125+
}
126+
127+
/**
128+
* Provides a decision on whether retries limit is hit.
129+
*
130+
* @param retries Current count of retries.
131+
* @return {@code true} if retries are exhausted.
132+
*/
133+
private boolean areRetriesExhausted(int retries) {
134+
int limit = getRetriesLimit();
135+
if (limit < 0) {
136+
return false;
137+
}
138+
return retries >= limit;
139+
}
140+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.tarantool;
2+
3+
import java.net.SocketAddress;
4+
import java.util.Collection;
5+
6+
public interface RefreshableSocketProvider {
7+
8+
Collection<SocketAddress> getAddresses();
9+
10+
void refreshAddresses(Collection<String> addresses);
11+
12+
}

0 commit comments

Comments
 (0)