Skip to content

Add a connect timeout to the ConnectionProfile to allow per node connect timeouts #21847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.elasticsearch.transport;

import org.elasticsearch.common.unit.TimeValue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -42,14 +44,16 @@ public final class ConnectionProfile {
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE))), 1);
TransportRequestOptions.Type.STATE))), 1, null);

private final List<ConnectionTypeHandle> handles;
private final int numConnections;
private final TimeValue connectTimeout;

private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections) {
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue connectTimeout) {
this.handles = handles;
this.numConnections = numConnections;
this.connectTimeout = connectTimeout;
}

/**
Expand All @@ -59,6 +63,17 @@ public static class Builder {
private final List<ConnectionTypeHandle> handles = new ArrayList<>();
private final Set<TransportRequestOptions.Type> addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class);
private int offset = 0;
private TimeValue connectTimeout;

/**
* Sets a connect connectTimeout for this connection profile
*/
public void setConnectTimeout(TimeValue connectTimeout) {
if (connectTimeout.millis() < 0) {
throw new IllegalArgumentException("connectTimeout must be non-negative but was: " + connectTimeout);
}
this.connectTimeout = connectTimeout;
}

/**
* Adds a number of connections for one or more types. Each type can only be added once.
Expand Down Expand Up @@ -89,8 +104,16 @@ public ConnectionProfile build() {
if (types.isEmpty() == false) {
throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types);
}
return new ConnectionProfile(Collections.unmodifiableList(handles), offset);
return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout);
}

}

/**
* Returns the connect timeout or <code>null</code> if no explicit timeout is set on this profile.
*/
public TimeValue getConnectTimeout() {
return connectTimeout;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
private static final int PING_DATA_SIZE = -1;

protected final TimeValue connectTimeout;
protected final boolean blockingClient;
private final CircuitBreakerService circuitBreakerService;
// package visibility for tests
Expand Down Expand Up @@ -190,9 +189,6 @@ public TcpTransport(String transportName, Settings settings, ThreadPool threadPo
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;


this.connectTimeout = TCP_CONNECT_TIMEOUT.get(settings);
this.blockingClient = TCP_BLOCKING_CLIENT.get(settings);
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
}
Expand All @@ -204,6 +200,7 @@ static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.get(settings);
int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.setConnectTimeout(TCP_CONNECT_TIMEOUT.get(settings));
builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK);
builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING);
// if we are not master eligible we don't need a dedicated channel to publish the state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

Expand All @@ -27,6 +28,11 @@ public class ConnectionProfileTests extends ESTestCase {

public void testBuildConnectionProfile() {
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
TimeValue connectTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10));
final boolean setConnectTimeout = randomBoolean();
if (setConnectTimeout) {
builder.setConnectTimeout(connectTimeout);
}
builder.addConnections(1, TransportRequestOptions.Type.BULK);
builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY);
builder.addConnections(3, TransportRequestOptions.Type.PING);
Expand All @@ -39,6 +45,11 @@ public void testBuildConnectionProfile() {
builder.addConnections(4, TransportRequestOptions.Type.REG);
ConnectionProfile build = builder.build();
assertEquals(10, build.getNumConnections());
if (setConnectTimeout) {
assertEquals(connectTimeout, build.getConnectTimeout());
} else {
assertNull(build.getConnectTimeout());
}
Integer[] array = new Integer[10];
for (int i = 0; i < array.length; i++) {
array[i] = i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils;
Expand Down Expand Up @@ -204,7 +205,7 @@ private Bootstrap createBootstrap() {

bootstrap.handler(getClientChannelInitializer());

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis()));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(defaultConnectionProfile.getConnectTimeout().millis()));
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));

Expand Down Expand Up @@ -270,7 +271,8 @@ private void createServerBootstrap(String name, Settings settings) {
logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], "
+ "connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]",
name, workerCount, settings.get("port"), settings.get("bind_host"), settings.get("publish_host"), compress,
connectTimeout, defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY),
defaultConnectionProfile.getConnectTimeout(),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.REG),
defaultConnectionProfile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE),
Expand Down Expand Up @@ -343,7 +345,18 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p
final NodeChannels nodeChannels = new NodeChannels(channels, profile);
boolean success = false;
try {
int numConnections = channels.length;
final int numConnections = channels.length;
final TimeValue connectTimeout;
final Bootstrap bootstrap;
final TimeValue defaultConnectTimeout = defaultConnectionProfile.getConnectTimeout();
if (profile.getConnectTimeout() != null && profile.getConnectTimeout().equals(defaultConnectTimeout) == false) {
bootstrap = this.bootstrap.clone(this.bootstrap.config().group());
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(profile.getConnectTimeout().millis()));
connectTimeout = profile.getConnectTimeout();
} else {
connectTimeout = defaultConnectTimeout;
bootstrap = this.bootstrap;
}
final ArrayList<ChannelFuture> connections = new ArrayList<>(numConnections);
final InetSocketAddress address = node.getAddress().address();
for (int i = 0; i < numConnections; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.Constants;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand All @@ -44,7 +45,12 @@
import org.junit.Before;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1721,4 +1727,46 @@ public void testRegisterHandlerTwice() {
serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
(request, message) -> {throw new AssertionError("boom");});
}

public void testTimeoutPerConnection() throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this test, maybe just add an assumeTrue(Constants.LINUX) (even then I think we want >= 2.2 kernels for the modern backlog behavior, but that's mostly a given these days anyway so I'm not going to lose sleep over it)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's actually not working on linux but on MacOS and I guess it would work on other BSDs too. On MacOS you see that packages are just dropped if they queue is full

panthor:elasticsearch simon$ sudo tcpdump -i lo0 port 6660
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on lo0, link-type NULL (BSD loopback), capture size 262144 bytes
13:00:50.896080 IP localhost.52585 > localhost.6660: Flags [SEW], seq 3480902841, win 65535, options [mss 16344,nop,wscale 5,nop,nop,TS val 249070543 ecr 0,sackOK,eol], length 0
13:00:50.896175 IP localhost.6660 > localhost.52585: Flags [S.E], seq 684828528, ack 3480902842, win 65535, options [mss 16344,nop,wscale 5,nop,nop,TS val 249070543 ecr 249070543,sackOK,eol], length 0
13:00:50.896192 IP localhost.52585 > localhost.6660: Flags [.], ack 1, win 12759, options [nop,nop,TS val 249070543 ecr 249070543], length 0
13:00:50.896200 IP localhost.6660 > localhost.52585: Flags [.], ack 1, win 12759, options [nop,nop,TS val 249070543 ecr 249070543], length 0
13:00:50.897563 IP localhost.52586 > localhost.6660: Flags [SEW], seq 3356989603, win 65535, options [mss 16344,nop,wscale 5,nop,nop,TS val 249070544 ecr 0,sackOK,eol], length 0
13:00:50.898729 IP localhost.52586 > localhost.6660: Flags [F], seq 3356989603, win 65535, options [nop,nop,TS val 249070545 ecr 0], length 0
13:00:51.096372 IP localhost.6660 > localhost.52585: Flags [R.], seq 1, ack 1, win 12759, length 0

while on linux it acks the syn package and establishes the connection:

root@monster:/home/simon# tcpdump -i any port 6660
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on any, link-type LINUX_SLL (Linux cooked), capture size 65535 bytes
12:57:56.364461 IP localhost.localdomain.51787 > localhost.localdomain.6660: Flags [S], seq 2528693146, win 32792, options [mss 16396,sackOK,TS val 364159 ecr 0,nop,wscale 7], length 0
12:57:56.364472 IP localhost.localdomain.6660 > localhost.localdomain.51787: Flags [S.], seq 1501812814, ack 2528693147, win 32768, options [mss 16396,sackOK,TS val 364159 ecr 364159,nop,wscale 7], length 0
12:57:56.364478 IP localhost.localdomain.51787 > localhost.localdomain.6660: Flags [.], ack 1, win 257, options [nop,nop,TS val 364159 ecr 364159], length 0
12:57:56.366587 IP localhost.localdomain.51788 > localhost.localdomain.6660: Flags [S], seq 3949013427, win 32792, options [mss 16396,sackOK,TS val 364159 ecr 0,nop,wscale 7], length 0
12:57:56.366596 IP localhost.localdomain.6660 > localhost.localdomain.51788: Flags [S.], seq 2483339599, ack 3949013428, win 32768, options [mss 16396,sackOK,TS val 364159 ecr 364159,nop,wscale 7], length 0
12:57:56.366602 IP localhost.localdomain.51788 > localhost.localdomain.6660: Flags [.], ack 1, win 257, options [nop,nop,TS val 364159 ecr 364159], length 0
12:57:56.367652 IP localhost.localdomain.6660 > localhost.localdomain.51787: Flags [R.], seq 1, ack 1, win 256, options [nop,nop,TS val 364159 ecr 364159], length 0
12:57:56.367676 IP localhost.localdomain.6660 > localhost.localdomain.51788: Flags [R.], seq 1, ack 1, win 256, options [nop,nop,TS val 364159 ecr 364159], length 0

it's a bit annoying since I think the test is good though... I can just assumeTrue(MacOS) for now...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the backlogs work differently on Linux and BSD (BSD has one queue for incomplete and established connections, Linux has two and the backlog only applies to the latter). Can you try setting /proc/sys/net/ipv4/tcp_abort_on_overflow to 1 on your Linux machine and see if it behaves as expected? (I don't think we want to expect this to be the case, I'm just curious if it would account for the difference).

I'm not sure how I feel about this test being macOS only, that means it will not ever run in CI.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this offline. I'm good with getting this in as is, and we'll follow up on the status of a Mac in CI.

assumeTrue("Works only on BSD network stacks and apparently windows",
Constants.MAC_OS_X || Constants.FREE_BSD || Constants.WINDOWS);
try (ServerSocket socket = new ServerSocket()) {
// note - this test uses backlog=1 which is implementation specific ie. it might not work on some TCP/IP stacks
// on linux (at least newer ones) the listen(addr, backlog=1) should just ignore new connections if the queue is full which
// means that once we received an ACK from the client we just drop the packet on the floor (which is what we want) and we run
// into a connection timeout quickly. Yet other implementations can for instance can terminate the connection within the 3 way
// handshake which I haven't tested yet.
socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1);
socket.setReuseAddress(true);
DiscoveryNode first = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
socket.getLocalPort()), emptyMap(),
emptySet(), version0);
DiscoveryNode second = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(),
socket.getLocalPort()), emptyMap(),
emptySet(), version0);
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections(1,
TransportRequestOptions.Type.BULK,
TransportRequestOptions.Type.PING,
TransportRequestOptions.Type.RECOVERY,
TransportRequestOptions.Type.REG,
TransportRequestOptions.Type.STATE);

// connection with one connection and a large timeout -- should consume the one spot in the backlog queue
serviceA.connectToNode(first, builder.build());
builder.setConnectTimeout(TimeValue.timeValueMillis(1));
final ConnectionProfile profile = builder.build();
// now with the 1ms timeout we got and test that is it's applied
long startTime = System.nanoTime();
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
serviceA.connectToNode(second, profile);
});
final long now = System.nanoTime();
final long timeTaken = TimeValue.nsecToMSec(now - startTime);
assertTrue("test didn't timeout quick enough, time taken: [" + timeTaken + "]",
timeTaken < TimeValue.timeValueSeconds(5).millis());
assertEquals(ex.getMessage(), "[][" + second.getAddress() + "] connect_timeout[1ms]");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand All @@ -46,6 +47,7 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -178,7 +180,13 @@ protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile p
final InetSocketAddress address = node.getAddress().address();
// we just use a single connections
configureSocket(socket);
socket.connect(address, (int) TCP_CONNECT_TIMEOUT.get(settings).millis());
final TimeValue connectTimeout = profile.getConnectTimeout() == null ? defaultConnectionProfile.getConnectTimeout()
: profile.getConnectTimeout();
try {
socket.connect(address, Math.toIntExact(connectTimeout.millis()));
} catch (SocketTimeoutException ex) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", ex);
}
MockChannel channel = new MockChannel(socket, address, "none", onClose);
channel.loopRead(executor);
mockChannels[0] = channel;
Expand Down