Skip to content

Commit 4f3a311

Browse files
committed
CCS: don't proxy requests for already connected node (#31273)
Cross-cluster search selects a subset of nodes for each remote cluster and sends requests only to them, which will act as a proxy and properly redirect such requests to the target nodes that hold the relevant data. What happens today is that every time we send a request to a remote cluster, it will be sent to the next node in the proxy list (in round-robin fashion), regardless of whether the target node is already amongst the ones that we are connected to. In case for instance we need to send a shard search request to a data node that's also one of the selected proxy nodes, we may end up sending the request to it through one of the other proxy nodes. This commit optimizes this case to make sure that whenever we are already connected to a remote node, we will send a direct request rather than using the next proxy node. There is a side-effect to this, which is that round-robin will be a bit unbalanced as the data nodes that are also selected as proxies will receive more requests.
1 parent 69bcd40 commit 4f3a311

File tree

3 files changed

+126
-45
lines changed

3 files changed

+126
-45
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.elasticsearch.transport.Transport;
5252
import org.elasticsearch.transport.TransportService;
5353

54-
import java.io.IOException;
5554
import java.util.ArrayList;
5655
import java.util.Arrays;
5756
import java.util.Collections;
@@ -364,8 +363,7 @@ private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupSh
364363
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
365364
OriginalIndices localIndices,
366365
List<SearchShardIterator> remoteShardIterators) {
367-
List<SearchShardIterator> shards = new ArrayList<>();
368-
shards.addAll(remoteShardIterators);
366+
List<SearchShardIterator> shards = new ArrayList<>(remoteShardIterators);
369367
for (ShardIterator shardIterator : localShardsIterator) {
370368
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
371369
}
@@ -397,7 +395,7 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque
397395
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
398396
return new SearchPhase(action.getName()) {
399397
@Override
400-
public void run() throws IOException {
398+
public void run() {
401399
action.start();
402400
}
403401
};

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.logging.log4j.message.ParameterizedMessage;
2222
import org.apache.lucene.store.AlreadyClosedException;
23-
import org.elasticsearch.core.internal.io.IOUtils;
2423
import org.apache.lucene.util.SetOnce;
2524
import org.elasticsearch.Version;
2625
import org.elasticsearch.action.ActionListener;
@@ -43,6 +42,7 @@
4342
import org.elasticsearch.common.util.CancellableThreads;
4443
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4544
import org.elasticsearch.common.util.concurrent.ThreadContext;
45+
import org.elasticsearch.core.internal.io.IOUtils;
4646
import org.elasticsearch.threadpool.ThreadPool;
4747

4848
import java.io.Closeable;
@@ -64,7 +64,6 @@
6464
import java.util.function.Consumer;
6565
import java.util.function.Function;
6666
import java.util.function.Predicate;
67-
import java.util.function.Supplier;
6867
import java.util.stream.Collectors;
6968

7069
/**
@@ -184,7 +183,7 @@ public void ensureConnected(ActionListener<Void> voidActionListener) {
184183

185184
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
186185
final ActionListener<ClusterSearchShardsResponse> listener) {
187-
final DiscoveryNode node = connectedNodes.get();
186+
final DiscoveryNode node = connectedNodes.getAny();
188187
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
189188
new TransportResponseHandler<ClusterSearchShardsResponse>() {
190189

@@ -220,7 +219,7 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
220219
request.clear();
221220
request.nodes(true);
222221
request.local(true); // run this on the node that gets the request it's as good as any other
223-
final DiscoveryNode node = connectedNodes.get();
222+
final DiscoveryNode node = connectedNodes.getAny();
224223
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
225224
new TransportResponseHandler<ClusterStateResponse>() {
226225
@Override
@@ -258,40 +257,52 @@ public String executor() {
258257
}
259258

260259
/**
261-
* Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the
262-
* given node.
260+
* Returns a connection to the remote cluster, preferably a direct connection to the provided {@link DiscoveryNode}.
261+
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
263262
*/
264263
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
265-
DiscoveryNode discoveryNode = connectedNodes.get();
264+
if (transportService.nodeConnected(remoteClusterNode)) {
265+
return transportService.getConnection(remoteClusterNode);
266+
}
267+
DiscoveryNode discoveryNode = connectedNodes.getAny();
266268
Transport.Connection connection = transportService.getConnection(discoveryNode);
267-
return new Transport.Connection() {
268-
@Override
269-
public DiscoveryNode getNode() {
270-
return remoteClusterNode;
271-
}
269+
return new ProxyConnection(connection, remoteClusterNode);
270+
}
272271

273-
@Override
274-
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
272+
static final class ProxyConnection implements Transport.Connection {
273+
private final Transport.Connection proxyConnection;
274+
private final DiscoveryNode targetNode;
275+
276+
private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) {
277+
this.proxyConnection = proxyConnection;
278+
this.targetNode = targetNode;
279+
}
280+
281+
@Override
282+
public DiscoveryNode getNode() {
283+
return targetNode;
284+
}
285+
286+
@Override
287+
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
275288
throws IOException, TransportException {
276-
connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
277-
TransportActionProxy.wrapRequest(remoteClusterNode, request), options);
278-
}
289+
proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
290+
TransportActionProxy.wrapRequest(targetNode, request), options);
291+
}
279292

280-
@Override
281-
public void close() throws IOException {
282-
assert false: "proxy connections must not be closed";
283-
}
293+
@Override
294+
public void close() {
295+
assert false: "proxy connections must not be closed";
296+
}
284297

285-
@Override
286-
public Version getVersion() {
287-
return connection.getVersion();
288-
}
289-
};
298+
@Override
299+
public Version getVersion() {
300+
return proxyConnection.getVersion();
301+
}
290302
}
291303

292304
Transport.Connection getConnection() {
293-
DiscoveryNode discoveryNode = connectedNodes.get();
294-
return transportService.getConnection(discoveryNode);
305+
return transportService.getConnection(getAnyConnectedNode());
295306
}
296307

297308
@Override
@@ -388,7 +399,7 @@ public void onFailure(Exception e) {
388399
}
389400

390401
@Override
391-
protected void doRun() throws Exception {
402+
protected void doRun() {
392403
ActionListener<Void> listener = ActionListener.wrap((x) -> {
393404
synchronized (queue) {
394405
running.release();
@@ -593,8 +604,8 @@ boolean isNodeConnected(final DiscoveryNode node) {
593604
return connectedNodes.contains(node);
594605
}
595606

596-
DiscoveryNode getConnectedNode() {
597-
return connectedNodes.get();
607+
DiscoveryNode getAnyConnectedNode() {
608+
return connectedNodes.getAny();
598609
}
599610

600611
void addConnectedNode(DiscoveryNode node) {
@@ -605,7 +616,7 @@ void addConnectedNode(DiscoveryNode node) {
605616
* Fetches connection info for this connection
606617
*/
607618
public void getConnectionInfo(ActionListener<RemoteConnectionInfo> listener) {
608-
final Optional<DiscoveryNode> anyNode = connectedNodes.getAny();
619+
final Optional<DiscoveryNode> anyNode = connectedNodes.getAnyConnectedNode();
609620
if (anyNode.isPresent() == false) {
610621
// not connected we return immediately
611622
RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias,
@@ -668,7 +679,7 @@ int getNumNodesConnected() {
668679
return connectedNodes.size();
669680
}
670681

671-
private static class ConnectedNodes implements Supplier<DiscoveryNode> {
682+
private static final class ConnectedNodes {
672683

673684
private final Set<DiscoveryNode> nodeSet = new HashSet<>();
674685
private final String clusterAlias;
@@ -679,8 +690,7 @@ private ConnectedNodes(String clusterAlias) {
679690
this.clusterAlias = clusterAlias;
680691
}
681692

682-
@Override
683-
public synchronized DiscoveryNode get() {
693+
public synchronized DiscoveryNode getAny() {
684694
ensureIteratorAvailable();
685695
if (currentIterator.hasNext()) {
686696
return currentIterator.next();
@@ -713,7 +723,7 @@ synchronized boolean contains(DiscoveryNode node) {
713723
return nodeSet.contains(node);
714724
}
715725

716-
synchronized Optional<DiscoveryNode> getAny() {
726+
synchronized Optional<DiscoveryNode> getAnyConnectedNode() {
717727
ensureIteratorAvailable();
718728
if (currentIterator.hasNext()) {
719729
return Optional.of(currentIterator.next());

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import static org.hamcrest.Matchers.iterableWithSize;
8989
import static org.hamcrest.Matchers.not;
9090
import static org.hamcrest.Matchers.notNullValue;
91+
import static org.hamcrest.Matchers.sameInstance;
9192
import static org.hamcrest.Matchers.startsWith;
9293

9394
public class RemoteClusterConnectionTests extends ESTestCase {
@@ -1063,7 +1064,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted
10631064
barrier.await();
10641065
for (int j = 0; j < numGetCalls; j++) {
10651066
try {
1066-
DiscoveryNode node = connection.getConnectedNode();
1067+
DiscoveryNode node = connection.getAnyConnectedNode();
10671068
assertNotNull(node);
10681069
} catch (IllegalStateException e) {
10691070
if (e.getMessage().startsWith("No node available for cluster:") == false) {
@@ -1124,10 +1125,10 @@ public void testClusterNameIsChecked() throws Exception {
11241125
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings);
11251126
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool,
11261127
settings);
1127-
MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
1128-
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build());
1129-
MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
1130-
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) {
1128+
MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
1129+
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build());
1130+
MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
1131+
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) {
11311132
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
11321133
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
11331134
knownNodes.add(seedTransport.getLocalDiscoNode());
@@ -1164,4 +1165,76 @@ public void testClusterNameIsChecked() throws Exception {
11641165
}
11651166
}
11661167
}
1168+
1169+
public void testGetConnection() throws Exception {
1170+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
1171+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
1172+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
1173+
1174+
DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode();
1175+
assertThat(connectedNode, notNullValue());
1176+
knownNodes.add(connectedNode);
1177+
1178+
DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode();
1179+
assertThat(disconnectedNode, notNullValue());
1180+
knownNodes.add(disconnectedNode);
1181+
1182+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
1183+
Transport.Connection seedConnection = new Transport.Connection() {
1184+
@Override
1185+
public DiscoveryNode getNode() {
1186+
return connectedNode;
1187+
}
1188+
1189+
@Override
1190+
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
1191+
throws TransportException {
1192+
// no-op
1193+
}
1194+
1195+
@Override
1196+
public void close() {
1197+
// no-op
1198+
}
1199+
};
1200+
service.addDelegate(connectedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) {
1201+
@Override
1202+
public Connection getConnection(DiscoveryNode node) {
1203+
if (node == connectedNode) {
1204+
return seedConnection;
1205+
}
1206+
return super.getConnection(node);
1207+
}
1208+
1209+
@Override
1210+
public boolean nodeConnected(DiscoveryNode node) {
1211+
return node.equals(connectedNode);
1212+
}
1213+
});
1214+
service.start();
1215+
service.acceptIncomingRequests();
1216+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
1217+
Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) {
1218+
connection.addConnectedNode(connectedNode);
1219+
for (int i = 0; i < 10; i++) {
1220+
//always a direct connection as the remote node is already connected
1221+
Transport.Connection remoteConnection = connection.getConnection(connectedNode);
1222+
assertSame(seedConnection, remoteConnection);
1223+
}
1224+
for (int i = 0; i < 10; i++) {
1225+
//always a direct connection as the remote node is already connected
1226+
Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode());
1227+
assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class)));
1228+
assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode()));
1229+
}
1230+
for (int i = 0; i < 10; i++) {
1231+
//always a proxy connection as the target node is not connected
1232+
Transport.Connection remoteConnection = connection.getConnection(disconnectedNode);
1233+
assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class));
1234+
assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode));
1235+
}
1236+
}
1237+
}
1238+
}
1239+
}
11671240
}

0 commit comments

Comments
 (0)