Skip to content

Commit 81971ac

Browse files
Use collector manager for search when necessary (#45829)
When we optimize sort, we sort segments by their min/max value. As a collector expects to have segments in order, we can not use a single collector for sorted segments. Thus for such a case, we use collectorManager, where for every segment a dedicated collector will be created.
1 parent c4c3b66 commit 81971ac

File tree

4 files changed

+146
-76
lines changed

4 files changed

+146
-76
lines changed

server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.lucene.search.CollectionStatistics;
2929
import org.apache.lucene.search.CollectionTerminatedException;
3030
import org.apache.lucene.search.Collector;
31+
import org.apache.lucene.search.CollectorManager;
3132
import org.apache.lucene.search.ConjunctionDISI;
3233
import org.apache.lucene.search.DocIdSetIterator;
3334
import org.apache.lucene.search.Explanation;
@@ -54,6 +55,7 @@
5455
import org.elasticsearch.search.profile.query.QueryTimingType;
5556

5657
import java.io.IOException;
58+
import java.util.ArrayList;
5759
import java.util.Arrays;
5860
import java.util.List;
5961
import java.util.Set;
@@ -138,6 +140,17 @@ private void checkCancelled() {
138140
}
139141
}
140142

143+
public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager) throws IOException {
144+
final List<Collector> collectors = new ArrayList<>(leaves.size());
145+
for (LeafReaderContext ctx : leaves) {
146+
final Collector collector = manager.newCollector();
147+
//TODO: setMinCompetitveScore between Collectors
148+
searchLeaf(ctx, weight, collector);
149+
collectors.add(collector);
150+
}
151+
manager.reduce(collectors);
152+
}
153+
141154
@Override
142155
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
143156
for (LeafReaderContext ctx : leaves) { // search each subreader
@@ -151,7 +164,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
151164
* {@link LeafCollector#collect(int)} is called for every matching document in
152165
* the provided <code>ctx</code>.
153166
*/
154-
public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
167+
private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
155168
checkCancelled();
156169
weight = wrapWeight(weight);
157170
final LeafCollector leafCollector;
@@ -228,6 +241,7 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
228241
}
229242
}
230243

244+
231245
private static BitSet getSparseBitSetOrNull(Bits liveDocs) {
232246
if (liveDocs instanceof SparseFixedBitSet) {
233247
return (BitSet) liveDocs;

server/src/main/java/org/elasticsearch/search/query/QueryPhase.java

+123-51
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,18 @@
3131
import org.apache.lucene.search.BooleanClause;
3232
import org.apache.lucene.search.BooleanQuery;
3333
import org.apache.lucene.search.Collector;
34+
import org.apache.lucene.search.CollectorManager;
3435
import org.apache.lucene.search.ConstantScoreQuery;
3536
import org.apache.lucene.search.DocValuesFieldExistsQuery;
3637
import org.apache.lucene.search.FieldDoc;
3738
import org.apache.lucene.search.MatchAllDocsQuery;
3839
import org.apache.lucene.search.Query;
3940
import org.apache.lucene.search.ScoreDoc;
41+
import org.apache.lucene.search.ScoreMode;
4042
import org.apache.lucene.search.Sort;
4143
import org.apache.lucene.search.SortField;
4244
import org.apache.lucene.search.TopDocs;
45+
import org.apache.lucene.search.TopFieldCollector;
4346
import org.apache.lucene.search.TopFieldDocs;
4447
import org.apache.lucene.search.TotalHits;
4548
import org.apache.lucene.search.Weight;
@@ -71,6 +74,7 @@
7174
import java.io.IOException;
7275
import java.util.ArrayList;
7376
import java.util.Arrays;
77+
import java.util.Collection;
7478
import java.util.Collections;
7579
import java.util.Comparator;
7680
import java.util.LinkedList;
@@ -236,7 +240,7 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
236240
newFormats[0] = DocValueFormat.RAW;
237241
// Add a tiebreak on _doc in order to be able to search
238242
// the leaves in any order. This is needed since we reorder
239-
// the leaves based on the minimum value in each segment.
243+
// the leaves based on the minimum/maxim value in each segment.
240244
newSortFields[newSortFields.length-1] = SortField.FIELD_DOC;
241245
newFormats[newSortFields.length-1] = DocValueFormat.RAW;
242246
System.arraycopy(oldSortFields, 0, newSortFields, 1, oldSortFields.length);
@@ -286,61 +290,20 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
286290
} else {
287291
checkCancelled = null;
288292
}
289-
290293
searcher.setCheckCancelled(checkCancelled);
291294

292-
final boolean doProfile = searchContext.getProfilers() != null;
293-
// create the top docs collector last when the other collectors are known
294-
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
295-
// add the top docs collector, the first collector context in the chain
296-
collectors.addFirst(topDocsFactory);
297-
298-
final Collector queryCollector;
299-
if (doProfile) {
300-
InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
301-
searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
302-
queryCollector = profileCollector;
295+
boolean shouldRescore;
296+
// if we are optimizing sort and there are no other collectors
297+
if (sortAndFormatsForRewrittenNumericSort != null && collectors.size() == 0 && searchContext.getProfilers() == null) {
298+
shouldRescore = searchWithCollectorManager(searchContext, searcher, query, leafSorter, timeoutSet);
303299
} else {
304-
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
305-
}
306-
307-
try {
308-
Weight weight = searcher.createWeight(searcher.rewrite(query), queryCollector.scoreMode(), 1f);
309-
// We search the leaves in a different order when the numeric sort optimization is
310-
// activated. Collectors expect leaves in order when searching but this is fine in this
311-
// case since we only have a TopFieldCollector and we force the tiebreak on _doc.
312-
List<LeafReaderContext> leaves = new ArrayList<>(searcher.getIndexReader().leaves());
313-
leafSorter.accept(leaves);
314-
for (LeafReaderContext ctx : leaves) {
315-
searcher.searchLeaf(ctx, weight, queryCollector);
316-
}
317-
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
318-
queryResult.terminatedEarly(true);
319-
} catch (TimeExceededException e) {
320-
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
321-
322-
if (searchContext.request().allowPartialSearchResults() == false) {
323-
// Can't rethrow TimeExceededException because not serializable
324-
throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
325-
}
326-
queryResult.searchTimedOut(true);
327-
} finally {
328-
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
329-
}
330-
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER
331-
&& queryResult.terminatedEarly() == null) {
332-
queryResult.terminatedEarly(false);
333-
}
334-
335-
final QuerySearchResult result = searchContext.queryResult();
336-
for (QueryCollectorContext ctx : collectors) {
337-
ctx.postProcess(result);
300+
shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet);
338301
}
339302

340303
// if we rewrote numeric long or date sort, restore fieldDocs based on the original sort
341304
if (sortAndFormatsForRewrittenNumericSort != null) {
342305
searchContext.sort(sortAndFormatsForRewrittenNumericSort); // restore SortAndFormats
343-
restoreTopFieldDocs(result, sortAndFormatsForRewrittenNumericSort);
306+
restoreTopFieldDocs(queryResult, sortAndFormatsForRewrittenNumericSort);
344307
}
345308

346309
ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
@@ -351,14 +314,123 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
351314
}
352315
if (searchContext.getProfilers() != null) {
353316
ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());
354-
result.profileResults(shardResults);
317+
queryResult.profileResults(shardResults);
355318
}
356-
return topDocsFactory.shouldRescore();
319+
return shouldRescore;
357320
} catch (Exception e) {
358321
throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);
359322
}
360323
}
361324

325+
private static boolean searchWithCollector(SearchContext searchContext, ContextIndexSearcher searcher, Query query,
326+
LinkedList<QueryCollectorContext> collectors, boolean hasFilterCollector, boolean timeoutSet) throws IOException {
327+
// create the top docs collector last when the other collectors are known
328+
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
329+
// add the top docs collector, the first collector context in the chain
330+
collectors.addFirst(topDocsFactory);
331+
332+
final Collector queryCollector;
333+
if ( searchContext.getProfilers() != null) {
334+
InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
335+
searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
336+
queryCollector = profileCollector;
337+
} else {
338+
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
339+
}
340+
QuerySearchResult queryResult = searchContext.queryResult();
341+
try {
342+
searcher.search(query, queryCollector);
343+
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
344+
queryResult.terminatedEarly(true);
345+
} catch (TimeExceededException e) {
346+
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
347+
if (searchContext.request().allowPartialSearchResults() == false) {
348+
// Can't rethrow TimeExceededException because not serializable
349+
throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
350+
}
351+
queryResult.searchTimedOut(true);
352+
} finally {
353+
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
354+
}
355+
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
356+
queryResult.terminatedEarly(false);
357+
}
358+
for (QueryCollectorContext ctx : collectors) {
359+
ctx.postProcess(queryResult);
360+
}
361+
return topDocsFactory.shouldRescore();
362+
}
363+
364+
// we use collectorManager during sort optimization
365+
// for the sort optimization, we have already checked that there are no other collectors, no filters,
366+
// no search after, no scroll, no collapse, no track scores
367+
// this means we can use TopFieldCollector directly
368+
private static boolean searchWithCollectorManager(SearchContext searchContext, ContextIndexSearcher searcher, Query query,
369+
CheckedConsumer<List<LeafReaderContext>, IOException> leafSorter, boolean timeoutSet) throws IOException {
370+
final IndexReader reader = searchContext.searcher().getIndexReader();
371+
final int numHits = Math.min(searchContext.from() + searchContext.size(), Math.max(1, reader.numDocs()));
372+
final SortAndFormats sortAndFormats = searchContext.sort();
373+
374+
int totalHitsThreshold;
375+
TotalHits totalHits;
376+
if (searchContext.trackTotalHitsUpTo() == SearchContext.TRACK_TOTAL_HITS_DISABLED) {
377+
totalHitsThreshold = 1;
378+
totalHits = new TotalHits(0, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
379+
} else {
380+
int hitCount = shortcutTotalHitCount(reader, query);
381+
if (hitCount == -1) {
382+
totalHitsThreshold = searchContext.trackTotalHitsUpTo();
383+
totalHits = null; // will be computed via the collector
384+
} else {
385+
totalHitsThreshold = 1;
386+
totalHits = new TotalHits(hitCount, TotalHits.Relation.EQUAL_TO); // don't compute hit counts via the collector
387+
}
388+
}
389+
390+
CollectorManager<TopFieldCollector, Void> manager = new CollectorManager<>() {
391+
@Override
392+
public TopFieldCollector newCollector() throws IOException {
393+
return TopFieldCollector.create(sortAndFormats.sort, numHits, null, totalHitsThreshold);
394+
}
395+
@Override
396+
public Void reduce(Collection<TopFieldCollector> collectors) throws IOException {
397+
TopFieldDocs[] topDocsArr = new TopFieldDocs[collectors.size()];
398+
int i = 0;
399+
for (TopFieldCollector collector : collectors) {
400+
topDocsArr[i++] = collector.topDocs();
401+
}
402+
// we have to set setShardIndex to true, as Lucene can't have ScoreDocs without shardIndex set
403+
TopFieldDocs mergedTopDocs = TopDocs.merge(sortAndFormats.sort, 0, numHits, topDocsArr, true);
404+
// reset shard index for all topDocs; ES will set shard index later during reduce stage
405+
for (ScoreDoc scoreDoc : mergedTopDocs.scoreDocs) {
406+
scoreDoc.shardIndex = -1;
407+
}
408+
if (totalHits != null) { // we have already precalculated totalHits for the whole index
409+
mergedTopDocs = new TopFieldDocs(totalHits, mergedTopDocs.scoreDocs, mergedTopDocs.fields);
410+
}
411+
searchContext.queryResult().topDocs(new TopDocsAndMaxScore(mergedTopDocs, Float.NaN), sortAndFormats.formats);
412+
return null;
413+
}
414+
};
415+
416+
List<LeafReaderContext> leaves = new ArrayList<>(searcher.getIndexReader().leaves());
417+
leafSorter.accept(leaves);
418+
try {
419+
Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.TOP_SCORES, 1f);
420+
searcher.search(leaves, weight, manager);
421+
} catch (TimeExceededException e) {
422+
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
423+
if (searchContext.request().allowPartialSearchResults() == false) {
424+
// Can't rethrow TimeExceededException because not serializable
425+
throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
426+
}
427+
searchContext.queryResult().searchTimedOut(true);
428+
} finally {
429+
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
430+
}
431+
return false; // no rescoring when sorting by field
432+
}
433+
362434
private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader reader,
363435
Query query, boolean hasFilterCollector) throws IOException {
364436
if (searchContext.searchAfter() != null) return null;
@@ -399,7 +471,7 @@ private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader
399471
if (missingValuesAccordingToSort == false) return null;
400472

401473
int docCount = PointValues.getDocCount(reader, fieldName);
402-
// is not worth to run optimization on small index
474+
// is not worth to run optimization on small index
403475
if (docCount <= 512) return null;
404476

405477
// check for multiple values

server/src/test/java/org/elasticsearch/search/SearchCancellationTests.java

+2-9
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,11 @@
2222
import org.apache.lucene.document.Field;
2323
import org.apache.lucene.document.StringField;
2424
import org.apache.lucene.index.IndexReader;
25-
import org.apache.lucene.index.LeafReaderContext;
2625
import org.apache.lucene.index.NoMergePolicy;
2726
import org.apache.lucene.index.RandomIndexWriter;
2827
import org.apache.lucene.search.IndexSearcher;
2928
import org.apache.lucene.search.MatchAllDocsQuery;
30-
import org.apache.lucene.search.ScoreMode;
3129
import org.apache.lucene.search.TotalHitCountCollector;
32-
import org.apache.lucene.search.Weight;
3330
import org.apache.lucene.store.Directory;
3431
import org.elasticsearch.core.internal.io.IOUtils;
3532
import org.apache.lucene.util.TestUtil;
@@ -88,13 +85,9 @@ public void testCancellableCollector() throws IOException {
8885
throw new TaskCancelledException("cancelled");
8986
}
9087
});
91-
LeafReaderContext leafContext = reader.leaves().get(0);
92-
final Weight weight = searcher.createWeight(new MatchAllDocsQuery(), ScoreMode.COMPLETE, 1f);
93-
searcher.searchLeaf(searcher.getIndexReader().leaves().get(0), weight, collector);
94-
assertThat(collector.getTotalHits(), equalTo(leafContext.reader().numDocs()));
88+
searcher.search(new MatchAllDocsQuery(), collector);
89+
assertThat(collector.getTotalHits(), equalTo(reader.numDocs()));
9590
cancelled.set(true);
96-
expectThrows(TaskCancelledException.class,
97-
() -> searcher.searchLeaf(searcher.getIndexReader().leaves().get(0), weight, collector));
9891
expectThrows(TaskCancelledException.class,
9992
() -> searcher.search(new MatchAllDocsQuery(), collector));
10093
}

server/src/test/java/org/elasticsearch/search/query/QueryPhaseTests.java

+6-15
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.lucene.search.BooleanClause.Occur;
4343
import org.apache.lucene.search.BooleanQuery;
4444
import org.apache.lucene.search.Collector;
45+
import org.apache.lucene.search.CollectorManager;
4546
import org.apache.lucene.search.ConstantScoreQuery;
4647
import org.apache.lucene.search.DocValuesFieldExistsQuery;
4748
import org.apache.lucene.search.FieldComparator;
@@ -886,14 +887,9 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead
886887
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy()) {
887888

888889
@Override
889-
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
890-
throw new AssertionError();
891-
}
892-
893-
@Override
894-
public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
895-
collector = new AssertingEarlyTerminationFilterCollector(collector, size);
896-
super.searchLeaf(ctx, weight, collector);
890+
public void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
891+
final Collector in = new AssertingEarlyTerminationFilterCollector(collector, size);
892+
super.search(leaves, weight, in);
897893
}
898894
};
899895
}
@@ -904,12 +900,7 @@ private static ContextIndexSearcher newOptimizedContextSearcher(IndexReader read
904900
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy()) {
905901

906902
@Override
907-
public void search(Query query, Collector results) throws IOException {
908-
throw new AssertionError();
909-
}
910-
911-
@Override
912-
public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
903+
public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager) throws IOException {
913904
final Query query = weight.getQuery();
914905
assertTrue(query instanceof BooleanQuery);
915906
List<BooleanClause> clauses = ((BooleanQuery) query).clauses();
@@ -922,7 +913,7 @@ public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector
922913
);
923914
}
924915
if (queryType == 1) assertTrue(clauses.get(1).getQuery() instanceof DocValuesFieldExistsQuery);
925-
super.searchLeaf(ctx, weight, collector);
916+
super.search(leaves, weight, manager);
926917
}
927918
};
928919
}

0 commit comments

Comments
 (0)