Skip to content

Commit 7d00b2a

Browse files
committed
Support auto refresh a list of cluster nodes
Refactor SocketChannelProvider implementations. Now we have two SingleSocketChannelProviderImpl and RoundRobinSocketProviderImpl used by simple and cluster clients respectively. To achieve this a BaseSocketChannelProvider was extracted. Add a service discovery implementation based on a Tarantool stored procedure which is called to obtain a new list of cluster nodes. Integrate service discovery and current cluster client. The client now is aware of potential nodes changing using a configurable background job which periodically checks whether node addresses have changed. If so the client refreshes the RoundRobinSocketProviderImpl and replaces old nodes by new ones. It also requires some additional effort in case of missing the current node in the list. We need to reconnect to another node as soon as possible with a minimal delay between client requests. To achieve this we currently try to catch a moment when the requests in progress have been accomplished and we can finish reconnection process. Closes: #34 See also: #142
1 parent 443d6f4 commit 7d00b2a

21 files changed

+1623
-262
lines changed
+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package org.tarantool;
2+
3+
import java.io.IOException;
4+
import java.net.InetSocketAddress;
5+
import java.nio.channels.SocketChannel;
6+
7+
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
8+
9+
/**
10+
* Limit of retries.
11+
*/
12+
private int retriesLimit = RETRY_NO_LIMIT;
13+
14+
/**
15+
* Timeout to establish socket connection with an individual server.
16+
*/
17+
private int timeout = NO_TIMEOUT;
18+
19+
/**
20+
* Tries to establish a new connection to the Tarantool instances.
21+
*
22+
* @param retryNumber number of current retry. Reset after successful connect.
23+
* @param lastError the last error occurs when reconnecting
24+
*
25+
* @return connected socket channel
26+
*
27+
* @throws CommunicationException if any I/O errors happen or there are
28+
* no addresses available
29+
*/
30+
@Override
31+
public final SocketChannel get(int retryNumber, Throwable lastError) {
32+
if (areRetriesExhausted(retryNumber)) {
33+
throw new CommunicationException("Connection retries exceeded.", lastError);
34+
}
35+
36+
long deadline = System.currentTimeMillis() + timeout;
37+
while (!Thread.currentThread().isInterrupted()) {
38+
try {
39+
InetSocketAddress address = getAddress(retryNumber, lastError);
40+
return openChannel(address);
41+
} catch (IOException e) {
42+
checkTimeout(deadline, e);
43+
}
44+
}
45+
throw new CommunicationException("Thread interrupted.", new InterruptedException());
46+
}
47+
48+
private void checkTimeout(long deadline, Exception e) {
49+
long timeLeft = deadline - System.currentTimeMillis();
50+
if (timeLeft <= 0) {
51+
throw new CommunicationException("Connection time out.", e);
52+
}
53+
try {
54+
Thread.sleep(timeLeft / 10);
55+
} catch (InterruptedException ignored) {
56+
Thread.currentThread().interrupt();
57+
}
58+
}
59+
60+
/**
61+
* Gets address to be used to establish a new connection
62+
* Address can be null.
63+
*
64+
* @param retryNumber reconnection attempt number
65+
* @param lastError reconnection reason
66+
*
67+
* @return available address which is depended on implementation
68+
*
69+
* @throws IOException if any I/O errors occur
70+
*/
71+
protected abstract InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException;
72+
73+
/**
74+
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
75+
* The retry count is maintained by a {@link #get(int, Throwable)} caller
76+
* when a socket level connection was established.
77+
* <p>
78+
* Negative value means unlimited attempts.
79+
*
80+
* @param retriesLimit Limit of retries to use.
81+
*/
82+
public void setRetriesLimit(int retriesLimit) {
83+
this.retriesLimit = retriesLimit;
84+
}
85+
86+
/**
87+
* Gets limit of attempts to establish connection.
88+
*
89+
* @return Maximum reconnect attempts to make before raising exception.
90+
*/
91+
public int getRetriesLimit() {
92+
return retriesLimit;
93+
}
94+
95+
/**
96+
* Parse a string address in the form of host[:port]
97+
* and builds a socket address.
98+
*
99+
* @param address Server address.
100+
*
101+
* @return Socket address.
102+
*/
103+
protected InetSocketAddress parseAddress(String address) {
104+
int separatorPosition = address.indexOf(':');
105+
String host = (separatorPosition < 0) ? address : address.substring(0, separatorPosition);
106+
int port = (separatorPosition < 0) ? 3301 : Integer.parseInt(address.substring(separatorPosition + 1));
107+
return new InetSocketAddress(host, port);
108+
}
109+
110+
protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOException {
111+
SocketChannel channel = null;
112+
try {
113+
channel = SocketChannel.open();
114+
channel.socket().connect(socketAddress, timeout);
115+
return channel;
116+
} catch (IOException e) {
117+
if (channel != null) {
118+
try {
119+
channel.close();
120+
} catch (IOException ignored) {
121+
// No-op.
122+
}
123+
}
124+
throw e;
125+
}
126+
}
127+
128+
/**
129+
* Sets maximum amount of time to wait for a socket connection establishment
130+
* with an individual server.
131+
* <p>
132+
* Zero means infinite timeout.
133+
*
134+
* @param timeout timeout value, ms.
135+
*
136+
* @throws IllegalArgumentException if timeout is negative.
137+
*/
138+
public void setTimeout(int timeout) {
139+
if (timeout < 0) {
140+
throw new IllegalArgumentException("timeout is negative.");
141+
}
142+
this.timeout = timeout;
143+
}
144+
145+
/**
146+
* Gest maximum amount of time to wait for a socket
147+
* connection establishment with an individual server.
148+
*
149+
* @return timeout
150+
*/
151+
public int getTimeout() {
152+
return timeout;
153+
}
154+
155+
/**
156+
* Provides a decision on whether retries limit is hit.
157+
*
158+
* @param retries Current count of retries.
159+
*
160+
* @return {@code true} if retries are exhausted.
161+
*/
162+
private boolean areRetriesExhausted(int retries) {
163+
int limit = getRetriesLimit();
164+
if (limit < 0) {
165+
return false;
166+
}
167+
return retries >= limit;
168+
}
169+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.tarantool;
2+
3+
import java.net.SocketAddress;
4+
import java.util.Collection;
5+
6+
public interface RefreshableSocketProvider {
7+
8+
Collection<SocketAddress> getAddresses();
9+
10+
void refreshAddresses(Collection<String> addresses);
11+
12+
}

0 commit comments

Comments
 (0)