Skip to content

Commit 3e6b15c

Browse files
nicktorwaldTotktonada
authored andcommitted
Apply connection timeout for each connection attempt
Relax a socket provider contract. Now socket provider can throw a transient error and the client will try to obtain a socket again instead of being closed. Make built-in socket providers configurable. Now the client can set retries count and connection timeout for providers. Update README doc im scope of new socket provider contract. Closes: #167 Follows on: #144
1 parent 0ee7b30 commit 3e6b15c

12 files changed

+277
-148
lines changed

Diff for: README.md

+63-42
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ To get the Java connector for Tarantool 1.6.9, visit
2020

2121
## Getting started
2222

23-
1. Add a dependency to your `pom.xml` file.
23+
1. Add a dependency to your `pom.xml` file:
2424

2525
```xml
2626
<dependency>
@@ -30,75 +30,81 @@ To get the Java connector for Tarantool 1.6.9, visit
3030
</dependency>
3131
```
3232

33-
2. Configure `TarantoolClientConfig`.
33+
2. Configure `TarantoolClientConfig`:
3434

3535
```java
3636
TarantoolClientConfig config = new TarantoolClientConfig();
3737
config.username = "test";
3838
config.password = "test";
3939
```
4040

41-
3. Implement your `SocketChannelProvider`.
42-
It should return a connected `SocketChannel`.
41+
3. Create a client:
4342

4443
```java
45-
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
46-
@Override
47-
public SocketChannel get(int retryNumber, Throwable lastError) {
48-
if (lastError != null) {
49-
lastError.printStackTrace(System.out);
50-
}
51-
try {
52-
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
53-
} catch (IOException e) {
54-
throw new IllegalStateException(e);
55-
}
56-
}
57-
};
44+
TarantoolClient client = new TarantoolClientImpl("host:3301", config);
5845
```
5946

60-
Here you could also implement some reconnection or fallback policy.
61-
Remember that `TarantoolClient` adopts a
62-
[fail-fast](https://en.wikipedia.org/wiki/Fail-fast) policy
63-
when a client is not connected.
47+
using `TarantoolClientImpl(String, TarantoolClientConfig)` is equivalent to:
6448

65-
The `TarantoolClient` will stop functioning if your implementation of a socket
66-
channel provider raises an exception or returns a null. You will need a new
67-
instance of client to recover. Hence, you should only throw in case you have
68-
met unrecoverable error.
49+
```java
50+
SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("host:3301")
51+
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
52+
```
6953

70-
Below is an example of `SocketChannelProvider` implementation that handles short
71-
tarantool restarts.
54+
You could implement your own `SocketChannelProvider`. It should return
55+
a connected `SocketChannel`. Feel free to implement `get(int retryNumber, Throwable lastError)`
56+
using your appropriate strategy to obtain the channel. The strategy can take into
57+
account current attempt number (retryNumber) and the last transient error occurred on
58+
the previous attempt.
59+
60+
The `TarantoolClient` will be closed if your implementation of a socket
61+
channel provider raises exceptions. However, throwing a `SocketProviderTransientException`
62+
or returning `null` value are handled by the client as recoverable errors. In these cases,
63+
the client will make next attempt to obtain the socket channel. Otherwise, you will need
64+
a new instance of client to recover. Hence, you should only throw an error different
65+
to `SocketProviderTransientException` in case you have met unrecoverable error.
66+
67+
Below is an example of `SocketChannelProvider` implementation that tries
68+
to connect no more than 3 times, two seconds for each attempt at max.
7269

7370
```java
7471
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
7572
@Override
7673
public SocketChannel get(int retryNumber, Throwable lastError) {
77-
long deadline = System.currentTimeMillis() + RESTART_TIMEOUT;
78-
while (!Thread.currentThread().isInterrupted()) {
79-
try {
80-
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
81-
} catch (IOException e) {
82-
if (deadline < System.currentTimeMillis())
83-
throw new RuntimeException(e);
84-
try {
85-
Thread.sleep(100);
86-
} catch (InterruptedException ignored) {
87-
Thread.currentThread().interrupt();
88-
}
74+
if (retryNumber > 3) {
75+
throw new RuntimeException("Too many attempts");
76+
}
77+
SocketChannel channel = null;
78+
try {
79+
channel = SocketChannel.open();
80+
channel.socket().connect(new InetSocketAddress("localhost", 3301), 2000);
81+
return channel;
82+
} catch (IOException e) {
83+
if (channel != null) {
84+
try {
85+
channel.close();
86+
} catch (IOException ignored) { }
8987
}
88+
throw new SocketProviderTransientException("Couldn't connect to server", e);
9089
}
91-
throw new RuntimeException(new TimeoutException("Connect timed out."));
9290
}
9391
};
9492
```
9593

96-
4. Create a client.
94+
Same behaviour can be achieved using built-in `SingleSocketChannelProviderImpl`:
9795

9896
```java
97+
TarantoolClientConfig config = new TarantoolClientConfig();
98+
config.connectionTimeout = 2_000; // two seconds timeout per attempt
99+
config.retryCount = 3; // three attempts at max
100+
101+
SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("localhost:3301")
99102
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
100103
```
101104

105+
`SingleSocketChannelProviderImpl` implements `ConfigurableSocketChannelProvider` that
106+
makes possible for the client to configure a socket provider.
107+
102108
> **Notes:**
103109
> * `TarantoolClient` is thread-safe and asynchronous, so you should use one
104110
> client inside the whole application.
@@ -166,6 +172,21 @@ a list of nodes which will be used by the cluster client to provide such
166172
ability. Also you can prefer to use a [discovery mechanism](#auto-discovery)
167173
in order to dynamically fetch and apply the node list.
168174

175+
### The RoundRobinSocketProviderImpl class
176+
177+
This cluster-aware provider uses addresses pool to connect to DB server.
178+
The provider picks up next address in order the addresses were passed.
179+
180+
Similar to `SingleSocketChannelProviderImpl` this RR provider also
181+
relies on two options from the config: `TarantoolClientConfig.connectionTimeout`
182+
and `TarantoolClientConfig.retryCount` but in a bit different way.
183+
The latter option says how many times the provider should try to establish a
184+
connection to _one instance_ before failing an attempt. The provider requires
185+
positive retry count to work properly. The socket timeout is used to limit
186+
an interval between connections attempts per instance. In other words, the provider
187+
follows a pattern _connection should succeed after N attempts with M interval between
188+
them at max_.
189+
169190
### Basic cluster client usage
170191

171192
1. Configure `TarantoolClusterClientConfig`:
@@ -196,7 +217,7 @@ client.syncOps().insert(23, Arrays.asList(1, 1));
196217
Auto-discovery feature allows a cluster client to fetch addresses of
197218
cluster nodes to reflect changes related to the cluster topology. To achieve
198219
this you have to create a Lua function on the server side which returns
199-
a single array result. Client periodically pools the server to obtain a
220+
a single array result. Client periodically polls the server to obtain a
200221
fresh list and apply it if its content changes.
201222

202223
1. On the server side create a function which returns nodes:

Diff for: src/main/java/org/tarantool/BaseSocketChannelProvider.java

+31-62
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.net.InetSocketAddress;
55
import java.nio.channels.SocketChannel;
66

7-
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
7+
public abstract class BaseSocketChannelProvider implements ConfigurableSocketChannelProvider {
88

99
/**
1010
* Limit of retries.
@@ -14,61 +14,39 @@ public abstract class BaseSocketChannelProvider implements SocketChannelProvider
1414
/**
1515
* Timeout to establish socket connection with an individual server.
1616
*/
17-
private int timeout = NO_TIMEOUT;
17+
private int connectionTimeout = NO_TIMEOUT;
1818

1919
/**
2020
* Tries to establish a new connection to the Tarantool instances.
2121
*
22-
* @param retryNumber number of current retry. Reset after successful connect.
22+
* @param retryNumber number of current retry
2323
* @param lastError the last error occurs when reconnecting
2424
*
2525
* @return connected socket channel
2626
*
27-
* @throws CommunicationException if any I/O errors happen or there are
28-
* no addresses available
27+
* @throws CommunicationException if number of retries or socket timeout are exceeded
28+
* @throws SocketProviderTransientException if any I/O errors happen
2929
*/
3030
@Override
3131
public final SocketChannel get(int retryNumber, Throwable lastError) {
32-
if (areRetriesExhausted(retryNumber)) {
33-
throw new CommunicationException("Connection retries exceeded.", lastError);
34-
}
35-
36-
long deadline = System.currentTimeMillis() + timeout;
37-
while (!Thread.currentThread().isInterrupted()) {
38-
try {
39-
InetSocketAddress address = getAddress(retryNumber, lastError);
40-
return openChannel(address);
41-
} catch (IOException e) {
42-
checkTimeout(deadline, e);
43-
}
44-
}
45-
throw new CommunicationException("Thread interrupted.", new InterruptedException());
46-
}
47-
48-
private void checkTimeout(long deadline, Exception e) {
49-
long timeLeft = deadline - System.currentTimeMillis();
50-
if (timeLeft <= 0) {
51-
throw new CommunicationException("Connection time out.", e);
52-
}
5332
try {
54-
Thread.sleep(timeLeft / 10);
55-
} catch (InterruptedException ignored) {
56-
Thread.currentThread().interrupt();
33+
return makeAttempt(retryNumber, lastError);
34+
} catch (IOException e) {
35+
throw new SocketProviderTransientException("Couldn't connect to the server", e);
5736
}
5837
}
5938

6039
/**
61-
* Gets address to be used to establish a new connection
62-
* Address can be null.
40+
* Obtains a connected socket channel.
6341
*
6442
* @param retryNumber reconnection attempt number
6543
* @param lastError reconnection reason
6644
*
67-
* @return available address which is depended on implementation
45+
* @return opened socket channel
6846
*
6947
* @throws IOException if any I/O errors occur
7048
*/
71-
protected abstract InetSocketAddress getAddress(int retryNumber, Throwable lastError) throws IOException;
49+
protected abstract SocketChannel makeAttempt(int retryNumber, Throwable lastError) throws IOException;
7250

7351
/**
7452
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
@@ -79,7 +57,11 @@ private void checkTimeout(long deadline, Exception e) {
7957
*
8058
* @param retriesLimit Limit of retries to use.
8159
*/
60+
@Override
8261
public void setRetriesLimit(int retriesLimit) {
62+
if (retriesLimit < 0) {
63+
throw new IllegalArgumentException("Retries count cannot be negative.");
64+
}
8365
this.retriesLimit = retriesLimit;
8466
}
8567

@@ -111,7 +93,7 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
11193
SocketChannel channel = null;
11294
try {
11395
channel = SocketChannel.open();
114-
channel.socket().connect(socketAddress, timeout);
96+
channel.socket().connect(socketAddress, connectionTimeout);
11597
return channel;
11698
} catch (IOException e) {
11799
if (channel != null) {
@@ -126,44 +108,31 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
126108
}
127109

128110
/**
129-
* Sets maximum amount of time to wait for a socket connection establishment
130-
* with an individual server.
131-
* <p>
132-
* Zero means infinite timeout.
133-
*
134-
* @param timeout timeout value, ms.
135-
*
136-
* @throws IllegalArgumentException if timeout is negative.
137-
*/
138-
public void setTimeout(int timeout) {
139-
if (timeout < 0) {
140-
throw new IllegalArgumentException("timeout is negative.");
141-
}
142-
this.timeout = timeout;
143-
}
144-
145-
/**
146-
* Gest maximum amount of time to wait for a socket
111+
* Gets maximum amount of time to wait for a socket
147112
* connection establishment with an individual server.
148113
*
149114
* @return timeout
150115
*/
151-
public int getTimeout() {
152-
return timeout;
116+
public int getConnectionTimeout() {
117+
return connectionTimeout;
153118
}
154119

155120
/**
156-
* Provides a decision on whether retries limit is hit.
121+
* Sets maximum amount of time to wait for a socket connection establishment
122+
* with an individual server.
123+
* <p>
124+
* Zero means infinite connectionTimeout.
157125
*
158-
* @param retries Current count of retries.
126+
* @param connectionTimeout connectionTimeout value, ms.
159127
*
160-
* @return {@code true} if retries are exhausted.
128+
* @throws IllegalArgumentException if connectionTimeout is negative.
161129
*/
162-
private boolean areRetriesExhausted(int retries) {
163-
int limit = getRetriesLimit();
164-
if (limit < 0) {
165-
return false;
130+
@Override
131+
public void setConnectionTimeout(int connectionTimeout) {
132+
if (connectionTimeout < 0) {
133+
throw new IllegalArgumentException("Connection timeout cannot be negative.");
166134
}
167-
return retries >= limit;
135+
this.connectionTimeout = connectionTimeout;
168136
}
137+
169138
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.tarantool;
2+
3+
public interface ConfigurableSocketChannelProvider extends SocketChannelProvider {
4+
5+
int RETRY_NO_LIMIT = 0;
6+
int NO_TIMEOUT = 0;
7+
8+
/**
9+
* Configures max count of retries.
10+
*
11+
* @param limit max attempts count
12+
*/
13+
void setRetriesLimit(int limit);
14+
15+
/**
16+
* Configures max time to establish
17+
* a connection per attempt.
18+
*
19+
* @param timeout connection timeout in millis
20+
*/
21+
void setConnectionTimeout(int timeout);
22+
23+
}

0 commit comments

Comments
 (0)