Skip to content

Commit 6745179

Browse files
committed
Apply connection timeout for each connect attempt
Relax a socket provider contract. Now socket provider can throw a transient error and the client will try to obtain a socket again instead of being closed. Make built-in socket providers configurable. Now the client can set retries count and connection timeout for providers. Update README doc im scope of new socket provider contract. Closes: #167 Follows on: #144
1 parent 3420575 commit 6745179

10 files changed

+162
-120
lines changed

Diff for: README.md

+37-45
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ To get the Java connector for Tarantool 1.6.9, visit
2222

2323
## Getting started
2424

25-
1. Add a dependency to your `pom.xml` file.
25+
1. Add a dependency to your `pom.xml` file:
2626

2727
```xml
2828
<dependency>
@@ -32,75 +32,67 @@ To get the Java connector for Tarantool 1.6.9, visit
3232
</dependency>
3333
```
3434

35-
2. Configure `TarantoolClientConfig`.
35+
2. Configure `TarantoolClientConfig`:
3636

3737
```java
3838
TarantoolClientConfig config = new TarantoolClientConfig();
3939
config.username = "test";
4040
config.password = "test";
4141
```
4242

43-
3. Implement your `SocketChannelProvider`.
44-
It should return a connected `SocketChannel`.
43+
3. Create a client:
4544

4645
```java
47-
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
48-
@Override
49-
public SocketChannel get(int retryNumber, Throwable lastError) {
50-
if (lastError != null) {
51-
lastError.printStackTrace(System.out);
52-
}
53-
try {
54-
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
55-
} catch (IOException e) {
56-
throw new IllegalStateException(e);
57-
}
58-
}
59-
};
46+
TarantoolClient client = new TarantoolClientImpl("host:3301", config);
47+
```
48+
49+
using `TarantoolClientImpl(String, TarantoolClientConfig)` is equivalent to:
50+
51+
```java
52+
SocketChannelProvider socketChannelProvider = new SingleSocketChannelProviderImpl("host:3301")
53+
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
6054
```
6155

62-
Here you could also implement some reconnection or fallback policy.
63-
Remember that `TarantoolClient` adopts a
56+
You could implement your own `SocketChannelProvider`. It should return
57+
a connected `SocketChannel`. You could also implement some reconnection or
58+
fallback policy. Remember that `TarantoolClient` adopts a
6459
[fail-fast](https://en.wikipedia.org/wiki/Fail-fast) policy
6560
when a client is not connected.
6661

67-
The `TarantoolClient` will stop functioning if your implementation of a socket
68-
channel provider raises an exception or returns a null. You will need a new
69-
instance of client to recover. Hence, you should only throw in case you have
70-
met unrecoverable error.
62+
The `TarantoolClient` will be closed if your implementation of a socket
63+
channel provider raises any exceptions but not a `SocketProviderTransientException`.
64+
Latter is handled by the client as a recoverable error. Otherwise, you will
65+
need a new instance of client to recover. Hence, you should only throw an
66+
error different to `SocketProviderTransientException` in case you have met
67+
unrecoverable error.
7168

72-
Below is an example of `SocketChannelProvider` implementation that handles short
73-
tarantool restarts.
69+
Below is an example of `SocketChannelProvider` implementation that tries
70+
to connect in two seconds and no more than 3 times.
7471

7572
```java
7673
SocketChannelProvider socketChannelProvider = new SocketChannelProvider() {
7774
@Override
7875
public SocketChannel get(int retryNumber, Throwable lastError) {
79-
long deadline = System.currentTimeMillis() + RESTART_TIMEOUT;
80-
while (!Thread.currentThread().isInterrupted()) {
81-
try {
82-
return SocketChannel.open(new InetSocketAddress("localhost", 3301));
83-
} catch (IOException e) {
84-
if (deadline < System.currentTimeMillis())
85-
throw new RuntimeException(e);
86-
try {
87-
Thread.sleep(100);
88-
} catch (InterruptedException ignored) {
89-
Thread.currentThread().interrupt();
90-
}
76+
if (retryNumber > 3) {
77+
throw new RuntimeException("Too many attempts");
78+
}
79+
SocketChannel channel = null;
80+
try {
81+
channel = SocketChannel.open();
82+
channel.socket().connect(new InetSocketAddress("localhost", 3301), 2000);
83+
return channel;
84+
} catch (IOException e) {
85+
if (channel != null) {
86+
try {
87+
channel.close();
88+
} catch (IOException ignored) { }
9189
}
90+
throw new SocketProviderTransientException("Couldn't connect to server", e);
9291
}
93-
throw new RuntimeException(new TimeoutException("Connect timed out."));
9492
}
9593
};
9694
```
9795

98-
4. Create a client.
99-
100-
```java
101-
TarantoolClient client = new TarantoolClientImpl(socketChannelProvider, config);
102-
```
103-
10496
> **Notes:**
10597
> * `TarantoolClient` is thread-safe and asynchronous, so you should use one
10698
> client inside the whole application.
@@ -198,7 +190,7 @@ client.syncOps().insert(23, Arrays.asList(1, 1));
198190
Auto-discovery feature allows a cluster client to fetch addresses of
199191
cluster nodes to reflect changes related to the cluster topology. To achieve
200192
this you have to create a Lua function on the server side which returns
201-
a single array result. Client periodically pools the server to obtain a
193+
a single array result. Client periodically polls the server to obtain a
202194
fresh list and apply it if its content changes.
203195

204196
1. On the server side create a function which returns nodes:

Diff for: src/main/java/org/tarantool/BaseSocketChannelProvider.java

+35-44
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import java.io.IOException;
44
import java.net.InetSocketAddress;
5+
import java.net.SocketTimeoutException;
56
import java.nio.channels.SocketChannel;
67

7-
public abstract class BaseSocketChannelProvider implements SocketChannelProvider {
8+
public abstract class BaseSocketChannelProvider implements ConfigurableSocketChannelProvider {
89

910
/**
1011
* Limit of retries.
@@ -14,46 +15,33 @@ public abstract class BaseSocketChannelProvider implements SocketChannelProvider
1415
/**
1516
* Timeout to establish socket connection with an individual server.
1617
*/
17-
private int timeout = NO_TIMEOUT;
18+
private int connectionTimeout = NO_TIMEOUT;
1819

1920
/**
2021
* Tries to establish a new connection to the Tarantool instances.
2122
*
22-
* @param retryNumber number of current retry. Reset after successful connect.
23+
* @param retryNumber number of current retry
2324
* @param lastError the last error occurs when reconnecting
2425
*
2526
* @return connected socket channel
2627
*
27-
* @throws CommunicationException if any I/O errors happen or there are
28-
* no addresses available
28+
* @throws CommunicationException if number of retries or socket timeout are exceeded
29+
* @throws SocketProviderTransientException if any I/O errors happen
2930
*/
3031
@Override
3132
public final SocketChannel get(int retryNumber, Throwable lastError) {
3233
if (areRetriesExhausted(retryNumber)) {
3334
throw new CommunicationException("Connection retries exceeded.", lastError);
3435
}
3536

36-
long deadline = System.currentTimeMillis() + timeout;
37-
while (!Thread.currentThread().isInterrupted()) {
38-
try {
39-
InetSocketAddress address = getAddress(retryNumber, lastError);
40-
return openChannel(address);
41-
} catch (IOException e) {
42-
checkTimeout(deadline, e);
43-
}
44-
}
45-
throw new CommunicationException("Thread interrupted.", new InterruptedException());
46-
}
47-
48-
private void checkTimeout(long deadline, Exception e) {
49-
long timeLeft = deadline - System.currentTimeMillis();
50-
if (timeLeft <= 0) {
51-
throw new CommunicationException("Connection time out.", e);
52-
}
5337
try {
54-
Thread.sleep(timeLeft / 10);
55-
} catch (InterruptedException ignored) {
56-
Thread.currentThread().interrupt();
38+
InetSocketAddress address = getAddress(retryNumber, lastError);
39+
return openChannel(address);
40+
} catch (SocketTimeoutException e) {
41+
throw new CommunicationException("Connection timed out", e);
42+
} catch (IOException e) {
43+
System.out.println(e);
44+
throw new SocketProviderTransientException("Couldn't connect to server", e);
5745
}
5846
}
5947

@@ -79,6 +67,7 @@ private void checkTimeout(long deadline, Exception e) {
7967
*
8068
* @param retriesLimit Limit of retries to use.
8169
*/
70+
@Override
8271
public void setRetriesLimit(int retriesLimit) {
8372
this.retriesLimit = retriesLimit;
8473
}
@@ -111,7 +100,7 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
111100
SocketChannel channel = null;
112101
try {
113102
channel = SocketChannel.open();
114-
channel.socket().connect(socketAddress, timeout);
103+
channel.socket().connect(socketAddress, connectionTimeout);
115104
return channel;
116105
} catch (IOException e) {
117106
if (channel != null) {
@@ -126,36 +115,37 @@ protected SocketChannel openChannel(InetSocketAddress socketAddress) throws IOEx
126115
}
127116

128117
/**
129-
* Sets maximum amount of time to wait for a socket connection establishment
130-
* with an individual server.
131-
* <p>
132-
* Zero means infinite timeout.
133-
*
134-
* @param timeout timeout value, ms.
118+
* Gets maximum amount of time to wait for a socket
119+
* connection establishment with an individual server.
135120
*
136-
* @throws IllegalArgumentException if timeout is negative.
121+
* @return timeout
137122
*/
138-
public void setTimeout(int timeout) {
139-
if (timeout < 0) {
140-
throw new IllegalArgumentException("timeout is negative.");
141-
}
142-
this.timeout = timeout;
123+
public int getConnectionTimeout() {
124+
return connectionTimeout;
143125
}
144126

145127
/**
146-
* Gest maximum amount of time to wait for a socket
147-
* connection establishment with an individual server.
128+
* Sets maximum amount of time to wait for a socket connection establishment
129+
* with an individual server.
130+
* <p>
131+
* Zero means infinite connectionTimeout.
148132
*
149-
* @return timeout
133+
* @param connectionTimeout connectionTimeout value, ms.
134+
*
135+
* @throws IllegalArgumentException if connectionTimeout is negative.
150136
*/
151-
public int getTimeout() {
152-
return timeout;
137+
@Override
138+
public void setConnectionTimeout(int connectionTimeout) {
139+
if (connectionTimeout < 0) {
140+
throw new IllegalArgumentException("connectionTimeout is negative.");
141+
}
142+
this.connectionTimeout = connectionTimeout;
153143
}
154144

155145
/**
156146
* Provides a decision on whether retries limit is hit.
157147
*
158-
* @param retries Current count of retries.
148+
* @param retries current count of retries.
159149
*
160150
* @return {@code true} if retries are exhausted.
161151
*/
@@ -166,4 +156,5 @@ private boolean areRetriesExhausted(int retries) {
166156
}
167157
return retries >= limit;
168158
}
159+
169160
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.tarantool;
2+
3+
public interface ConfigurableSocketChannelProvider extends SocketChannelProvider {
4+
5+
int RETRY_NO_LIMIT = -1;
6+
int NO_TIMEOUT = 0;
7+
8+
/**
9+
* Configures max count of retries.
10+
*
11+
* @param limit max attempts count
12+
*/
13+
void setRetriesLimit(int limit);
14+
15+
/**
16+
* Configures max time to establish
17+
* a connection per attempt.
18+
*
19+
* @param timeout connection timeout in millis
20+
*/
21+
void setConnectionTimeout(int timeout);
22+
23+
}
+10-6
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
package org.tarantool;
22

3+
import java.net.SocketAddress;
34
import java.nio.channels.SocketChannel;
45

56
public interface SocketChannelProvider {
67

7-
int RETRY_NO_LIMIT = -1;
8-
int NO_TIMEOUT = 0;
9-
108
/**
119
* Provides socket channel to init restore connection.
12-
* You could change hosts on fail and sleep between retries in this method
13-
* @param retryNumber number of current retry. Reset after successful connect.
10+
* You could change hosts between retries in this method.
11+
*
12+
* @param retryNumber number of current retry.
1413
* @param lastError the last error occurs when reconnecting
15-
* @return the result of SocketChannel open(SocketAddress remote) call
14+
*
15+
* @return the result of {@link SocketChannel#open(SocketAddress)} call
16+
*
17+
* @throws SocketProviderTransientException if recoverable error occurred
18+
* @throws RuntimeException if any other reasons occurred
1619
*/
1720
SocketChannel get(int retryNumber, Throwable lastError);
21+
1822
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.tarantool;
2+
3+
public class SocketProviderTransientException extends RuntimeException {
4+
5+
public SocketProviderTransientException(String message, Throwable cause) {
6+
super(message, cause);
7+
}
8+
9+
}

Diff for: src/main/java/org/tarantool/TarantoolClientConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,10 @@ public class TarantoolClientConfig {
4848
public long initTimeoutMillis = 60 * 1000L;
4949
public long writeTimeoutMillis = 60 * 1000L;
5050

51+
/**
52+
* Connection establishment policies.
53+
*/
54+
public int connectionTimeout = 2 * 1000;
55+
public int retryCount = 3;
56+
5157
}

0 commit comments

Comments
 (0)