Skip to content

Commit 7f20b3a

Browse files
committed
Merge pull request #18237 from jimferenczi/scroll_by_slice
Add the ability to partition a scroll in multiple slices.
2 parents d71894a + b9030bf commit 7f20b3a

16 files changed

+1462
-63
lines changed

core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.search.Scroll;
3131
import org.elasticsearch.search.aggregations.AggregationBuilder;
3232
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder;
33+
import org.elasticsearch.search.slice.SliceBuilder;
3334
import org.elasticsearch.search.builder.SearchSourceBuilder;
3435
import org.elasticsearch.search.highlight.HighlightBuilder;
3536
import org.elasticsearch.search.rescore.RescoreBuilder;
@@ -352,6 +353,11 @@ public SearchRequestBuilder searchAfter(Object[] values) {
352353
return this;
353354
}
354355

356+
public SearchRequestBuilder slice(SliceBuilder builder) {
357+
sourceBuilder().slice(builder);
358+
return this;
359+
}
360+
355361
/**
356362
* Applies when sorting, and controls if scores will be tracked as well. Defaults to
357363
* <tt>false</tt>.

core/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,15 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
821821
FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter());
822822
context.searchAfter(fieldDoc);
823823
}
824+
825+
if (source.slice() != null) {
826+
if (context.scrollContext() == null) {
827+
throw new SearchContextException(context, "`slice` cannot be used outside of a scroll context");
828+
}
829+
context.sliceFilter(source.slice().toFilter(queryShardContext,
830+
context.shardTarget().getShardId().getId(),
831+
queryShardContext.getIndexSettings().getNumberOfShards()));
832+
}
824833
}
825834

826835
private static final int[] EMPTY_DOC_IDS = new int[0];

core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java

Lines changed: 45 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.index.query.QueryShardContext;
4343
import org.elasticsearch.script.Script;
4444
import org.elasticsearch.search.aggregations.AggregationBuilder;
45+
import org.elasticsearch.search.slice.SliceBuilder;
4546
import org.elasticsearch.search.aggregations.AggregatorFactories;
4647
import org.elasticsearch.search.aggregations.AggregatorParsers;
4748
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder;
@@ -98,6 +99,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
9899
public static final ParseField EXT_FIELD = new ParseField("ext");
99100
public static final ParseField PROFILE_FIELD = new ParseField("profile");
100101
public static final ParseField SEARCH_AFTER = new ParseField("search_after");
102+
public static final ParseField SLICE = new ParseField("slice");
101103

102104
public static SearchSourceBuilder fromXContent(QueryParseContext context, AggregatorParsers aggParsers,
103105
Suggesters suggesters) throws IOException {
@@ -138,6 +140,8 @@ public static HighlightBuilder highlight() {
138140

139141
private SearchAfterBuilder searchAfterBuilder;
140142

143+
private SliceBuilder sliceBuilder;
144+
141145
private Float minScore;
142146

143147
private long timeoutInMillis = -1;
@@ -175,9 +179,7 @@ public SearchSourceBuilder() {
175179
* Read from a stream.
176180
*/
177181
public SearchSourceBuilder(StreamInput in) throws IOException {
178-
if (in.readBoolean()) {
179-
aggregations = new AggregatorFactories.Builder(in);
180-
}
182+
aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
181183
explain = in.readOptionalBoolean();
182184
fetchSourceContext = in.readOptionalStreamable(FetchSourceContext::new);
183185
boolean hasFieldDataFields = in.readBoolean();
@@ -206,15 +208,9 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
206208
indexBoost.put(in.readString(), in.readFloat());
207209
}
208210
}
209-
if (in.readBoolean()) {
210-
minScore = in.readFloat();
211-
}
212-
if (in.readBoolean()) {
213-
postQueryBuilder = in.readNamedWriteable(QueryBuilder.class);
214-
}
215-
if (in.readBoolean()) {
216-
queryBuilder = in.readNamedWriteable(QueryBuilder.class);
217-
}
211+
minScore = in.readOptionalFloat();
212+
postQueryBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);
213+
queryBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);
218214
if (in.readBoolean()) {
219215
int size = in.readVInt();
220216
rescoreBuilders = new ArrayList<>();
@@ -244,29 +240,20 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
244240
stats.add(in.readString());
245241
}
246242
}
247-
if (in.readBoolean()) {
248-
suggestBuilder = new SuggestBuilder(in);
249-
}
243+
suggestBuilder = in.readOptionalWriteable(SuggestBuilder::new);
250244
terminateAfter = in.readVInt();
251245
timeoutInMillis = in.readLong();
252246
trackScores = in.readBoolean();
253247
version = in.readOptionalBoolean();
254-
if (in.readBoolean()) {
255-
ext = in.readBytesReference();
256-
}
248+
ext = in.readOptionalBytesReference();
257249
profile = in.readBoolean();
258-
if (in.readBoolean()) {
259-
searchAfterBuilder = new SearchAfterBuilder(in);
260-
}
250+
searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new);
251+
sliceBuilder = in.readOptionalWriteable(SliceBuilder::new);
261252
}
262253

263254
@Override
264255
public void writeTo(StreamOutput out) throws IOException {
265-
boolean hasAggregations = aggregations != null;
266-
out.writeBoolean(hasAggregations);
267-
if (hasAggregations) {
268-
aggregations.writeTo(out);
269-
}
256+
out.writeOptionalWriteable(aggregations);
270257
out.writeOptionalBoolean(explain);
271258
out.writeOptionalStreamable(fetchSourceContext);
272259
boolean hasFieldDataFields = fieldDataFields != null;
@@ -296,21 +283,9 @@ public void writeTo(StreamOutput out) throws IOException {
296283
out.writeFloat(indexBoost.get(key.value));
297284
}
298285
}
299-
boolean hasMinScore = minScore != null;
300-
out.writeBoolean(hasMinScore);
301-
if (hasMinScore) {
302-
out.writeFloat(minScore);
303-
}
304-
boolean hasPostQuery = postQueryBuilder != null;
305-
out.writeBoolean(hasPostQuery);
306-
if (hasPostQuery) {
307-
out.writeNamedWriteable(postQueryBuilder);
308-
}
309-
boolean hasQuery = queryBuilder != null;
310-
out.writeBoolean(hasQuery);
311-
if (hasQuery) {
312-
out.writeNamedWriteable(queryBuilder);
313-
}
286+
out.writeOptionalFloat(minScore);
287+
out.writeOptionalNamedWriteable(postQueryBuilder);
288+
out.writeOptionalNamedWriteable(queryBuilder);
314289
boolean hasRescoreBuilders = rescoreBuilders != null;
315290
out.writeBoolean(hasRescoreBuilders);
316291
if (hasRescoreBuilders) {
@@ -344,26 +319,15 @@ public void writeTo(StreamOutput out) throws IOException {
344319
out.writeString(stat);
345320
}
346321
}
347-
boolean hasSuggestBuilder = suggestBuilder != null;
348-
out.writeBoolean(hasSuggestBuilder);
349-
if (hasSuggestBuilder) {
350-
suggestBuilder.writeTo(out);
351-
}
322+
out.writeOptionalWriteable(suggestBuilder);
352323
out.writeVInt(terminateAfter);
353324
out.writeLong(timeoutInMillis);
354325
out.writeBoolean(trackScores);
355326
out.writeOptionalBoolean(version);
356-
boolean hasExt = ext != null;
357-
out.writeBoolean(hasExt);
358-
if (hasExt) {
359-
out.writeBytesReference(ext);
360-
}
327+
out.writeOptionalBytesReference(ext);
361328
out.writeBoolean(profile);
362-
boolean hasSearchAfter = searchAfterBuilder != null;
363-
out.writeBoolean(hasSearchAfter);
364-
if (hasSearchAfter) {
365-
searchAfterBuilder.writeTo(out);
366-
}
329+
out.writeOptionalWriteable(searchAfterBuilder);
330+
out.writeOptionalWriteable(sliceBuilder);
367331
}
368332

369333
/**
@@ -597,6 +561,22 @@ public SearchSourceBuilder searchAfter(Object[] values) {
597561
return this;
598562
}
599563

564+
/**
565+
* Sets a filter that will restrict the search hits, the top hits and the aggregations to a slice of the results
566+
* of the main query.
567+
*/
568+
public SearchSourceBuilder slice(SliceBuilder builder) {
569+
this.sliceBuilder = builder;
570+
return this;
571+
}
572+
573+
/**
574+
* Gets the slice used to filter the search hits, the top hits and the aggregations.
575+
*/
576+
public SliceBuilder slice() {
577+
return sliceBuilder;
578+
}
579+
600580
/**
601581
* Add an aggregation to perform as part of the search.
602582
*/
@@ -943,6 +923,7 @@ private SearchSourceBuilder shallowCopy(QueryBuilder queryBuilder, QueryBuilder
943923
rewrittenBuilder.rescoreBuilders = rescoreBuilders;
944924
rewrittenBuilder.scriptFields = scriptFields;
945925
rewrittenBuilder.searchAfterBuilder = searchAfterBuilder;
926+
rewrittenBuilder.sliceBuilder = sliceBuilder;
946927
rewrittenBuilder.size = size;
947928
rewrittenBuilder.sorts = sorts;
948929
rewrittenBuilder.stats = stats;
@@ -1039,6 +1020,8 @@ public void parseXContent(QueryParseContext context, AggregatorParsers aggParser
10391020
} else if (context.getParseFieldMatcher().match(currentFieldName, EXT_FIELD)) {
10401021
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().copyCurrentStructure(parser);
10411022
ext = xContentBuilder.bytes();
1023+
} else if (context.getParseFieldMatcher().match(currentFieldName, SLICE)) {
1024+
sliceBuilder = SliceBuilder.fromXContent(context);
10421025
} else {
10431026
throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
10441027
parser.getTokenLocation());
@@ -1193,6 +1176,10 @@ public void innerToXContent(XContentBuilder builder, Params params) throws IOExc
11931176
builder.field(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues());
11941177
}
11951178

1179+
if (sliceBuilder != null) {
1180+
builder.field(SLICE.getPreferredName(), sliceBuilder);
1181+
}
1182+
11961183
if (indexBoost != null) {
11971184
builder.startObject(INDICES_BOOST_FIELD.getPreferredName());
11981185
assert !indexBoost.containsKey(null);
@@ -1355,7 +1342,7 @@ public boolean equals(Object obj) {
13551342
public int hashCode() {
13561343
return Objects.hash(aggregations, explain, fetchSourceContext, fieldDataFields, fieldNames, from,
13571344
highlightBuilder, indexBoost, minScore, postQueryBuilder, queryBuilder, rescoreBuilders, scriptFields,
1358-
size, sorts, searchAfterBuilder, stats, suggestBuilder, terminateAfter, timeoutInMillis, trackScores, version, profile);
1345+
size, sorts, searchAfterBuilder, sliceBuilder, stats, suggestBuilder, terminateAfter, timeoutInMillis, trackScores, version, profile);
13591346
}
13601347

13611348
@Override
@@ -1383,6 +1370,7 @@ public boolean equals(Object obj) {
13831370
&& Objects.equals(size, other.size)
13841371
&& Objects.equals(sorts, other.sorts)
13851372
&& Objects.equals(searchAfterBuilder, other.searchAfterBuilder)
1373+
&& Objects.equals(sliceBuilder, other.sliceBuilder)
13861374
&& Objects.equals(stats, other.stats)
13871375
&& Objects.equals(suggestBuilder, other.suggestBuilder)
13881376
&& Objects.equals(terminateAfter, other.terminateAfter)

core/src/main/java/org/elasticsearch/search/internal/DefaultSearchContext.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,17 @@ public class DefaultSearchContext extends SearchContext {
115115
private Float minimumScore;
116116
private boolean trackScores = false; // when sorting, track scores as well...
117117
private FieldDoc searchAfter;
118+
// filter for sliced scroll
119+
private Query sliceFilter;
120+
118121
/**
119122
* The original query as sent by the user without the types and aliases
120123
* applied. Putting things in here leaks them into highlighting so don't add
121124
* things like the type filter or alias filters.
122125
*/
123126
private ParsedQuery originalQuery;
124127
/**
125-
* Just like originalQuery but with the filters from types and aliases
126-
* applied.
128+
* Just like originalQuery but with the filters from types, aliases and slice applied.
127129
*/
128130
private ParsedQuery filteredQuery;
129131
/**
@@ -210,7 +212,7 @@ public void preProcess() {
210212
if (rescoreContext.window() > maxWindow) {
211213
throw new QueryPhaseExecutionException(this, "Rescore window [" + rescoreContext.window() + "] is too large. It must "
212214
+ "be less than [" + maxWindow + "]. This prevents allocating massive heaps for storing the results to be "
213-
+ "rescored. This limit can be set by chaning the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey()
215+
+ "rescored. This limit can be set by chaining the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey()
214216
+ "] index level setting.");
215217

216218
}
@@ -254,7 +256,17 @@ private ParsedQuery buildFilteredQuery() {
254256
@Override
255257
@Nullable
256258
public Query searchFilter(String[] types) {
257-
return createSearchFilter(types, aliasFilter, mapperService().hasNested());
259+
Query typesFilter = createSearchFilter(types, aliasFilter, mapperService().hasNested());
260+
if (sliceFilter == null) {
261+
return typesFilter;
262+
}
263+
if (typesFilter == null) {
264+
return sliceFilter;
265+
}
266+
return new BooleanQuery.Builder()
267+
.add(typesFilter, Occur.FILTER)
268+
.add(sliceFilter, Occur.FILTER)
269+
.build();
258270
}
259271

260272
// extracted to static helper method to make writing unit tests easier:
@@ -550,6 +562,11 @@ public FieldDoc searchAfter() {
550562
return searchAfter;
551563
}
552564

565+
public SearchContext sliceFilter(Query filter) {
566+
this.sliceFilter = filter;
567+
return this;
568+
}
569+
553570
@Override
554571
public SearchContext parsedPostFilter(ParsedQuery postFilter) {
555572
this.postFilter = postFilter;

core/src/main/java/org/elasticsearch/search/internal/ScrollContext.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,4 @@ public class ScrollContext {
2929
public float maxScore;
3030
public ScoreDoc lastEmittedDoc;
3131
public Scroll scroll;
32-
3332
}

core/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.lucene.search.Query;
2222
import org.apache.lucene.util.Counter;
23-
import org.elasticsearch.action.search.SearchType;
2423
import org.elasticsearch.index.query.ParsedQuery;
2524
import org.elasticsearch.search.aggregations.SearchContextAggregations;
2625
import org.elasticsearch.search.fetch.FetchSearchResult;
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.slice;
21+
22+
import org.apache.lucene.index.LeafReaderContext;
23+
import org.apache.lucene.index.DocValues;
24+
import org.apache.lucene.index.SortedNumericDocValues;
25+
import org.apache.lucene.search.IndexSearcher;
26+
import org.apache.lucene.search.Weight;
27+
import org.apache.lucene.search.RandomAccessWeight;
28+
import org.apache.lucene.util.Bits;
29+
30+
import java.io.IOException;
31+
32+
/**
33+
* A {@link SliceQuery} that uses the numeric doc values of a field to do the slicing.
34+
*
35+
* <b>NOTE</b>: With deterministic field values this query can be used across different readers safely.
36+
* If updates are accepted on the field you must ensure that the same reader is used for all `slice` queries.
37+
*/
38+
public final class DocValuesSliceQuery extends SliceQuery {
39+
public DocValuesSliceQuery(String field, int id, int max) {
40+
super(field, id, max);
41+
}
42+
43+
@Override
44+
public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException {
45+
return new RandomAccessWeight(this) {
46+
@Override
47+
protected Bits getMatchingDocs(final LeafReaderContext context) throws IOException {
48+
final SortedNumericDocValues values = DocValues.getSortedNumeric(context.reader(), getField());
49+
return new Bits() {
50+
@Override
51+
public boolean get(int doc) {
52+
values.setDocument(doc);
53+
for (int i = 0; i < values.count(); i++) {
54+
return contains(Long.hashCode(values.valueAt(i)));
55+
}
56+
return contains(0);
57+
}
58+
59+
@Override
60+
public int length() {
61+
return context.reader().maxDoc();
62+
}
63+
};
64+
}
65+
};
66+
}
67+
}

0 commit comments

Comments
 (0)