Skip to content

Commit cfec8b8

Browse files
committed
Support auto refresh a list of cluster nodes
Refactor SocketChannelProvider implementations. Now we have two SingleSocketChannelProviderImpl and RoundRobinSocketProviderImpl used by simple and cluster clients respectively. To achieve this a BaseSocketChannelProvider was extracted. Add a service discovery implementation based on a tarantool stored procedure which is called to obtain a new list of cluster nodes. Integrate service discovery and current cluster client. The client now is aware of potential nodes changing using a configurable background job which periodically checks whether node addresses have changed. If so the client refreshes the RoundRobinSocketProviderImpl and replaces old nodes by new ones. It also requires some additional effort in case of missing the current node in the list. We need to reconnect to another node as soon as possible with a minimal delay between client requests. To achieve this we currently try to catch a moment when the requests in progress have been accomplished and we can finish reconnection process. Closes: #34
1 parent e7e775e commit cfec8b8

21 files changed

+1772
-317
lines changed
+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package org.tarantool;
2+
3+
import java.io.IOException;
4+
import java.net.InetSocketAddress;
5+
import java.nio.channels.SocketChannel;
6+
7+
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
8+
9+
/**
10+
* Special exception used to provide some sort of lazy-initialization
11+
* feature for its derived implementations
12+
*
13+
* @see #getAddress(int, Throwable)
14+
*/
15+
private final static CommunicationException NO_ADDRESS_AVAILABLE_EXCEPTION
16+
= new CommunicationException("No addresses are available");
17+
18+
/**
19+
* Limit of retries.
20+
*/
21+
private int retriesLimit = RETRY_NO_LIMIT;
22+
23+
/**
24+
* Timeout to establish socket connection with an individual server.
25+
*/
26+
private int timeout = NO_TIMEOUT;
27+
28+
/**
29+
* Tries to establish a new connection to the Tarantool instances.
30+
*
31+
* @param retryNumber number of current retry. Reset after successful connect.
32+
* @param lastError the last error occurs when reconnecting
33+
*
34+
* @return connected socket channel
35+
*
36+
* @throws CommunicationException if any I/O errors happen or there are
37+
* no addresses available
38+
*/
39+
@Override
40+
public final SocketChannel get(int retryNumber, Throwable lastError) {
41+
if (areRetriesExhausted(retryNumber)) {
42+
throw new CommunicationException("Connection retries exceeded.", lastError);
43+
}
44+
45+
long deadline = System.currentTimeMillis() + timeout;
46+
while (!Thread.currentThread().isInterrupted()) {
47+
try {
48+
InetSocketAddress address = getAddress(retryNumber, lastError);
49+
if (address == null) {
50+
checkTimeout(deadline, NO_ADDRESS_AVAILABLE_EXCEPTION);
51+
} else {
52+
return openChannel(address);
53+
}
54+
} catch (IOException e) {
55+
checkTimeout(deadline, e);
56+
}
57+
}
58+
throw new CommunicationException("Thread interrupted.", new InterruptedException());
59+
}
60+
61+
private void checkTimeout(long deadline, Exception e) {
62+
long timeLeft = deadline - System.currentTimeMillis();
63+
if (timeLeft <= 0) {
64+
throw new CommunicationException("Connection time out.", e);
65+
}
66+
try {
67+
Thread.sleep(timeLeft / 10);
68+
} catch (InterruptedException ignored) {
69+
Thread.currentThread().interrupt();
70+
}
71+
}
72+
73+
/**
74+
* Gets address to be used to establish a new connection
75+
* Address can be null
76+
*
77+
* @param retryNumber reconnection attempt number
78+
* @param lastError reconnection reason
79+
*
80+
* @return available address which is depended on implementation
81+
*
82+
* @throws IOException if any I/O errors occur
83+
*/
84+
protected abstract InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException;
85+
86+
/**
87+
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
88+
* The retry count is maintained by a {@link #get(int, Throwable)} caller
89+
* when a socket level connection was established.
90+
* <p>
91+
* Negative value means unlimited attempts.
92+
*
93+
* @param retriesLimit Limit of retries to use.
94+
*/
95+
public void setRetriesLimit(int retriesLimit) {
96+
this.retriesLimit = retriesLimit;
97+
}
98+
99+
/**
100+
* @return Maximum reconnect attempts to make before raising exception.
101+
*/
102+
public int getRetriesLimit() {
103+
return retriesLimit;
104+
}
105+
106+
/**
107+
* Parse a string address in the form of host[:port]
108+
* and builds a socket address.
109+
*
110+
* @param address Server address.
111+
*
112+
* @return Socket address.
113+
*/
114+
protected InetSocketAddress parseAddress(String address) {
115+
int separatorPosition = address.indexOf(':');
116+
String host = (separatorPosition < 0) ? address : address.substring(0, separatorPosition);
117+
int port = (separatorPosition < 0) ? 3301 : Integer.parseInt(address.substring(separatorPosition + 1));
118+
return new InetSocketAddress(host, port);
119+
}
120+
121+
protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOException {
122+
SocketChannel channel = null;
123+
try {
124+
channel = SocketChannel.open();
125+
channel.socket().connect(socketAddress, timeout);
126+
return channel;
127+
} catch (IOException e) {
128+
if (channel != null) {
129+
try {
130+
channel.close();
131+
} catch (IOException ignored) {
132+
// No-op.
133+
}
134+
}
135+
throw e;
136+
}
137+
}
138+
139+
/**
140+
* Sets maximum amount of time to wait for a socket connection establishment
141+
* with an individual server.
142+
* <p>
143+
* Zero means infinite timeout.
144+
*
145+
* @param timeout timeout value, ms.
146+
*
147+
* @throws IllegalArgumentException if timeout is negative.
148+
*/
149+
public void setTimeout(int timeout) {
150+
if (timeout < 0) {
151+
throw new IllegalArgumentException("timeout is negative.");
152+
}
153+
this.timeout = timeout;
154+
}
155+
156+
/**
157+
* @return Maximum amount of time to wait for a socket connection establishment
158+
* with an individual server.
159+
*/
160+
public int getTimeout() {
161+
return timeout;
162+
}
163+
164+
/**
165+
* Provides a decision on whether retries limit is hit.
166+
*
167+
* @param retries Current count of retries.
168+
*
169+
* @return {@code true} if retries are exhausted.
170+
*/
171+
private boolean areRetriesExhausted(int retries) {
172+
int limit = getRetriesLimit();
173+
if (limit < 0) {
174+
return false;
175+
}
176+
return retries >= limit;
177+
}
178+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.tarantool;
2+
3+
import java.net.SocketAddress;
4+
import java.util.Collection;
5+
6+
public interface RefreshableSocketProvider {
7+
8+
Collection<SocketAddress> getAddresses();
9+
10+
void refreshAddresses(Collection<String> addresses);
11+
12+
}

0 commit comments

Comments
 (0)