Skip to content

Commit 195a524

Browse files
committed
Prevent connection races in testEnsureWeReconnect (#56654)
Currently it is possible that a sniff connection round is occurring as we enter another test loop in testEnsureWeReconnect. The problem is that once we enter another loop, closing the connection manually can cause this pre-existing connection round to fail. This round failing can fail the test. This commit fixes the issue by ensuring that there are no in-progress connections before entering another loop.
1 parent cb4d5f5 commit 195a524

File tree

4 files changed

+5
-11
lines changed

4 files changed

+5
-11
lines changed

server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.elasticsearch.transport;
2121

22-
import org.apache.logging.log4j.LogManager;
23-
import org.apache.logging.log4j.Logger;
2422
import org.apache.logging.log4j.message.ParameterizedMessage;
2523
import org.elasticsearch.Version;
2624
import org.elasticsearch.action.ActionListener;
@@ -83,7 +81,6 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
8381
static final int CHANNELS_PER_CONNECTION = 1;
8482

8583
private static final int MAX_CONNECT_ATTEMPTS_PER_RUN = 3;
86-
private static final Logger logger = LogManager.getLogger(ProxyConnectionStrategy.class);
8784

8885
private final int maxNumConnections;
8986
private final String configuredAddress;

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public Writeable.Reader<RemoteConnectionInfo.ModeInfo> getReader() {
111111

112112
private final int maxPendingConnectionListeners;
113113

114-
private static final Logger logger = LogManager.getLogger(RemoteConnectionStrategy.class);
114+
protected final Logger logger = LogManager.getLogger(getClass());
115115

116116
private final AtomicBoolean closed = new AtomicBoolean(false);
117117
private final Object mutex = new Object();
@@ -313,8 +313,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
313313
if (shouldOpenMoreConnections()) {
314314
// try to reconnect and fill up the slot of the disconnected node
315315
connect(ActionListener.wrap(
316-
ignore -> logger.trace("successfully connected after disconnect of {}", node),
317-
e -> logger.trace(() -> new ParameterizedMessage("failed to connect after disconnect of {}", node), e)));
316+
ignore -> logger.trace("[{}] successfully connected after disconnect of {}", clusterAlias, node),
317+
e -> logger.debug(() -> new ParameterizedMessage("[{}] failed to connect after disconnect of {}", clusterAlias, node), e)));
318318
}
319319
}
320320

server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.elasticsearch.transport;
2121

22-
import org.apache.logging.log4j.LogManager;
23-
import org.apache.logging.log4j.Logger;
2422
import org.apache.logging.log4j.message.ParameterizedMessage;
2523
import org.apache.lucene.util.SetOnce;
2624
import org.elasticsearch.Version;
@@ -200,8 +198,6 @@ public String getKey(final String key) {
200198

201199
static final int CHANNELS_PER_CONNECTION = 6;
202200

203-
private static final Logger logger = LogManager.getLogger(SniffConnectionStrategy.class);
204-
205201
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
206202
&& (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode());
207203

server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public void testEnsureWeReconnect() throws Exception {
9898
assertBusy(() -> assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)));
9999
for (int i = 0; i < 10; i++) {
100100
RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test");
101+
assertBusy(remoteClusterConnection::assertNoRunningConnections);
101102
ConnectionManager connectionManager = remoteClusterConnection.getConnectionManager();
102103
Transport.Connection connection = connectionManager.getConnection(remoteNode);
103104
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
@@ -109,7 +110,7 @@ public void testEnsureWeReconnect() throws Exception {
109110
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
110111
assertNotNull(clusterStateResponse);
111112
assertEquals("foo_bar_cluster", clusterStateResponse.getState().getClusterName().value());
112-
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
113+
assertTrue(remoteClusterConnection.isNodeConnected(remoteNode));
113114
}
114115
}
115116
}

0 commit comments

Comments
 (0)