Skip to content

Commit 1f40f8a

Browse files
authored
Introduce incremental reduction of TopDocs (#23946)
This commit adds support for incremental top N reduction if the number of expected shards in the search request is high enough. The changes here also clean up more code in SearchPhaseController to make the separation between values that are the same on each search result and values that are per response. The reduced search phase result doesn't hold an arbitrary result to obtain values like `from`, `size` or sort values which is now cleanly encapsulated.
1 parent b636ca7 commit 1f40f8a

File tree

11 files changed

+490
-227
lines changed

11 files changed

+490
-227
lines changed

core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,27 +98,26 @@ private void innerRun() throws IOException {
9898
final int numShards = context.getNumShards();
9999
final boolean isScrollSearch = context.getRequest().scroll() != null;
100100
List<SearchPhaseResult> phaseResults = queryResults.asList();
101-
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(isScrollSearch, phaseResults);
102101
String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null;
103102
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();
104103
final boolean queryAndFetchOptimization = queryResults.length() == 1;
105104
final Runnable finishPhase = ()
106-
-> moveToNextPhase(searchPhaseController, sortedShardDocs, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
105+
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
107106
queryResults : fetchResults);
108107
if (queryAndFetchOptimization) {
109108
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null;
110109
// query AND fetch optimization
111110
finishPhase.run();
112111
} else {
113-
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, sortedShardDocs);
114-
if (sortedShardDocs.length == 0) { // no docs to fetch -- sidestep everything and return
112+
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
113+
if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
115114
phaseResults.stream()
116115
.map(e -> e.queryResult())
117116
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
118117
finishPhase.run();
119118
} else {
120119
final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
121-
searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs, numShards)
120+
searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
122121
: null;
123122
final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),
124123
docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
@@ -188,7 +187,7 @@ public void onFailure(Exception e) {
188187
private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
189188
// we only release search context that we did not fetch from if we are not scrolling
190189
// and if it has at lease one hit that didn't make it to the global topDocs
191-
if (context.getRequest().scroll() == null && queryResult.hasHits()) {
190+
if (context.getRequest().scroll() == null && queryResult.hasSearchContext()) {
192191
try {
193192
Transport.Connection connection = context.getConnection(queryResult.getSearchShardTarget().getNodeId());
194193
context.sendReleaseSearchContext(queryResult.getRequestId(), connection);
@@ -198,11 +197,11 @@ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
198197
}
199198
}
200199

201-
private void moveToNextPhase(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
200+
private void moveToNextPhase(SearchPhaseController searchPhaseController,
202201
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
203202
AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
204203
final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
205-
sortedDocs, reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
204+
reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
206205
context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId)));
207206
}
208207

core/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 195 additions & 104 deletions
Large diffs are not rendered by default.

core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,8 @@ private void finishHim() {
173173

174174
private void innerFinishHim() throws Exception {
175175
List<QueryFetchSearchResult> queryFetchSearchResults = queryFetchResults.asList();
176-
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults.asList());
177-
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs,
178-
searchPhaseController.reducedQueryPhase(queryFetchSearchResults), queryFetchSearchResults, queryFetchResults::get);
176+
final InternalSearchResponse internalResponse = searchPhaseController.merge(true,
177+
searchPhaseController.reducedQueryPhase(queryFetchSearchResults, true), queryFetchSearchResults, queryFetchResults::get);
179178
String scrollId = null;
180179
if (request.scroll() != null) {
181180
scrollId = request.scrollId();

core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ final class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
5555
private volatile AtomicArray<ShardSearchFailure> shardFailures;
5656
final AtomicArray<QuerySearchResult> queryResults;
5757
final AtomicArray<FetchSearchResult> fetchResults;
58-
private volatile ScoreDoc[] sortedShardDocs;
5958
private final AtomicInteger successfulOps;
6059

6160
SearchScrollQueryThenFetchAsyncAction(Logger logger, ClusterService clusterService, SearchTransportService searchTransportService,
@@ -171,16 +170,15 @@ void onQueryPhaseFailure(final int shardIndex, final CountDown counter, final lo
171170
}
172171

173172
private void executeFetchPhase() throws Exception {
174-
sortedShardDocs = searchPhaseController.sortDocs(true, queryResults.asList());
175-
if (sortedShardDocs.length == 0) {
176-
finishHim(searchPhaseController.reducedQueryPhase(queryResults.asList()));
173+
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList(),
174+
true);
175+
if (reducedQueryPhase.scoreDocs.length == 0) {
176+
finishHim(reducedQueryPhase);
177177
return;
178178
}
179179

180-
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), sortedShardDocs);
181-
SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(queryResults.asList());
182-
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, sortedShardDocs,
183-
queryResults.length());
180+
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), reducedQueryPhase.scoreDocs);
181+
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, queryResults.length());
184182
final CountDown counter = new CountDown(docIdsToLoad.length);
185183
for (int i = 0; i < docIdsToLoad.length; i++) {
186184
final int index = i;
@@ -222,8 +220,8 @@ public void onFailure(Exception t) {
222220

223221
private void finishHim(SearchPhaseController.ReducedQueryPhase queryPhase) {
224222
try {
225-
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryPhase,
226-
fetchResults.asList(), fetchResults::get);
223+
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, queryPhase, fetchResults.asList(),
224+
fetchResults::get);
227225
String scrollId = null;
228226
if (request.scroll() != null) {
229227
scrollId = request.scrollId();

core/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,6 @@
3131
* to get the concrete values as a list using {@link #asList()}.
3232
*/
3333
public class AtomicArray<E> {
34-
35-
private static final AtomicArray EMPTY = new AtomicArray(0);
36-
37-
@SuppressWarnings("unchecked")
38-
public static <E> E empty() {
39-
return (E) EMPTY;
40-
}
41-
4234
private final AtomicReferenceArray<E> array;
4335
private volatile List<E> nonNullList;
4436

@@ -53,7 +45,6 @@ public int length() {
5345
return array.length();
5446
}
5547

56-
5748
/**
5849
* Sets the element at position {@code i} to the given value.
5950
*

core/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ public SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTas
259259

260260
loadOrExecuteQueryPhase(request, context);
261261

262-
if (context.queryResult().hasHits() == false && context.scrollContext() == null) {
262+
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
263263
freeContext(context.id());
264264
} else {
265265
contextProcessedSuccessfully(context);
@@ -341,7 +341,7 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request, SearchTas
341341
operationListener.onPreQueryPhase(context);
342342
long time = System.nanoTime();
343343
queryPhase.execute(context);
344-
if (context.queryResult().hasHits() == false && context.scrollContext() == null) {
344+
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
345345
// no hits, we can release the context since there will be no fetch phase
346346
freeContext(context.id());
347347
} else {

core/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void execute(SearchContext context) {
166166
fetchSubPhase.hitsExecute(context, hits);
167167
}
168168

169-
context.fetchResult().hits(new SearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore()));
169+
context.fetchResult().hits(new SearchHits(hits, context.queryResult().getTotalHits(), context.queryResult().getMaxScore()));
170170
}
171171

172172
private int findRootDocumentIfNested(SearchContext context, LeafReaderContext subReaderContext, int subDocId) throws IOException {

core/src/main/java/org/elasticsearch/search/query/QueryPhase.java

Lines changed: 25 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,6 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher
142142
queryResult.searchTimedOut(false);
143143

144144
final boolean doProfile = searchContext.getProfilers() != null;
145-
final SearchType searchType = searchContext.searchType();
146145
boolean rescore = false;
147146
try {
148147
queryResult.from(searchContext.from());
@@ -165,12 +164,7 @@ static boolean execute(SearchContext searchContext, final IndexSearcher searcher
165164
if (searchContext.getProfilers() != null) {
166165
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_COUNT, Collections.emptyList());
167166
}
168-
topDocsCallable = new Callable<TopDocs>() {
169-
@Override
170-
public TopDocs call() throws Exception {
171-
return new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0);
172-
}
173-
};
167+
topDocsCallable = () -> new TopDocs(totalHitCountCollector.getTotalHits(), Lucene.EMPTY_SCORE_DOCS, 0);
174168
} else {
175169
// Perhaps have a dedicated scroll phase?
176170
final ScrollContext scrollContext = searchContext.scrollContext();
@@ -238,38 +232,35 @@ public TopDocs call() throws Exception {
238232
if (doProfile) {
239233
collector = new InternalProfileCollector(collector, CollectorResult.REASON_SEARCH_TOP_HITS, Collections.emptyList());
240234
}
241-
topDocsCallable = new Callable<TopDocs>() {
242-
@Override
243-
public TopDocs call() throws Exception {
244-
final TopDocs topDocs;
245-
if (topDocsCollector instanceof TopDocsCollector) {
246-
topDocs = ((TopDocsCollector<?>) topDocsCollector).topDocs();
247-
} else if (topDocsCollector instanceof CollapsingTopDocsCollector) {
248-
topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs();
235+
topDocsCallable = () -> {
236+
final TopDocs topDocs;
237+
if (topDocsCollector instanceof TopDocsCollector) {
238+
topDocs = ((TopDocsCollector<?>) topDocsCollector).topDocs();
239+
} else if (topDocsCollector instanceof CollapsingTopDocsCollector) {
240+
topDocs = ((CollapsingTopDocsCollector) topDocsCollector).getTopDocs();
241+
} else {
242+
throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName());
243+
}
244+
if (scrollContext != null) {
245+
if (scrollContext.totalHits == -1) {
246+
// first round
247+
scrollContext.totalHits = topDocs.totalHits;
248+
scrollContext.maxScore = topDocs.getMaxScore();
249249
} else {
250-
throw new IllegalStateException("Unknown top docs collector " + topDocsCollector.getClass().getName());
250+
// subsequent round: the total number of hits and
251+
// the maximum score were computed on the first round
252+
topDocs.totalHits = scrollContext.totalHits;
253+
topDocs.setMaxScore(scrollContext.maxScore);
251254
}
252-
if (scrollContext != null) {
253-
if (scrollContext.totalHits == -1) {
254-
// first round
255-
scrollContext.totalHits = topDocs.totalHits;
256-
scrollContext.maxScore = topDocs.getMaxScore();
257-
} else {
258-
// subsequent round: the total number of hits and
259-
// the maximum score were computed on the first round
260-
topDocs.totalHits = scrollContext.totalHits;
261-
topDocs.setMaxScore(scrollContext.maxScore);
262-
}
263-
if (searchContext.request().numberOfShards() == 1) {
264-
// if we fetch the document in the same roundtrip, we already know the last emitted doc
265-
if (topDocs.scoreDocs.length > 0) {
266-
// set the last emitted doc
267-
scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
268-
}
255+
if (searchContext.request().numberOfShards() == 1) {
256+
// if we fetch the document in the same roundtrip, we already know the last emitted doc
257+
if (topDocs.scoreDocs.length > 0) {
258+
// set the last emitted doc
259+
scrollContext.lastEmittedDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
269260
}
270261
}
271-
return topDocs;
272262
}
263+
return topDocs;
273264
};
274265
}
275266

core/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ public final class QuerySearchResult extends SearchPhaseResult {
5555
private Boolean terminatedEarly = null;
5656
private ProfileShardResult profileShardResults;
5757
private boolean hasProfileResults;
58+
private boolean hasScoreDocs;
59+
private int totalHits;
60+
private float maxScore;
5861

5962
public QuerySearchResult() {
6063
}
@@ -87,11 +90,34 @@ public Boolean terminatedEarly() {
8790
}
8891

8992
public TopDocs topDocs() {
93+
if (topDocs == null) {
94+
throw new IllegalStateException("topDocs already consumed");
95+
}
96+
return topDocs;
97+
}
98+
99+
/**
100+
* Returns <code>true</code> iff the top docs have already been consumed.
101+
*/
102+
public boolean hasConsumedTopDocs() {
103+
return topDocs == null;
104+
}
105+
106+
/**
107+
* Returns and nulls out the top docs for this search results. This allows to free up memory once the top docs are consumed.
108+
* @throws IllegalStateException if the top docs have already been consumed.
109+
*/
110+
public TopDocs consumeTopDocs() {
111+
TopDocs topDocs = this.topDocs;
112+
if (topDocs == null) {
113+
throw new IllegalStateException("topDocs already consumed");
114+
}
115+
this.topDocs = null;
90116
return topDocs;
91117
}
92118

93119
public void topDocs(TopDocs topDocs, DocValueFormat[] sortValueFormats) {
94-
this.topDocs = topDocs;
120+
setTopDocs(topDocs);
95121
if (topDocs.scoreDocs.length > 0 && topDocs.scoreDocs[0] instanceof FieldDoc) {
96122
int numFields = ((FieldDoc) topDocs.scoreDocs[0]).fields.length;
97123
if (numFields != sortValueFormats.length) {
@@ -102,12 +128,19 @@ public void topDocs(TopDocs topDocs, DocValueFormat[] sortValueFormats) {
102128
this.sortValueFormats = sortValueFormats;
103129
}
104130

131+
private void setTopDocs(TopDocs topDocs) {
132+
this.topDocs = topDocs;
133+
hasScoreDocs = topDocs.scoreDocs.length > 0;
134+
this.totalHits = topDocs.totalHits;
135+
this.maxScore = topDocs.getMaxScore();
136+
}
137+
105138
public DocValueFormat[] sortValueFormats() {
106139
return sortValueFormats;
107140
}
108141

109142
/**
110-
* Retruns <code>true</code> if this query result has unconsumed aggregations
143+
* Returns <code>true</code> if this query result has unconsumed aggregations
111144
*/
112145
public boolean hasAggs() {
113146
return hasAggs;
@@ -195,10 +228,15 @@ public QuerySearchResult size(int size) {
195228
return this;
196229
}
197230

198-
/** Returns true iff the result has hits */
199-
public boolean hasHits() {
200-
return (topDocs != null && topDocs.scoreDocs.length > 0) ||
201-
(suggest != null && suggest.hasScoreDocs());
231+
/**
232+
* Returns <code>true</code> if this result has any suggest score docs
233+
*/
234+
public boolean hasSuggestHits() {
235+
return (suggest != null && suggest.hasScoreDocs());
236+
}
237+
238+
public boolean hasSearchContext() {
239+
return hasScoreDocs || hasSuggestHits();
202240
}
203241

204242
public static QuerySearchResult readQuerySearchResult(StreamInput in) throws IOException {
@@ -227,7 +265,7 @@ public void readFromWithId(long id, StreamInput in) throws IOException {
227265
sortValueFormats[i] = in.readNamedWriteable(DocValueFormat.class);
228266
}
229267
}
230-
topDocs = readTopDocs(in);
268+
setTopDocs(readTopDocs(in));
231269
if (hasAggs = in.readBoolean()) {
232270
aggregations = InternalAggregations.readAggregations(in);
233271
}
@@ -278,4 +316,12 @@ public void writeToNoId(StreamOutput out) throws IOException {
278316
out.writeOptionalBoolean(terminatedEarly);
279317
out.writeOptionalWriteable(profileShardResults);
280318
}
319+
320+
public int getTotalHits() {
321+
return totalHits;
322+
}
323+
324+
public float getMaxScore() {
325+
return maxScore;
326+
}
281327
}

0 commit comments

Comments
 (0)