-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Add a connect timeout to the ConnectionProfile to allow per node connect timeouts #21847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
51a88d3
3636788
4d6158d
345ae32
57e39fd
dbd14d3
110251c
e3c1e61
7a7e571
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ | |
*/ | ||
package org.elasticsearch.transport; | ||
|
||
import org.elasticsearch.common.unit.TimeValue; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
|
@@ -42,14 +44,16 @@ public final class ConnectionProfile { | |
TransportRequestOptions.Type.PING, | ||
TransportRequestOptions.Type.RECOVERY, | ||
TransportRequestOptions.Type.REG, | ||
TransportRequestOptions.Type.STATE)), 1); | ||
TransportRequestOptions.Type.STATE)), 1, null); | ||
|
||
private final List<ConnectionTypeHandle> handles; | ||
private final int numConnections; | ||
private final TimeValue connectTimeout; | ||
|
||
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections) { | ||
private ConnectionProfile(List<ConnectionTypeHandle> handles, int numConnections, TimeValue timeout) { | ||
this.handles = handles; | ||
this.numConnections = numConnections; | ||
connectTimeout = timeout; | ||
} | ||
|
||
/** | ||
|
@@ -59,6 +63,17 @@ public static class Builder { | |
private final List<ConnectionTypeHandle> handles = new ArrayList<>(); | ||
private final Set<TransportRequestOptions.Type> addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class); | ||
private int offset = 0; | ||
private TimeValue connectTimeout; | ||
|
||
/** | ||
* Sets a connect timeout for this connection profile | ||
*/ | ||
public void setConnectTimeout(TimeValue timeout) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we name the parameter |
||
if (timeout.millis() < 0) { | ||
throw new IllegalArgumentException("timeout must be positive but was: " + timeout); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The condition (less than zero) and the message (positive) are at odds with each other. Either the condition should be less than or equal to zero, or the message should say non-negative. |
||
} | ||
this.connectTimeout = timeout; | ||
} | ||
|
||
/** | ||
* Adds a number of connections for one or more types. Each type can only be added once. | ||
|
@@ -89,8 +104,16 @@ public ConnectionProfile build() { | |
if (types.isEmpty() == false) { | ||
throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types); | ||
} | ||
return new ConnectionProfile(Collections.unmodifiableList(handles), offset); | ||
return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout); | ||
} | ||
|
||
} | ||
|
||
/** | ||
* Returns the connect timeout or <code>null</code> if no explicit timeout is set on this profile. | ||
*/ | ||
public TimeValue getConnectTimeout() { | ||
return connectTimeout; | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,7 +44,12 @@ | |
import org.junit.Before; | ||
|
||
import java.io.IOException; | ||
import java.net.InetAddress; | ||
import java.net.InetSocketAddress; | ||
import java.net.ServerSocket; | ||
import java.sql.Time; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -1721,4 +1726,44 @@ public void testRegisterHandlerTwice() { | |
serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), | ||
(request, message) -> {throw new AssertionError("boom");}); | ||
} | ||
|
||
public void testTimeoutPerConnection() throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this test, maybe just add an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's actually not working on linux but on MacOS and I guess it would work on other BSDs too. On MacOS you see that packages are just dropped if they queue is full
while on linux it acks the syn package and establishes the connection:
it's a bit annoying since I think the test is good though... I can just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the backlogs work differently on Linux and BSD (BSD has one queue for incomplete and established connections, Linux has two and the backlog only applies to the latter). Can you try setting I'm not sure how I feel about this test being macOS only, that means it will not ever run in CI. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We discussed this offline. I'm good with getting this in as is, and we'll follow up on the status of a Mac in CI. |
||
try (ServerSocket socket = new ServerSocket()) { | ||
// nocommit - this test uses backlog=1 which is implementation specific ie. it might not work on some TCP/IP stacks | ||
// on linux (at least newer ones) the listen(addr, backlog=1) should just ignore new connections if the queue is full which | ||
// means that once we received an ACK from the client we just drop the packet on the floor (which is what we want) and we run | ||
// into a connection timeout quickly. Yet other implementations can for instance can terminate the connection within the 3 way | ||
// handshake which I haven't tested yet. | ||
socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0), 1); | ||
socket.setReuseAddress(true); | ||
DiscoveryNode first = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(), | ||
socket.getLocalPort()), emptyMap(), | ||
emptySet(), version0); | ||
DiscoveryNode second = new DiscoveryNode("TEST", new TransportAddress(socket.getInetAddress(), | ||
socket.getLocalPort()), emptyMap(), | ||
emptySet(), version0); | ||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); | ||
builder.addConnections(1, | ||
TransportRequestOptions.Type.BULK, | ||
TransportRequestOptions.Type.PING, | ||
TransportRequestOptions.Type.RECOVERY, | ||
TransportRequestOptions.Type.REG, | ||
TransportRequestOptions.Type.STATE); | ||
|
||
// connection with one connection and a large timeout -- should consume the one spot in the backlog queue | ||
serviceA.connectToNode(first, builder.build()); | ||
builder.setConnectTimeout(TimeValue.timeValueMillis(1)); | ||
final ConnectionProfile profile = builder.build(); | ||
// now with the 1ms timeout we got and test that is it's applied | ||
long startTime = System.nanoTime(); | ||
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> { | ||
serviceA.connectToNode(second, profile); | ||
}); | ||
final long now = System.nanoTime(); | ||
final long timeTaken = TimeValue.nsecToMSec(now - startTime); | ||
assertTrue("test didn't timeout quick enough, time taken: [" + timeTaken + "]", | ||
timeTaken < TimeValue.timeValueSeconds(5).millis()); | ||
assertEquals(ex.getMessage(), "[][" + second.getAddress() + "] connect_timeout[1ms]"); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we document where/when this light profile is used? For what I see it's used to connect to nodes on zen pings & transport client sniffer. Also, maybe we could use a lower timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be a different change. The documentation is the reference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay