diff --git a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java index 927bce5d9d6dd..c4c61dc16f15d 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/NumberFieldMapper.java @@ -193,6 +193,19 @@ public Number parsePoint(byte[] value) { return HalfFloatPoint.decodeDimension(value, 0); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + float parsedValue = parse(value, coerce); + byte[] bytes = new byte[Integer.BYTES]; + HalfFloatPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return HalfFloatPoint.BYTES; + } + @Override public Float parse(XContentParser parser, boolean coerce) throws IOException { float parsed = parser.floatValue(coerce); @@ -290,6 +303,19 @@ public Number parsePoint(byte[] value) { return FloatPoint.decodeDimension(value, 0); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + float parsedValue = parse(value, coerce); + byte[] bytes = new byte[Integer.BYTES]; + FloatPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Integer.BYTES; + } + @Override public Float parse(XContentParser parser, boolean coerce) throws IOException { float parsed = parser.floatValue(coerce); @@ -376,6 +402,19 @@ public Number parsePoint(byte[] value) { return DoublePoint.decodeDimension(value, 0); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + double parsedValue = parse(value, coerce); + byte[] bytes = new byte[Long.BYTES]; + DoublePoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Long.BYTES; + } + @Override public Double parse(XContentParser parser, boolean coerce) throws IOException { double parsed = parser.doubleValue(coerce); @@ -473,6 +512,21 @@ public Number parsePoint(byte[] value) { return INTEGER.parsePoint(value).byteValue(); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + int parsedValue = parse(value, coerce); + + // Same as integer + byte[] bytes = new byte[Integer.BYTES]; + IntPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Integer.BYTES; + } + @Override public Short parse(XContentParser parser, boolean coerce) throws IOException { int value = parser.intValue(coerce); @@ -534,6 +588,21 @@ public Number parsePoint(byte[] value) { return INTEGER.parsePoint(value).shortValue(); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + int parsedValue = parse(value, coerce); + + // Same as integer + byte[] bytes = new byte[Integer.BYTES]; + IntPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Integer.BYTES; + } + @Override public Short parse(XContentParser parser, boolean coerce) throws IOException { return parser.shortValue(coerce); @@ -591,6 +660,19 @@ public Number parsePoint(byte[] value) { return IntPoint.decodeDimension(value, 0); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + int parsedValue = parse(value, coerce); + byte[] bytes = new byte[Integer.BYTES]; + IntPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Integer.BYTES; + } + @Override public Integer parse(XContentParser parser, boolean coerce) throws IOException { return parser.intValue(coerce); @@ -710,6 +792,19 @@ public Number parsePoint(byte[] value) { return LongPoint.decodeDimension(value, 0); } + @Override + public byte[] encodePoint(Number value, boolean coerce) { + long parsedValue = parse(value, coerce); + byte[] bytes = new byte[Long.BYTES]; + LongPoint.encodeDimension(parsedValue, bytes, 0); + return bytes; + } + + @Override + public int bytesPerEncodedPoint() { + return Long.BYTES; + } + @Override public Long parse(XContentParser parser, boolean coerce) throws IOException { return parser.longValue(coerce); @@ -827,6 +922,8 @@ public abstract Query rangeQuery(String field, Object lowerTerm, Object upperTer public abstract Number parse(XContentParser parser, boolean coerce) throws IOException; public abstract Number parse(Object value, boolean coerce); public abstract Number parsePoint(byte[] value); + public abstract byte[] encodePoint(Number value, boolean coerce); + public abstract int bytesPerEncodedPoint(); public abstract List createFields(String name, Number value, boolean indexed, boolean docValued, boolean stored); Number valueForSearch(Number value) { @@ -979,6 +1076,14 @@ public Number parsePoint(byte[] value) { return type.parsePoint(value); } + public byte[] encodePoint(Number value, boolean coerce) { + return type.encodePoint(value, coerce); + } + + public int bytesPerEncodedPoint() { + return type.bytesPerEncodedPoint(); + } + @Override public boolean equals(Object o) { if (super.equals(o) == false) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 81611f2c8e1b2..d37251a22981e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -164,18 +164,26 @@ public AggParseContext(String name) { public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0], new ArrayList<>()); - private AggregatorFactory[] factories; + protected AggregatorFactory[] factories; private List pipelineAggregatorFactories; public static Builder builder() { return new Builder(); } - private AggregatorFactories(AggregatorFactory[] factories, List pipelineAggregators) { + protected AggregatorFactories(AggregatorFactory[] factories, List pipelineAggregators) { this.factories = factories; this.pipelineAggregatorFactories = pipelineAggregators; } + public AggregatorFactory[] getFactories() { + return factories; + } + + public List getPipelineAggregatorFactories() { + return pipelineAggregatorFactories; + } + public List createPipelineAggregators() { List pipelineAggregators = new ArrayList<>(this.pipelineAggregatorFactories.size()); for (PipelineAggregationBuilder factory : this.pipelineAggregatorFactories) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java index 970ef725f027d..1d54a86200a3e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java @@ -37,15 +37,15 @@ public abstract class AggregatorFactory { public static final class MultiBucketAggregatorWrapper extends Aggregator { - private final BigArrays bigArrays; - private final Aggregator parent; - private final AggregatorFactory factory; + protected final BigArrays bigArrays; + protected final AggregatorFactory factory; + protected ObjectArray aggregators; + protected ObjectArray collectors; + protected final Aggregator parent; private final Aggregator first; - ObjectArray aggregators; - ObjectArray collectors; MultiBucketAggregatorWrapper(BigArrays bigArrays, SearchContext context, - Aggregator parent, AggregatorFactory factory, Aggregator first) { + Aggregator parent, AggregatorFactory factory, Aggregator first) { this.bigArrays = bigArrays; this.parent = parent; this.factory = factory; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java index d60851a2d7fef..55870b0573ce3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/AbstractRangeAggregatorFactory.java @@ -19,6 +19,11 @@ package org.elasticsearch.search.aggregations.bucket.range; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -31,10 +36,13 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.search.profile.Profilers; +import org.elasticsearch.search.profile.aggregation.ProfilingAggregator; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; public class AbstractRangeAggregatorFactory extends ValuesSourceAggregatorFactory { @@ -42,15 +50,15 @@ public class AbstractRangeAggregatorFactory extends ValuesSourc private final R[] ranges; private final boolean keyed; - public AbstractRangeAggregatorFactory(String name, - ValuesSourceConfig config, - R[] ranges, - boolean keyed, - InternalRange.Factory rangeFactory, - QueryShardContext queryShardContext, - AggregatorFactory parent, - AggregatorFactories.Builder subFactoriesBuilder, - Map metaData) throws IOException { + AbstractRangeAggregatorFactory(String name, + ValuesSourceConfig config, + R[] ranges, + boolean keyed, + InternalRange.Factory rangeFactory, + QueryShardContext queryShardContext, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metaData) throws IOException { super(name, config, queryShardContext, parent, subFactoriesBuilder, metaData); this.ranges = ranges; this.keyed = keyed; @@ -59,22 +67,92 @@ public AbstractRangeAggregatorFactory(String name, @Override protected Aggregator createUnmapped(SearchContext searchContext, - Aggregator parent, - List pipelineAggregators, - Map metaData) throws IOException { + Aggregator parent, + List pipelineAggregators, + Map metaData) throws IOException { return new Unmapped<>(name, ranges, keyed, config.format(), searchContext, parent, rangeFactory, pipelineAggregators, metaData); } @Override protected Aggregator doCreateInternal(Numeric valuesSource, - SearchContext searchContext, - Aggregator parent, - boolean collectsFromSingleBucket, - List pipelineAggregators, - Map metaData) throws IOException { - return new RangeAggregator(name, factories, valuesSource, config.format(), rangeFactory, ranges, keyed, searchContext, parent, - pipelineAggregators, metaData); + SearchContext searchContext, + Aggregator parent, + boolean collectsFromSingleBucket, + List pipelineAggregators, + Map metaData) throws IOException { + + // If we don't have a parent, the range agg can potentially optimize by using the BKD tree. But BKD + // traversal is per-range, which means that docs are potentially called out-of-order across multiple + // ranges. To prevent this from causing problems, we create a special AggregatorFactories that + // wraps all the sub-aggs with a MultiBucketAggregatorWrapper. This effectively creates a new agg + // sub-tree for each range and prevents out-of-order problems + BiFunction pointEncoder = configurePointEncoder(searchContext, parent, config); + AggregatorFactories wrappedFactories = factories; + if (pointEncoder != null) { + wrappedFactories = wrapSubAggsAsMultiBucket(factories); + } + + return new RangeAggregator(name, wrappedFactories, valuesSource, config, rangeFactory, ranges, keyed, searchContext, parent, + pipelineAggregators, metaData, pointEncoder); } + /** + * Returns a converter for point values if BKD optimization is applicable to + * the context or null otherwise. Optimization criteria is: + * - Match_all query + * - no parent agg + * - no script + * - no missing value + * - has indexed points + * + * @param context The {@link SearchContext} of the aggregation. + * @param parent The parent aggregator. + * @param config The config for the values source metric. + */ + private BiFunction configurePointEncoder(SearchContext context, Aggregator parent, + ValuesSourceConfig config) { + if (context.query() != null && + context.query().getClass() != MatchAllDocsQuery.class) { + return null; + } + if (parent != null) { + return null; + } + if (config.fieldContext() != null && config.script() == null && config.missing() == null) { + MappedFieldType fieldType = config.fieldContext().fieldType(); + if (fieldType == null || fieldType.indexOptions() == IndexOptions.NONE) { + return null; + } + if (fieldType instanceof NumberFieldMapper.NumberFieldType) { + return ((NumberFieldMapper.NumberFieldType) fieldType)::encodePoint; + } else if (fieldType.getClass() == DateFieldMapper.DateFieldType.class) { + return NumberFieldMapper.NumberType.LONG::encodePoint; + } + } + return null; + } + + /** + * Creates a new{@link AggregatorFactories} object so that sub-aggs are automatically + * wrapped with a {@link org.elasticsearch.search.aggregations.AggregatorFactory.MultiBucketAggregatorWrapper}. + * This allows sub-aggs to execute in their own isolated sub tree + */ + private static AggregatorFactories wrapSubAggsAsMultiBucket(AggregatorFactories factories) { + return new AggregatorFactories(factories.getFactories(), factories.getPipelineAggregatorFactories()) { + @Override + public Aggregator[] createSubAggregators(SearchContext searchContext, Aggregator parent) throws IOException { + Aggregator[] aggregators = new Aggregator[countAggregators()]; + for (int i = 0; i < this.factories.length; ++i) { + Aggregator factory = asMultiBucketAggregator(factories[i], searchContext, parent); + Profilers profilers = factory.context().getProfilers(); + if (profilers != null) { + factory = new ProfilingAggregator(factory, profilers.getAggregationProfiler()); + } + aggregators[i] = factory; + } + return aggregators; + } + }; + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java index 711297762b8d4..eeabad42ce577 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/GeoDistanceRangeAggregatorFactory.java @@ -81,9 +81,8 @@ protected Aggregator doCreateInternal(final ValuesSource.GeoPoint valuesSource, List pipelineAggregators, Map metaData) throws IOException { DistanceSource distanceSource = new DistanceSource(valuesSource, distanceType, origin, unit); - return new RangeAggregator(name, factories, distanceSource, config.format(), rangeFactory, ranges, keyed, searchContext, - parent, - pipelineAggregators, metaData); + return new RangeAggregator(name, factories, distanceSource, config, rangeFactory, ranges, keyed, searchContext, + parent, pipelineAggregators, metaData, null); } private static class DistanceSource extends ValuesSource.Numeric { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java index c4e2d1fc4394e..0b866acb84d6e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java @@ -19,8 +19,14 @@ package org.elasticsearch.search.aggregations.bucket.range; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.DocIdSetBuilder; import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -40,6 +46,7 @@ import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -47,6 +54,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiFunction; + +import static org.apache.lucene.util.FutureArrays.compareUnsigned; public class RangeAggregator extends BucketsAggregator { @@ -217,33 +227,53 @@ public boolean equals(Object obj) { } } - final ValuesSource.Numeric valuesSource; + private final ValuesSource.Numeric valuesSource; final DocValueFormat format; final Range[] ranges; + final boolean keyed; - final InternalRange.Factory rangeFactory; + private final InternalRange.Factory rangeFactory; + private final double[] maxTo; - final double[] maxTo; + private final String pointField; + private final boolean canOptimize; + private final byte[][] encodedRanges; - public RangeAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format, - InternalRange.Factory rangeFactory, Range[] ranges, boolean keyed, SearchContext context, - Aggregator parent, List pipelineAggregators, Map metaData) throws IOException { + + public RangeAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, ValuesSourceConfig config, + InternalRange.Factory rangeFactory, Range[] ranges, boolean keyed, SearchContext context, Aggregator parent, + List pipelineAggregators, Map metaData, + BiFunction pointEncoder) throws IOException { super(name, factories, context, parent, pipelineAggregators, metaData); assert valuesSource != null; this.valuesSource = valuesSource; - this.format = format; + this.format = config.format(); this.keyed = keyed; this.rangeFactory = rangeFactory; - this.ranges = ranges; - maxTo = new double[this.ranges.length]; - maxTo[0] = this.ranges[0].to; - for (int i = 1; i < this.ranges.length; ++i) { - maxTo[i] = Math.max(this.ranges[i].to,maxTo[i-1]); + maxTo = new double[ranges.length]; + maxTo[0] = ranges[0].to; + for (int i = 1; i < ranges.length; ++i) { + maxTo[i] = Math.max(ranges[i].to, maxTo[i-1]); } + if (pointEncoder != null) { + pointField = config.fieldContext().field(); + encodedRanges = new byte[ranges.length * 2][]; + for (int i = 0; i < ranges.length; i++) { + byte[] from = Double.isFinite(ranges[i].from) ? pointEncoder.apply(ranges[i].from, false) : null; + byte[] to = Double.isFinite(ranges[i].to) ? pointEncoder.apply(ranges[i].to, false) : null; + encodedRanges[i*2] = from; + encodedRanges[i*2 + 1] = to; + } + canOptimize = true; + } else { + pointField = null; + canOptimize = false; + encodedRanges = null; + } } @Override @@ -257,6 +287,24 @@ public ScoreMode scoreMode() { @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + + if (valuesSource == null) { + if (parent != null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } else { + // we have no parent and the values source is empty so we can skip collecting hits. + throw new CollectionTerminatedException(); + } + } + + if (canOptimize) { + // if we can optimize, and we decide the optimization is better than DV collection, + // this will use the BKD to collect hits and then throw a CollectionTerminatedException + tryBKDOptimization(ctx, sub); + } + + // We either cannot optimize, or have decided DVs would be faster so + // fall back to collecting all the values from DVs directly final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx); return new LeafBucketCollectorBase(sub, values) { @Override @@ -265,12 +313,14 @@ public void collect(int doc, long bucket) throws IOException { final int valuesCount = values.docValueCount(); for (int i = 0, lo = 0; i < valuesCount; ++i) { final double value = values.nextValue(); - lo = collect(doc, value, bucket, lo); + lo = collectValue(doc, value, bucket, lo, sub); } } } + }; + } - private int collect(int doc, double value, long owningBucketOrdinal, int lowBound) throws IOException { + private int collectValue(int doc, double value, long owningBucketOrdinal, int lowBound, LeafBucketCollector sub) throws IOException { int lo = lowBound, hi = ranges.length - 1; // all candidates are between these indexes int mid = (lo + hi) >>> 1; while (lo <= hi) { @@ -312,12 +362,140 @@ private int collect(int doc, double value, long owningBucketOrdinal, int lowBoun for (int i = startLo; i <= endHi; ++i) { if (ranges[i].matches(value)) { - collectBucket(sub, doc, subBucketOrdinal(owningBucketOrdinal, i)); + collectBucket(sub, doc, subBucketOrdinal(owningBucketOrdinal, i)); } } return endHi + 1; } + + /** + * Attempt to collect these ranges via the BKD tree instead of DocValues. + * + * This estimates the number of matching points in the BKD tree. If it is + * less than 75% of maxDoc we attempt to use the BKD tree to collect values. + * The BKD tree is potentially much faster than DV collection because + * we only need to inspect leaves that overlap each range, rather than + * collecting all the values as with DVs. And since we only care about doc + * counts, we don't need to decode values when an entire leaf matches. + * + * If we use the BKD tree, when it is done collecting values a + * {@link CollectionTerminatedException} is thrown to signal completion + */ + private void tryBKDOptimization(LeafReaderContext ctx, LeafBucketCollector sub) throws CollectionTerminatedException, IOException { + final PointValues pointValues = ctx.reader().getPointValues(pointField); + if (pointValues != null) { + final Bits liveDocs = ctx.reader().getLiveDocs(); + int maxDoc = ctx.reader().maxDoc(); + + try { + // pre-allocate what our DocIdSetBuilder will use as worst-case estimate + addRequestCircuitBreakerBytes(maxDoc / 8); + + // We collect ranges individually since a doc can land in multiple ranges. + for (int i = 0; i < ranges.length; i++) { + DocIdSetBuilder result = new DocIdSetBuilder(maxDoc); + PointValues.IntersectVisitor visitor = getVisitor(liveDocs, encodedRanges[i * 2], encodedRanges[i * 2 + 1], result); + + pointValues.intersect(visitor); + DocIdSetIterator iter = result.build().iterator(); + while (iter.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + // Now that we know the matching docs, collect the bucket and sub-aggs + // + // NOTE: because we're in the BKD optimization, we know there is no parent agg + // and bucket ordinals are zero-based offset by range ordinal + collectBucket(sub, iter.docID(), i); + } + } + throw new CollectionTerminatedException(); + + } catch (CircuitBreakingException e) { + // If we tripped the breaker the DocIdSetBuilder is (potentially) too large. + // Exit without throwing CollectionTerminatedException so we can fall back to old method + } finally { + // Make sure we account for DocIdSetBuilder deallocation + addRequestCircuitBreakerBytes(-maxDoc / 8); + } + } + } + + /** + * Returns a BKD intersection visitor for the provided range (`from` inclusive, `to` exclusive) + */ + private PointValues.IntersectVisitor getVisitor(Bits liveDocs, byte[] from, byte[] to, DocIdSetBuilder result) { + + return new PointValues.IntersectVisitor() { + DocIdSetBuilder.BulkAdder adder; + + @Override + public void grow(int count) { + adder = result.grow(count); + } + + @Override + public void visit(int docID) { + if ((liveDocs == null || liveDocs.get(docID))) { + adder.add(docID); + } + } + + @Override + public void visit(int docID, byte[] packedValue) { + int packedLength = packedValue.length; + + // Value is inside range if value >= from && value < to + boolean inside = (from == null || compareUnsigned(packedValue, 0, packedValue.length, from, 0, from.length) >= 0) + && (to == null || compareUnsigned(packedValue, 0, packedLength, to, 0, to.length) < 0); + + if (inside) { + visit(docID); + } + } + + @Override + public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOException { + int packedLength = packedValue.length; + + // Value is inside range if value >= from && value < to + boolean inside = (from == null || compareUnsigned(packedValue, 0, packedValue.length, from, 0, from.length) >= 0) + && (to == null || compareUnsigned(packedValue, 0, packedLength, to, 0, to.length) < 0); + + if (inside) { + while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + visit(iterator.docID()); + } + } + } + + @Override + public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { + + int packedLength = minPackedValue.length; + + // max < from (exclusive, since ranges are inclusive on from) + if (from != null && compareUnsigned(maxPackedValue, 0, packedLength, from, 0, from.length) < 0) { + return PointValues.Relation.CELL_OUTSIDE_QUERY; + } + // min >= to (inclusive, since ranges are exclusive on to) + if (to != null && compareUnsigned(minPackedValue, 0, packedLength, to, 0, to.length) >= 0) { + return PointValues.Relation.CELL_OUTSIDE_QUERY; + } + + // Leaf is fully inside this range if min >= from && max < to + if ( + // `from` is unbounded or `min >= from` + (from == null || compareUnsigned(minPackedValue, 0, packedLength, from, 0, from.length) >= 0) + && + // `to` is unbounded or `max < to` + (to == null || compareUnsigned(maxPackedValue, 0, packedLength, to, 0, to.length) < 0) + ) { + return PointValues.Relation.CELL_INSIDE_QUERY; + } + + // If we're not outside, and not fully inside, we must be crossing + return PointValues.Relation.CELL_CROSSES_QUERY; + + } }; } @@ -328,13 +506,13 @@ private long subBucketOrdinal(long owningBucketOrdinal, int rangeOrd) { @Override public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException { consumeBucketsAndMaybeBreak(ranges.length); - List buckets = new ArrayList<>(ranges.length); + List buckets = new ArrayList<>(ranges.length); for (int i = 0; i < ranges.length; i++) { Range range = ranges[i]; final long bucketOrd = subBucketOrdinal(owningBucketOrdinal, i); - org.elasticsearch.search.aggregations.bucket.range.Range.Bucket bucket = - rangeFactory.createBucket(range.key, range.from, range.to, bucketDocCount(bucketOrd), - bucketAggregations(bucketOrd), keyed, format); + InternalRange.Bucket bucket = + rangeFactory.createBucket(range.key, range.from, range.to, bucketDocCount(bucketOrd), + bucketAggregations(bucketOrd), keyed, format); buckets.add(bucket); } // value source can be null in the case of unmapped fields @@ -344,11 +522,10 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE @Override public InternalAggregation buildEmptyAggregation() { InternalAggregations subAggs = buildEmptySubAggregations(); - List buckets = new ArrayList<>(ranges.length); + List buckets = new ArrayList<>(ranges.length); for (int i = 0; i < ranges.length; i++) { Range range = ranges[i]; - org.elasticsearch.search.aggregations.bucket.range.Range.Bucket bucket = - rangeFactory.createBucket(range.key, range.from, range.to, 0, subAggs, keyed, format); + InternalRange.Bucket bucket = rangeFactory.createBucket(range.key, range.from, range.to, 0, subAggs, keyed, format); buckets.add(bucket); } // value source can be null in the case of unmapped fields diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java index f7a1ce30d1ed7..5a33251bcdf24 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/RangeIT.java @@ -18,9 +18,11 @@ */ package org.elasticsearch.search.aggregations.bucket; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.fielddata.ScriptDocValues; import org.elasticsearch.plugins.Plugin; @@ -33,6 +35,7 @@ import org.elasticsearch.search.aggregations.bucket.range.Range; import org.elasticsearch.search.aggregations.bucket.range.Range.Bucket; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.InternalMax; import org.elasticsearch.search.aggregations.metrics.Sum; import org.elasticsearch.test.ESIntegTestCase; import org.hamcrest.Matchers; @@ -48,6 +51,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.max; import static org.elasticsearch.search.aggregations.AggregationBuilders.range; import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; @@ -66,6 +70,7 @@ public class RangeIT extends ESIntegTestCase { private static final String MULTI_VALUED_FIELD_NAME = "l_values"; static int numDocs; + static int numDocsBigIndex; @Override protected Collection> nodePlugins() { @@ -116,6 +121,19 @@ public void setupSuiteScopeCluster() throws Exception { .endObject())); } + createIndex("idx_big"); + numDocsBigIndex = 5000; + BulkRequestBuilder bulkBuilder = client().prepareBulk("idx_big"); + for (int i = 0; i < numDocsBigIndex; i++) { + bulkBuilder.add(client().prepareIndex().setSource(jsonBuilder() + .startObject() + .field(SINGLE_VALUED_FIELD_NAME, i+1) + .startArray(MULTI_VALUED_FIELD_NAME).value(i+1).value(i+2).endArray() + .endObject())); + } + bulkBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + client().admin().indices().prepareForceMerge("idx_big").setMaxNumSegments(1).get(); + // Create two indices and add the field 'route_length_miles' as an alias in // one, and a concrete field in the other. prepareCreate("old_index") @@ -243,6 +261,47 @@ public void testSingleValueField() throws Exception { assertThat(bucket.getDocCount(), equalTo(numDocs - 5L)); } + public void testMultipleRangesWithMaxBKDOptimization() throws Exception { + SearchResponse response = client().prepareSearch("idx_big") + .addAggregation(range("range") + .field(SINGLE_VALUED_FIELD_NAME) + .addRange(3, 6) + .addRange(6, 10) + .subAggregation(max("the_max").field(SINGLE_VALUED_FIELD_NAME))) + .get(); + + assertSearchResponse(response); + + + Range range = response.getAggregations().get("range"); + assertThat(range, notNullValue()); + assertThat(range.getName(), equalTo("range")); + List buckets = range.getBuckets(); + assertThat(buckets.size(), equalTo(2)); + + Range.Bucket bucket = buckets.get(0); + assertThat(bucket, notNullValue()); + assertThat(bucket.getKey(), equalTo("3.0-6.0")); + assertThat(((Number) bucket.getFrom()).doubleValue(), equalTo(3.0)); + assertThat(((Number) bucket.getTo()).doubleValue(), equalTo(6.0)); + assertThat(bucket.getFromAsString(), equalTo("3.0")); + assertThat(bucket.getToAsString(), equalTo("6.0")); + assertThat(bucket.getDocCount(), equalTo(3L)); + InternalMax max = bucket.getAggregations().get("the_max"); + assertThat(max.getValue(), equalTo(5.0)); + + bucket = buckets.get(1); + assertThat(bucket, notNullValue()); + assertThat(bucket.getKey(), equalTo("6.0-10.0")); + assertThat(((Number) bucket.getFrom()).doubleValue(), equalTo(6.0)); + assertThat(((Number) bucket.getTo()).doubleValue(), equalTo(10.0)); + assertThat(bucket.getFromAsString(), equalTo("6.0")); + assertThat(bucket.getToAsString(), equalTo("10.0")); + assertThat(bucket.getDocCount(), equalTo(4L)); + max = bucket.getAggregations().get("the_max"); + assertThat(max.getValue(), equalTo(9.0)); + } + public void testSingleValueFieldWithFormat() throws Exception { SearchResponse response = client() .prepareSearch("idx")