Skip to content

Commit 28fa23b

Browse files
committed
Always rewrite search shard request outside of the search thread pool
This change ensures that the rewrite of the shard request is executed in the network thread or in the refresh listener when waiting for an active shard. This allows queries that rewrite to match_no_docs to bypass the search thread pool entirely even if the can_match phase was skipped (pre_filter_shard_size > number of shards). Coordinating nodes don't have the ability to create empty responses so this change also ensures that at least one shard creates a full empty response while the other can return null ones. This is needed since creating true empty responses on shards require to create concrete aggregators which would be too costly to build on a network thread. We should move this functionality to aggregation builders in a follow up but that would be a much bigger change. This change is also important for elastic#49601 since we want to add the ability to use the result of other shards to rewrite the request of subsequent ones. For instance if the first M shards have their top N computed, the top worst document in the global queue can be pass to subsequent shards that can then rewrite to match_no_docs if they can guarantee that they don't have any document better than the provided one.
1 parent 2854f5c commit 28fa23b

File tree

11 files changed

+612
-164
lines changed

11 files changed

+612
-164
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/search/140_pre_filter_search_shards.yml

+29
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,32 @@ setup:
153153
- match: { _shards.failed: 0 }
154154
- match: { hits.total: 2 }
155155
- length: { aggregations.idx_terms.buckets: 2 }
156+
157+
# check that empty responses are correctly handled when rewriting to match_no_docs
158+
- do:
159+
search:
160+
rest_total_hits_as_int: true
161+
# ensure that one shard can return empty response
162+
max_concurrent_shard_requests: 1
163+
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
164+
165+
- match: { _shards.total: 3 }
166+
- match: { _shards.successful: 3 }
167+
- match: { _shards.skipped : 0 }
168+
- match: { _shards.failed: 0 }
169+
- match: { hits.total: 2 }
170+
- length: { aggregations.idx_terms.buckets: 2 }
171+
172+
- do:
173+
search:
174+
rest_total_hits_as_int: true
175+
# ensure that one shard can return empty response
176+
max_concurrent_shard_requests: 2
177+
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }
178+
179+
- match: { _shards.total: 3 }
180+
- match: { _shards.successful: 3 }
181+
- match: { _shards.skipped : 0 }
182+
- match: { _shards.failed: 0 }
183+
- match: { hits.total: 0 }
184+
- length: { aggregations.idx_terms.buckets: 0 }

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.Set;
5252
import java.util.concurrent.ConcurrentHashMap;
5353
import java.util.concurrent.Executor;
54+
import java.util.concurrent.atomic.AtomicBoolean;
5455
import java.util.concurrent.atomic.AtomicInteger;
5556
import java.util.function.BiFunction;
5657
import java.util.stream.Collectors;
@@ -75,13 +76,14 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
7576
**/
7677
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
7778
private final SearchTask task;
78-
private final SearchPhaseResults<Result> results;
79+
protected final SearchPhaseResults<Result> results;
7980
private final long clusterStateVersion;
8081
private final Map<String, AliasFilter> aliasFilter;
8182
private final Map<String, Float> concreteIndexBoosts;
8283
private final Map<String, Set<String>> indexRoutings;
8384
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
8485
private final Object shardFailuresMutex = new Object();
86+
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
8587
private final AtomicInteger successfulOps = new AtomicInteger();
8688
private final AtomicInteger skippedOps = new AtomicInteger();
8789
private final SearchTimeProvider timeProvider;
@@ -462,9 +464,10 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg
462464
* @param result the result returned form the shard
463465
* @param shardIt the shard iterator
464466
*/
465-
private void onShardResult(Result result, SearchShardIterator shardIt) {
467+
protected void onShardResult(Result result, SearchShardIterator shardIt) {
466468
assert result.getShardIndex() != -1 : "shard index is not set";
467469
assert result.getSearchShardTarget() != null : "search shard target must not be null";
470+
hasShardResponse.set(true);
468471
successfulOps.incrementAndGet();
469472
results.consumeResult(result);
470473
if (logger.isTraceEnabled()) {
@@ -602,8 +605,13 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
602605
String indexName = shardIt.shardId().getIndex().getName();
603606
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
604607
.toArray(new String[0]);
605-
return new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
608+
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
606609
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings);
610+
// if we already received a search result we can inform the shard that it
611+
// can return a null response if the request rewrites to match none rather
612+
// than creating an empty response in the search thread pool.
613+
shardRequest.setMatchNoDocsReturnNullResponse(hasShardResponse.get());
614+
return shardRequest;
607615
}
608616

609617
/**

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

+39-27
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.Map;
6666
import java.util.function.Function;
6767
import java.util.function.IntFunction;
68+
import java.util.stream.Collectors;
6869

6970
public final class SearchPhaseController {
7071

@@ -427,6 +428,15 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
427428
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
428429
false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
429430
}
431+
int total = queryResults.size();
432+
queryResults = queryResults.stream()
433+
.filter(res -> res.queryResult().isNull() == false)
434+
.collect(Collectors.toList());
435+
String errorMsg = "must have at least one non-empty search result, got 0 out of " + total;
436+
assert queryResults.isEmpty() == false : errorMsg;
437+
if (queryResults.isEmpty()) {
438+
throw new IllegalStateException(errorMsg);
439+
}
430440
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
431441
final boolean hasSuggest = firstResult.suggest() != null;
432442
final boolean hasProfileResults = firstResult.hasProfileResults();
@@ -622,36 +632,38 @@ public void consumeResult(SearchPhaseResult result) {
622632
}
623633

624634
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
625-
if (index == bufferSize) {
635+
if (querySearchResult.isNull() == false) {
636+
if (index == bufferSize) {
637+
if (hasAggs) {
638+
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
639+
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
640+
Arrays.fill(aggsBuffer, null);
641+
aggsBuffer[0] = reducedAggs;
642+
}
643+
if (hasTopDocs) {
644+
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
645+
// we have to merge here in the same way we collect on a shard
646+
querySearchResult.from() + querySearchResult.size(), 0);
647+
Arrays.fill(topDocsBuffer, null);
648+
topDocsBuffer[0] = reducedTopDocs;
649+
}
650+
numReducePhases++;
651+
index = 1;
652+
if (hasAggs) {
653+
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
654+
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
655+
}
656+
}
657+
final int i = index++;
626658
if (hasAggs) {
627-
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
628-
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
629-
Arrays.fill(aggsBuffer, null);
630-
aggsBuffer[0] = reducedAggs;
659+
aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
631660
}
632661
if (hasTopDocs) {
633-
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
634-
// we have to merge here in the same way we collect on a shard
635-
querySearchResult.from() + querySearchResult.size(), 0);
636-
Arrays.fill(topDocsBuffer, null);
637-
topDocsBuffer[0] = reducedTopDocs;
662+
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
663+
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
664+
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
665+
topDocsBuffer[i] = topDocs.topDocs;
638666
}
639-
numReducePhases++;
640-
index = 1;
641-
if (hasAggs) {
642-
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
643-
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
644-
}
645-
}
646-
final int i = index++;
647-
if (hasAggs) {
648-
aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
649-
}
650-
if (hasTopDocs) {
651-
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
652-
topDocsStats.add(topDocs, querySearchResult.searchTimedOut(), querySearchResult.terminatedEarly());
653-
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
654-
topDocsBuffer[i] = topDocs.topDocs;
655667
}
656668
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
657669
}
@@ -731,7 +743,7 @@ ReducedQueryPhase reduce() {
731743

732744
static final class TopDocsStats {
733745
final int trackTotalHitsUpTo;
734-
private long totalHits;
746+
long totalHits;
735747
private TotalHits.Relation totalHitsRelation;
736748
long fetchHits;
737749
private float maxScore = Float.NEGATIVE_INFINITY;

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+7
Original file line numberDiff line numberDiff line change
@@ -1210,6 +1210,13 @@ private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scop
12101210
markSearcherAccessed();
12111211
final Engine engine = getEngine();
12121212
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
1213+
return wrapSearcher(searcher);
1214+
}
1215+
1216+
/**
1217+
* Wraps the provided searcher acquired with {@link #acquireSearcherNoWrap(String)}.
1218+
*/
1219+
public Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
12131220
assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())
12141221
!= null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";
12151222
boolean success = false;

0 commit comments

Comments
 (0)