Skip to content

Commit dd1ce50

Browse files
authored
Split search in two when made against throttled and non throttled searches (elastic#42510)
When a search on some indices takes a long time, it may cause problems to other indices that are being searched as part of the same search request and being written to as well, because their search context needs to stay open for a long time. This is especially a problem when searching against throttled and non-throttled indices as part of the same request. The problem can be generalized though: this may happen whenever read-only indices are searched together with indices that are being written to. Search contexts staying open for a long time is only an issue for indices that are being written to, in practice. This commit splits the search in two sub-searches: one for read-only indices, and one for ordinary indices. This way the two don't interfere with each other. The split is done only when size is greater than 0, no scroll is provided and query_then_fetch is used as search type. Otherwise, the search executes like before. Note that the returned num_reduce_phases reflect the number of reduction phases that were run. If the search is split in two, there are three reductions: one non-final for each search, and a final one that merges the results of the previous two. Closes elastic#40900
1 parent 4d0c375 commit dd1ce50

File tree

4 files changed

+289
-44
lines changed

4 files changed

+289
-44
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 134 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.action.support.IndicesOptions;
3030
import org.elasticsearch.client.Client;
3131
import org.elasticsearch.cluster.ClusterState;
32+
import org.elasticsearch.cluster.block.ClusterBlockException;
3233
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3334
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3435
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -472,10 +473,89 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
472473
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
473474
searchRequest.indices());
474475
routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
475-
String[] concreteIndices = new String[indices.length];
476-
for (int i = 0; i < indices.length; i++) {
477-
concreteIndices[i] = indices[i].getName();
476+
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);
477+
478+
if (shouldSplitIndices(searchRequest)) {
479+
//Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible.
480+
//Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other
481+
//indices (possibly slower) being searched at the same time.
482+
List<String> writeIndicesList = new ArrayList<>();
483+
List<String> readOnlyIndicesList = new ArrayList<>();
484+
splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList);
485+
String[] writeIndices = writeIndicesList.toArray(new String[0]);
486+
String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]);
487+
488+
if (readOnlyIndices.length == 0) {
489+
executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap,
490+
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
491+
} else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) {
492+
executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap,
493+
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
494+
} else {
495+
//Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so
496+
//that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices.
497+
CountDown countDown = new CountDown(2);
498+
AtomicReference<Exception> exceptions = new AtomicReference<>();
499+
SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider,
500+
searchService::createReduceContext);
501+
CountDownActionListener<SearchResponse, SearchResponse> countDownActionListener =
502+
new CountDownActionListener<>(countDown, exceptions, listener) {
503+
@Override
504+
void innerOnResponse(SearchResponse searchResponse) {
505+
searchResponseMerger.add(searchResponse);
506+
}
507+
508+
@Override
509+
SearchResponse createFinalResponse() {
510+
return searchResponseMerger.getMergedResponse(clusters);
511+
}
512+
};
513+
514+
//Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
515+
//will be provided separately to executeSearch.
516+
SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices,
517+
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
518+
executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap,
519+
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener,
520+
SearchResponse.Clusters.EMPTY);
521+
522+
//Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
523+
//will be provided separately to executeSearch.
524+
SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices,
525+
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
526+
executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap,
527+
aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener,
528+
SearchResponse.Clusters.EMPTY);
529+
}
530+
} else {
531+
String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new);
532+
executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap,
533+
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
534+
}
535+
}
536+
537+
static boolean shouldSplitIndices(SearchRequest searchRequest) {
538+
return searchRequest.scroll() == null && searchRequest.searchType() != DFS_QUERY_THEN_FETCH
539+
&& (searchRequest.source() == null || searchRequest.source().size() != 0);
540+
}
541+
542+
static void splitIndices(Index[] indices, ClusterState clusterState, List<String> writeIndices, List<String> readOnlyIndices) {
543+
for (Index index : indices) {
544+
ClusterBlockException writeBlock = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, index.getName());
545+
if (writeBlock == null) {
546+
writeIndices.add(index.getName());
547+
} else {
548+
readOnlyIndices.add(index.getName());
549+
}
478550
}
551+
}
552+
553+
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
554+
OriginalIndices localIndices, String[] concreteIndices, Map<String, Set<String>> routingMap,
555+
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
556+
List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections,
557+
ClusterState clusterState, ActionListener<SearchResponse> listener, SearchResponse.Clusters clusters) {
558+
479559
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
480560
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
481561
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
@@ -484,8 +564,6 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
484564

485565
failIfOverShardCountLimit(clusterService, shardIterators.size());
486566

487-
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);
488-
489567
// optimize search type for cases where there is only one shard group to search on
490568
if (shardIterators.size() == 1) {
491569
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
@@ -498,11 +576,9 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
498576
if (searchRequest.isSuggestOnly()) {
499577
// disable request cache if we have only suggest
500578
searchRequest.requestCache(false);
501-
switch (searchRequest.searchType()) {
502-
case DFS_QUERY_THEN_FETCH:
503-
// convert to Q_T_F if we have only suggest
504-
searchRequest.searchType(QUERY_THEN_FETCH);
505-
break;
579+
if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {
580+
// convert to Q_T_F if we have only suggest
581+
searchRequest.searchType(QUERY_THEN_FETCH);
506582
}
507583
}
508584

@@ -611,22 +687,16 @@ private static void failIfOverShardCountLimit(ClusterService clusterService, int
611687
}
612688
}
613689

614-
abstract static class CCSActionListener<Response, FinalResponse> implements ActionListener<Response> {
615-
private final String clusterAlias;
616-
private final boolean skipUnavailable;
690+
abstract static class CountDownActionListener<Response, FinalResponse> implements ActionListener<Response> {
617691
private final CountDown countDown;
618-
private final AtomicInteger skippedClusters;
619692
private final AtomicReference<Exception> exceptions;
620-
private final ActionListener<FinalResponse> originalListener;
693+
private final ActionListener<FinalResponse> delegateListener;
621694

622-
CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters,
623-
AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
624-
this.clusterAlias = clusterAlias;
625-
this.skipUnavailable = skipUnavailable;
695+
CountDownActionListener(CountDown countDown, AtomicReference<Exception> exceptions,
696+
ActionListener<FinalResponse> delegateListener) {
626697
this.countDown = countDown;
627-
this.skippedClusters = skippedClusters;
628698
this.exceptions = exceptions;
629-
this.originalListener = originalListener;
699+
this.delegateListener = delegateListener;
630700
}
631701

632702
@Override
@@ -637,44 +707,64 @@ public final void onResponse(Response response) {
637707

638708
abstract void innerOnResponse(Response response);
639709

640-
@Override
641-
public final void onFailure(Exception e) {
642-
if (skipUnavailable) {
643-
skippedClusters.incrementAndGet();
644-
} else {
645-
Exception exception = e;
646-
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) {
647-
exception = wrapRemoteClusterFailure(clusterAlias, e);
648-
}
649-
if (exceptions.compareAndSet(null, exception) == false) {
650-
exceptions.accumulateAndGet(exception, (previous, current) -> {
651-
current.addSuppressed(previous);
652-
return current;
653-
});
654-
}
655-
}
656-
maybeFinish();
657-
}
658-
659-
private void maybeFinish() {
710+
final void maybeFinish() {
660711
if (countDown.countDown()) {
661712
Exception exception = exceptions.get();
662713
if (exception == null) {
663714
FinalResponse response;
664715
try {
665716
response = createFinalResponse();
666717
} catch(Exception e) {
667-
originalListener.onFailure(e);
718+
delegateListener.onFailure(e);
668719
return;
669720
}
670-
originalListener.onResponse(response);
721+
delegateListener.onResponse(response);
671722
} else {
672-
originalListener.onFailure(exceptions.get());
723+
delegateListener.onFailure(exceptions.get());
673724
}
674725
}
675726
}
676727

677728
abstract FinalResponse createFinalResponse();
729+
730+
@Override
731+
public void onFailure(Exception e) {
732+
if (exceptions.compareAndSet(null, e) == false) {
733+
exceptions.accumulateAndGet(e, (previous, current) -> {
734+
current.addSuppressed(previous);
735+
return current;
736+
});
737+
}
738+
maybeFinish();
739+
}
740+
}
741+
742+
abstract static class CCSActionListener<Response, FinalResponse> extends CountDownActionListener<Response, FinalResponse> {
743+
private final String clusterAlias;
744+
private final boolean skipUnavailable;
745+
private final AtomicInteger skippedClusters;
746+
747+
CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters,
748+
AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
749+
super(countDown, exceptions, originalListener);
750+
this.clusterAlias = clusterAlias;
751+
this.skipUnavailable = skipUnavailable;
752+
this.skippedClusters = skippedClusters;
753+
}
754+
755+
@Override
756+
public final void onFailure(Exception e) {
757+
if (skipUnavailable) {
758+
skippedClusters.incrementAndGet();
759+
maybeFinish();
760+
} else {
761+
Exception exception = e;
762+
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) {
763+
exception = wrapRemoteClusterFailure(clusterAlias, e);
764+
}
765+
super.onFailure(exception);
766+
}
767+
}
678768
}
679769

680770
private static RemoteTransportException wrapRemoteClusterFailure(String clusterAlias, Exception e) {

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919

2020
package org.elasticsearch.action.search;
2121

22+
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2223
import org.elasticsearch.action.index.IndexRequest;
2324
import org.elasticsearch.action.index.IndexResponse;
2425
import org.elasticsearch.action.support.IndicesOptions;
2526
import org.elasticsearch.action.support.WriteRequest;
27+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
2628
import org.elasticsearch.common.Strings;
29+
import org.elasticsearch.common.settings.Settings;
2730
import org.elasticsearch.index.query.RangeQueryBuilder;
2831
import org.elasticsearch.rest.RestStatus;
2932
import org.elasticsearch.search.SearchHit;
@@ -174,4 +177,62 @@ public void testFinalReduce() {
174177
assertEquals(2, longTerms.getBuckets().size());
175178
}
176179
}
180+
181+
public void testSplitIndices() {
182+
{
183+
CreateIndexResponse response = client().admin().indices().prepareCreate("write").get();
184+
assertTrue(response.isAcknowledged());
185+
}
186+
{
187+
CreateIndexResponse response = client().admin().indices().prepareCreate("readonly").get();
188+
assertTrue(response.isAcknowledged());
189+
}
190+
{
191+
SearchResponse response = client().prepareSearch("readonly").get();
192+
assertEquals(1, response.getTotalShards());
193+
assertEquals(1, response.getSuccessfulShards());
194+
assertEquals(1, response.getNumReducePhases());
195+
}
196+
{
197+
SearchResponse response = client().prepareSearch("write").get();
198+
assertEquals(1, response.getTotalShards());
199+
assertEquals(1, response.getSuccessfulShards());
200+
assertEquals(1, response.getNumReducePhases());
201+
}
202+
{
203+
SearchResponse response = client().prepareSearch("readonly", "write").get();
204+
assertEquals(2, response.getTotalShards());
205+
assertEquals(2, response.getSuccessfulShards());
206+
assertEquals(1, response.getNumReducePhases());
207+
}
208+
{
209+
Settings settings = Settings.builder().put("index.blocks.read_only", "true").build();
210+
AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get();
211+
assertTrue(response.isAcknowledged());
212+
}
213+
try {
214+
{
215+
SearchResponse response = client().prepareSearch("readonly").get();
216+
assertEquals(1, response.getTotalShards());
217+
assertEquals(1, response.getSuccessfulShards());
218+
assertEquals(1, response.getNumReducePhases());
219+
}
220+
{
221+
SearchResponse response = client().prepareSearch("write").get();
222+
assertEquals(1, response.getTotalShards());
223+
assertEquals(1, response.getSuccessfulShards());
224+
assertEquals(1, response.getNumReducePhases());
225+
}
226+
{
227+
SearchResponse response = client().prepareSearch("readonly", "write").get();
228+
assertEquals(2, response.getTotalShards());
229+
assertEquals(2, response.getSuccessfulShards());
230+
assertEquals(3, response.getNumReducePhases());
231+
}
232+
} finally {
233+
Settings settings = Settings.builder().put("index.blocks.read_only", "false").build();
234+
AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get();
235+
assertTrue(response.isAcknowledged());
236+
}
237+
}
177238
}

0 commit comments

Comments
 (0)