Skip to content

Commit 33c8275

Browse files
authored
Sort leaves on search according to the primary numeric sort field (#44021)
This change pre-sort the index reader leaves (segment) prior to search when the primary sort is a numeric field eligible to the distance feature optimization. It also adds a tie breaker on `_doc` to the rewritten sort in order to bypass the fact that leaves will be collected in a random order. I ran this patch on the http_logs benchmark and the results are very promising: ``` | 50th percentile latency | desc_sort_timestamp | 220.706 | 136544 | 136324 | ms | | 90th percentile latency | desc_sort_timestamp | 244.847 | 162084 | 161839 | ms | | 99th percentile latency | desc_sort_timestamp | 316.627 | 172005 | 171688 | ms | | 100th percentile latency | desc_sort_timestamp | 335.306 | 173325 | 172989 | ms | | 50th percentile service time | desc_sort_timestamp | 218.369 | 1968.11 | 1749.74 | ms | | 90th percentile service time | desc_sort_timestamp | 244.182 | 2447.2 | 2203.02 | ms | | 99th percentile service time | desc_sort_timestamp | 313.176 | 2950.85 | 2637.67 | ms | | 100th percentile service time | desc_sort_timestamp | 332.924 | 2959.38 | 2626.45 | ms | | error rate | desc_sort_timestamp | 0 | 0 | 0 | % | | Min Throughput | asc_sort_timestamp | 0.801824 | 0.800855 | -0.00097 | ops/s | | Median Throughput | asc_sort_timestamp | 0.802595 | 0.801104 | -0.00149 | ops/s | | Max Throughput | asc_sort_timestamp | 0.803282 | 0.801351 | -0.00193 | ops/s | | 50th percentile latency | asc_sort_timestamp | 220.761 | 824.098 | 603.336 | ms | | 90th percentile latency | asc_sort_timestamp | 251.741 | 853.984 | 602.243 | ms | | 99th percentile latency | asc_sort_timestamp | 368.761 | 893.943 | 525.182 | ms | | 100th percentile latency | asc_sort_timestamp | 431.042 | 908.85 | 477.808 | ms | | 50th percentile service time | asc_sort_timestamp | 218.547 | 820.757 | 602.211 | ms | | 90th percentile service time | asc_sort_timestamp | 249.578 | 849.886 | 600.308 | ms | | 99th percentile service time | asc_sort_timestamp | 366.317 | 888.894 | 522.577 | ms | | 100th percentile service time | asc_sort_timestamp | 430.952 | 908.401 | 477.45 | ms | | error rate | asc_sort_timestamp | 0 | 0 | 0 | % | ``` So roughly 10x faster for the descending sort and 2-3x faster in the ascending case. Note that I indexed the http_logs with a single client in order to simulate real time-based indices where document are indexed in their timestamp order. Relates #37043
1 parent 04e5e41 commit 33c8275

File tree

9 files changed

+271
-252
lines changed

9 files changed

+271
-252
lines changed

server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java

Lines changed: 59 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,67 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
132132
}
133133
}
134134

135+
private void checkCancelled() {
136+
if (checkCancelled != null) {
137+
checkCancelled.run();
138+
}
139+
}
140+
135141
@Override
136142
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
137-
final Weight cancellableWeight;
138-
if (checkCancelled != null) {
139-
cancellableWeight = new Weight(weight.getQuery()) {
143+
for (LeafReaderContext ctx : leaves) { // search each subreader
144+
searchLeaf(ctx, weight, collector);
145+
}
146+
}
140147

148+
/**
149+
* Lower-level search API.
150+
*
151+
* {@link LeafCollector#collect(int)} is called for every matching document in
152+
* the provided <code>ctx</code>.
153+
*/
154+
public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
155+
checkCancelled();
156+
weight = wrapWeight(weight);
157+
final LeafCollector leafCollector;
158+
try {
159+
leafCollector = collector.getLeafCollector(ctx);
160+
} catch (CollectionTerminatedException e) {
161+
// there is no doc of interest in this reader context
162+
// continue with the following leaf
163+
return;
164+
}
165+
Bits liveDocs = ctx.reader().getLiveDocs();
166+
BitSet liveDocsBitSet = getSparseBitSetOrNull(ctx.reader().getLiveDocs());
167+
if (liveDocsBitSet == null) {
168+
BulkScorer bulkScorer = weight.bulkScorer(ctx);
169+
if (bulkScorer != null) {
170+
try {
171+
bulkScorer.score(leafCollector, liveDocs);
172+
} catch (CollectionTerminatedException e) {
173+
// collection was terminated prematurely
174+
// continue with the following leaf
175+
}
176+
}
177+
} else {
178+
// if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
179+
Scorer scorer = weight.scorer(ctx);
180+
if (scorer != null) {
181+
try {
182+
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
183+
checkCancelled == null ? () -> {
184+
} : checkCancelled);
185+
} catch (CollectionTerminatedException e) {
186+
// collection was terminated prematurely
187+
// continue with the following leaf
188+
}
189+
}
190+
}
191+
}
192+
193+
private Weight wrapWeight(Weight weight) {
194+
if (checkCancelled != null) {
195+
return new Weight(weight.getQuery()) {
141196
@Override
142197
public void extractTerms(Set<Term> terms) {
143198
throw new UnsupportedOperationException();
@@ -169,48 +224,10 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
169224
}
170225
};
171226
} else {
172-
cancellableWeight = weight;
227+
return weight;
173228
}
174-
searchInternal(leaves, cancellableWeight, collector);
175229
}
176230

177-
private void searchInternal(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
178-
for (LeafReaderContext ctx : leaves) { // search each subreader
179-
final LeafCollector leafCollector;
180-
try {
181-
leafCollector = collector.getLeafCollector(ctx);
182-
} catch (CollectionTerminatedException e) {
183-
// there is no doc of interest in this reader context
184-
// continue with the following leaf
185-
continue;
186-
}
187-
Bits liveDocs = ctx.reader().getLiveDocs();
188-
BitSet liveDocsBitSet = getSparseBitSetOrNull(ctx.reader().getLiveDocs());
189-
if (liveDocsBitSet == null) {
190-
BulkScorer bulkScorer = weight.bulkScorer(ctx);
191-
if (bulkScorer != null) {
192-
try {
193-
bulkScorer.score(leafCollector, liveDocs);
194-
} catch (CollectionTerminatedException e) {
195-
// collection was terminated prematurely
196-
// continue with the following leaf
197-
}
198-
}
199-
} else {
200-
// if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
201-
Scorer scorer = weight.scorer(ctx);
202-
if (scorer != null) {
203-
try {
204-
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
205-
checkCancelled == null ? () -> {} : checkCancelled);
206-
} catch (CollectionTerminatedException e) {
207-
// collection was terminated prematurely
208-
// continue with the following leaf
209-
}
210-
}
211-
}
212-
}
213-
}
214231

215232
private static BitSet getSparseBitSetOrNull(Bits liveDocs) {
216233
if (liveDocs instanceof SparseFixedBitSet) {

server/src/main/java/org/elasticsearch/search/profile/query/CollectorResult.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ public class CollectorResult implements ToXContentObject, Writeable {
4949
public static final String REASON_SEARCH_POST_FILTER = "search_post_filter";
5050
public static final String REASON_SEARCH_MIN_SCORE = "search_min_score";
5151
public static final String REASON_SEARCH_MULTI = "search_multi";
52-
public static final String REASON_SEARCH_TIMEOUT = "search_timeout";
53-
public static final String REASON_SEARCH_CANCELLED = "search_cancelled";
5452
public static final String REASON_AGGREGATION = "aggregation";
5553
public static final String REASON_AGGREGATION_GLOBAL = "aggregation_global";
5654

server/src/main/java/org/elasticsearch/search/query/CancellableCollector.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/search/query/QueryCollectorContext.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,13 @@
2828
import org.elasticsearch.common.lucene.MinimumScoreCollector;
2929
import org.elasticsearch.common.lucene.search.FilteredCollector;
3030
import org.elasticsearch.search.profile.query.InternalProfileCollector;
31-
import org.elasticsearch.tasks.TaskCancelledException;
3231

3332
import java.io.IOException;
3433
import java.util.ArrayList;
3534
import java.util.Collection;
3635
import java.util.Collections;
3736
import java.util.List;
38-
import java.util.function.BooleanSupplier;
3937

40-
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_CANCELLED;
4138
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MIN_SCORE;
4239
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_MULTI;
4340
import static org.elasticsearch.search.profile.query.CollectorResult.REASON_SEARCH_POST_FILTER;
@@ -150,18 +147,6 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i
150147
};
151148
}
152149

153-
/**
154-
* Creates a collector that throws {@link TaskCancelledException} if the search is cancelled
155-
*/
156-
static QueryCollectorContext createCancellableCollectorContext(BooleanSupplier cancelled) {
157-
return new QueryCollectorContext(REASON_SEARCH_CANCELLED) {
158-
@Override
159-
Collector create(Collector in) throws IOException {
160-
return new CancellableCollector(cancelled, in);
161-
}
162-
};
163-
}
164-
165150
/**
166151
* Creates collector limiting the collection to the first <code>numHits</code> documents
167152
*/

server/src/main/java/org/elasticsearch/search/query/QueryPhase.java

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.lucene.search.ConstantScoreQuery;
3535
import org.apache.lucene.search.DocValuesFieldExistsQuery;
3636
import org.apache.lucene.search.FieldDoc;
37-
import org.apache.lucene.search.IndexSearcher;
3837
import org.apache.lucene.search.MatchAllDocsQuery;
3938
import org.apache.lucene.search.Query;
4039
import org.apache.lucene.search.ScoreDoc;
@@ -43,8 +42,10 @@
4342
import org.apache.lucene.search.TopDocs;
4443
import org.apache.lucene.search.TopFieldDocs;
4544
import org.apache.lucene.search.TotalHits;
45+
import org.apache.lucene.search.Weight;
4646
import org.elasticsearch.action.search.SearchTask;
4747
import org.elasticsearch.common.Booleans;
48+
import org.elasticsearch.common.CheckedConsumer;
4849
import org.elasticsearch.common.lucene.Lucene;
4950
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
5051
import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor;
@@ -68,12 +69,14 @@
6869
import org.elasticsearch.threadpool.ThreadPool;
6970

7071
import java.io.IOException;
72+
import java.util.ArrayList;
7173
import java.util.Arrays;
74+
import java.util.Collections;
75+
import java.util.Comparator;
7276
import java.util.LinkedList;
77+
import java.util.List;
7378
import java.util.concurrent.ExecutorService;
74-
import java.util.function.Consumer;
7579

76-
import static org.elasticsearch.search.query.QueryCollectorContext.createCancellableCollectorContext;
7780
import static org.elasticsearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
7881
import static org.elasticsearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
7982
import static org.elasticsearch.search.query.QueryCollectorContext.createMinScoreCollectorContext;
@@ -89,7 +92,7 @@
8992
public class QueryPhase implements SearchPhase {
9093
private static final Logger LOGGER = LogManager.getLogger(QueryPhase.class);
9194
public static final boolean SYS_PROP_LONG_SORT_OPTIMIZED =
92-
Booleans.parseBoolean(System.getProperty("es.search.long_sort_optimized", "false"));
95+
Booleans.parseBoolean(System.getProperty("es.search.long_sort_optimized", "true"));
9396

9497
private final AggregationPhase aggregationPhase;
9598
private final SuggestPhase suggestPhase;
@@ -124,8 +127,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
124127
// request, preProcess is called on the DFS phase phase, this is why we pre-process them
125128
// here to make sure it happens during the QUERY phase
126129
aggregationPhase.preProcess(searchContext);
127-
final ContextIndexSearcher searcher = searchContext.searcher();
128-
boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled);
130+
boolean rescore = executeInternal(searchContext);
129131

130132
if (rescore) { // only if we do a regular search
131133
rescorePhase.execute(searchContext);
@@ -145,9 +147,8 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
145147
* wire everything (mapperService, etc.)
146148
* @return whether the rescoring phase should be executed
147149
*/
148-
static boolean execute(SearchContext searchContext,
149-
final IndexSearcher searcher,
150-
Consumer<Runnable> checkCancellationSetter) throws QueryPhaseExecutionException {
150+
static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExecutionException {
151+
final ContextIndexSearcher searcher = searchContext.searcher();
151152
SortAndFormats sortAndFormatsForRewrittenNumericSort = null;
152153
final IndexReader reader = searcher.getIndexReader();
153154
QuerySearchResult queryResult = searchContext.queryResult();
@@ -220,6 +221,7 @@ static boolean execute(SearchContext searchContext,
220221
hasFilterCollector = true;
221222
}
222223

224+
CheckedConsumer<List<LeafReaderContext>, IOException> leafSorter = l -> {};
223225
// try to rewrite numeric or date sort to the optimized distanceFeatureQuery
224226
if ((searchContext.sort() != null) && SYS_PROP_LONG_SORT_OPTIMIZED) {
225227
Query rewrittenQuery = tryRewriteLongSort(searchContext, searcher.getIndexReader(), query, hasFilterCollector);
@@ -228,14 +230,20 @@ static boolean execute(SearchContext searchContext,
228230
// modify sorts: add sort on _score as 1st sort, and move the sort on the original field as the 2nd sort
229231
SortField[] oldSortFields = searchContext.sort().sort.getSort();
230232
DocValueFormat[] oldFormats = searchContext.sort().formats;
231-
SortField[] newSortFields = new SortField[oldSortFields.length + 1];
232-
DocValueFormat[] newFormats = new DocValueFormat[oldSortFields.length + 1];
233+
SortField[] newSortFields = new SortField[oldSortFields.length + 2];
234+
DocValueFormat[] newFormats = new DocValueFormat[oldSortFields.length + 2];
233235
newSortFields[0] = SortField.FIELD_SCORE;
234236
newFormats[0] = DocValueFormat.RAW;
237+
// Add a tiebreak on _doc in order to be able to search
238+
// the leaves in any order. This is needed since we reorder
239+
// the leaves based on the minimum value in each segment.
240+
newSortFields[newSortFields.length-1] = SortField.FIELD_DOC;
241+
newFormats[newSortFields.length-1] = DocValueFormat.RAW;
235242
System.arraycopy(oldSortFields, 0, newSortFields, 1, oldSortFields.length);
236243
System.arraycopy(oldFormats, 0, newFormats, 1, oldFormats.length);
237244
sortAndFormatsForRewrittenNumericSort = searchContext.sort(); // stash SortAndFormats to restore it later
238245
searchContext.sort(new SortAndFormats(new Sort(newSortFields), newFormats));
246+
leafSorter = createLeafSorter(oldSortFields[0]);
239247
}
240248
}
241249

@@ -279,16 +287,11 @@ static boolean execute(SearchContext searchContext,
279287
checkCancelled = null;
280288
}
281289

282-
checkCancellationSetter.accept(checkCancelled);
283-
284-
// add cancellable
285-
// this only performs segment-level cancellation, which is cheap and checked regardless of
286-
// searchContext.lowLevelCancellation()
287-
collectors.add(createCancellableCollectorContext(searchContext.getTask()::isCancelled));
290+
searcher.setCheckCancelled(checkCancelled);
288291

289292
final boolean doProfile = searchContext.getProfilers() != null;
290293
// create the top docs collector last when the other collectors are known
291-
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, reader, hasFilterCollector);
294+
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
292295
// add the top docs collector, the first collector context in the chain
293296
collectors.addFirst(topDocsFactory);
294297

@@ -302,7 +305,15 @@ static boolean execute(SearchContext searchContext,
302305
}
303306

304307
try {
305-
searcher.search(query, queryCollector);
308+
Weight weight = searcher.createWeight(searcher.rewrite(query), queryCollector.scoreMode(), 1f);
309+
// We search the leaves in a different order when the numeric sort optimization is
310+
// activated. Collectors expect leaves in order when searching but this is fine in this
311+
// case since we only have a TopFieldCollector and we force the tiebreak on _doc.
312+
List<LeafReaderContext> leaves = new ArrayList<>(searcher.getIndexReader().leaves());
313+
leafSorter.accept(leaves);
314+
for (LeafReaderContext ctx : leaves) {
315+
searcher.searchLeaf(ctx, weight, queryCollector);
316+
}
306317
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
307318
queryResult.terminatedEarly(true);
308319
} catch (TimeExceededException e) {
@@ -427,13 +438,39 @@ private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader
427438
return rewrittenQuery;
428439
}
429440

430-
// Restore fieldsDocs to remove the first _score sort
431-
// updating in place without creating new FieldDoc objects
441+
/**
442+
* Creates a sorter of {@link LeafReaderContext} that orders leaves depending on the minimum
443+
* value and the sort order of the provided <code>sortField</code>.
444+
*/
445+
static CheckedConsumer<List<LeafReaderContext>, IOException> createLeafSorter(SortField sortField) {
446+
return leaves -> {
447+
long[] sortValues = new long[leaves.size()];
448+
long missingValue = (long) sortField.getMissingValue();
449+
for (LeafReaderContext ctx : leaves) {
450+
PointValues values = ctx.reader().getPointValues(sortField.getField());
451+
if (values == null) {
452+
sortValues[ctx.ord] = missingValue;
453+
} else {
454+
byte[] sortValue = sortField.getReverse() ? values.getMaxPackedValue(): values.getMinPackedValue();
455+
sortValues[ctx.ord] = sortValue == null ? missingValue : LongPoint.decodeDimension(sortValue, 0);
456+
}
457+
}
458+
Comparator<LeafReaderContext> comparator = Comparator.comparingLong(l -> sortValues[l.ord]);
459+
if (sortField.getReverse()) {
460+
comparator = comparator.reversed();
461+
}
462+
Collections.sort(leaves, comparator);
463+
};
464+
}
465+
466+
/**
467+
* Restore fieldsDocs to remove the first _score and last _doc sort.
468+
*/
432469
static void restoreTopFieldDocs(QuerySearchResult result, SortAndFormats originalSortAndFormats) {
433470
TopDocs topDocs = result.topDocs().topDocs;
434471
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
435472
FieldDoc fieldDoc = (FieldDoc) scoreDoc;
436-
fieldDoc.fields = Arrays.copyOfRange(fieldDoc.fields, 1, fieldDoc.fields.length);
473+
fieldDoc.fields = Arrays.copyOfRange(fieldDoc.fields, 1, fieldDoc.fields.length-1);
437474
}
438475
TopFieldDocs newTopDocs = new TopFieldDocs(topDocs.totalHits, topDocs.scoreDocs, originalSortAndFormats.sort.getSort());
439476
result.topDocs(new TopDocsAndMaxScore(newTopDocs, Float.NaN), originalSortAndFormats.formats);

0 commit comments

Comments
 (0)