Skip to content

Commit 155de53

Browse files
authored
Add a connect timeout to the ConnectionProfile to allow per node connect timeouts (#21847)
Timeouts are global today across all connections this commit allows to specify a connection timeout per node such that depending on the context connections can be established with different timeouts. Relates to #19719
1 parent fe01c0f commit 155de53

File tree

6 files changed

+111
-11
lines changed

6 files changed

+111
-11
lines changed

core/src/main/java/org/elasticsearch/transport/ConnectionProfile.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21+
import org.elasticsearch.common.unit.TimeValue;
22+
2123
import java.util.ArrayList;
2224
import java.util.Arrays;
2325
import java.util.Collections;
@@ -42,14 +44,16 @@ public final class ConnectionProfile {
4244
TransportRequestOptions.Type.PING,
4345
TransportRequestOptions.Type.RECOVERY,
4446
TransportRequestOptions.Type.REG,
45-
TransportRequestOptions.Type.STATE))), 1);
47+
TransportRequestOptions.Type.STATE))), 1, null);
4648

4749
private final List<ConnectionTypeHandle> handles;
4850
private final int numConnections;
51+
private final TimeValue connectTimeout;
4952

50-
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections) {
53+
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout) {
5154
this.handles = handles;
5255
this.numConnections = numConnections;
56+
this.connectTimeout = connectTimeout;
5357
}
5458

5559
/**
@@ -59,6 +63,17 @@ public static class Builder {
5963
private final List<ConnectionTypeHandle> handles = new ArrayList<>();
6064
private final Set<TransportRequestOptions.Type> addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class);
6165
private int offset = 0;
66+
private TimeValue connectTimeout;
67+
68+
/**
69+
* Sets a connect connectTimeout for this connection profile
70+
*/
71+
public void setConnectTimeout(TimeValue connectTimeout) {
72+
if (connectTimeout.millis() < 0) {
73+
throw new IllegalArgumentException("connectTimeout must be non-negative but was: " + connectTimeout);
74+
}
75+
this.connectTimeout = connectTimeout;
76+
}
6277

6378
/**
6479
* Adds a number of connections for one or more types. Each type can only be added once.
@@ -89,8 +104,16 @@ public ConnectionProfile build() {
89104
if (types.isEmpty() == false) {
90105
throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types);
91106
}
92-
return new ConnectionProfile(Collections.unmodifiableList(handles), offset);
107+
return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout);
93108
}
109+
110+
}
111+
112+
/**
113+
* Returns the connect timeout or <code>null</code> if no explicit timeout is set on this profile.
114+
*/
115+
public TimeValue getConnectTimeout() {
116+
return connectTimeout;
94117
}
95118

96119
/**

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
150150
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
151151
private static final int PING_DATA_SIZE = -1;
152152

153-
protected final TimeValue connectTimeout;
154153
protected final boolean blockingClient;
155154
private final CircuitBreakerService circuitBreakerService;
156155
// package visibility for tests
@@ -190,9 +189,6 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
190189
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
191190
this.networkService = networkService;
192191
this.transportName = transportName;
193-
194-
195-
this.connectTimeout = TCP_CONNECT_TIMEOUT.get(settings);
196192
this.blockingClient = TCP_BLOCKING_CLIENT.get(settings);
197193
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
198194
}
@@ -204,6 +200,7 @@ static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
204200
int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
205201
int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
206202
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
203+
builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));
207204
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
208205
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
209206
// if we are not master eligible we don't need a dedicated channel to publish the state

core/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.transport;
2020

21+
import org.elasticsearch.common.unit.TimeValue;
2122
import org.elasticsearch.test.ESTestCase;
2223
import org.hamcrest.Matchers;
2324

@@ -27,6 +28,11 @@ public class ConnectionProfileTests extends ESTestCase {
2728

2829
public void testBuildConnectionProfile() {
2930
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
31+
TimeValue connectTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10));
32+
final boolean setConnectTimeout = randomBoolean();
33+
if (setConnectTimeout) {
34+
builder.setConnectTimeout(connectTimeout);
35+
}
3036
builder.addConnections(1, TransportRequestOptions.Type.BULK);
3137
builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY);
3238
builder.addConnections(3, TransportRequestOptions.Type.PING);
@@ -39,6 +45,11 @@ public void testBuildConnectionProfile() {
3945
builder.addConnections(4, TransportRequestOptions.Type.REG);
4046
ConnectionProfile build = builder.build();
4147
assertEquals(10, build.getNumConnections());
48+
if (setConnectTimeout) {
49+
assertEquals(connectTimeout, build.getConnectTimeout());
50+
} else {
51+
assertNull(build.getConnectTimeout());
52+
}
4253
Integer[] array = new Integer[10];
4354
for (int i = 0; i < array.length; i++) {
4455
array[i] = i;

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.elasticsearch.common.settings.Setting.Property;
5656
import org.elasticsearch.common.settings.Settings;
5757
import org.elasticsearch.common.unit.ByteSizeValue;
58+
import org.elasticsearch.common.unit.TimeValue;
5859
import org.elasticsearch.common.util.BigArrays;
5960
import org.elasticsearch.common.util.concurrent.EsExecutors;
6061
import org.elasticsearch.common.util.concurrent.FutureUtils;
@@ -204,7 +205,7 @@ private Bootstrap createBootstrap() {
204205

205206
bootstrap.handler(getClientChannelInitializer());
206207

207-
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis()));
208+
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(defaultConnectionProfile.getConnectTimeout().millis()));
208209
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
209210
bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));
210211

@@ -270,7 +271,8 @@ private void createServerBootstrap(String name, Settings settings) {
270271
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
271272
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
272273
name, workerCount, settings.get("port"), settings.get("bind_host"), settings.get("publish_host"), compress,
273-
connectTimeout, defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY),
274+
defaultConnectionProfile.getConnectTimeout(),
275+
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY),
274276
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK),
275277
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.REG),
276278
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE),
@@ -343,7 +345,18 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p
343345
final NodeChannels nodeChannels = new NodeChannels(channels, profile);
344346
boolean success = false;
345347
try {
346-
int numConnections = channels.length;
348+
final int numConnections = channels.length;
349+
final TimeValue connectTimeout;
350+
final Bootstrap bootstrap;
351+
final TimeValue defaultConnectTimeout = defaultConnectionProfile.getConnectTimeout();
352+
if (profile.getConnectTimeout() != null && profile.getConnectTimeout().equals(defaultConnectTimeout) == false) {
353+
bootstrap = this.bootstrap.clone(this.bootstrap.config().group());
354+
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(profile.getConnectTimeout().millis()));
355+
connectTimeout = profile.getConnectTimeout();
356+
} else {
357+
connectTimeout = defaultConnectTimeout;
358+
bootstrap = this.bootstrap;
359+
}
347360
final ArrayList<ChannelFuture> connections = new ArrayList<>(numConnections);
348361
final InetSocketAddress address = node.getAddress().address();
349362
for (int i = 0; i < numConnections; i++) {

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.logging.log4j.util.Supplier;
24+
import org.apache.lucene.util.Constants;
2425
import org.elasticsearch.ExceptionsHelper;
2526
import org.elasticsearch.Version;
2627
import org.elasticsearch.action.ActionListenerResponseHandler;
@@ -44,7 +45,12 @@
4445
import org.junit.Before;
4546

4647
import java.io.IOException;
48+
import java.net.InetAddress;
49+
import java.net.InetSocketAddress;
50+
import java.net.ServerSocket;
51+
import java.sql.Time;
4752
import java.util.ArrayList;
53+
import java.util.Collections;
4854
import java.util.HashMap;
4955
import java.util.List;
5056
import java.util.Map;
@@ -1721,4 +1727,46 @@ public void testRegisterHandlerTwice() {
17211727
serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
17221728
(request, message) -> {throw new AssertionError("boom");});
17231729
}
1730+
1731+
public void testTimeoutPerConnection() throws IOException {
1732+
assumeTrue("Works only on BSD network stacks and apparently windows",
1733+
Constants.MAC_OS_X || Constants.FREE_BSD || Constants.WINDOWS);
1734+
try (ServerSocket socket = new ServerSocket()) {
1735+
// note - this test uses backlog=1 which is implementation specific ie. it might not work on some TCP/IP stacks
1736+
// on linux (at least newer ones) the listen(addr, backlog=1) should just ignore new connections if the queue is full which
1737+
// means that once we received an ACK from the client we just drop the packet on the floor (which is what we want) and we run
1738+
// into a connection timeout quickly. Yet other implementations can for instance can terminate the connection within the 3 way
1739+
// handshake which I haven't tested yet.
1740+
socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1);
1741+
socket.setReuseAddress(true);
1742+
DiscoveryNode first = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
1743+
socket.getLocalPort()), emptyMap(),
1744+
emptySet(), version0);
1745+
DiscoveryNode second = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
1746+
socket.getLocalPort()), emptyMap(),
1747+
emptySet(), version0);
1748+
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
1749+
builder.addConnections(1,
1750+
TransportRequestOptions.Type.BULK,
1751+
TransportRequestOptions.Type.PING,
1752+
TransportRequestOptions.Type.RECOVERY,
1753+
TransportRequestOptions.Type.REG,
1754+
TransportRequestOptions.Type.STATE);
1755+
1756+
// connection with one connection and a large timeout -- should consume the one spot in the backlog queue
1757+
serviceA.connectToNode(first, builder.build());
1758+
builder.setConnectTimeout(TimeValue.timeValueMillis(1));
1759+
final ConnectionProfile profile = builder.build();
1760+
// now with the 1ms timeout we got and test that is it's applied
1761+
long startTime = System.nanoTime();
1762+
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
1763+
serviceA.connectToNode(second, profile);
1764+
});
1765+
final long now = System.nanoTime();
1766+
final long timeTaken = TimeValue.nsecToMSec(now - startTime);
1767+
assertTrue("test didn't timeout quick enough, time taken: [" + timeTaken + "]",
1768+
timeTaken < TimeValue.timeValueSeconds(5).millis());
1769+
assertEquals(ex.getMessage(), "[][" + second.getAddress() + "] connect_timeout[1ms]");
1770+
}
1771+
}
17241772
}

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.network.NetworkService;
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.common.unit.ByteSizeValue;
33+
import org.elasticsearch.common.unit.TimeValue;
3334
import org.elasticsearch.common.util.BigArrays;
3435
import org.elasticsearch.common.util.CancellableThreads;
3536
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -46,6 +47,7 @@
4647
import java.net.ServerSocket;
4748
import java.net.Socket;
4849
import java.net.SocketException;
50+
import java.net.SocketTimeoutException;
4951
import java.util.List;
5052
import java.util.Map;
5153
import java.util.concurrent.ConcurrentHashMap;
@@ -178,7 +180,13 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p
178180
final InetSocketAddress address = node.getAddress().address();
179181
// we just use a single connections
180182
configureSocket(socket);
181-
socket.connect(address, (int) TCP_CONNECT_TIMEOUT.get(settings).millis());
183+
final TimeValue connectTimeout = profile.getConnectTimeout() == null ? defaultConnectionProfile.getConnectTimeout()
184+
: profile.getConnectTimeout();
185+
try {
186+
socket.connect(address, Math.toIntExact(connectTimeout.millis()));
187+
} catch (SocketTimeoutException ex) {
188+
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", ex);
189+
}
182190
MockChannel channel = new MockChannel(socket, address, "none", onClose);
183191
channel.loopRead(executor);
184192
mockChannels[0] = channel;

0 commit comments

Comments
 (0)