Skip to content

Add the ability to partition a scroll in multiple slices. #18237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.highlight.HighlightBuilder;
import org.elasticsearch.search.rescore.RescoreBuilder;
Expand Down Expand Up @@ -352,6 +353,11 @@ public SearchRequestBuilder searchAfter(Object[] values) {
return this;
}

public SearchRequestBuilder slice(SliceBuilder builder) {
sourceBuilder().slice(builder);
return this;
}

/**
* Applies when sorting, and controls if scores will be tracked as well. Defaults to
* <tt>false</tt>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,15 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter());
context.searchAfter(fieldDoc);
}

if (source.slice() != null) {
if (context.scrollContext() == null) {
throw new SearchContextException(context, "`slice` cannot be used outside of a scroll context");
}
context.sliceFilter(source.slice().toFilter(queryShardContext,
context.shardTarget().getShardId().getId(),
queryShardContext.getIndexSettings().getNumberOfShards()));
}
}

private static final int[] EMPTY_DOC_IDS = new int[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.slice.SliceBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.aggregations.PipelineAggregatorBuilder;
Expand Down Expand Up @@ -98,6 +99,7 @@ public final class SearchSourceBuilder extends ToXContentToBytes implements Writ
public static final ParseField EXT_FIELD = new ParseField("ext");
public static final ParseField PROFILE_FIELD = new ParseField("profile");
public static final ParseField SEARCH_AFTER = new ParseField("search_after");
public static final ParseField SLICE = new ParseField("slice");

public static SearchSourceBuilder fromXContent(QueryParseContext context, AggregatorParsers aggParsers,
Suggesters suggesters) throws IOException {
Expand Down Expand Up @@ -138,6 +140,8 @@ public static HighlightBuilder highlight() {

private SearchAfterBuilder searchAfterBuilder;

private SliceBuilder sliceBuilder;

private Float minScore;

private long timeoutInMillis = -1;
Expand Down Expand Up @@ -175,9 +179,7 @@ public SearchSourceBuilder() {
* Read from a stream.
*/
public SearchSourceBuilder(StreamInput in) throws IOException {
if (in.readBoolean()) {
aggregations = new AggregatorFactories.Builder(in);
}
aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 :)

explain = in.readOptionalBoolean();
fetchSourceContext = in.readOptionalStreamable(FetchSourceContext::new);
boolean hasFieldDataFields = in.readBoolean();
Expand Down Expand Up @@ -206,15 +208,9 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
indexBoost.put(in.readString(), in.readFloat());
}
}
if (in.readBoolean()) {
minScore = in.readFloat();
}
if (in.readBoolean()) {
postQueryBuilder = in.readNamedWriteable(QueryBuilder.class);
}
if (in.readBoolean()) {
queryBuilder = in.readNamedWriteable(QueryBuilder.class);
}
minScore = in.readOptionalFloat();
postQueryBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);
queryBuilder = in.readOptionalNamedWriteable(QueryBuilder.class);
if (in.readBoolean()) {
int size = in.readVInt();
rescoreBuilders = new ArrayList<>();
Expand Down Expand Up @@ -244,29 +240,20 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
stats.add(in.readString());
}
}
if (in.readBoolean()) {
suggestBuilder = new SuggestBuilder(in);
}
suggestBuilder = in.readOptionalWriteable(SuggestBuilder::new);
terminateAfter = in.readVInt();
timeoutInMillis = in.readLong();
trackScores = in.readBoolean();
version = in.readOptionalBoolean();
if (in.readBoolean()) {
ext = in.readBytesReference();
}
ext = in.readOptionalBytesReference();
profile = in.readBoolean();
if (in.readBoolean()) {
searchAfterBuilder = new SearchAfterBuilder(in);
}
searchAfterBuilder = in.readOptionalWriteable(SearchAfterBuilder::new);
sliceBuilder = in.readOptionalWriteable(SliceBuilder::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
boolean hasAggregations = aggregations != null;
out.writeBoolean(hasAggregations);
if (hasAggregations) {
aggregations.writeTo(out);
}
out.writeOptionalWriteable(aggregations);
out.writeOptionalBoolean(explain);
out.writeOptionalStreamable(fetchSourceContext);
boolean hasFieldDataFields = fieldDataFields != null;
Expand Down Expand Up @@ -296,21 +283,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeFloat(indexBoost.get(key.value));
}
}
boolean hasMinScore = minScore != null;
out.writeBoolean(hasMinScore);
if (hasMinScore) {
out.writeFloat(minScore);
}
boolean hasPostQuery = postQueryBuilder != null;
out.writeBoolean(hasPostQuery);
if (hasPostQuery) {
out.writeNamedWriteable(postQueryBuilder);
}
boolean hasQuery = queryBuilder != null;
out.writeBoolean(hasQuery);
if (hasQuery) {
out.writeNamedWriteable(queryBuilder);
}
out.writeOptionalFloat(minScore);
out.writeOptionalNamedWriteable(postQueryBuilder);
out.writeOptionalNamedWriteable(queryBuilder);
boolean hasRescoreBuilders = rescoreBuilders != null;
out.writeBoolean(hasRescoreBuilders);
if (hasRescoreBuilders) {
Expand Down Expand Up @@ -344,26 +319,15 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(stat);
}
}
boolean hasSuggestBuilder = suggestBuilder != null;
out.writeBoolean(hasSuggestBuilder);
if (hasSuggestBuilder) {
suggestBuilder.writeTo(out);
}
out.writeOptionalWriteable(suggestBuilder);
out.writeVInt(terminateAfter);
out.writeLong(timeoutInMillis);
out.writeBoolean(trackScores);
out.writeOptionalBoolean(version);
boolean hasExt = ext != null;
out.writeBoolean(hasExt);
if (hasExt) {
out.writeBytesReference(ext);
}
out.writeOptionalBytesReference(ext);
out.writeBoolean(profile);
boolean hasSearchAfter = searchAfterBuilder != null;
out.writeBoolean(hasSearchAfter);
if (hasSearchAfter) {
searchAfterBuilder.writeTo(out);
}
out.writeOptionalWriteable(searchAfterBuilder);
out.writeOptionalWriteable(sliceBuilder);
}

/**
Expand Down Expand Up @@ -597,6 +561,22 @@ public SearchSourceBuilder searchAfter(Object[] values) {
return this;
}

/**
* Sets a filter that will restrict the search hits, the top hits and the aggregations to a slice of the results
* of the main query.
*/
public SearchSourceBuilder slice(SliceBuilder builder) {
this.sliceBuilder = builder;
return this;
}

/**
* Gets the slice used to filter the search hits, the top hits and the aggregations.
*/
public SliceBuilder slice() {
return sliceBuilder;
}

/**
* Add an aggregation to perform as part of the search.
*/
Expand Down Expand Up @@ -943,6 +923,7 @@ private SearchSourceBuilder shallowCopy(QueryBuilder queryBuilder, QueryBuilder
rewrittenBuilder.rescoreBuilders = rescoreBuilders;
rewrittenBuilder.scriptFields = scriptFields;
rewrittenBuilder.searchAfterBuilder = searchAfterBuilder;
rewrittenBuilder.sliceBuilder = sliceBuilder;
rewrittenBuilder.size = size;
rewrittenBuilder.sorts = sorts;
rewrittenBuilder.stats = stats;
Expand Down Expand Up @@ -1039,6 +1020,8 @@ public void parseXContent(QueryParseContext context, AggregatorParsers aggParser
} else if (context.getParseFieldMatcher().match(currentFieldName, EXT_FIELD)) {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().copyCurrentStructure(parser);
ext = xContentBuilder.bytes();
} else if (context.getParseFieldMatcher().match(currentFieldName, SLICE)) {
sliceBuilder = SliceBuilder.fromXContent(context);
} else {
throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation());
Expand Down Expand Up @@ -1193,6 +1176,10 @@ public void innerToXContent(XContentBuilder builder, Params params) throws IOExc
builder.field(SEARCH_AFTER.getPreferredName(), searchAfterBuilder.getSortValues());
}

if (sliceBuilder != null) {
builder.field(SLICE.getPreferredName(), sliceBuilder);
}

if (indexBoost != null) {
builder.startObject(INDICES_BOOST_FIELD.getPreferredName());
assert !indexBoost.containsKey(null);
Expand Down Expand Up @@ -1355,7 +1342,7 @@ public boolean equals(Object obj) {
public int hashCode() {
return Objects.hash(aggregations, explain, fetchSourceContext, fieldDataFields, fieldNames, from,
highlightBuilder, indexBoost, minScore, postQueryBuilder, queryBuilder, rescoreBuilders, scriptFields,
size, sorts, searchAfterBuilder, stats, suggestBuilder, terminateAfter, timeoutInMillis, trackScores, version, profile);
size, sorts, searchAfterBuilder, sliceBuilder, stats, suggestBuilder, terminateAfter, timeoutInMillis, trackScores, version, profile);
}

@Override
Expand Down Expand Up @@ -1383,6 +1370,7 @@ public boolean equals(Object obj) {
&& Objects.equals(size, other.size)
&& Objects.equals(sorts, other.sorts)
&& Objects.equals(searchAfterBuilder, other.searchAfterBuilder)
&& Objects.equals(sliceBuilder, other.sliceBuilder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be used in the hashcode too

&& Objects.equals(stats, other.stats)
&& Objects.equals(suggestBuilder, other.suggestBuilder)
&& Objects.equals(terminateAfter, other.terminateAfter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,17 @@ public class DefaultSearchContext extends SearchContext {
private Float minimumScore;
private boolean trackScores = false; // when sorting, track scores as well...
private FieldDoc searchAfter;
// filter for sliced scroll
private Query sliceFilter;

/**
* The original query as sent by the user without the types and aliases
* applied. Putting things in here leaks them into highlighting so don't add
* things like the type filter or alias filters.
*/
private ParsedQuery originalQuery;
/**
* Just like originalQuery but with the filters from types and aliases
* applied.
* Just like originalQuery but with the filters from types, aliases and slice applied.
*/
private ParsedQuery filteredQuery;
/**
Expand Down Expand Up @@ -210,7 +212,7 @@ public void preProcess() {
if (rescoreContext.window() > maxWindow) {
throw new QueryPhaseExecutionException(this, "Rescore window [" + rescoreContext.window() + "] is too large. It must "
+ "be less than [" + maxWindow + "]. This prevents allocating massive heaps for storing the results to be "
+ "rescored. This limit can be set by chaning the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey()
+ "rescored. This limit can be set by chaining the [" + IndexSettings.MAX_RESCORE_WINDOW_SETTING.getKey()
+ "] index level setting.");

}
Expand Down Expand Up @@ -254,7 +256,17 @@ private ParsedQuery buildFilteredQuery() {
@Override
@Nullable
public Query searchFilter(String[] types) {
return createSearchFilter(types, aliasFilter, mapperService().hasNested());
Query typesFilter = createSearchFilter(types, aliasFilter, mapperService().hasNested());
if (sliceFilter == null) {
return typesFilter;
}
if (typesFilter == null) {
return sliceFilter;
}
return new BooleanQuery.Builder()
.add(typesFilter, Occur.FILTER)
.add(sliceFilter, Occur.FILTER)
.build();
}

// extracted to static helper method to make writing unit tests easier:
Expand Down Expand Up @@ -550,6 +562,11 @@ public FieldDoc searchAfter() {
return searchAfter;
}

public SearchContext sliceFilter(Query filter) {
this.sliceFilter = filter;
return this;
}

@Override
public SearchContext parsedPostFilter(ParsedQuery postFilter) {
this.postFilter = postFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,4 @@ public class ScrollContext {
public float maxScore;
public ScoreDoc lastEmittedDoc;
public Scroll scroll;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.lucene.search.Query;
import org.apache.lucene.util.Counter;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.search.aggregations.SearchContextAggregations;
import org.elasticsearch.search.fetch.FetchSearchResult;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.slice;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.RandomAccessWeight;
import org.apache.lucene.util.Bits;

import java.io.IOException;

/**
* A {@link SliceQuery} that uses the numeric doc values of a field to do the slicing.
*
* <b>NOTE</b>: With deterministic field values this query can be used across different readers safely.
* If updates are accepted on the field you must ensure that the same reader is used for all `slice` queries.
*/
public final class DocValuesSliceQuery extends SliceQuery {
public DocValuesSliceQuery(String field, int id, int max) {
super(field, id, max);
}

@Override
public Weight createWeight(IndexSearcher searcher, boolean needsScores) throws IOException {
return new RandomAccessWeight(this) {
@Override
protected Bits getMatchingDocs(final LeafReaderContext context) throws IOException {
final SortedNumericDocValues values = DocValues.getSortedNumeric(context.reader(), getField());
return new Bits() {
@Override
public boolean get(int doc) {
values.setDocument(doc);
for (int i = 0; i < values.count(); i++) {
return contains(Long.hashCode(values.valueAt(i)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Long.hashCode/BitMixer.mix64/ ? otherwise we might still have the issue with doubles given that Long.hashCode is a bit naive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I was naive too, I pushed another commit.

}
return contains(0);
}

@Override
public int length() {
return context.reader().maxDoc();
}
};
}
};
}
}
Loading