Skip to content

Commit ea5513f

Browse files
committed
Make NodeConnectionsService non-blocking (#44211)
With connection management now being non-blocking, we can make NodeConnectionsService avoid the use of MANAGEMENT threads that are blocked during the connection attempts. I had to fiddle a bit with the tests as testPeriodicReconnection was using both the mock Threadpool from the DeterministicTaskQueue as well as the real ThreadPool initialized at the test class level, which resulted in races.
1 parent 47ab2bd commit ea5513f

File tree

2 files changed

+55
-18
lines changed

2 files changed

+55
-18
lines changed

server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,14 +297,27 @@ private class ConnectionTarget {
297297

298298
private final AtomicInteger consecutiveFailureCount = new AtomicInteger();
299299

300-
private final Runnable connectActivity = () -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
300+
private final Runnable connectActivity = new AbstractRunnable() {
301+
302+
final AbstractRunnable abstractRunnable = this;
303+
301304
@Override
302305
protected void doRun() {
303306
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
304-
transportService.connectToNode(discoveryNode);
305-
consecutiveFailureCount.set(0);
306-
logger.debug("connected to {}", discoveryNode);
307-
onCompletion(ActivityType.CONNECTING, null, disconnectActivity);
307+
transportService.connectToNode(discoveryNode, new ActionListener<Void>() {
308+
@Override
309+
public void onResponse(Void aVoid) {
310+
assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held";
311+
consecutiveFailureCount.set(0);
312+
logger.debug("connected to {}", discoveryNode);
313+
onCompletion(ActivityType.CONNECTING, null, disconnectActivity);
314+
}
315+
316+
@Override
317+
public void onFailure(Exception e) {
318+
abstractRunnable.onFailure(e);
319+
}
320+
});
308321
}
309322

310323
@Override
@@ -322,7 +335,7 @@ public void onFailure(Exception e) {
322335
public String toString() {
323336
return "connect to " + discoveryNode;
324337
}
325-
});
338+
};
326339

327340
private final Runnable disconnectActivity = new AbstractRunnable() {
328341
@Override

server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void testConnectAndDisconnect() throws Exception {
131131
service.connectToNodes(nodes, () -> future.onResponse(null));
132132
future.actionGet();
133133
if (isDisrupting == false) {
134-
assertConnected(nodes);
134+
assertConnected(transportService, nodes);
135135
}
136136
service.disconnectFromNodesExcept(nodes);
137137

@@ -169,6 +169,11 @@ public void testPeriodicReconnection() {
169169
final DeterministicTaskQueue deterministicTaskQueue
170170
= new DeterministicTaskQueue(builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());
171171

172+
MockTransport transport = new MockTransport(deterministicTaskQueue.getThreadPool());
173+
TestTransportService transportService = new TestTransportService(transport, deterministicTaskQueue.getThreadPool());
174+
transportService.start();
175+
transportService.acceptIncomingRequests();
176+
172177
final NodeConnectionsService service
173178
= new NodeConnectionsService(settings.build(), deterministicTaskQueue.getThreadPool(), transportService);
174179
service.start();
@@ -211,7 +216,7 @@ public String toString() {
211216
transport.randomConnectionExceptions = false;
212217
logger.info("renewing connections");
213218
runTasksUntil(deterministicTaskQueue, maxDisconnectionTime + reconnectIntervalMillis);
214-
assertConnectedExactlyToNodes(targetNodes);
219+
assertConnectedExactlyToNodes(transportService, targetNodes);
215220
}
216221

217222
public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
@@ -314,11 +319,15 @@ private void ensureConnections(NodeConnectionsService service) {
314319
}
315320

316321
private void assertConnectedExactlyToNodes(DiscoveryNodes discoveryNodes) {
317-
assertConnected(discoveryNodes);
322+
assertConnectedExactlyToNodes(transportService, discoveryNodes);
323+
}
324+
325+
private void assertConnectedExactlyToNodes(TransportService transportService, DiscoveryNodes discoveryNodes) {
326+
assertConnected(transportService, discoveryNodes);
318327
assertThat(transportService.getConnectionManager().size(), equalTo(discoveryNodes.getSize()));
319328
}
320329

321-
private void assertConnected(Iterable<DiscoveryNode> nodes) {
330+
private void assertConnected(TransportService transportService, Iterable<DiscoveryNode> nodes) {
322331
for (DiscoveryNode node : nodes) {
323332
assertTrue("not connected to " + node, transportService.nodeConnected(node));
324333
}
@@ -328,8 +337,9 @@ private void assertConnected(Iterable<DiscoveryNode> nodes) {
328337
@Before
329338
public void setUp() throws Exception {
330339
super.setUp();
331-
this.threadPool = new TestThreadPool(getClass().getName());
332-
this.transport = new MockTransport();
340+
ThreadPool threadPool = new TestThreadPool(getClass().getName());
341+
this.threadPool = threadPool;
342+
this.transport = new MockTransport(threadPool);
333343
nodeConnectionBlocks = newConcurrentMap();
334344
transportService = new TestTransportService(transport, threadPool);
335345
transportService.start();
@@ -361,21 +371,35 @@ public void handshake(Transport.Connection connection, long timeout, Predicate<C
361371

362372
@Override
363373
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
374+
throw new AssertionError("no blocking connect");
375+
}
376+
377+
@Override
378+
public void connectToNode(DiscoveryNode node, ActionListener<Void> listener) throws ConnectTransportException {
364379
final CheckedRunnable<Exception> connectionBlock = nodeConnectionBlocks.get(node);
365380
if (connectionBlock != null) {
366-
try {
367-
connectionBlock.run();
368-
} catch (Exception e) {
369-
throw new AssertionError(e);
370-
}
381+
getThreadPool().generic().execute(() -> {
382+
try {
383+
connectionBlock.run();
384+
super.connectToNode(node, listener);
385+
} catch (Exception e) {
386+
throw new AssertionError(e);
387+
}
388+
});
389+
} else {
390+
super.connectToNode(node, listener);
371391
}
372-
super.connectToNode(node);
373392
}
374393
}
375394

376395
private final class MockTransport implements Transport {
377396
private ResponseHandlers responseHandlers = new ResponseHandlers();
378397
private volatile boolean randomConnectionExceptions = false;
398+
private final ThreadPool threadPool;
399+
400+
MockTransport(ThreadPool threadPool) {
401+
this.threadPool = threadPool;
402+
}
379403

380404
@Override
381405
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {

0 commit comments

Comments
 (0)