From 495273553bca4efa10dd95067023995fa525856b Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 8 Jul 2019 16:03:03 +0100 Subject: [PATCH] Wait for blackholed connection before discovery Since #42636 we no longer treat connections specially when simulating a blackholed connection. This means that at the end of the safety phase we may have just started a connection attempt which will time out, but the default timeout is 30 seconds, much longer than the 2 seconds we normally allow for post-safety-phase discovery. This commit adds time for such a connection attempt to time out. It also fixes some spurious logging of `this` that now refers to an object with an unhelpful `toString()` implementation introduced in #42636. Fixes #44073 --- .../HandshakingTransportAddressConnector.java | 13 +++++++------ .../coordination/AbstractCoordinatorTestCase.java | 4 +++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java index bca3cc7037175..6d9d0c7892e6a 100644 --- a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java +++ b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java @@ -70,9 +70,10 @@ public HandshakingTransportAddressConnector(Settings settings, TransportService @Override public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener) { transportService.getThreadPool().generic().execute(new AbstractRunnable() { + private final AbstractRunnable thisConnectionAttempt = this; + @Override protected void doRun() { - // TODO if transportService is already connected to this address then skip the handshaking final DiscoveryNode targetNode = new DiscoveryNode("", transportAddress.toString(), @@ -80,13 +81,13 @@ protected void doRun() { transportAddress.address().getHostString(), transportAddress.getAddress(), transportAddress, emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()); - logger.trace("[{}] opening probe connection", this); + logger.trace("[{}] opening probe connection", thisConnectionAttempt); transportService.openConnection(targetNode, ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout, TimeValue.MINUS_ONE, null), new ActionListener<>() { @Override public void onResponse(Connection connection) { - logger.trace("[{}] opened probe connection", this); + logger.trace("[{}] opened probe connection", thisConnectionAttempt); // use NotifyOnceListener to make sure the following line does not result in onFailure being called when // the connection is closed in the onResponse handler @@ -96,7 +97,7 @@ public void onResponse(Connection connection) { protected void innerOnResponse(DiscoveryNode remoteNode) { try { // success means (amongst other things) that the cluster names match - logger.trace("[{}] handshake successful: {}", this, remoteNode); + logger.trace("[{}] handshake successful: {}", thisConnectionAttempt, remoteNode); IOUtils.closeWhileHandlingException(connection); if (remoteNode.equals(transportService.getLocalNode())) { @@ -109,7 +110,7 @@ protected void innerOnResponse(DiscoveryNode remoteNode) { transportService.connectToNode(remoteNode, new ActionListener() { @Override public void onResponse(Void ignored) { - logger.trace("[{}] full connection successful: {}", this, remoteNode); + logger.trace("[{}] full connection successful: {}", thisConnectionAttempt, remoteNode); listener.onResponse(remoteNode); } @@ -129,7 +130,7 @@ protected void innerOnFailure(Exception e) { // we opened a connection and successfully performed a low-level handshake, so we were definitely // talking to an Elasticsearch node, but the high-level handshake failed indicating some kind of // mismatched configurations (e.g. cluster name) that the user should address - logger.warn(new ParameterizedMessage("handshake failed for [{}]", this), e); + logger.warn(new ParameterizedMessage("handshake failed for [{}]", thisConnectionAttempt), e); IOUtils.closeWhileHandlingException(connection); listener.onFailure(e); } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 0c27f84d7f1e4..d2c950722c568 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -121,6 +121,7 @@ import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; +import static org.elasticsearch.transport.TransportSettings.CONNECT_TIMEOUT; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -556,7 +557,8 @@ void bootstrapIfNecessary() { if (clusterNodes.stream().allMatch(ClusterNode::isNotUsefullyBootstrapped)) { assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty()); - runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); + runFor(defaultMillis(CONNECT_TIMEOUT) + // may be in a prior connection attempt which has been blackholed + defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration"); final ClusterNode bootstrapNode = getAnyBootstrappableNode(); bootstrapNode.applyInitialConfiguration(); } else {