Skip to content

Commit 664903a

Browse files
authored
CCS: don't proxy requests for already connected node (#31273)
Cross-cluster search selects a subset of nodes for each remote cluster and sends requests only to them, which will act as a proxy and properly redirect such requests to the target nodes that hold the relevant data. What happens today is that every time we send a request to a remote cluster, it will be sent to the next node in the proxy list (in round-robin fashion), regardless of whether the target node is already amongst the ones that we are connected to. In case for instance we need to send a shard search request to a data node that's also one of the selected proxy nodes, we may end up sending the request to it through one of the other proxy nodes. This commit optimizes this case to make sure that whenever we are already connected to a remote node, we will send a direct request rather than using the next proxy node. There is a side-effect to this, which is that round-robin will be a bit unbalanced as the data nodes that are also selected as proxies will receive more requests.
1 parent 018d3fc commit 664903a

File tree

3 files changed

+124
-53
lines changed

3 files changed

+124
-53
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.elasticsearch.transport.Transport;
5151
import org.elasticsearch.transport.TransportService;
5252

53-
import java.io.IOException;
5453
import java.util.ArrayList;
5554
import java.util.Arrays;
5655
import java.util.Collections;
@@ -351,8 +350,7 @@ private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupSh
351350
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
352351
OriginalIndices localIndices,
353352
List<SearchShardIterator> remoteShardIterators) {
354-
List<SearchShardIterator> shards = new ArrayList<>();
355-
shards.addAll(remoteShardIterators);
353+
List<SearchShardIterator> shards = new ArrayList<>(remoteShardIterators);
356354
for (ShardIterator shardIterator : localShardsIterator) {
357355
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
358356
}
@@ -384,7 +382,7 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque
384382
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
385383
return new SearchPhase(action.getName()) {
386384
@Override
387-
public void run() throws IOException {
385+
public void run() {
388386
action.start();
389387
}
390388
};

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.logging.log4j.message.ParameterizedMessage;
2222
import org.apache.lucene.store.AlreadyClosedException;
23-
import org.elasticsearch.core.internal.io.IOUtils;
2423
import org.apache.lucene.util.SetOnce;
2524
import org.elasticsearch.Version;
2625
import org.elasticsearch.action.ActionListener;
@@ -40,6 +39,7 @@
4039
import org.elasticsearch.common.util.CancellableThreads;
4140
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4241
import org.elasticsearch.common.util.concurrent.ThreadContext;
42+
import org.elasticsearch.core.internal.io.IOUtils;
4343
import org.elasticsearch.threadpool.ThreadPool;
4444

4545
import java.io.Closeable;
@@ -50,7 +50,6 @@
5050
import java.util.HashSet;
5151
import java.util.Iterator;
5252
import java.util.List;
53-
import java.util.Optional;
5453
import java.util.Set;
5554
import java.util.concurrent.ArrayBlockingQueue;
5655
import java.util.concurrent.BlockingQueue;
@@ -61,7 +60,6 @@
6160
import java.util.function.Consumer;
6261
import java.util.function.Function;
6362
import java.util.function.Predicate;
64-
import java.util.function.Supplier;
6563
import java.util.stream.Collectors;
6664

6765
/**
@@ -181,7 +179,7 @@ public void ensureConnected(ActionListener<Void> voidActionListener) {
181179

182180
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
183181
final ActionListener<ClusterSearchShardsResponse> listener) {
184-
final DiscoveryNode node = connectedNodes.get();
182+
final DiscoveryNode node = connectedNodes.getAny();
185183
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
186184
new TransportResponseHandler<ClusterSearchShardsResponse>() {
187185

@@ -217,7 +215,7 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
217215
request.clear();
218216
request.nodes(true);
219217
request.local(true); // run this on the node that gets the request it's as good as any other
220-
final DiscoveryNode node = connectedNodes.get();
218+
final DiscoveryNode node = connectedNodes.getAny();
221219
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
222220
new TransportResponseHandler<ClusterStateResponse>() {
223221
@Override
@@ -255,40 +253,52 @@ public String executor() {
255253
}
256254

257255
/**
258-
* Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the
259-
* given node.
256+
* Returns a connection to the remote cluster, preferably a direct connection to the provided {@link DiscoveryNode}.
257+
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
260258
*/
261259
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
262-
DiscoveryNode discoveryNode = connectedNodes.get();
260+
if (transportService.nodeConnected(remoteClusterNode)) {
261+
return transportService.getConnection(remoteClusterNode);
262+
}
263+
DiscoveryNode discoveryNode = connectedNodes.getAny();
263264
Transport.Connection connection = transportService.getConnection(discoveryNode);
264-
return new Transport.Connection() {
265-
@Override
266-
public DiscoveryNode getNode() {
267-
return remoteClusterNode;
268-
}
265+
return new ProxyConnection(connection, remoteClusterNode);
266+
}
269267

270-
@Override
271-
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
268+
static final class ProxyConnection implements Transport.Connection {
269+
private final Transport.Connection proxyConnection;
270+
private final DiscoveryNode targetNode;
271+
272+
private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) {
273+
this.proxyConnection = proxyConnection;
274+
this.targetNode = targetNode;
275+
}
276+
277+
@Override
278+
public DiscoveryNode getNode() {
279+
return targetNode;
280+
}
281+
282+
@Override
283+
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
272284
throws IOException, TransportException {
273-
connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
274-
TransportActionProxy.wrapRequest(remoteClusterNode, request), options);
275-
}
285+
proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
286+
TransportActionProxy.wrapRequest(targetNode, request), options);
287+
}
276288

277-
@Override
278-
public void close() throws IOException {
279-
assert false: "proxy connections must not be closed";
280-
}
289+
@Override
290+
public void close() {
291+
assert false: "proxy connections must not be closed";
292+
}
281293

282-
@Override
283-
public Version getVersion() {
284-
return connection.getVersion();
285-
}
286-
};
294+
@Override
295+
public Version getVersion() {
296+
return proxyConnection.getVersion();
297+
}
287298
}
288299

289300
Transport.Connection getConnection() {
290-
DiscoveryNode discoveryNode = connectedNodes.get();
291-
return transportService.getConnection(discoveryNode);
301+
return transportService.getConnection(getAnyConnectedNode());
292302
}
293303

294304
@Override
@@ -385,7 +395,7 @@ public void onFailure(Exception e) {
385395
}
386396

387397
@Override
388-
protected void doRun() throws Exception {
398+
protected void doRun() {
389399
ActionListener<Void> listener = ActionListener.wrap((x) -> {
390400
synchronized (queue) {
391401
running.release();
@@ -590,8 +600,8 @@ boolean isNodeConnected(final DiscoveryNode node) {
590600
return connectedNodes.contains(node);
591601
}
592602

593-
DiscoveryNode getConnectedNode() {
594-
return connectedNodes.get();
603+
DiscoveryNode getAnyConnectedNode() {
604+
return connectedNodes.getAny();
595605
}
596606

597607
void addConnectedNode(DiscoveryNode node) {
@@ -612,7 +622,7 @@ int getNumNodesConnected() {
612622
return connectedNodes.size();
613623
}
614624

615-
private static class ConnectedNodes implements Supplier<DiscoveryNode> {
625+
private static final class ConnectedNodes {
616626

617627
private final Set<DiscoveryNode> nodeSet = new HashSet<>();
618628
private final String clusterAlias;
@@ -623,8 +633,7 @@ private ConnectedNodes(String clusterAlias) {
623633
this.clusterAlias = clusterAlias;
624634
}
625635

626-
@Override
627-
public synchronized DiscoveryNode get() {
636+
public synchronized DiscoveryNode getAny() {
628637
ensureIteratorAvailable();
629638
if (currentIterator.hasNext()) {
630639
return currentIterator.next();
@@ -657,15 +666,6 @@ synchronized boolean contains(DiscoveryNode node) {
657666
return nodeSet.contains(node);
658667
}
659668

660-
synchronized Optional<DiscoveryNode> getAny() {
661-
ensureIteratorAvailable();
662-
if (currentIterator.hasNext()) {
663-
return Optional.of(currentIterator.next());
664-
} else {
665-
return Optional.empty();
666-
}
667-
}
668-
669669
private synchronized void ensureIteratorAvailable() {
670670
if (currentIterator == null) {
671671
currentIterator = nodeSet.iterator();

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import static org.hamcrest.Matchers.iterableWithSize;
8282
import static org.hamcrest.Matchers.not;
8383
import static org.hamcrest.Matchers.notNullValue;
84+
import static org.hamcrest.Matchers.sameInstance;
8485
import static org.hamcrest.Matchers.startsWith;
8586

8687
public class RemoteClusterConnectionTests extends ESTestCase {
@@ -992,7 +993,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted
992993
barrier.await();
993994
for (int j = 0; j < numGetCalls; j++) {
994995
try {
995-
DiscoveryNode node = connection.getConnectedNode();
996+
DiscoveryNode node = connection.getAnyConnectedNode();
996997
assertNotNull(node);
997998
} catch (IllegalStateException e) {
998999
if (e.getMessage().startsWith("No node available for cluster:") == false) {
@@ -1053,10 +1054,10 @@ public void testClusterNameIsChecked() throws Exception {
10531054
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings);
10541055
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool,
10551056
settings);
1056-
MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
1057-
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build());
1058-
MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
1059-
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) {
1057+
MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
1058+
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build());
1059+
MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
1060+
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) {
10601061
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
10611062
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
10621063
knownNodes.add(seedTransport.getLocalDiscoNode());
@@ -1093,4 +1094,76 @@ public void testClusterNameIsChecked() throws Exception {
10931094
}
10941095
}
10951096
}
1097+
1098+
public void testGetConnection() throws Exception {
1099+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
1100+
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
1101+
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
1102+
1103+
DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode();
1104+
assertThat(connectedNode, notNullValue());
1105+
knownNodes.add(connectedNode);
1106+
1107+
DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode();
1108+
assertThat(disconnectedNode, notNullValue());
1109+
knownNodes.add(disconnectedNode);
1110+
1111+
try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
1112+
Transport.Connection seedConnection = new Transport.Connection() {
1113+
@Override
1114+
public DiscoveryNode getNode() {
1115+
return connectedNode;
1116+
}
1117+
1118+
@Override
1119+
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
1120+
throws TransportException {
1121+
// no-op
1122+
}
1123+
1124+
@Override
1125+
public void close() {
1126+
// no-op
1127+
}
1128+
};
1129+
service.addDelegate(connectedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) {
1130+
@Override
1131+
public Connection getConnection(DiscoveryNode node) {
1132+
if (node == connectedNode) {
1133+
return seedConnection;
1134+
}
1135+
return super.getConnection(node);
1136+
}
1137+
1138+
@Override
1139+
public boolean nodeConnected(DiscoveryNode node) {
1140+
return node.equals(connectedNode);
1141+
}
1142+
});
1143+
service.start();
1144+
service.acceptIncomingRequests();
1145+
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
1146+
Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) {
1147+
connection.addConnectedNode(connectedNode);
1148+
for (int i = 0; i < 10; i++) {
1149+
//always a direct connection as the remote node is already connected
1150+
Transport.Connection remoteConnection = connection.getConnection(connectedNode);
1151+
assertSame(seedConnection, remoteConnection);
1152+
}
1153+
for (int i = 0; i < 10; i++) {
1154+
//always a direct connection as the remote node is already connected
1155+
Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode());
1156+
assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class)));
1157+
assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode()));
1158+
}
1159+
for (int i = 0; i < 10; i++) {
1160+
//always a proxy connection as the target node is not connected
1161+
Transport.Connection remoteConnection = connection.getConnection(disconnectedNode);
1162+
assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class));
1163+
assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode));
1164+
}
1165+
}
1166+
}
1167+
}
1168+
}
10961169
}

0 commit comments

Comments
 (0)