Skip to content

Commit 4711076

Browse files
authored
Ensure we don't use a remote profile if cluster name matches (#31331)
If we are running into a race condition between a node being configured to be a remote node for cross cluster search etc. and that node joining the cluster we might connect to that node with a remote profile. If that node now joins the cluster it connected to it as a CCS remote node we use the wrong profile and can't use bulk connections etc. anymore. This change uses the remote profile only if we connect to a node that has a different cluster name than the local cluster. This is not a perfect fix for this situation but is the safe option while potentially only loose a small optimization of using less connections per node which is small anyways since we only connect to a small set of nodes. Closes #29321
1 parent be382e9 commit 4711076

File tree

4 files changed

+160
-12
lines changed

4 files changed

+160
-12
lines changed

core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

+28-5
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
9090
private volatile List<DiscoveryNode> seedNodes;
9191
private final ConnectHandler connectHandler;
9292
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
93+
private final ClusterName localClusterName;
9394

9495
/**
9596
* Creates a new {@link RemoteClusterConnection}
@@ -103,6 +104,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
103104
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
104105
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
105106
super(settings);
107+
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
106108
this.transportService = transportService;
107109
this.maxNumRemoteConnections = maxNumRemoteConnections;
108110
this.nodePredicate = nodePredicate;
@@ -293,6 +295,21 @@ public boolean isClosed() {
293295
return connectHandler.isClosed();
294296
}
295297

298+
private ConnectionProfile getRemoteProfile(ClusterName name) {
299+
// we can only compare the cluster name to make a decision if we should use a remote profile
300+
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
301+
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
302+
// rather smallish optimization on the connection layer under certain situations where remote clusters
303+
// have the same name as the local one is minor here.
304+
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
305+
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
306+
if (this.localClusterName.equals(name)) {
307+
return null;
308+
} else {
309+
return remoteProfile;
310+
}
311+
}
312+
296313
/**
297314
* The connect handler manages node discovery and the actual connect to the remote cluster.
298315
* There is at most one connect job running at any time. If such a connect job is triggered
@@ -402,7 +419,6 @@ protected void doRun() throws Exception {
402419
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
403420
}
404421
});
405-
406422
}
407423

408424
void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
@@ -414,21 +430,27 @@ void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
414430
if (seedNodes.hasNext()) {
415431
cancellableThreads.executeIO(() -> {
416432
final DiscoveryNode seedNode = seedNodes.next();
417-
final DiscoveryNode handshakeNode;
433+
final TransportService.HandshakeResponse handshakeResponse;
418434
Transport.Connection connection = transportService.openConnection(seedNode,
419435
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
420436
boolean success = false;
421437
try {
422438
try {
423-
handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
439+
handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
424440
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
425441
} catch (IllegalStateException ex) {
426442
logger.warn((Supplier<?>) () -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
427443
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
428444
throw ex;
429445
}
446+
447+
final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
430448
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
431-
transportService.connectToNode(handshakeNode, remoteProfile);
449+
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
450+
if (remoteClusterName.get() == null) {
451+
assert handshakeResponse.getClusterName().value() != null;
452+
remoteClusterName.set(handshakeResponse.getClusterName());
453+
}
432454
connectedNodes.add(handshakeNode);
433455
}
434456
ClusterStateRequest request = new ClusterStateRequest();
@@ -536,7 +558,8 @@ public void handleResponse(ClusterStateResponse response) {
536558
for (DiscoveryNode node : nodesIter) {
537559
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
538560
try {
539-
transportService.connectToNode(node, remoteProfile); // noop if node is connected
561+
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
562+
// connected
540563
connectedNodes.add(node);
541564
} catch (ConnectTransportException | IllegalStateException ex) {
542565
// ISE if we fail the handshake with an version incompatible node

core/src/main/java/org/elasticsearch/transport/TransportService.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,8 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
340340
return;
341341
}
342342
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
343-
// We don't validate cluster names to allow for tribe node connections.
344-
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
343+
// We don't validate cluster names to allow for CCS connections.
344+
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode;
345345
if (node.equals(remote) == false) {
346346
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
347347
}
@@ -377,7 +377,7 @@ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionP
377377
public DiscoveryNode handshake(
378378
final Transport.Connection connection,
379379
final long handshakeTimeout) throws ConnectTransportException {
380-
return handshake(connection, handshakeTimeout, clusterName::equals);
380+
return handshake(connection, handshakeTimeout, clusterName::equals).discoveryNode;
381381
}
382382

383383
/**
@@ -389,11 +389,11 @@ public DiscoveryNode handshake(
389389
* @param connection the connection to a specific node
390390
* @param handshakeTimeout handshake timeout
391391
* @param clusterNamePredicate cluster name validation predicate
392-
* @return the connected node
392+
* @return the handshake response
393393
* @throws ConnectTransportException if the connection failed
394394
* @throws IllegalStateException if the handshake failed
395395
*/
396-
public DiscoveryNode handshake(
396+
public HandshakeResponse handshake(
397397
final Transport.Connection connection,
398398
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) throws ConnectTransportException {
399399
final HandshakeResponse response;
@@ -419,7 +419,7 @@ public HandshakeResponse newInstance() {
419419
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
420420
}
421421

422-
return response.discoveryNode;
422+
return response;
423423
}
424424

425425
static class HandshakeRequest extends TransportRequest {
@@ -460,6 +460,14 @@ public void writeTo(StreamOutput out) throws IOException {
460460
clusterName.writeTo(out);
461461
Version.writeVersion(version, out);
462462
}
463+
464+
public DiscoveryNode getDiscoveryNode() {
465+
return discoveryNode;
466+
}
467+
468+
public ClusterName getClusterName() {
469+
return clusterName;
470+
}
463471
}
464472

465473
public void disconnectFromNode(DiscoveryNode node) {

core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

+93
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,99 @@ public static MockTransportService startTransport(
143143
}
144144
}
145145

146+
public void testLocalProfileIsUsedForLocalCluster() throws Exception {
147+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
148+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
149+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
150+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
151+
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
152+
knownNodes.add(seedTransport.getLocalDiscoNode());
153+
knownNodes.add(discoverableTransport.getLocalDiscoNode());
154+
Collections.shuffle(knownNodes, random());
155+
156+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
157+
service.start();
158+
service.acceptIncomingRequests();
159+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
160+
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
161+
updateSeedNodes(connection, Arrays.asList(seedNode));
162+
assertTrue(service.nodeConnected(seedNode));
163+
assertTrue(service.nodeConnected(discoverableNode));
164+
assertTrue(connection.assertNoRunningConnections());
165+
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
166+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
167+
168+
@Override
169+
public ClusterSearchShardsResponse newInstance() {
170+
return new ClusterSearchShardsResponse();
171+
}
172+
});
173+
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
174+
.build();
175+
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
176+
options, futureHandler);
177+
futureHandler.txGet();
178+
}
179+
}
180+
}
181+
}
182+
183+
public void testRemoteProfileIsUsedForRemoteCluster() throws Exception {
184+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
185+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool,
186+
Settings.builder().put("cluster.name", "foobar").build());
187+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT,
188+
threadPool, Settings.builder().put("cluster.name", "foobar").build())) {
189+
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
190+
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
191+
knownNodes.add(seedTransport.getLocalDiscoNode());
192+
knownNodes.add(discoverableTransport.getLocalDiscoNode());
193+
Collections.shuffle(knownNodes, random());
194+
195+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
196+
service.start();
197+
service.acceptIncomingRequests();
198+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
199+
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
200+
updateSeedNodes(connection, Arrays.asList(seedNode));
201+
assertTrue(service.nodeConnected(seedNode));
202+
assertTrue(service.nodeConnected(discoverableNode));
203+
assertTrue(connection.assertNoRunningConnections());
204+
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
205+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
206+
207+
@Override
208+
public ClusterSearchShardsResponse newInstance() {
209+
return new ClusterSearchShardsResponse();
210+
}
211+
});
212+
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
213+
.build();
214+
IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> {
215+
service.sendRequest(discoverableNode,
216+
ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler);
217+
futureHandler.txGet();
218+
}).getCause();
219+
assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]");
220+
221+
PlainTransportFuture<ClusterSearchShardsResponse> handler = new PlainTransportFuture<>(
222+
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {
223+
224+
@Override
225+
public ClusterSearchShardsResponse newInstance() {
226+
return new ClusterSearchShardsResponse();
227+
}
228+
});
229+
TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG)
230+
.build();
231+
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
232+
ops, handler);
233+
handler.txGet();
234+
}
235+
}
236+
}
237+
}
238+
146239
public void testDiscoverSingleNode() throws Exception {
147240
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
148241
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,36 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx
175175
}
176176
}
177177

178+
179+
ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
180+
ConnectionProfile connectionProfile1 = connectionProfile == null ? LIGHT_PROFILE : connectionProfile;
181+
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
182+
Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
183+
Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
184+
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) {
185+
Set<TransportRequestOptions.Type> types = handle.getTypes();
186+
if (handle.length > 0) {
187+
allTypesWithConnection.addAll(types);
188+
} else {
189+
allTypesWithoutConnection.addAll(types);
190+
}
191+
}
192+
// make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
193+
builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
194+
if (allTypesWithoutConnection.isEmpty() == false) {
195+
builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
196+
}
197+
builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout());
198+
builder.setConnectTimeout(connectionProfile1.getConnectTimeout());
199+
return builder.build();
200+
}
201+
178202
@Override
179203
protected NodeChannels connectToChannels(DiscoveryNode node,
180204
ConnectionProfile profile,
181205
Consumer<MockChannel> onChannelClose) throws IOException {
182206
final MockChannel[] mockChannels = new MockChannel[1];
183-
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE); // we always use light here
207+
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, resolveConnectionProfile(profile));
184208
boolean success = false;
185209
final Socket socket = new Socket();
186210
try {

0 commit comments

Comments
 (0)