Skip to content

Commit bca865d

Browse files
authored
Move ConnectionManager to async APIs (#42636)
This commit converts the ConnectionManager's openConnection and connectToNode methods to async-style. This will allow us to not block threads anymore when opening connections. This PR also adapts the cluster coordination subsystem to make use of the new async APIs, allowing to remove some hacks in the test infrastructure that had to account for the previous synchronous nature of the connection APIs.
1 parent 4ca2c6c commit bca865d

19 files changed

+549
-322
lines changed

server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java

+8
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,20 @@
1919

2020
package org.elasticsearch.action.support;
2121

22+
import org.elasticsearch.common.CheckedConsumer;
23+
2224
public class PlainActionFuture<T> extends AdapterActionFuture<T, T> {
2325

2426
public static <T> PlainActionFuture<T> newFuture() {
2527
return new PlainActionFuture<>();
2628
}
2729

30+
public static <T, E extends Exception> T get(CheckedConsumer<PlainActionFuture<T>, E> e) throws E {
31+
PlainActionFuture<T> fut = newFuture();
32+
e.accept(fut);
33+
return fut.actionGet();
34+
}
35+
2836
@Override
2937
protected T convert(T listenerResponse) {
3038
return listenerResponse;

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

+15-16
Original file line numberDiff line numberDiff line change
@@ -442,23 +442,22 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
442442
return;
443443
}
444444

445-
transportService.connectToNode(joinRequest.getSourceNode());
446-
447-
final ClusterState stateForJoinValidation = getStateForMasterService();
448-
449-
if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) {
450-
onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
451-
if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
452-
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
453-
// to ensure we fail as fast as possible.
454-
JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(),
455-
stateForJoinValidation.getNodes().getMinNodeVersion());
445+
transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> {
446+
final ClusterState stateForJoinValidation = getStateForMasterService();
447+
448+
if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) {
449+
onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation));
450+
if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
451+
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
452+
// to ensure we fail as fast as possible.
453+
JoinTaskExecutor.ensureMajorVersionBarrier(joinRequest.getSourceNode().getVersion(),
454+
stateForJoinValidation.getNodes().getMinNodeVersion());
455+
}
456+
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
457+
} else {
458+
processJoinRequest(joinRequest, joinCallback);
456459
}
457-
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
458-
459-
} else {
460-
processJoinRequest(joinRequest, joinCallback);
461-
}
460+
}, joinCallback::onFailure));
462461
}
463462

464463
// package private for tests

server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java

+63-32
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
2525
import org.elasticsearch.Version;
2626
import org.elasticsearch.action.ActionListener;
27+
import org.elasticsearch.action.NotifyOnceListener;
2728
import org.elasticsearch.cluster.node.DiscoveryNode;
2829
import org.elasticsearch.common.Randomness;
2930
import org.elasticsearch.common.UUIDs;
@@ -70,7 +71,7 @@ public HandshakingTransportAddressConnector(Settings settings, TransportService
7071
public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener<DiscoveryNode> listener) {
7172
transportService.getThreadPool().generic().execute(new AbstractRunnable() {
7273
@Override
73-
protected void doRun() throws Exception {
74+
protected void doRun() {
7475

7576
// TODO if transportService is already connected to this address then skip the handshaking
7677

@@ -80,38 +81,68 @@ protected void doRun() throws Exception {
8081
emptySet(), Version.CURRENT.minimumCompatibilityVersion());
8182

8283
logger.trace("[{}] opening probe connection", this);
83-
final Connection connection = transportService.openConnection(targetNode,
84+
transportService.openConnection(targetNode,
8485
ConnectionProfile.buildSingleChannelProfile(Type.REG, probeConnectTimeout, probeHandshakeTimeout,
85-
TimeValue.MINUS_ONE, null));
86-
logger.trace("[{}] opened probe connection", this);
87-
88-
final DiscoveryNode remoteNode;
89-
try {
90-
remoteNode = transportService.handshake(connection, probeHandshakeTimeout.millis());
91-
// success means (amongst other things) that the cluster names match
92-
logger.trace("[{}] handshake successful: {}", this, remoteNode);
93-
} catch (Exception e) {
94-
// we opened a connection and successfully performed a low-level handshake, so we were definitely talking to an
95-
// Elasticsearch node, but the high-level handshake failed indicating some kind of mismatched configurations
96-
// (e.g. cluster name) that the user should address
97-
logger.warn(new ParameterizedMessage("handshake failed for [{}]", this), e);
98-
listener.onFailure(e);
99-
return;
100-
} finally {
101-
IOUtils.closeWhileHandlingException(connection);
102-
}
103-
104-
if (remoteNode.equals(transportService.getLocalNode())) {
105-
// TODO cache this result for some time? forever?
106-
listener.onFailure(new ConnectTransportException(remoteNode, "local node found"));
107-
} else if (remoteNode.isMasterNode() == false) {
108-
// TODO cache this result for some time?
109-
listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found"));
110-
} else {
111-
transportService.connectToNode(remoteNode);
112-
logger.trace("[{}] full connection successful: {}", this, remoteNode);
113-
listener.onResponse(remoteNode);
114-
}
86+
TimeValue.MINUS_ONE, null), new ActionListener<>() {
87+
@Override
88+
public void onResponse(Connection connection) {
89+
logger.trace("[{}] opened probe connection", this);
90+
91+
// use NotifyOnceListener to make sure the following line does not result in onFailure being called when
92+
// the connection is closed in the onResponse handler
93+
transportService.handshake(connection, probeHandshakeTimeout.millis(), new NotifyOnceListener<DiscoveryNode>() {
94+
95+
@Override
96+
protected void innerOnResponse(DiscoveryNode remoteNode) {
97+
try {
98+
// success means (amongst other things) that the cluster names match
99+
logger.trace("[{}] handshake successful: {}", this, remoteNode);
100+
IOUtils.closeWhileHandlingException(connection);
101+
102+
if (remoteNode.equals(transportService.getLocalNode())) {
103+
// TODO cache this result for some time? forever?
104+
listener.onFailure(new ConnectTransportException(remoteNode, "local node found"));
105+
} else if (remoteNode.isMasterNode() == false) {
106+
// TODO cache this result for some time?
107+
listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found"));
108+
} else {
109+
transportService.connectToNode(remoteNode, new ActionListener<Void>() {
110+
@Override
111+
public void onResponse(Void ignored) {
112+
logger.trace("[{}] full connection successful: {}", this, remoteNode);
113+
listener.onResponse(remoteNode);
114+
}
115+
116+
@Override
117+
public void onFailure(Exception e) {
118+
listener.onFailure(e);
119+
}
120+
});
121+
}
122+
} catch (Exception e) {
123+
listener.onFailure(e);
124+
}
125+
}
126+
127+
@Override
128+
protected void innerOnFailure(Exception e) {
129+
// we opened a connection and successfully performed a low-level handshake, so we were definitely
130+
// talking to an Elasticsearch node, but the high-level handshake failed indicating some kind of
131+
// mismatched configurations (e.g. cluster name) that the user should address
132+
logger.warn(new ParameterizedMessage("handshake failed for [{}]", this), e);
133+
IOUtils.closeWhileHandlingException(connection);
134+
listener.onFailure(e);
135+
}
136+
137+
});
138+
139+
}
140+
141+
@Override
142+
public void onFailure(Exception e) {
143+
listener.onFailure(e);
144+
}
145+
});
115146
}
116147

117148
@Override

0 commit comments

Comments
 (0)