Skip to content

Commit be6697f

Browse files
Remove Blocking Connect Methods from TransportService (#48841)
We're not doing any blocking connects in production code anymore so we can move these helpers to test code only. Also, we were only tracking the proper closing of blockingly opened connections on the mock transport service but didn't check those created via the non-blocking API which is fixed here too.
1 parent 1f3d101 commit be6697f

File tree

16 files changed

+128
-119
lines changed

16 files changed

+128
-119
lines changed

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionP
6565

6666
public void testConnectException() throws UnknownHostException {
6767
try {
68-
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
68+
connectToNode(serviceA, new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
6969
emptyMap(), emptySet(),Version.CURRENT));
7070
fail("Expected ConnectTransportException");
7171
} catch (ConnectTransportException e) {

plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionP
6868

6969
public void testConnectException() throws UnknownHostException {
7070
try {
71-
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
71+
connectToNode(serviceA, new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
7272
emptyMap(), emptySet(),Version.CURRENT));
7373
fail("Expected ConnectTransportException");
7474
} catch (ConnectTransportException e) {

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.Version;
2626
import org.elasticsearch.action.ActionListener;
2727
import org.elasticsearch.action.ActionListenerResponseHandler;
28-
import org.elasticsearch.action.support.PlainActionFuture;
2928
import org.elasticsearch.cluster.ClusterName;
3029
import org.elasticsearch.cluster.node.DiscoveryNode;
3130
import org.elasticsearch.common.Nullable;
@@ -139,7 +138,7 @@ public void close() {
139138
* Build the service.
140139
*
141140
* @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
142-
* * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
141+
* updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
143142
*/
144143
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
145144
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
@@ -315,25 +314,6 @@ public boolean nodeConnected(DiscoveryNode node) {
315314
return isLocalNode(node) || connectionManager.nodeConnected(node);
316315
}
317316

318-
/**
319-
* Connect to the specified node with the default connection profile
320-
*
321-
* @param node the node to connect to
322-
*/
323-
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
324-
connectToNode(node, (ConnectionProfile) null);
325-
}
326-
327-
/**
328-
* Connect to the specified node with the given connection profile
329-
*
330-
* @param node the node to connect to
331-
* @param connectionProfile the connection profile to use when connecting to this node
332-
*/
333-
public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile) {
334-
PlainActionFuture.get(fut -> connectToNode(node, connectionProfile, ActionListener.map(fut, x -> null)));
335-
}
336-
337317
/**
338318
* Connect to the specified node with the given connection profile.
339319
* The ActionListener will be called on the calling thread or the generic thread pool.
@@ -374,17 +354,6 @@ public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode n
374354
};
375355
}
376356

377-
/**
378-
* Establishes and returns a new connection to the given node. The connection is NOT maintained by this service, it's the callers
379-
* responsibility to close the connection once it goes out of scope.
380-
* The ActionListener will be called on the calling thread or the generic thread pool.
381-
* @param node the node to connect to
382-
* @param connectionProfile the connection profile to use
383-
*/
384-
public Transport.Connection openConnection(final DiscoveryNode node, ConnectionProfile connectionProfile) {
385-
return PlainActionFuture.get(fut -> openConnection(node, connectionProfile, fut));
386-
}
387-
388357
/**
389358
* Establishes a new connection to the given node. The connection is NOT maintained by this service, it's the callers
390359
* responsibility to close the connection once it goes out of scope.

server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.test.tasks.MockTaskManager;
5151
import org.elasticsearch.threadpool.TestThreadPool;
5252
import org.elasticsearch.threadpool.ThreadPool;
53+
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
5354
import org.elasticsearch.transport.TransportService;
5455
import org.elasticsearch.transport.nio.MockNioTransport;
5556
import org.junit.After;
@@ -222,7 +223,7 @@ public static void connectNodes(TestNode... nodes) {
222223
}
223224
for (TestNode nodeA : nodes) {
224225
for (TestNode nodeB : nodes) {
225-
nodeA.transportService.connectToNode(nodeB.discoveryNode());
226+
AbstractSimpleTransportTestCase.connectToNode(nodeA.transportService, nodeB.discoveryNode());
226227
}
227228
}
228229
}

server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,6 @@ public void handshake(Transport.Connection connection, long timeout, Predicate<C
369369
listener.onResponse(new HandshakeResponse(connection.getNode(), new ClusterName(""), Version.CURRENT));
370370
}
371371

372-
@Override
373-
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
374-
throw new AssertionError("no blocking connect");
375-
}
376-
377372
@Override
378373
public void connectToNode(DiscoveryNode node, ActionListener<Void> listener) throws ConnectTransportException {
379374
final CheckedRunnable<Exception> connectionBlock = nodeConnectionBlocks.get(node);

server/src/test/java/org/elasticsearch/cluster/coordination/FollowersCheckerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.test.transport.CapturingTransport;
3636
import org.elasticsearch.test.transport.MockTransport;
3737
import org.elasticsearch.threadpool.ThreadPool.Names;
38+
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
3839
import org.elasticsearch.transport.ConnectTransportException;
3940
import org.elasticsearch.transport.TransportException;
4041
import org.elasticsearch.transport.TransportRequest;
@@ -283,7 +284,7 @@ public String toString() {
283284
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).build();
284285
followersChecker.setCurrentNodes(discoveryNodes);
285286

286-
transportService.connectToNode(otherNode);
287+
AbstractSimpleTransportTestCase.connectToNode(transportService, otherNode);
287288
transportService.disconnectFromNode(otherNode);
288289
deterministicTaskQueue.runAllRunnableTasks();
289290
assertTrue(nodeFailed.get());

server/src/test/java/org/elasticsearch/cluster/coordination/LeaderCheckerTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.test.transport.CapturingTransport;
3434
import org.elasticsearch.test.transport.MockTransport;
3535
import org.elasticsearch.threadpool.ThreadPool.Names;
36+
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
3637
import org.elasticsearch.transport.ConnectTransportException;
3738
import org.elasticsearch.transport.TransportException;
3839
import org.elasticsearch.transport.TransportRequest;
@@ -305,7 +306,8 @@ public String toString() {
305306

306307
leaderChecker.updateLeader(leader);
307308
{
308-
transportService.connectToNode(leader); // need to connect first for disconnect to have any effect
309+
// need to connect first for disconnect to have any effect
310+
AbstractSimpleTransportTestCase.connectToNode(transportService, leader);
309311

310312
transportService.disconnectFromNode(leader);
311313
deterministicTaskQueue.runAllRunnableTasks();

server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.test.ESIntegTestCase;
3939
import org.elasticsearch.test.transport.MockTransportService;
4040
import org.elasticsearch.threadpool.ThreadPool;
41+
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
4142
import org.elasticsearch.transport.TransportService;
4243

4344
import java.io.Closeable;
@@ -420,7 +421,7 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
420421
internalCluster().getInstance(TransportService.class, connection.getNode().getName());
421422
final DiscoveryNode primaryNode = primaryTransportService.getLocalNode();
422423
replicaTransportService.disconnectFromNode(primaryNode);
423-
replicaTransportService.connectToNode(primaryNode);
424+
AbstractSimpleTransportTestCase.connectToNode(replicaTransportService, primaryNode);
424425
} else {
425426
// return an exception to the FINALIZE action
426427
throw new ElasticsearchException("failing recovery for test purposes");

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -806,7 +806,8 @@ public void clearNetworkDisruptions() {
806806
disconnectedNodes.forEach(nodeName -> {
807807
if (testClusterNodes.nodes.containsKey(nodeName)) {
808808
final DiscoveryNode node = testClusterNodes.nodes.get(nodeName).node;
809-
testClusterNodes.nodes.values().forEach(n -> n.transportService.openConnection(node, null));
809+
testClusterNodes.nodes.values().forEach(
810+
n -> n.transportService.openConnection(node, null, ActionListener.wrap(() -> {})));
810811
}
811812
});
812813
}

server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void testSendMessage() throws InterruptedException {
9090
channel.sendResponse(response);
9191
});
9292
TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new);
93-
serviceA.connectToNode(nodeB);
93+
AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB);
9494

9595
serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new,
9696
(request, channel, task) -> {
@@ -99,7 +99,7 @@ public void testSendMessage() throws InterruptedException {
9999
channel.sendResponse(response);
100100
});
101101
TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new);
102-
serviceB.connectToNode(nodeC);
102+
AbstractSimpleTransportTestCase.connectToNode(serviceB, nodeC);
103103
serviceC.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new,
104104
(request, channel, task) -> {
105105
assertEquals(request.sourceNode, "TS_A");
@@ -150,7 +150,7 @@ public void testException() throws InterruptedException {
150150
channel.sendResponse(response);
151151
});
152152
TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new);
153-
serviceA.connectToNode(nodeB);
153+
AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB);
154154

155155
serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new,
156156
(request, channel, task) -> {
@@ -159,7 +159,7 @@ public void testException() throws InterruptedException {
159159
channel.sendResponse(response);
160160
});
161161
TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new);
162-
serviceB.connectToNode(nodeC);
162+
AbstractSimpleTransportTestCase.connectToNode(serviceB, nodeC);
163163
serviceC.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new,
164164
(request, channel, task) -> {
165165
throw new ElasticsearchException("greetings from TS_C");

server/src/test/java/org/elasticsearch/transport/TransportServiceHandshakeTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ public void testConnectToNodeLight() throws IOException {
109109
emptyMap(),
110110
emptySet(),
111111
Version.CURRENT.minimumCompatibilityVersion());
112-
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode, TestProfiles.LIGHT_PROFILE)){
112+
try (Transport.Connection connection =
113+
AbstractSimpleTransportTestCase.openConnection(handleA.transportService, discoveryNode, TestProfiles.LIGHT_PROFILE)) {
113114
DiscoveryNode connectedNode = PlainActionFuture.get(fut -> handleA.transportService.handshake(connection, timeout, fut));
114115
assertNotNull(connectedNode);
115116
// the name and version should be updated
@@ -130,8 +131,8 @@ public void testMismatchedClusterName() {
130131
emptySet(),
131132
Version.CURRENT.minimumCompatibilityVersion());
132133
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
133-
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
134-
TestProfiles.LIGHT_PROFILE)) {
134+
try (Transport.Connection connection =
135+
AbstractSimpleTransportTestCase.openConnection(handleA.transportService, discoveryNode, TestProfiles.LIGHT_PROFILE)) {
135136
PlainActionFuture.get(fut -> handleA.transportService.handshake(connection, timeout, ActionListener.map(fut, x -> null)));
136137
}
137138
});
@@ -152,8 +153,8 @@ public void testIncompatibleVersions() {
152153
emptySet(),
153154
Version.CURRENT.minimumCompatibilityVersion());
154155
IllegalStateException ex = expectThrows(IllegalStateException.class, () -> {
155-
try (Transport.Connection connection = handleA.transportService.openConnection(discoveryNode,
156-
TestProfiles.LIGHT_PROFILE)) {
156+
try (Transport.Connection connection =
157+
AbstractSimpleTransportTestCase.openConnection(handleA.transportService, discoveryNode, TestProfiles.LIGHT_PROFILE)) {
157158
PlainActionFuture.get(fut -> handleA.transportService.handshake(connection, timeout, ActionListener.map(fut, x -> null)));
158159
}
159160
});
@@ -173,9 +174,8 @@ public void testNodeConnectWithDifferentNodeId() {
173174
emptyMap(),
174175
emptySet(),
175176
handleB.discoveryNode.getVersion());
176-
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
177-
handleA.transportService.connectToNode(discoveryNode, TestProfiles.LIGHT_PROFILE);
178-
});
177+
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () ->
178+
AbstractSimpleTransportTestCase.connectToNode(handleA.transportService, discoveryNode, TestProfiles.LIGHT_PROFILE));
179179
assertThat(ex.getMessage(), containsString("unexpected remote node"));
180180
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
181181
}

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -505,27 +505,26 @@ public Transport getOriginalTransport() {
505505
}
506506

507507
@Override
508-
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
509-
Transport.Connection connection = super.openConnection(node, profile);
510-
511-
synchronized (openConnections) {
512-
openConnections.computeIfAbsent(node, n -> new CopyOnWriteArrayList<>()).add(connection);
513-
connection.addCloseListener(ActionListener.wrap(() -> {
514-
synchronized (openConnections) {
515-
List<Transport.Connection> connections = openConnections.get(node);
516-
boolean remove = connections.remove(connection);
517-
assert remove : "Should have removed connection";
518-
if (connections.isEmpty()) {
519-
openConnections.remove(node);
520-
}
521-
if (openConnections.isEmpty()) {
522-
openConnections.notifyAll();
508+
public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Transport.Connection> listener) {
509+
super.openConnection(node, connectionProfile, ActionListener.delegateFailure(listener, (l, connection) -> {
510+
synchronized (openConnections) {
511+
openConnections.computeIfAbsent(node, n -> new CopyOnWriteArrayList<>()).add(connection);
512+
connection.addCloseListener(ActionListener.wrap(() -> {
513+
synchronized (openConnections) {
514+
List<Transport.Connection> connections = openConnections.get(node);
515+
boolean remove = connections.remove(connection);
516+
assert remove : "Should have removed connection";
517+
if (connections.isEmpty()) {
518+
openConnections.remove(node);
519+
}
520+
if (openConnections.isEmpty()) {
521+
openConnections.notifyAll();
522+
}
523523
}
524-
}
525-
}));
526-
}
527-
528-
return connection;
524+
}));
525+
}
526+
l.onResponse(connection);
527+
}));
529528
}
530529

531530
@Override

0 commit comments

Comments
 (0)