Skip to content

Commit e70cad4

Browse files
committed
Remove node conn block after connection barrier (#44114)
Today `testOnlyBlocksOnConnectionsToNewNodes` fails (extremely rarely) if the last attempt to connect to `node0` is delayed for so long that the test runs `nodeConnectionsBlocks.clear()` before the connection attempt obtains the expected connection block. We can turn this into a reliable failure with this delay: ```diff diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index f484138..9a1d0336bcd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -300,6 +300,13 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { @OverRide protected void doRun() { + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + throw new AssertionError("unexpected", e); + } + assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; transportService.connectToNode(discoveryNode); consecutiveFailureCount.set(0); ``` This commit reverts the extra logging introduced in #43979 and fixes this failure by waiting for the connection attempt to hit the barrier before removing it. Fixes #40170
1 parent a406ef1 commit e70cad4

File tree

1 file changed

+4
-36
lines changed

1 file changed

+4
-36
lines changed

server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.elasticsearch.cluster;
2121

22-
import org.apache.logging.log4j.message.ParameterizedMessage;
23-
import org.elasticsearch.ElasticsearchException;
2422
import org.elasticsearch.ElasticsearchTimeoutException;
2523
import org.elasticsearch.Version;
2624
import org.elasticsearch.action.ActionListener;
@@ -37,7 +35,6 @@
3735
import org.elasticsearch.common.transport.BoundTransportAddress;
3836
import org.elasticsearch.common.transport.TransportAddress;
3937
import org.elasticsearch.test.ESTestCase;
40-
import org.elasticsearch.test.junit.annotations.TestLogging;
4138
import org.elasticsearch.threadpool.TestThreadPool;
4239
import org.elasticsearch.threadpool.ThreadPool;
4340
import org.elasticsearch.transport.ConnectTransportException;
@@ -59,10 +56,8 @@
5956
import java.util.List;
6057
import java.util.Map;
6158
import java.util.Set;
62-
import java.util.concurrent.BrokenBarrierException;
6359
import java.util.concurrent.CyclicBarrier;
6460
import java.util.concurrent.TimeUnit;
65-
import java.util.concurrent.TimeoutException;
6661
import java.util.concurrent.atomic.AtomicBoolean;
6762
import java.util.function.Predicate;
6863

@@ -219,7 +214,6 @@ public String toString() {
219214
assertConnectedExactlyToNodes(targetNodes);
220215
}
221216

222-
@TestLogging("org.elasticsearch.cluster.NodeConnectionsService:TRACE") // for https://github.com/elastic/elasticsearch/issues/40170
223217
public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
224218
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
225219

@@ -232,7 +226,7 @@ public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
232226
assertConnectedExactlyToNodes(nodes0);
233227

234228
// connection attempts to node0 block indefinitely
235-
final CyclicBarrier connectionBarrier = new VerboseCyclicBarrier(2);
229+
final CyclicBarrier connectionBarrier = new CyclicBarrier(2);
236230
try {
237231
nodeConnectionBlocks.put(node0, connectionBarrier::await);
238232
transportService.disconnectFromNode(node0);
@@ -259,8 +253,8 @@ public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
259253
expectThrows(ElasticsearchTimeoutException.class, () -> future3.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000))));
260254

261255
// once the connection is unblocked we successfully connect to it.
262-
nodeConnectionBlocks.clear();
263256
connectionBarrier.await(10, TimeUnit.SECONDS);
257+
nodeConnectionBlocks.clear();
264258
future3.actionGet();
265259
assertConnectedExactlyToNodes(nodes01);
266260

@@ -292,8 +286,8 @@ public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
292286
future6.actionGet(); // completed even though the connection attempt is still blocked
293287
assertConnectedExactlyToNodes(nodes1);
294288

295-
nodeConnectionBlocks.clear();
296289
connectionBarrier.await(10, TimeUnit.SECONDS);
290+
nodeConnectionBlocks.clear();
297291
ensureConnections(service);
298292
assertConnectedExactlyToNodes(nodes1);
299293
} finally {
@@ -302,33 +296,6 @@ public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
302296
}
303297
}
304298

305-
// tracing barrier usage for https://github.com/elastic/elasticsearch/issues/40170
306-
private class VerboseCyclicBarrier extends CyclicBarrier {
307-
VerboseCyclicBarrier(int parties) {
308-
super(parties);
309-
}
310-
311-
@Override
312-
public int await() throws InterruptedException, BrokenBarrierException {
313-
final String waitUUID = UUIDs.randomBase64UUID(random());
314-
logger.info(new ParameterizedMessage("--> wait[{}] starting", waitUUID),
315-
new ElasticsearchException("stack trace for CyclicBarrier#await()"));
316-
final int result = super.await();
317-
logger.info("--> wait[{}] returning [{}]", waitUUID, result);
318-
return result;
319-
}
320-
321-
@Override
322-
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
323-
final String waitUUID = UUIDs.randomBase64UUID(random());
324-
logger.info(new ParameterizedMessage("--> wait[{}] starting", waitUUID),
325-
new ElasticsearchException("stack trace for CyclicBarrier#await(" + timeout + ", " + unit + ')'));
326-
final int result = super.await(timeout, unit);
327-
logger.info("--> wait[{}] returning [{}]", waitUUID, result);
328-
return result;
329-
}
330-
}
331-
332299
private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long endTimeMillis) {
333300
while (deterministicTaskQueue.getCurrentTimeMillis() < endTimeMillis) {
334301
if (deterministicTaskQueue.hasRunnableTasks() && randomBoolean()) {
@@ -414,6 +381,7 @@ private final class MockTransport implements Transport {
414381
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
415382
}
416383

384+
@SuppressWarnings("unchecked")
417385
@Override
418386
public RequestHandlerRegistry getRequestHandler(String action) {
419387
return null;

0 commit comments

Comments
 (0)