Skip to content

Commit f960c18

Browse files
committed
Apply connection timeout for each connection attempt
Relax a socket provider contract. Now socket provider can throw a transient error and the client will try to obtain a socket again instead of being closed. Make built-in socket providers configurable. Now the client can set retries count and connection timeout for providers. Update README doc im scope of new socket provider contract. Closes: #167 Follows on: #144
1 parent 3420575 commit f960c18

12 files changed

+277
-148
lines changed

Diff for: README.md

+63-42
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ To get the Java connector for Tarantool 1.6.9, visit
2222

2323
## Getting started
2424

25-
1. Add a dependency to your `pom.xml` file.
25+
1. Add a dependency to your `pom.xml` file:
2626

2727
```xml
2828
<dependency>
@@ -32,75 +32,81 @@ To get the Java connector for Tarantool 1.6.9, visit
3232
</dependency>
3333
```
3434

35-
2. Configure `TarantoolClientConfig`.
35+
2. Configure `TarantoolClientConfig`:
3636

3737
```java
3838
TarantoolClientConfig config = new TarantoolClientConfig();
3939
config.username = "test";
4040
config.password = "test";
4141
```
4242

43-
3. Implement your `SocketChannelProvider`.
44-
It should return a connected `SocketChannel`.
43+
3. Create a client:
4544

4645
```java
47-
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
48-
@Override
49-
public SocketChannel get(int retryNumber, Throwable lastError) {
50-
if (lastError != null) {
51-
lastError.printStackTrace(System.out);
52-
}
53-
try {
54-
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
55-
} catch (IOException e) {
56-
throw new IllegalStateException(e);
57-
}
58-
}
59-
};
46+
TarantoolClient client = new TarantoolClientImpl("host:3301", config);
6047
```
6148

62-
Here you could also implement some reconnection or fallback policy.
63-
Remember that `TarantoolClient` adopts a
64-
[fail-fast](https://en.wikipedia.org/wiki/Fail-fast) policy
65-
when a client is not connected.
49+
using `TarantoolClientImpl(String, TarantoolClientConfig)` is equivalent to:
6650

67-
The `TarantoolClient` will stop functioning if your implementation of a socket
68-
channel provider raises an exception or returns a null. You will need a new
69-
instance of client to recover. Hence, you should only throw in case you have
70-
met unrecoverable error.
51+
```java
52+
SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("host:3301")
53+
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
54+
```
7155

72-
Below is an example of `SocketChannelProvider` implementation that handles short
73-
tarantool restarts.
56+
You could implement your own `SocketChannelProvider`. It should return
57+
a connected `SocketChannel`. Feel free to implement `get(int retryNumber, Throwable lastError)`
58+
using your appropriate strategy to obtain the channel. The strategy can take into
59+
account current attempt number (retryNumber) and the last transient error occurred on
60+
the previous attempt.
61+
62+
The `TarantoolClient` will be closed if your implementation of a socket
63+
channel provider raises any exceptions but not a `SocketProviderTransientException`.
64+
Latter is handled by the client as a recoverable error. Otherwise, you will
65+
need a new instance of client to recover. Hence, you should only throw an
66+
error different to `SocketProviderTransientException` in case you have met
67+
unrecoverable error.
68+
69+
Below is an example of `SocketChannelProvider` implementation that tries
70+
to connect no more than 3 times, two seconds for each attempt at max.
7471

7572
```java
7673
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
7774
@Override
7875
public SocketChannel get(int retryNumber, Throwable lastError) {
79-
long deadline = System.currentTimeMillis() + RESTART_TIMEOUT;
80-
while (!Thread.currentThread().isInterrupted()) {
81-
try {
82-
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
83-
} catch (IOException e) {
84-
if (deadline < System.currentTimeMillis())
85-
throw new RuntimeException(e);
86-
try {
87-
Thread.sleep(100);
88-
} catch (InterruptedException ignored) {
89-
Thread.currentThread().interrupt();
90-
}
76+
if (retryNumber > 3) {
77+
throw new RuntimeException("Too many attempts");
78+
}
79+
SocketChannel channel = null;
80+
try {
81+
channel = SocketChannel.open();
82+
channel.socket().connect(new InetSocketAddress("localhost", 3301), 2000);
83+
return channel;
84+
} catch (IOException e) {
85+
if (channel != null) {
86+
try {
87+
channel.close();
88+
} catch (IOException ignored) { }
9189
}
90+
throw new SocketProviderTransientException("Couldn't connect to server", e);
9291
}
93-
throw new RuntimeException(new TimeoutException("Connect timed out."));
9492
}
9593
};
9694
```
9795

98-
4. Create a client.
96+
Same behaviour can be achieved using built-in `SingleSocketChannelProviderImpl`:
9997

10098
```java
99+
TarantoolClientConfig config = new TarantoolClientConfig();
100+
config.connectionTimeout = 2_000; // two seconds timeout per attempt
101+
config.retryCount = 3; // three attempts at max
102+
103+
SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("localhost:3301")
101104
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
102105
```
103106

107+
`SingleSocketChannelProviderImpl` implements `ConfigurableSocketChannelProvider` that
108+
makes possible for the client to configure a socket provider.
109+
104110
> **Notes:**
105111
> * `TarantoolClient` is thread-safe and asynchronous, so you should use one
106112
> client inside the whole application.
@@ -168,6 +174,21 @@ a list of nodes which will be used by the cluster client to provide such
168174
ability. Also you can prefer to use a [discovery mechanism](#auto-discovery)
169175
in order to dynamically fetch and apply the node list.
170176

177+
### The RoundRobinSocketProviderImpl class
178+
179+
This cluster-aware provider uses addresses pool to connect to DB server.
180+
The provider picks up next address in order the addresses were passed.
181+
182+
Similar to `SingleSocketChannelProviderImpl` this RR provider also
183+
relies on two options from the config: `TarantoolClientConfig.connectionTimeout`
184+
and `TarantoolClientConfig.retryCount` but in a bit different way.
185+
The latter option says how many times the provider should try to establish a
186+
connection to _one instance_ before failing an attempt. The provider requires
187+
positive retry count to work properly. The socket timeout is used to limit
188+
an interval between connections attempts per instance. In other words, the provider
189+
follows a pattern _connection should succeed after N attempts with M interval between
190+
them at max_.
191+
171192
### Basic cluster client usage
172193

173194
1. Configure `TarantoolClusterClientConfig`:
@@ -198,7 +219,7 @@ client.syncOps().insert(23, Arrays.asList(1, 1));
198219
Auto-discovery feature allows a cluster client to fetch addresses of
199220
cluster nodes to reflect changes related to the cluster topology. To achieve
200221
this you have to create a Lua function on the server side which returns
201-
a single array result. Client periodically pools the server to obtain a
222+
a single array result. Client periodically polls the server to obtain a
202223
fresh list and apply it if its content changes.
203224

204225
1. On the server side create a function which returns nodes:

Diff for: src/main/java/org/tarantool/BaseSocketChannelProvider.java

+31-62
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.net.InetSocketAddress;
55
import java.nio.channels.SocketChannel;
66

7-
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
7+
public abstract class BaseSocketChannelProvider implements ConfigurableSocketChannelProvider {
88

99
/**
1010
* Limit of retries.
@@ -14,61 +14,39 @@ public abstract class BaseSocketChannelProvider implements SocketChannelProvider
1414
/**
1515
* Timeout to establish socket connection with an individual server.
1616
*/
17-
private int timeout = NO_TIMEOUT;
17+
private int connectionTimeout = NO_TIMEOUT;
1818

1919
/**
2020
* Tries to establish a new connection to the Tarantool instances.
2121
*
22-
* @param retryNumber number of current retry. Reset after successful connect.
22+
* @param retryNumber number of current retry
2323
* @param lastError the last error occurs when reconnecting
2424
*
2525
* @return connected socket channel
2626
*
27-
* @throws CommunicationException if any I/O errors happen or there are
28-
* no addresses available
27+
* @throws CommunicationException if number of retries or socket timeout are exceeded
28+
* @throws SocketProviderTransientException if any I/O errors happen
2929
*/
3030
@Override
3131
public final SocketChannel get(int retryNumber, Throwable lastError) {
32-
if (areRetriesExhausted(retryNumber)) {
33-
throw new CommunicationException("Connection retries exceeded.", lastError);
34-
}
35-
36-
long deadline = System.currentTimeMillis() + timeout;
37-
while (!Thread.currentThread().isInterrupted()) {
38-
try {
39-
InetSocketAddress address = getAddress(retryNumber, lastError);
40-
return openChannel(address);
41-
} catch (IOException e) {
42-
checkTimeout(deadline, e);
43-
}
44-
}
45-
throw new CommunicationException("Thread interrupted.", new InterruptedException());
46-
}
47-
48-
private void checkTimeout(long deadline, Exception e) {
49-
long timeLeft = deadline - System.currentTimeMillis();
50-
if (timeLeft <= 0) {
51-
throw new CommunicationException("Connection time out.", e);
52-
}
5332
try {
54-
Thread.sleep(timeLeft / 10);
55-
} catch (InterruptedException ignored) {
56-
Thread.currentThread().interrupt();
33+
return makeAttempt(retryNumber, lastError);
34+
} catch (IOException e) {
35+
throw new SocketProviderTransientException("Couldn't connect to the server", e);
5736
}
5837
}
5938

6039
/**
61-
* Gets address to be used to establish a new connection
62-
* Address can be null.
40+
* Obtains a connected socket channel.
6341
*
6442
* @param retryNumber reconnection attempt number
6543
* @param lastError reconnection reason
6644
*
67-
* @return available address which is depended on implementation
45+
* @return opened socket channel
6846
*
6947
* @throws IOException if any I/O errors occur
7048
*/
71-
protected abstract InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException;
49+
protected abstract SocketChannel makeAttempt(int retryNumber, Throwable lastError) throws IOException;
7250

7351
/**
7452
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
@@ -79,7 +57,11 @@ private void checkTimeout(long deadline, Exception e) {
7957
*
8058
* @param retriesLimit Limit of retries to use.
8159
*/
60+
@Override
8261
public void setRetriesLimit(int retriesLimit) {
62+
if (retriesLimit < 0) {
63+
throw new IllegalArgumentException("Retries count cannot be negative.");
64+
}
8365
this.retriesLimit = retriesLimit;
8466
}
8567

@@ -111,7 +93,7 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
11193
SocketChannel channel = null;
11294
try {
11395
channel = SocketChannel.open();
114-
channel.socket().connect(socketAddress, timeout);
96+
channel.socket().connect(socketAddress, connectionTimeout);
11597
return channel;
11698
} catch (IOException e) {
11799
if (channel != null) {
@@ -126,44 +108,31 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
126108
}
127109

128110
/**
129-
* Sets maximum amount of time to wait for a socket connection establishment
130-
* with an individual server.
131-
* <p>
132-
* Zero means infinite timeout.
133-
*
134-
* @param timeout timeout value, ms.
135-
*
136-
* @throws IllegalArgumentException if timeout is negative.
137-
*/
138-
public void setTimeout(int timeout) {
139-
if (timeout < 0) {
140-
throw new IllegalArgumentException("timeout is negative.");
141-
}
142-
this.timeout = timeout;
143-
}
144-
145-
/**
146-
* Gest maximum amount of time to wait for a socket
111+
* Gets maximum amount of time to wait for a socket
147112
* connection establishment with an individual server.
148113
*
149114
* @return timeout
150115
*/
151-
public int getTimeout() {
152-
return timeout;
116+
public int getConnectionTimeout() {
117+
return connectionTimeout;
153118
}
154119

155120
/**
156-
* Provides a decision on whether retries limit is hit.
121+
* Sets maximum amount of time to wait for a socket connection establishment
122+
* with an individual server.
123+
* <p>
124+
* Zero means infinite connectionTimeout.
157125
*
158-
* @param retries Current count of retries.
126+
* @param connectionTimeout connectionTimeout value, ms.
159127
*
160-
* @return {@code true} if retries are exhausted.
128+
* @throws IllegalArgumentException if connectionTimeout is negative.
161129
*/
162-
private boolean areRetriesExhausted(int retries) {
163-
int limit = getRetriesLimit();
164-
if (limit < 0) {
165-
return false;
130+
@Override
131+
public void setConnectionTimeout(int connectionTimeout) {
132+
if (connectionTimeout < 0) {
133+
throw new IllegalArgumentException("Connection timeout cannot be negative.");
166134
}
167-
return retries >= limit;
135+
this.connectionTimeout = connectionTimeout;
168136
}
137+
169138
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.tarantool;
2+
3+
public interface ConfigurableSocketChannelProvider extends SocketChannelProvider {
4+
5+
int RETRY_NO_LIMIT = 0;
6+
int NO_TIMEOUT = 0;
7+
8+
/**
9+
* Configures max count of retries.
10+
*
11+
* @param limit max attempts count
12+
*/
13+
void setRetriesLimit(int limit);
14+
15+
/**
16+
* Configures max time to establish
17+
* a connection per attempt.
18+
*
19+
* @param timeout connection timeout in millis
20+
*/
21+
void setConnectionTimeout(int timeout);
22+
23+
}

0 commit comments

Comments
 (0)