Skip to content

Commit 8084884

Browse files
committed
Apply connection timeout for each connect attempt
Relax a socket provider contract. Now socket provider can throw a transient error and the client will try to obtain a socket again instead of being closed. Make built-in socket providers configurable. Now the client can set retries count and connection timeout for providers. Update README doc im scope of new socket provider contract. Closes: #167 Follows on: #144
1 parent 3420575 commit 8084884

10 files changed

+184
-124
lines changed

Diff for: README.md

+48-42
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ To get the Java connector for Tarantool 1.6.9, visit
2222

2323
## Getting started
2424

25-
1. Add a dependency to your `pom.xml` file.
25+
1. Add a dependency to your `pom.xml` file:
2626

2727
```xml
2828
<dependency>
@@ -32,75 +32,81 @@ To get the Java connector for Tarantool 1.6.9, visit
3232
</dependency>
3333
```
3434

35-
2. Configure `TarantoolClientConfig`.
35+
2. Configure `TarantoolClientConfig`:
3636

3737
```java
3838
TarantoolClientConfig config = new TarantoolClientConfig();
3939
config.username = "test";
4040
config.password = "test";
4141
```
4242

43-
3. Implement your `SocketChannelProvider`.
44-
It should return a connected `SocketChannel`.
43+
3. Create a client:
4544

4645
```java
47-
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
48-
@Override
49-
public SocketChannel get(int retryNumber, Throwable lastError) {
50-
if (lastError != null) {
51-
lastError.printStackTrace(System.out);
52-
}
53-
try {
54-
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
55-
} catch (IOException e) {
56-
throw new IllegalStateException(e);
57-
}
58-
}
59-
};
46+
TarantoolClient client = new TarantoolClientImpl("host:3301", config);
47+
```
48+
49+
using `TarantoolClientImpl(String, TarantoolClientConfig)` is equivalent to:
50+
51+
```java
52+
SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("host:3301")
53+
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
6054
```
6155

62-
Here you could also implement some reconnection or fallback policy.
63-
Remember that `TarantoolClient` adopts a
64-
[fail-fast](https://en.wikipedia.org/wiki/Fail-fast) policy
65-
when a client is not connected.
56+
You could implement your own `SocketChannelProvider`. It should return
57+
a connected `SocketChannel`. Feel free to implement `get(int retryNumber, Throwable lastError)`
58+
using your appropriate strategy to obtain the channel. The strategy can take into
59+
account current attempt number (retryNumber) and the last transient error occurred on
60+
the previous attempt.
6661

67-
The `TarantoolClient` will stop functioning if your implementation of a socket
68-
channel provider raises an exception or returns a null. You will need a new
69-
instance of client to recover. Hence, you should only throw in case you have
70-
met unrecoverable error.
62+
The `TarantoolClient` will be closed if your implementation of a socket
63+
channel provider raises any exceptions but not a `SocketProviderTransientException`.
64+
Latter is handled by the client as a recoverable error. Otherwise, you will
65+
need a new instance of client to recover. Hence, you should only throw an
66+
error different to `SocketProviderTransientException` in case you have met
67+
unrecoverable error.
7168

72-
Below is an example of `SocketChannelProvider` implementation that handles short
73-
tarantool restarts.
69+
Below is an example of `SocketChannelProvider` implementation that tries
70+
to connect no more than 3 times, two seconds for each attempt at max.
7471

7572
```java
7673
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
7774
@Override
7875
public SocketChannel get(int retryNumber, Throwable lastError) {
79-
long deadline = System.currentTimeMillis() + RESTART_TIMEOUT;
80-
while (!Thread.currentThread().isInterrupted()) {
81-
try {
82-
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
83-
} catch (IOException e) {
84-
if (deadline < System.currentTimeMillis())
85-
throw new RuntimeException(e);
86-
try {
87-
Thread.sleep(100);
88-
} catch (InterruptedException ignored) {
89-
Thread.currentThread().interrupt();
90-
}
76+
if (retryNumber > 3) {
77+
throw new RuntimeException("Too many attempts");
78+
}
79+
SocketChannel channel = null;
80+
try {
81+
channel = SocketChannel.open();
82+
channel.socket().connect(new InetSocketAddress("localhost", 3301), 2000);
83+
return channel;
84+
} catch (IOException e) {
85+
if (channel != null) {
86+
try {
87+
channel.close();
88+
} catch (IOException ignored) { }
9189
}
90+
throw new SocketProviderTransientException("Couldn't connect to server", e);
9291
}
93-
throw new RuntimeException(new TimeoutException("Connect timed out."));
9492
}
9593
};
9694
```
9795

98-
4. Create a client.
96+
Same behaviour can be achieved using built-in `SingleSocketChannelProviderImpl`:
9997

10098
```java
99+
TarantoolClientConfig config = new TarantoolClientConfig();
100+
config.connectionTimeout = 2_000; // two seconds timeout per attempt
101+
config.retryCount = 3; // three attempts at max
102+
103+
SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("localhost:3301")
101104
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
102105
```
103106

107+
`SingleSocketChannelProviderImpl` implements `ConfigurableSocketChannelProvider` that
108+
makes possible for the client to configure a socket provider.
109+
104110
> **Notes:**
105111
> * `TarantoolClient` is thread-safe and asynchronous, so you should use one
106112
> client inside the whole application.
@@ -198,7 +204,7 @@ client.syncOps().insert(23, Arrays.asList(1, 1));
198204
Auto-discovery feature allows a cluster client to fetch addresses of
199205
cluster nodes to reflect changes related to the cluster topology. To achieve
200206
this you have to create a Lua function on the server side which returns
201-
a single array result. Client periodically pools the server to obtain a
207+
a single array result. Client periodically polls the server to obtain a
202208
fresh list and apply it if its content changes.
203209

204210
1. On the server side create a function which returns nodes:

Diff for: src/main/java/org/tarantool/BaseSocketChannelProvider.java

+31-44
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.net.InetSocketAddress;
55
import java.nio.channels.SocketChannel;
66

7-
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
7+
public abstract class BaseSocketChannelProvider implements ConfigurableSocketChannelProvider {
88

99
/**
1010
* Limit of retries.
@@ -14,46 +14,30 @@ public abstract class BaseSocketChannelProvider implements SocketChannelProvider
1414
/**
1515
* Timeout to establish socket connection with an individual server.
1616
*/
17-
private int timeout = NO_TIMEOUT;
17+
private int connectionTimeout = NO_TIMEOUT;
1818

1919
/**
2020
* Tries to establish a new connection to the Tarantool instances.
2121
*
22-
* @param retryNumber number of current retry. Reset after successful connect.
22+
* @param retryNumber number of current retry
2323
* @param lastError the last error occurs when reconnecting
2424
*
2525
* @return connected socket channel
2626
*
27-
* @throws CommunicationException if any I/O errors happen or there are
28-
* no addresses available
27+
* @throws CommunicationException if number of retries or socket timeout are exceeded
28+
* @throws SocketProviderTransientException if any I/O errors happen
2929
*/
3030
@Override
3131
public final SocketChannel get(int retryNumber, Throwable lastError) {
3232
if (areRetriesExhausted(retryNumber)) {
3333
throw new CommunicationException("Connection retries exceeded.", lastError);
3434
}
3535

36-
long deadline = System.currentTimeMillis() + timeout;
37-
while (!Thread.currentThread().isInterrupted()) {
38-
try {
39-
InetSocketAddress address = getAddress(retryNumber, lastError);
40-
return openChannel(address);
41-
} catch (IOException e) {
42-
checkTimeout(deadline, e);
43-
}
44-
}
45-
throw new CommunicationException("Thread interrupted.", new InterruptedException());
46-
}
47-
48-
private void checkTimeout(long deadline, Exception e) {
49-
long timeLeft = deadline - System.currentTimeMillis();
50-
if (timeLeft <= 0) {
51-
throw new CommunicationException("Connection time out.", e);
52-
}
5336
try {
54-
Thread.sleep(timeLeft / 10);
55-
} catch (InterruptedException ignored) {
56-
Thread.currentThread().interrupt();
37+
InetSocketAddress address = getAddress(retryNumber, lastError);
38+
return openChannel(address);
39+
} catch (IOException e) {
40+
throw new SocketProviderTransientException("Couldn't connect to the server", e);
5741
}
5842
}
5943

@@ -79,6 +63,7 @@ private void checkTimeout(long deadline, Exception e) {
7963
*
8064
* @param retriesLimit Limit of retries to use.
8165
*/
66+
@Override
8267
public void setRetriesLimit(int retriesLimit) {
8368
this.retriesLimit = retriesLimit;
8469
}
@@ -111,7 +96,7 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
11196
SocketChannel channel = null;
11297
try {
11398
channel = SocketChannel.open();
114-
channel.socket().connect(socketAddress, timeout);
99+
channel.socket().connect(socketAddress, connectionTimeout);
115100
return channel;
116101
} catch (IOException e) {
117102
if (channel != null) {
@@ -126,36 +111,37 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
126111
}
127112

128113
/**
129-
* Sets maximum amount of time to wait for a socket connection establishment
130-
* with an individual server.
131-
* <p>
132-
* Zero means infinite timeout.
133-
*
134-
* @param timeout timeout value, ms.
114+
* Gets maximum amount of time to wait for a socket
115+
* connection establishment with an individual server.
135116
*
136-
* @throws IllegalArgumentException if timeout is negative.
117+
* @return timeout
137118
*/
138-
public void setTimeout(int timeout) {
139-
if (timeout < 0) {
140-
throw new IllegalArgumentException("timeout is negative.");
141-
}
142-
this.timeout = timeout;
119+
public int getConnectionTimeout() {
120+
return connectionTimeout;
143121
}
144122

145123
/**
146-
* Gest maximum amount of time to wait for a socket
147-
* connection establishment with an individual server.
124+
* Sets maximum amount of time to wait for a socket connection establishment
125+
* with an individual server.
126+
* <p>
127+
* Zero means infinite connectionTimeout.
148128
*
149-
* @return timeout
129+
* @param connectionTimeout connectionTimeout value, ms.
130+
*
131+
* @throws IllegalArgumentException if connectionTimeout is negative.
150132
*/
151-
public int getTimeout() {
152-
return timeout;
133+
@Override
134+
public void setConnectionTimeout(int connectionTimeout) {
135+
if (connectionTimeout < 0) {
136+
throw new IllegalArgumentException("connectionTimeout is negative.");
137+
}
138+
this.connectionTimeout = connectionTimeout;
153139
}
154140

155141
/**
156142
* Provides a decision on whether retries limit is hit.
157143
*
158-
* @param retries Current count of retries.
144+
* @param retries current count of retries.
159145
*
160146
* @return {@code true} if retries are exhausted.
161147
*/
@@ -166,4 +152,5 @@ private boolean areRetriesExhausted(int retries) {
166152
}
167153
return retries >= limit;
168154
}
155+
169156
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.tarantool;
2+
3+
public interface ConfigurableSocketChannelProvider extends SocketChannelProvider {
4+
5+
int RETRY_NO_LIMIT = -1;
6+
int NO_TIMEOUT = 0;
7+
8+
/**
9+
* Configures max count of retries.
10+
*
11+
* @param limit max attempts count
12+
*/
13+
void setRetriesLimit(int limit);
14+
15+
/**
16+
* Configures max time to establish
17+
* a connection per attempt.
18+
*
19+
* @param timeout connection timeout in millis
20+
*/
21+
void setConnectionTimeout(int timeout);
22+
23+
}
+10-6
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package org.tarantool;
22

3+
import java.net.SocketAddress;
34
import java.nio.channels.SocketChannel;
45

56
public interface SocketChannelProvider {
67

7-
int RETRY_NO_LIMIT = -1;
8-
int NO_TIMEOUT = 0;
9-
108
/**
119
* Provides socket channel to init restore connection.
12-
* You could change hosts on fail and sleep between retries in this method
13-
* @param retryNumber number of current retry. Reset after successful connect.
10+
* You could change hosts between retries in this method.
11+
*
12+
* @param retryNumber number of current retry.
1413
* @param lastError the last error occurs when reconnecting
15-
* @return the result of SocketChannel open(SocketAddress remote) call
14+
*
15+
* @return the result of {@link SocketChannel#open(SocketAddress)} call
16+
*
17+
* @throws SocketProviderTransientException if recoverable error occurred
18+
* @throws RuntimeException if any other reasons occurred
1619
*/
1720
SocketChannel get(int retryNumber, Throwable lastError);
21+
1822
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.tarantool;
2+
3+
public class SocketProviderTransientException extends RuntimeException {
4+
5+
public SocketProviderTransientException(String message, Throwable cause) {
6+
super(message, cause);
7+
}
8+
9+
}

Diff for: src/main/java/org/tarantool/TarantoolClientConfig.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,35 @@ public class TarantoolClientConfig {
3636
*/
3737
public double directWriteFactor = 0.5d;
3838

39+
/**
40+
* Write operation timeout.
41+
*/
42+
public long writeTimeoutMillis = 60 * 1000L;
43+
3944
/**
4045
* Use old call command https://github.com/tarantool/doc/issues/54,
4146
* please ensure that you server supports new call command.
4247
*/
4348
public boolean useNewCall = false;
4449

4550
/**
46-
* Limits for synchronous operations.
51+
* Max time to establish connection to the server
52+
* and be completely configured (to have an {@code ALIVE} status).
53+
*
54+
* @see TarantoolClient#isAlive()
4755
*/
4856
public long initTimeoutMillis = 60 * 1000L;
49-
public long writeTimeoutMillis = 60 * 1000L;
57+
58+
/**
59+
* Connection timeout per attempt.
60+
* {@code 0} means no timeout.
61+
*/
62+
public int connectionTimeout = 2 * 1000;
63+
64+
/**
65+
* Total attempts number to connect to DB.
66+
* {@code -1} means unlimited attempts.
67+
*/
68+
public int retryCount = 3;
5069

5170
}

0 commit comments

Comments
 (0)