Skip to content

Commit 12f5b02

Browse files
authored
Streamline skip_unavailable handling (#37672)
This commit moves the collectSearchShards method out of RemoteClusterService into TransportSearchAction that currently calls it. RemoteClusterService used to be used only for cross-cluster search but is now also used in cross-cluster replication where different API are called through the RemoteClusterAwareClient. There is no reason for the collectSearchShards and fetchShards methods to be respectively in RemoteClusterService and RemoteClusterConnection. The search shards API can be called through the RemoteClusterAwareClient too, the only missing bit is a way to handle failures based on the skip_unavailable setting for each cluster (currently only supported in RemoteClusterConnection#fetchShards) which is achieved by adding a isSkipUnavailable(String clusterAlias) method to RemoteClusterService. This change is useful for #32125 as we will very soon need to also call the search API against remote clusters, which will be done through RemoteClusterAwareClient. In that case we will also need to support skip_unavailable when calling the search API so we need some way to handle the skip_unavailable setting like we currently do for the search_shards call. Relates to #32125
1 parent d5139e0 commit 12f5b02

File tree

8 files changed

+445
-543
lines changed

8 files changed

+445
-543
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,11 @@
2929

3030
import java.io.IOException;
3131
import java.util.Arrays;
32-
import java.util.Collections;
3332
import java.util.HashMap;
3433
import java.util.Map;
3534

3635
public class ClusterSearchShardsResponse extends ActionResponse implements ToXContentObject {
3736

38-
public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
39-
new DiscoveryNode[0], Collections.emptyMap());
40-
4137
private final ClusterSearchShardsGroup[] groups;
4238
private final DiscoveryNode[] nodes;
4339
private final Map<String, AliasFilter> indicesAndFilters;

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 73 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
import org.elasticsearch.action.ActionListener;
2323
import org.elasticsearch.action.OriginalIndices;
2424
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
25+
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
2526
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
2627
import org.elasticsearch.action.support.ActionFilters;
2728
import org.elasticsearch.action.support.HandledTransportAction;
2829
import org.elasticsearch.action.support.IndicesOptions;
30+
import org.elasticsearch.client.Client;
2931
import org.elasticsearch.cluster.ClusterState;
3032
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3133
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -39,6 +41,7 @@
3941
import org.elasticsearch.common.io.stream.Writeable;
4042
import org.elasticsearch.common.settings.Setting;
4143
import org.elasticsearch.common.settings.Setting.Property;
44+
import org.elasticsearch.common.util.concurrent.CountDown;
4245
import org.elasticsearch.index.Index;
4346
import org.elasticsearch.index.query.Rewriteable;
4447
import org.elasticsearch.index.shard.ShardId;
@@ -50,6 +53,7 @@
5053
import org.elasticsearch.threadpool.ThreadPool;
5154
import org.elasticsearch.transport.RemoteClusterAware;
5255
import org.elasticsearch.transport.RemoteClusterService;
56+
import org.elasticsearch.transport.RemoteTransportException;
5357
import org.elasticsearch.transport.Transport;
5458
import org.elasticsearch.transport.TransportService;
5559

@@ -60,8 +64,11 @@
6064
import java.util.List;
6165
import java.util.Map;
6266
import java.util.Set;
67+
import java.util.concurrent.ConcurrentHashMap;
6368
import java.util.concurrent.Executor;
6469
import java.util.concurrent.TimeUnit;
70+
import java.util.concurrent.atomic.AtomicInteger;
71+
import java.util.concurrent.atomic.AtomicReference;
6572
import java.util.function.BiFunction;
6673
import java.util.function.Function;
6774
import java.util.function.LongSupplier;
@@ -195,17 +202,23 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
195202
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
196203
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY);
197204
} else {
198-
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
199-
searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
200-
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
201-
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
202-
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
203-
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
204-
SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses);
205-
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
206-
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
207-
clusters);
208-
}, listener::onFailure));
205+
AtomicInteger skippedClusters = new AtomicInteger(0);
206+
collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters,
207+
remoteClusterIndices, remoteClusterService, threadPool,
208+
ActionListener.wrap(
209+
searchShardsResponses -> {
210+
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
211+
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
212+
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(
213+
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
214+
int localClusters = localIndices == null ? 0 : 1;
215+
int totalClusters = remoteClusterIndices.size() + localClusters;
216+
int successfulClusters = searchShardsResponses.size() + localClusters;
217+
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
218+
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
219+
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));
220+
},
221+
listener::onFailure));
209222
}
210223
}, listener::onFailure);
211224
if (searchRequest.source() == null) {
@@ -216,18 +229,56 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
216229
}
217230
}
218231

219-
static SearchResponse.Clusters buildClusters(OriginalIndices localIndices, Map<String, OriginalIndices> remoteIndices,
220-
Map<String, ClusterSearchShardsResponse> searchShardsResponses) {
221-
int localClusters = localIndices == null ? 0 : 1;
222-
int totalClusters = remoteIndices.size() + localClusters;
223-
int successfulClusters = localClusters;
224-
for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) {
225-
if (searchShardsResponse != ClusterSearchShardsResponse.EMPTY) {
226-
successfulClusters++;
227-
}
232+
static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters,
233+
Map<String, OriginalIndices> remoteIndicesByCluster, RemoteClusterService remoteClusterService,
234+
ThreadPool threadPool, ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
235+
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
236+
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
237+
final AtomicReference<RemoteTransportException> transportException = new AtomicReference<>();
238+
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
239+
final String clusterAlias = entry.getKey();
240+
boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
241+
Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
242+
final String[] indices = entry.getValue().indices();
243+
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
244+
.indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
245+
clusterClient.admin().cluster().searchShards(searchShardsRequest, new ActionListener<ClusterSearchShardsResponse>() {
246+
@Override
247+
public void onResponse(ClusterSearchShardsResponse response) {
248+
searchShardsResponses.put(clusterAlias, response);
249+
maybeFinish();
250+
}
251+
252+
@Override
253+
public void onFailure(Exception e) {
254+
if (skipUnavailable) {
255+
skippedClusters.incrementAndGet();
256+
} else {
257+
RemoteTransportException exception =
258+
new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e);
259+
if (transportException.compareAndSet(null, exception) == false) {
260+
transportException.accumulateAndGet(exception, (previous, current) -> {
261+
current.addSuppressed(previous);
262+
return current;
263+
});
264+
}
265+
}
266+
maybeFinish();
267+
}
268+
269+
private void maybeFinish() {
270+
if (responsesCountDown.countDown()) {
271+
RemoteTransportException exception = transportException.get();
272+
if (exception == null) {
273+
listener.onResponse(searchShardsResponses);
274+
} else {
275+
listener.onFailure(transportException.get());
276+
}
277+
}
278+
}
279+
}
280+
);
228281
}
229-
int skippedClusters = totalClusters - successfulClusters;
230-
return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters);
231282
}
232283

233284
static BiFunction<String, String, DiscoveryNode> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses,

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 8 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,6 @@
2525
import org.apache.lucene.util.SetOnce;
2626
import org.elasticsearch.Version;
2727
import org.elasticsearch.action.ActionListener;
28-
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
29-
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
30-
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
3128
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
3229
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
3330
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@@ -62,7 +59,6 @@
6259
import java.util.concurrent.RejectedExecutionException;
6360
import java.util.concurrent.Semaphore;
6461
import java.util.concurrent.atomic.AtomicBoolean;
65-
import java.util.function.Consumer;
6662
import java.util.function.Function;
6763
import java.util.function.Predicate;
6864
import java.util.function.Supplier;
@@ -172,6 +168,13 @@ void updateSkipUnavailable(boolean skipUnavailable) {
172168
this.skipUnavailable = skipUnavailable;
173169
}
174170

171+
/**
172+
* Returns whether this cluster is configured to be skipped when unavailable
173+
*/
174+
boolean isSkipUnavailable() {
175+
return skipUnavailable;
176+
}
177+
175178
@Override
176179
public void onNodeDisconnected(DiscoveryNode node) {
177180
boolean remove = connectedNodes.remove(node);
@@ -181,67 +184,18 @@ public void onNodeDisconnected(DiscoveryNode node) {
181184
}
182185
}
183186

184-
/**
185-
* Fetches all shards for the search request from this remote connection. This is used to later run the search on the remote end.
186-
*/
187-
public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
188-
ActionListener<ClusterSearchShardsResponse> listener) {
189-
190-
final ActionListener<ClusterSearchShardsResponse> searchShardsListener;
191-
final Consumer<Exception> onConnectFailure;
192-
if (skipUnavailable) {
193-
onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY);
194-
searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY));
195-
} else {
196-
onConnectFailure = listener::onFailure;
197-
searchShardsListener = listener;
198-
}
199-
// in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on
200-
// the skip_unavailable setting
201-
ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure));
202-
}
203-
204187
/**
205188
* Ensures that this cluster is connected. If the cluster is connected this operation
206189
* will invoke the listener immediately.
207190
*/
208-
public void ensureConnected(ActionListener<Void> voidActionListener) {
191+
void ensureConnected(ActionListener<Void> voidActionListener) {
209192
if (connectedNodes.size() == 0) {
210193
connectHandler.connect(voidActionListener);
211194
} else {
212195
voidActionListener.onResponse(null);
213196
}
214197
}
215198

216-
private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
217-
final ActionListener<ClusterSearchShardsResponse> listener) {
218-
final DiscoveryNode node = getAnyConnectedNode();
219-
Transport.Connection connection = connectionManager.getConnection(node);
220-
transportService.sendRequest(connection, ClusterSearchShardsAction.NAME, searchShardsRequest, TransportRequestOptions.EMPTY,
221-
new TransportResponseHandler<ClusterSearchShardsResponse>() {
222-
223-
@Override
224-
public ClusterSearchShardsResponse read(StreamInput in) throws IOException {
225-
return new ClusterSearchShardsResponse(in);
226-
}
227-
228-
@Override
229-
public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
230-
listener.onResponse(clusterSearchShardsResponse);
231-
}
232-
233-
@Override
234-
public void handleException(TransportException e) {
235-
listener.onFailure(e);
236-
}
237-
238-
@Override
239-
public String executor() {
240-
return ThreadPool.Names.SEARCH;
241-
}
242-
});
243-
}
244-
245199
/**
246200
* Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function
247201
* that returns <code>null</code> if the node ID is not found.

0 commit comments

Comments
 (0)