diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java index 1a65c577e0da7..2fc356184dcb4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java @@ -32,7 +32,7 @@ public class InternalSum extends InternalNumericMetricsAggregation.SingleValue implements Sum { private final double sum; - public InternalSum(String name, double sum, DocValueFormat formatter, Map metadata) { + public InternalSum(String name, double sum, DocValueFormat formatter, Map metadata) { super(name, metadata); this.sum = sum; this.format = formatter; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MinAggregator.java index 7cb1f015342c8..f83c06ab2b295 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MinAggregator.java @@ -48,7 +48,7 @@ import java.util.Map; import java.util.function.Function; -class MinAggregator extends NumericMetricsAggregator.SingleValue { +public class MinAggregator extends NumericMetricsAggregator.SingleValue { private static final int MAX_BKD_LOOKUPS = 1024; final ValuesSource.Numeric valuesSource; @@ -168,7 +168,7 @@ public void doClose() { * @param parent The parent aggregator. * @param config The config for the values source metric. */ - static Function getPointReaderOrNull(SearchContext context, Aggregator parent, + public static Function getPointReaderOrNull(SearchContext context, Aggregator parent, ValuesSourceConfig config) { if (context.query() != null && context.query().getClass() != MatchAllDocsQuery.class) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ValueCountAggregatorSupplier.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ValueCountAggregatorSupplier.java index f882afaf1d076..1bfda814a19ae 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ValueCountAggregatorSupplier.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ValueCountAggregatorSupplier.java @@ -7,7 +7,7 @@ * not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an diff --git a/x-pack/plugin/mapper-aggregate-metric/build.gradle b/x-pack/plugin/mapper-aggregate-metric/build.gradle index 73409b7293c6e..1ee27ba17b245 100644 --- a/x-pack/plugin/mapper-aggregate-metric/build.gradle +++ b/x-pack/plugin/mapper-aggregate-metric/build.gradle @@ -7,7 +7,6 @@ evaluationDependsOn(xpackModule('core')) apply plugin: 'elasticsearch.esplugin' - esplugin { name 'x-pack-aggregate-metric' description 'Module for the aggregate_metric field type, which allows pre-aggregated fields to be stored a single field.' @@ -16,7 +15,12 @@ esplugin { } archivesBaseName = 'x-pack-aggregate-metric' +compileJava.options.compilerArgs << "-Xlint:-rawtypes" +compileTestJava.options.compilerArgs << "-Xlint:-rawtypes" + dependencies { + compileOnly project(":server") + compileOnly project(path: xpackModule('core'), configuration: 'default') testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') } diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/AggregateMetricMapperPlugin.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/AggregateMetricMapperPlugin.java index 0279965216442..ebb484c1ca671 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/AggregateMetricMapperPlugin.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/AggregateMetricMapperPlugin.java @@ -12,6 +12,9 @@ import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.xpack.aggregatemetric.aggregations.metrics.AggregateMetricsAggregatorsRegistrar; import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; @@ -19,10 +22,11 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import static java.util.Collections.singletonMap; -public class AggregateMetricMapperPlugin extends Plugin implements MapperPlugin, ActionPlugin { +public class AggregateMetricMapperPlugin extends Plugin implements MapperPlugin, ActionPlugin, SearchPlugin { @Override public Map getMappers() { @@ -37,4 +41,14 @@ public Map getMappers() { ); } + @Override + public List> getAggregationExtentions() { + return List.of( + AggregateMetricsAggregatorsRegistrar::registerSumAggregator, + AggregateMetricsAggregatorsRegistrar::registerAvgAggregator, + AggregateMetricsAggregatorsRegistrar::registerMinAggregator, + AggregateMetricsAggregatorsRegistrar::registerMaxAggregator, + AggregateMetricsAggregatorsRegistrar::registerValueCountAggregator + ); + } } diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedAvgAggregator.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedAvgAggregator.java new file mode 100644 index 0000000000000..c50ebcabc58ce --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedAvgAggregator.java @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.CompensatedSum; +import org.elasticsearch.search.aggregations.metrics.InternalAvg; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSource; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; +import java.util.Map; + +class AggregateMetricBackedAvgAggregator extends NumericMetricsAggregator.SingleValue { + + final AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource; + + LongArray counts; + DoubleArray sums; + DoubleArray compensations; + DocValueFormat format; + + AggregateMetricBackedAvgAggregator( + String name, + AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource, + DocValueFormat formatter, + SearchContext context, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, context, parent, metadata); + this.valuesSource = valuesSource; + this.format = formatter; + if (valuesSource != null) { + final BigArrays bigArrays = context.bigArrays(); + counts = bigArrays.newLongArray(1, true); + sums = bigArrays.newDoubleArray(1, true); + compensations = bigArrays.newDoubleArray(1, true); + } + } + + @Override + public ScoreMode scoreMode() { + return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final BigArrays bigArrays = context.bigArrays(); + // Retrieve aggregate values for metrics sum and value_count + final SortedNumericDoubleValues aggregateSums = valuesSource.getAggregateMetricValues(ctx, Metric.sum); + final SortedNumericDoubleValues aggregateValueCounts = valuesSource.getAggregateMetricValues(ctx, Metric.value_count); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + return new LeafBucketCollectorBase(sub, sums) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + + // Read aggregate values for sums + if (aggregateSums.advanceExact(doc)) { + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + + kahanSummation.reset(sum, compensation); + for (int i = 0; i < aggregateSums.docValueCount(); i++) { + double value = aggregateSums.nextValue(); + kahanSummation.add(value); + } + + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); + } + + counts = bigArrays.grow(counts, bucket + 1); + // Read aggregate values for value_count + if (aggregateValueCounts.advanceExact(doc)) { + for (int i = 0; i < aggregateValueCounts.docValueCount(); i++) { + double d = aggregateValueCounts.nextValue(); + long value = Double.valueOf(d).longValue(); + counts.increment(bucket, value); + } + } + } + }; + } + + @Override + public double metric(long owningBucketOrd) { + if (valuesSource == null || owningBucketOrd >= sums.size()) { + return Double.NaN; + } + return sums.get(owningBucketOrd) / counts.get(owningBucketOrd); + } + + @Override + public InternalAggregation buildAggregation(long bucket) { + if (valuesSource == null || bucket >= sums.size()) { + return buildEmptyAggregation(); + } + return new InternalAvg(name, sums.get(bucket), counts.get(bucket), format, metadata()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalAvg(name, 0.0, 0L, format, metadata()); + } + + @Override + public void doClose() { + Releasables.close(counts, sums, compensations); + } + +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMaxAggregator.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMaxAggregator.java new file mode 100644 index 0000000000000..01422e3811469 --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMaxAggregator.java @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.index.fielddata.NumericDoubleValues; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.InternalMax; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSource; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; +import java.util.Map; + +class AggregateMetricBackedMaxAggregator extends NumericMetricsAggregator.SingleValue { + + private final AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource; + final DocValueFormat formatter; + DoubleArray maxes; + + AggregateMetricBackedMaxAggregator( + String name, + ValuesSourceConfig config, + AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource, + SearchContext context, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, context, parent, metadata); + this.valuesSource = valuesSource; + if (valuesSource != null) { + maxes = context.bigArrays().newDoubleArray(1, false); + maxes.fill(0, maxes.size(), Double.NEGATIVE_INFINITY); + } + this.formatter = config.format(); + } + + @Override + public ScoreMode scoreMode() { + return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + if (parent != null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } else { + // we have no parent and the values source is empty so we can skip collecting hits. + throw new CollectionTerminatedException(); + } + } + + final BigArrays bigArrays = context.bigArrays(); + final SortedNumericDoubleValues allValues = valuesSource.getAggregateMetricValues(ctx, Metric.max); + final NumericDoubleValues values = MultiValueMode.MAX.select(allValues); + return new LeafBucketCollectorBase(sub, allValues) { + + @Override + public void collect(int doc, long bucket) throws IOException { + if (bucket >= maxes.size()) { + long from = maxes.size(); + maxes = bigArrays.grow(maxes, bucket + 1); + maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY); + } + if (values.advanceExact(doc)) { + final double value = values.doubleValue(); + double max = maxes.get(bucket); + max = Math.max(max, value); + maxes.set(bucket, max); + } + } + }; + } + + @Override + public double metric(long owningBucketOrd) { + if (valuesSource == null || owningBucketOrd >= maxes.size()) { + return Double.NEGATIVE_INFINITY; + } + return maxes.get(owningBucketOrd); + } + + @Override + public InternalAggregation buildAggregation(long bucket) { + if (valuesSource == null || bucket >= maxes.size()) { + return buildEmptyAggregation(); + } + return new InternalMax(name, maxes.get(bucket), formatter, metadata()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalMax(name, Double.NEGATIVE_INFINITY, formatter, metadata()); + } + + @Override + public void doClose() { + Releasables.close(maxes); + } +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMinAggregator.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMinAggregator.java new file mode 100644 index 0000000000000..4a973e1a96ee1 --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMinAggregator.java @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.index.fielddata.NumericDoubleValues; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.InternalMin; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSource; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; +import java.util.Map; + +class AggregateMetricBackedMinAggregator extends NumericMetricsAggregator.SingleValue { + + private final AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource; + final DocValueFormat format; + DoubleArray mins; + + AggregateMetricBackedMinAggregator( + String name, + ValuesSourceConfig config, + AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource, + SearchContext context, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, context, parent, metadata); + this.valuesSource = valuesSource; + if (valuesSource != null) { + mins = context.bigArrays().newDoubleArray(1, false); + mins.fill(0, mins.size(), Double.POSITIVE_INFINITY); + } + this.format = config.format(); + } + + @Override + public ScoreMode scoreMode() { + return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + if (parent == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } else { + // we have no parent and the values source is empty so we can skip collecting hits. + throw new CollectionTerminatedException(); + } + } + + final BigArrays bigArrays = context.bigArrays(); + final SortedNumericDoubleValues allValues = valuesSource.getAggregateMetricValues(ctx, Metric.min); + final NumericDoubleValues values = MultiValueMode.MIN.select(allValues); + return new LeafBucketCollectorBase(sub, allValues) { + + @Override + public void collect(int doc, long bucket) throws IOException { + if (bucket >= mins.size()) { + long from = mins.size(); + mins = bigArrays.grow(mins, bucket + 1); + mins.fill(from, mins.size(), Double.POSITIVE_INFINITY); + } + if (values.advanceExact(doc)) { + final double value = values.doubleValue(); + double min = mins.get(bucket); + min = Math.min(min, value); + mins.set(bucket, min); + } + } + }; + } + + @Override + public double metric(long owningBucketOrd) { + if (valuesSource == null || owningBucketOrd >= mins.size()) { + return Double.POSITIVE_INFINITY; + } + return mins.get(owningBucketOrd); + } + + @Override + public InternalAggregation buildAggregation(long bucket) { + if (valuesSource == null || bucket >= mins.size()) { + return buildEmptyAggregation(); + } + return new InternalMin(name, mins.get(bucket), format, metadata()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalMin(name, Double.POSITIVE_INFINITY, format, metadata()); + } + + @Override + public void doClose() { + Releasables.close(mins); + } +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedSumAggregator.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedSumAggregator.java new file mode 100644 index 0000000000000..56bade52824f1 --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedSumAggregator.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.CompensatedSum; +import org.elasticsearch.search.aggregations.metrics.InternalSum; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSource; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; +import java.util.Map; + +class AggregateMetricBackedSumAggregator extends NumericMetricsAggregator.SingleValue { + + private final AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource; + private final DocValueFormat format; + + private DoubleArray sums; + private DoubleArray compensations; + + AggregateMetricBackedSumAggregator( + String name, + AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource, + DocValueFormat formatter, + SearchContext context, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, context, parent, metadata); + this.valuesSource = valuesSource; + this.format = formatter; + if (valuesSource != null) { + sums = context.bigArrays().newDoubleArray(1, true); + compensations = context.bigArrays().newDoubleArray(1, true); + } + } + + @Override + public ScoreMode scoreMode() { + return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final BigArrays bigArrays = context.bigArrays(); + final SortedNumericDoubleValues values = valuesSource.getAggregateMetricValues(ctx, Metric.sum); + final CompensatedSum kahanSummation = new CompensatedSum(0, 0); + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + sums = bigArrays.grow(sums, bucket + 1); + compensations = bigArrays.grow(compensations, bucket + 1); + + if (values.advanceExact(doc)) { + final int valuesCount = values.docValueCount(); // For aggregate metric this should always equal to 1 + // Compute the sum of double values with Kahan summation algorithm which is more + // accurate than naive summation. + double sum = sums.get(bucket); + double compensation = compensations.get(bucket); + kahanSummation.reset(sum, compensation); + + for (int i = 0; i < valuesCount; i++) { + double value = values.nextValue(); + kahanSummation.add(value); + } + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); + } + } + }; + } + + @Override + public double metric(long owningBucketOrd) { + if (valuesSource == null || owningBucketOrd >= sums.size()) { + return 0.0; + } + return sums.get(owningBucketOrd); + } + + @Override + public InternalAggregation buildAggregation(long bucket) { + if (valuesSource == null || bucket >= sums.size()) { + return buildEmptyAggregation(); + } + return new InternalSum(name, sums.get(bucket), format, metadata()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalSum(name, 0.0, format, metadata()); + } + + @Override + public void doClose() { + Releasables.close(sums, compensations); + } +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedValueCountAggregator.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedValueCountAggregator.java new file mode 100644 index 0000000000000..c6d79f45df8c1 --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedValueCountAggregator.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.metrics.InternalValueCount; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator; +import org.elasticsearch.search.internal.SearchContext; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSource; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; + +import java.io.IOException; +import java.util.Map; + +/** + * A field data based aggregator that adds all values in the value_count metric sub-field from an aggregate_metric field. + * This aggregator works in a multi-bucket mode, that is, when serves as a sub-aggregator, a single aggregator instance + * aggregates the counts for all buckets owned by the parent aggregator) + */ +class AggregateMetricBackedValueCountAggregator extends NumericMetricsAggregator.SingleValue { + + private final AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource; + + // a count per bucket + LongArray counts; + + AggregateMetricBackedValueCountAggregator( + String name, + AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource, + SearchContext aggregationContext, + Aggregator parent, + Map metadata + ) throws IOException { + super(name, aggregationContext, parent, metadata); + this.valuesSource = valuesSource; + if (valuesSource != null) { + counts = context.bigArrays().newLongArray(1, true); + } + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final BigArrays bigArrays = context.bigArrays(); + final SortedNumericDoubleValues values = valuesSource.getAggregateMetricValues( + ctx, + AggregateDoubleMetricFieldMapper.Metric.value_count + ); + + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long bucket) throws IOException { + counts = bigArrays.grow(counts, bucket + 1); + if (values.advanceExact(doc)) { + for (int i = 0; i < values.docValueCount(); i++) { // For aggregate metric this should always equal to 1 + long value = Double.valueOf(values.nextValue()).longValue(); + counts.increment(bucket, value); + } + } + } + }; + } + + @Override + public double metric(long owningBucketOrd) { + return (valuesSource == null || owningBucketOrd >= counts.size()) ? 0 : counts.get(owningBucketOrd); + } + + @Override + public InternalAggregation buildAggregation(long bucket) { + if (valuesSource == null || bucket >= counts.size()) { + return buildEmptyAggregation(); + } + return new InternalValueCount(name, counts.get(bucket), metadata()); + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return new InternalValueCount(name, 0L, metadata()); + } + + @Override + public void doClose() { + Releasables.close(counts); + } + +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricsAggregatorsRegistrar.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricsAggregatorsRegistrar.java new file mode 100644 index 0000000000000..3c1b04c1c6fa9 --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricsAggregatorsRegistrar.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MetricAggregatorSupplier; +import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MinMaxAggregatorSupplier; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.ValueCountAggregatorSupplier; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSource; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSourceType; + +/** + * Utility class providing static methods to register aggregators for the aggregate_metric values source + */ +public class AggregateMetricsAggregatorsRegistrar { + + public static void registerSumAggregator(ValuesSourceRegistry.Builder builder) { + builder.register( + SumAggregationBuilder.NAME, + AggregateMetricsValuesSourceType.AGGREGATE_METRIC, + (MetricAggregatorSupplier) (name, valuesSource, formatter, context, parent, metadata) -> new AggregateMetricBackedSumAggregator( + name, + (AggregateMetricsValuesSource.AggregateDoubleMetric) valuesSource, + formatter, + context, + parent, + metadata + ) + ); + } + + public static void registerAvgAggregator(ValuesSourceRegistry.Builder builder) { + builder.register( + AvgAggregationBuilder.NAME, + AggregateMetricsValuesSourceType.AGGREGATE_METRIC, + (MetricAggregatorSupplier) (name, valuesSource, formatter, context, parent, metadata) -> new AggregateMetricBackedAvgAggregator( + name, + (AggregateMetricsValuesSource.AggregateDoubleMetric) valuesSource, + formatter, + context, + parent, + metadata + ) + ); + } + + public static void registerMinAggregator(ValuesSourceRegistry.Builder builder) { + builder.register( + MinAggregationBuilder.NAME, + AggregateMetricsValuesSourceType.AGGREGATE_METRIC, + (MinMaxAggregatorSupplier) ( + name, + valuesSourceConfig, + valuesSource, + context, + parent, + metadata) -> new AggregateMetricBackedMinAggregator( + name, + valuesSourceConfig, + (AggregateMetricsValuesSource.AggregateDoubleMetric) valuesSource, + context, + parent, + metadata + ) + ); + } + + public static void registerMaxAggregator(ValuesSourceRegistry.Builder builder) { + builder.register( + MaxAggregationBuilder.NAME, + AggregateMetricsValuesSourceType.AGGREGATE_METRIC, + (MinMaxAggregatorSupplier) (name, config, valuesSource, context, parent, metadata) -> new AggregateMetricBackedMaxAggregator( + name, + config, + (AggregateMetricsValuesSource.AggregateDoubleMetric) valuesSource, + context, + parent, + metadata + ) + ); + } + + public static void registerValueCountAggregator(ValuesSourceRegistry.Builder builder) { + builder.register( + ValueCountAggregationBuilder.NAME, + AggregateMetricsValuesSourceType.AGGREGATE_METRIC, + (ValueCountAggregatorSupplier) (name, valuesSource, context, parent, metadata) -> new AggregateMetricBackedValueCountAggregator( + name, + (AggregateMetricsValuesSource.AggregateDoubleMetric) valuesSource, + context, + parent, + metadata + ) + ); + } +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/support/AggregateMetricsValuesSource.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/support/AggregateMetricsValuesSource.java new file mode 100644 index 0000000000000..eedd22185f8e7 --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/support/AggregateMetricsValuesSource.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.support; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.elasticsearch.common.Rounding; +import org.elasticsearch.index.fielddata.DocValueBits; +import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.xpack.aggregatemetric.fielddata.IndexAggregateDoubleMetricFieldData; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; +import java.util.function.Function; + +public class AggregateMetricsValuesSource { + public abstract static class AggregateDoubleMetric extends org.elasticsearch.search.aggregations.support.ValuesSource { + + public abstract SortedNumericDoubleValues getAggregateMetricValues(LeafReaderContext context, Metric metric) throws IOException; + + public static class Fielddata extends AggregateDoubleMetric { + + protected final IndexAggregateDoubleMetricFieldData indexFieldData; + + public Fielddata(IndexAggregateDoubleMetricFieldData indexFieldData) { + this.indexFieldData = indexFieldData; + } + + @Override + public SortedBinaryDocValues bytesValues(LeafReaderContext context) { + return indexFieldData.load(context).getBytesValues(); + } + + @Override + public DocValueBits docsWithValue(LeafReaderContext context) throws IOException { + SortedNumericDoubleValues values = getAggregateMetricValues(context, null); + return new DocValueBits() { + @Override + public boolean advanceExact(int doc) throws IOException { + return values.advanceExact(doc); + } + }; + } + + @Override + public Function roundingPreparer(IndexReader reader) throws IOException { + throw new AggregationExecutionException("Can't round an [" + AggregateDoubleMetricFieldMapper.CONTENT_TYPE + "]"); + } + + public SortedNumericDoubleValues getAggregateMetricValues(LeafReaderContext context, Metric metric) throws IOException { + return indexFieldData.load(context).getAggregateMetricValues(metric); + } + } + } +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/support/AggregateMetricsValuesSourceType.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/support/AggregateMetricsValuesSourceType.java new file mode 100644 index 0000000000000..1c2b8bf6a0d8a --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/aggregations/support/AggregateMetricsValuesSourceType.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.support; + +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.script.AggregationScript; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregationExecutionException; +import org.elasticsearch.search.aggregations.support.FieldContext; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.fielddata.IndexAggregateDoubleMetricFieldData; + +import java.util.Locale; +import java.util.function.LongSupplier; + +public enum AggregateMetricsValuesSourceType implements ValuesSourceType { + + AGGREGATE_METRIC() { + @Override + public ValuesSource getEmpty() { + throw new IllegalArgumentException("Can't deal with unmapped AggregateMetricsValuesSource type " + this.value()); + } + + @Override + public ValuesSource getScript(AggregationScript.LeafFactory script, ValueType scriptValueType) { + throw new AggregationExecutionException("Value source of type [" + this.value() + "] is not supported by scripts"); + } + + @Override + public ValuesSource getField(FieldContext fieldContext, AggregationScript.LeafFactory script) { + final IndexFieldData indexFieldData = fieldContext.indexFieldData(); + + if ((indexFieldData instanceof IndexAggregateDoubleMetricFieldData) == false) { + throw new IllegalArgumentException( + "Expected aggregate_metric_double type on field [" + + fieldContext.field() + + "], but got [" + + fieldContext.fieldType().typeName() + + "]" + ); + } + return new AggregateMetricsValuesSource.AggregateDoubleMetric.Fielddata((IndexAggregateDoubleMetricFieldData) indexFieldData); + } + + @Override + public ValuesSource replaceMissing(ValuesSource valuesSource, Object rawMissing, DocValueFormat docValueFormat, LongSupplier now) { + throw new IllegalArgumentException("Can't apply missing values on a " + valuesSource.getClass()); + } + }; + + @Override + public String typeName() { + return value(); + } + + public static ValuesSourceType fromString(String name) { + return valueOf(name.trim().toUpperCase(Locale.ROOT)); + } + + public String value() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/fielddata/IndexAggregateDoubleMetricFieldData.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/fielddata/IndexAggregateDoubleMetricFieldData.java new file mode 100644 index 0000000000000..d2c169f0b12a3 --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/fielddata/IndexAggregateDoubleMetricFieldData.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.aggregatemetric.fielddata; + +import org.elasticsearch.index.Index; +import org.elasticsearch.index.fielddata.IndexFieldData; + +/** + * Specialization of {@link IndexFieldData} for aggregate_metric. + */ +public abstract class IndexAggregateDoubleMetricFieldData implements IndexFieldData { + + protected final Index index; + protected final String fieldName; + + public IndexAggregateDoubleMetricFieldData(Index index, String fieldName) { + this.index = index; + this.fieldName = fieldName; + } + + @Override + public final String getFieldName() { + return fieldName; + } + + @Override + public final void clear() { + // can't do + } + + @Override + public final Index index() { + return index; + } +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/fielddata/LeafAggregateDoubleMetricFieldData.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/fielddata/LeafAggregateDoubleMetricFieldData.java new file mode 100644 index 0000000000000..6b6acc3c9e0f9 --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/fielddata/LeafAggregateDoubleMetricFieldData.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.fielddata; + +import org.elasticsearch.index.fielddata.LeafFieldData; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; + +/** + * {@link LeafFieldData} specialization for aggregate_double_metric data. + */ +public interface LeafAggregateDoubleMetricFieldData extends LeafFieldData { + + /** + * Return aggregate_metric of double values for a given metric + */ + SortedNumericDoubleValues getAggregateMetricValues(Metric metric) throws IOException; + +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java index 41cd261b52c8f..f397f22c9297e 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java @@ -5,28 +5,49 @@ */ package org.elasticsearch.xpack.aggregatemetric.mapper; +import org.apache.lucene.index.DocValues; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.Query; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.util.NumericUtils; import org.elasticsearch.common.Explicit; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateMathParser; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentSubParser; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; +import org.elasticsearch.index.fielddata.ScriptDocValues; +import org.elasticsearch.index.fielddata.SortedBinaryDocValues; +import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.SimpleMappedFieldType; -import org.elasticsearch.index.mapper.TypeParsers; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.MultiValueMode; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.search.sort.BucketedSort; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.fielddata.IndexAggregateDoubleMetricFieldData; +import org.elasticsearch.xpack.aggregatemetric.fielddata.LeafAggregateDoubleMetricFieldData; import java.io.IOException; import java.time.ZoneId; @@ -34,6 +55,7 @@ import java.util.Collections; import java.util.EnumMap; import java.util.EnumSet; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -46,6 +68,18 @@ public class AggregateDoubleMetricFieldMapper extends FieldMapper { public static final String CONTENT_TYPE = "aggregate_metric_double"; + public static final String SUBFIELD_SEPARATOR = "."; + + /** + * Return the name of a subfield of an aggregate metric field + * + * @param fieldName the name of the aggregate metric field + * @param metric the metric type the subfield corresponds to + * @return the name of the subfield + */ + public static String subfieldName(String fieldName, Metric metric) { + return fieldName + AggregateDoubleMetricFieldMapper.SUBFIELD_SEPARATOR + metric.name(); + } /** * Mapping field names @@ -59,7 +93,7 @@ public static class Names { /** * Enum of aggregate metrics supported by this field mapper */ - enum Metric { + public enum Metric { min, max, sum, @@ -159,7 +193,7 @@ public AggregateDoubleMetricFieldMapper build(BuilderContext context) { EnumMap metricMappers = new EnumMap<>(Metric.class); // Instantiate one NumberFieldMapper instance for each metric for (Metric m : this.metrics) { - String fieldName = name + "._" + m.name(); + String fieldName = subfieldName(name, m); NumberFieldMapper.Builder builder; if (m == Metric.value_count) { @@ -184,8 +218,7 @@ public AggregateDoubleMetricFieldMapper build(BuilderContext context) { ) ); Explicit defaultMetric = defaultMetric(context); - - AggregateDoubleMetricFieldType metricFieldType = (AggregateDoubleMetricFieldType) this.fieldType; + AggregateDoubleMetricFieldType metricFieldType = (AggregateDoubleMetricFieldType) fieldType; metricFieldType.setMetricFields(metricFields); metricFieldType.setDefaultMetric(defaultMetric.value()); @@ -194,11 +227,9 @@ public AggregateDoubleMetricFieldMapper build(BuilderContext context) { metricFieldType, defaultFieldType, context.indexSettings(), - multiFieldsBuilder.build(this, context), ignoreMalformed(context), metrics(context), defaultMetric, - copyTo, metricMappers ); } @@ -248,8 +279,6 @@ public Mapper.Builder parse( XContentMapValues.nodeBooleanValue(propNode, name + "." + Names.IGNORE_MALFORMED.getPreferredName()) ); iterator.remove(); - } else if (TypeParsers.parseMultiField(builder, name, parserContext, propName, propNode)) { - iterator.remove(); } } return builder; @@ -262,7 +291,7 @@ public static final class AggregateDoubleMetricFieldType extends SimpleMappedFie private Metric defaultMetric; - AggregateDoubleMetricFieldType() {} + public AggregateDoubleMetricFieldType() {} AggregateDoubleMetricFieldType(AggregateDoubleMetricFieldType other) { super(other); @@ -296,11 +325,25 @@ public String typeName() { return CONTENT_TYPE; } - public void setMetricFields(EnumMap metricFields) { + private void setMetricFields(EnumMap metricFields) { checkIfFrozen(); this.metricFields = metricFields; } + public void addMetricField(Metric m, NumberFieldMapper.NumberFieldType subfield) { + checkIfFrozen(); + if (metricFields == null) { + metricFields = new EnumMap<>(AggregateDoubleMetricFieldMapper.Metric.class); + } + + if (name() == null) { + throw new IllegalArgumentException("Field of type [" + typeName() + "] must have a name before adding a subfield"); + } + String subfieldName = subfieldName(name(), m); + subfield.setName(subfieldName); + metricFields.put(m, subfield); + } + public void setDefaultMetric(Metric defaultMetric) { checkIfFrozen(); this.defaultMetric = defaultMetric; @@ -351,10 +394,118 @@ public Relation isFieldWithinQuery( } @Override - public IndexFieldData.Builder fielddataBuilder(String fullyQualifiedIndexName) { - return delegateFieldType().fielddataBuilder(fullyQualifiedIndexName); + public ValuesSourceType getValuesSourceType() { + return AggregateMetricsValuesSourceType.AGGREGATE_METRIC; } + @Override + public IndexFieldData.Builder fielddataBuilder(String fullyQualifiedIndexName) { + return new IndexFieldData.Builder() { + @Override + public IndexFieldData build( + IndexSettings indexSettings, + MappedFieldType fieldType, + IndexFieldDataCache cache, + CircuitBreakerService breakerService, + MapperService mapperService + ) { + return new IndexAggregateDoubleMetricFieldData(indexSettings.getIndex(), fieldType.name()) { + @Override + public LeafAggregateDoubleMetricFieldData load(LeafReaderContext context) { + return new LeafAggregateDoubleMetricFieldData() { + @Override + public SortedNumericDoubleValues getAggregateMetricValues(final Metric metric) throws IOException { + try { + final SortedNumericDocValues values = DocValues.getSortedNumeric( + context.reader(), + subfieldName(getFieldName(), metric) + ); + + return new SortedNumericDoubleValues() { + @Override + public int docValueCount() { + return values.docValueCount(); + } + + @Override + public boolean advanceExact(int doc) throws IOException { + return values.advanceExact(doc); + } + + @Override + public double nextValue() throws IOException { + long v = values.nextValue(); + if (metric == Metric.value_count) { + // Only value_count metrics are encoded as integers + return v; + } else { + // All other metrics are encoded as doubles + return NumericUtils.sortableLongToDouble(v); + } + } + }; + } catch (IOException e) { + throw new IOException("Cannot load doc values", e); + } + } + + @Override + public ScriptDocValues getScriptValues() { + throw new UnsupportedOperationException( + "The [" + CONTENT_TYPE + "] field does not " + "support scripts" + ); + } + + @Override + public SortedBinaryDocValues getBytesValues() { + throw new UnsupportedOperationException( + "String representation of doc values " + "for [" + CONTENT_TYPE + "] fields is not supported" + ); + } + + @Override + public long ramBytesUsed() { + return 0; // Unknown + } + + @Override + public void close() {} + }; + } + + @Override + public LeafAggregateDoubleMetricFieldData loadDirect(LeafReaderContext context) { + return load(context); + } + + @Override + public SortField sortField( + Object missingValue, + MultiValueMode sortMode, + XFieldComparatorSource.Nested nested, + boolean reverse + ) { + SortField sortField = new SortedNumericSortField(delegateFieldType().name(), SortField.Type.DOUBLE, reverse); + return sortField; + } + + @Override + public BucketedSort newBucketedSort( + BigArrays bigArrays, + Object missingValue, + MultiValueMode sortMode, + XFieldComparatorSource.Nested nested, + SortOrder sortOrder, + DocValueFormat format, + int bucketSize, + BucketedSort.ExtraData extra + ) { + throw new IllegalArgumentException("Can't sort on the [" + CONTENT_TYPE + "] field"); + } + }; + } + }; + } } private final EnumMap metricFieldMappers; @@ -372,14 +523,12 @@ private AggregateDoubleMetricFieldMapper( MappedFieldType fieldType, MappedFieldType defaultFieldType, Settings indexSettings, - MultiFields multiFields, Explicit ignoreMalformed, Explicit> metrics, Explicit defaultMetric, - CopyTo copyTo, EnumMap metricFieldMappers ) { - super(simpleName, fieldType, defaultFieldType, indexSettings, multiFields, copyTo); + super(simpleName, fieldType, defaultFieldType, indexSettings, MultiFields.empty(), CopyTo.empty()); this.ignoreMalformed = ignoreMalformed; this.metrics = metrics; this.defaultMetric = defaultMetric; @@ -414,21 +563,20 @@ protected void parseCreateField(ParseContext context) throws IOException { } context.path().add(simpleName()); - XContentParser.Token token = null; + XContentParser.Token token; XContentSubParser subParser = null; - + EnumSet metricsParsed = EnumSet.noneOf(Metric.class); try { token = context.parser().currentToken(); if (token == XContentParser.Token.VALUE_NULL) { context.path().remove(); return; } - ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser()::getTokenLocation); subParser = new XContentSubParser(context.parser()); token = subParser.nextToken(); while (token != XContentParser.Token.END_OBJECT) { - // should be an object subfield with name a metric name + // should be an object sub-field with name a metric name ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser::getTokenLocation); String fieldName = subParser.currentName(); Metric metric = Metric.valueOf(fieldName); @@ -444,37 +592,39 @@ protected void parseCreateField(ParseContext context) throws IOException { // new aggregate metric types are added (histogram, cardinality etc) ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser::getTokenLocation); NumberFieldMapper delegateFieldMapper = metricFieldMappers.get(metric); - + // We don't accept arrays of metrics if (context.doc().getField(delegateFieldMapper.fieldType().name()) != null) { throw new IllegalArgumentException( "Field [" + name() + "] of type [" + typeName() - + "] does not support indexing multiple values for the same metric in the same field" + + "] does not support indexing multiple values for the same field in the same document" ); } - + // Delegate parsing the field to a numeric field mapper delegateFieldMapper.parse(context); + // Ensure a value_count metric does not have a negative value if (Metric.value_count == metric) { - Number n = context.doc().getField(delegateFieldMapper.fieldType().name()).numericValue(); + // context.doc().getField() method iterates over all fields in the document. + // Making the following call slow down. Maybe we can think something smarter. + Number n = context.doc().getField(delegateFieldMapper.name()).numericValue(); if (n.intValue() < 0) { throw new IllegalArgumentException( "Aggregate metric [" + metric.name() + "] of field [" + fieldType.name() + "] cannot be a negative number" ); } } - + metricsParsed.add(metric); token = subParser.nextToken(); } - for (Metric m : metrics.value()) { - if (context.doc().getField(fieldType().name() + "._" + m.name()) == null) { - throw new IllegalArgumentException( - "Aggregate metric field [" + fieldType.name() + "] must contain all metrics " + metrics.value().toString() - ); - } + // Check if all required metrics have been parsed. + if (metricsParsed.containsAll(metrics.value()) == false) { + throw new IllegalArgumentException( + "Aggregate metric field [" + fieldType.name() + "] must contain all metrics " + metrics.value().toString() + ); } } catch (Exception e) { if (ignoreMalformed.value()) { @@ -482,7 +632,23 @@ protected void parseCreateField(ParseContext context) throws IOException { // close the subParser so we advance to the end of the object subParser.close(); } - context.addIgnoredField(fieldType().name()); + // If ignoreMalformed == true, clear all parsed fields + Set ignoreFieldNames = new HashSet<>(metricFieldMappers.size()); + for (NumberFieldMapper m : metricFieldMappers.values()) { + context.addIgnoredField(m.fieldType().name()); + ignoreFieldNames.add(m.fieldType().name()); + } + // Parsing a metric sub-field is delegated to the delegate field mapper by calling method + // delegateFieldMapper.parse(context). Unfortunately, this method adds the parsed sub-field + // to the document automatically. So, at this point we must undo this by removing all metric + // sub-fields from the document. To do so, we iterate over the document fields and remove + // the ones whose names match. + for (Iterator iter = context.doc().getFields().iterator(); iter.hasNext();) { + IndexableField field = iter.next(); + if (ignoreFieldNames.contains(field.name())) { + iter.remove(); + } + } } else { // Rethrow exception as is. It is going to be caught and nested in a MapperParsingException // by its FieldMapper.MappedFieldType#parse() @@ -503,7 +669,8 @@ protected void doMerge(Mapper mergeWith) { if (other.metrics.explicit()) { if (this.metrics.value() != null && metrics.value().isEmpty() == false - && metrics.value().containsAll(other.metrics.value()) == false) { + && (metrics.value().containsAll(other.metrics.value()) == false + || other.metrics.value().containsAll(metrics.value()) == false)) { throw new IllegalArgumentException( "[" + fieldType().name() diff --git a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedAvgAggregatorTests.java b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedAvgAggregatorTests.java new file mode 100644 index 0000000000000..8d7396a5102ea --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedAvgAggregatorTests.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.InternalAvg; +import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; +import java.util.List; +import java.util.function.Consumer; + +import static java.util.Collections.singleton; +import static org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.subfieldName; + +public class AggregateMetricBackedAvgAggregatorTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "aggregate_metric_field"; + + public void testMatchesNumericDocValues() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument( + List.of( + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(20)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), 2) + ) + ); + + iw.addDocument( + List.of( + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(50)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), 5) + ) + ); + }, avg -> { + assertEquals(10, avg.getValue(), 0d); + assertTrue(AggregationInspectionHelper.hasValue(avg)); + }); + } + + public void testNoDocs() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + // Intentionally not writing any docs + }, avg -> { + assertEquals(Double.NaN, avg.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(avg)); + }); + } + + public void testNoMatchingField() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new NumericDocValuesField("wrong_number", 7))); + iw.addDocument(singleton(new NumericDocValuesField("wrong_number", 1))); + }, avg -> { + assertEquals(Double.NaN, avg.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(avg)); + }); + } + + public void testQueryFiltering() throws IOException { + testCase(new TermQuery(new Term("match", "yes")), iw -> { + iw.addDocument( + List.of( + new StringField("match", "yes", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(10)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), 1) + ) + ); + iw.addDocument( + List.of( + new StringField("match", "yes", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(50)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), 5) + ) + ); + iw.addDocument( + List.of( + new StringField("match", "no", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(40)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), 5) + ) + ); + }, avg -> { + assertEquals(10d, avg.getValue(), 0d); + assertTrue(AggregationInspectionHelper.hasValue(avg)); + }); + } + + /** + * Create a default aggregate_metric_double field type containing sum and a value_count metrics. + * + * @param fieldName the name of the field + * @return the created field type + */ + private AggregateDoubleMetricFieldType createDefaultFieldType(String fieldName) { + AggregateDoubleMetricFieldType fieldType = new AggregateDoubleMetricFieldType(); + fieldType.setName(fieldName); + + for (Metric m : List.of(Metric.value_count, Metric.sum)) { + NumberFieldMapper.NumberFieldType subfield = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE); + fieldType.addMetricField(m, subfield); + } + fieldType.setDefaultMetric(Metric.sum); + return fieldType; + } + + private void testCase(Query query, CheckedConsumer buildIndex, Consumer verify) + throws IOException { + MappedFieldType fieldType = createDefaultFieldType(FIELD_NAME); + AggregationBuilder aggregationBuilder = createAggBuilderForTypeTest(fieldType, FIELD_NAME); + testCase(aggregationBuilder, query, buildIndex, verify, fieldType); + } + + @Override + protected List getSearchPlugins() { + return List.of(new AggregateMetricMapperPlugin()); + } + + @Override + protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) { + return new AvgAggregationBuilder("avg_agg").field(fieldName); + } + + @Override + protected List getSupportedValuesSourceTypes() { + return List.of( + CoreValuesSourceType.NUMERIC, + CoreValuesSourceType.DATE, + CoreValuesSourceType.BOOLEAN, + AggregateMetricsValuesSourceType.AGGREGATE_METRIC + ); + } + +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMaxAggregatorTests.java b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMaxAggregatorTests.java new file mode 100644 index 0000000000000..a005298217b24 --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMaxAggregatorTests.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.metrics.InternalMax; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; +import java.util.List; +import java.util.function.Consumer; + +import static java.util.Collections.singleton; +import static org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.subfieldName; + +public class AggregateMetricBackedMaxAggregatorTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "aggregate_metric_field"; + + public void testMatchesNumericDocValues() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument( + List.of( + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.max), Double.doubleToLongBits(10)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.min), Double.doubleToLongBits(2)) + ) + ); + + iw.addDocument( + List.of( + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.max), Double.doubleToLongBits(50)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.min), Double.doubleToLongBits(5)) + ) + ); + }, max -> { + assertEquals(50, max.getValue(), 0d); + assertTrue(AggregationInspectionHelper.hasValue(max)); + }); + } + + public void testNoDocs() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + // Intentionally not writing any docs + }, max -> { + assertEquals(Double.NEGATIVE_INFINITY, max.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(max)); + }); + } + + public void testNoMatchingField() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new NumericDocValuesField("wrong_number", 7))); + iw.addDocument(singleton(new NumericDocValuesField("wrong_number", 1))); + }, max -> { + assertEquals(Double.NEGATIVE_INFINITY, max.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(max)); + }); + } + + public void testQueryFiltering() throws IOException { + testCase(new TermQuery(new Term("match", "yes")), iw -> { + iw.addDocument( + List.of( + new StringField("match", "yes", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.max), Double.doubleToLongBits(10)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.min), Double.doubleToLongBits(2)) + ) + ); + iw.addDocument( + List.of( + new StringField("match", "yes", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.max), Double.doubleToLongBits(20)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.min), Double.doubleToLongBits(5)) + ) + ); + iw.addDocument( + List.of( + new StringField("match", "no", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.max), Double.doubleToLongBits(40)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.min), Double.doubleToLongBits(1)) + ) + ); + }, max -> { + assertEquals(20L, max.getValue(), 0d); + assertTrue(AggregationInspectionHelper.hasValue(max)); + }); + } + + /** + * Create a default aggregate_metric_double field type containing min and a max metrics. + * + * @param fieldName the name of the field + * @return the created field type + */ + private AggregateDoubleMetricFieldType createDefaultFieldType(String fieldName) { + AggregateDoubleMetricFieldType fieldType = new AggregateDoubleMetricFieldType(); + fieldType.setName(fieldName); + + for (Metric m : List.of(Metric.min, Metric.max)) { + NumberFieldMapper.NumberFieldType subfield = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE); + fieldType.addMetricField(m, subfield); + } + fieldType.setDefaultMetric(Metric.min); + return fieldType; + } + + private void testCase(Query query, CheckedConsumer buildIndex, Consumer verify) + throws IOException { + MappedFieldType fieldType = createDefaultFieldType(FIELD_NAME); + AggregationBuilder aggregationBuilder = createAggBuilderForTypeTest(fieldType, FIELD_NAME); + testCase(aggregationBuilder, query, buildIndex, verify, fieldType); + } + + @Override + protected List getSearchPlugins() { + return List.of(new AggregateMetricMapperPlugin()); + } + + @Override + protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) { + return new MaxAggregationBuilder("max_agg").field(fieldName); + } + + @Override + protected List getSupportedValuesSourceTypes() { + return List.of( + CoreValuesSourceType.NUMERIC, + CoreValuesSourceType.DATE, + CoreValuesSourceType.BOOLEAN, + AggregateMetricsValuesSourceType.AGGREGATE_METRIC + ); + } + +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMinAggregatorTests.java b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMinAggregatorTests.java new file mode 100644 index 0000000000000..8b72e1ce46d8a --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedMinAggregatorTests.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.metrics.InternalMin; +import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; +import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; +import java.util.List; +import java.util.function.Consumer; + +import static java.util.Collections.singleton; +import static org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.subfieldName; + +public class AggregateMetricBackedMinAggregatorTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "aggregate_metric_field"; + + public void testMatchesNumericDocValues() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument( + List.of( + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.max), Double.doubleToLongBits(10)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.min), Double.doubleToLongBits(2)) + ) + ); + + iw.addDocument( + List.of( + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.max), Double.doubleToLongBits(50)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.min), Double.doubleToLongBits(5)) + ) + ); + }, min -> { + assertEquals(2, min.getValue(), 0d); + assertTrue(AggregationInspectionHelper.hasValue(min)); + }); + } + + public void testNoDocs() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + // Intentionally not writing any docs + }, min -> { + assertEquals(Double.POSITIVE_INFINITY, min.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(min)); + }); + } + + public void testNoMatchingField() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new NumericDocValuesField("wrong_number", 7))); + iw.addDocument(singleton(new NumericDocValuesField("wrong_number", 1))); + }, min -> { + assertEquals(Double.POSITIVE_INFINITY, min.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(min)); + }); + } + + public void testQueryFiltering() throws IOException { + testCase(new TermQuery(new Term("match", "yes")), iw -> { + iw.addDocument( + List.of( + new StringField("match", "yes", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.max), Double.doubleToLongBits(10)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.min), Double.doubleToLongBits(2)) + ) + ); + iw.addDocument( + List.of( + new StringField("match", "yes", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.max), Double.doubleToLongBits(20)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.min), Double.doubleToLongBits(5)) + ) + ); + iw.addDocument( + List.of( + new StringField("match", "no", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.max), Double.doubleToLongBits(40)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.min), Double.doubleToLongBits(1)) + ) + ); + }, min -> { + assertEquals(2L, min.getValue(), 0d); + assertTrue(AggregationInspectionHelper.hasValue(min)); + }); + } + + /** + * Create a default aggregate_metric_double field type containing min and a max metrics. + * + * @param fieldName the name of the field + * @return the created field type + */ + private AggregateDoubleMetricFieldType createDefaultFieldType(String fieldName) { + AggregateDoubleMetricFieldType fieldType = new AggregateDoubleMetricFieldType(); + fieldType.setName(fieldName); + + for (Metric m : List.of(Metric.min, Metric.max)) { + NumberFieldMapper.NumberFieldType subfield = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE); + fieldType.addMetricField(m, subfield); + } + fieldType.setDefaultMetric(Metric.min); + return fieldType; + } + + private void testCase(Query query, CheckedConsumer buildIndex, Consumer verify) + throws IOException { + MappedFieldType fieldType = createDefaultFieldType(FIELD_NAME); + AggregationBuilder aggregationBuilder = createAggBuilderForTypeTest(fieldType, FIELD_NAME); + testCase(aggregationBuilder, query, buildIndex, verify, fieldType); + } + + @Override + protected List getSearchPlugins() { + return List.of(new AggregateMetricMapperPlugin()); + } + + @Override + protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) { + return new MinAggregationBuilder("min_agg").field(fieldName); + } + + @Override + protected List getSupportedValuesSourceTypes() { + return List.of( + CoreValuesSourceType.NUMERIC, + CoreValuesSourceType.DATE, + CoreValuesSourceType.BOOLEAN, + AggregateMetricsValuesSourceType.AGGREGATE_METRIC + ); + } + +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedSumAggregatorTests.java b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedSumAggregatorTests.java new file mode 100644 index 0000000000000..d89f80b6c42bc --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedSumAggregatorTests.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.metrics.InternalSum; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; +import java.util.List; +import java.util.function.Consumer; + +import static java.util.Collections.singleton; +import static org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.subfieldName; + +public class AggregateMetricBackedSumAggregatorTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "aggregate_metric_field"; + + public void testMatchesNumericDocValues() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument( + List.of( + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(10)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), Double.doubleToLongBits(2)) + ) + ); + + iw.addDocument( + List.of( + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(50)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), Double.doubleToLongBits(5)) + ) + ); + }, sum -> { + assertEquals(60, sum.getValue(), 0d); + assertTrue(AggregationInspectionHelper.hasValue(sum)); + }); + } + + public void testNoDocs() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + // Intentionally not writing any docs + }, sum -> { + assertEquals(0L, sum.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(sum)); + }); + } + + public void testNoMatchingField() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new NumericDocValuesField("wrong_number", 7))); + iw.addDocument(singleton(new NumericDocValuesField("wrong_number", 1))); + }, sum -> { + assertEquals(0L, sum.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(sum)); + }); + } + + public void testQueryFiltering() throws IOException { + testCase(new TermQuery(new Term("match", "yes")), iw -> { + iw.addDocument( + List.of( + new StringField("match", "yes", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(10)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), Double.doubleToLongBits(2)) + ) + ); + iw.addDocument( + List.of( + new StringField("match", "yes", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(20)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), Double.doubleToLongBits(5)) + ) + ); + iw.addDocument( + List.of( + new StringField("match", "no", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(40)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), Double.doubleToLongBits(5)) + ) + ); + }, sum -> { + assertEquals(30L, sum.getValue(), 0d); + assertTrue(AggregationInspectionHelper.hasValue(sum)); + }); + } + + /** + * Create a default aggregate_metric_double field type containing sum and a value_count metrics. + * + * @param fieldName the name of the field + * @return the created field type + */ + private AggregateDoubleMetricFieldType createDefaultFieldType(String fieldName) { + AggregateDoubleMetricFieldType fieldType = new AggregateDoubleMetricFieldType(); + fieldType.setName(fieldName); + + for (Metric m : List.of(Metric.value_count, Metric.sum)) { + NumberFieldMapper.NumberFieldType subfield = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE); + fieldType.addMetricField(m, subfield); + } + fieldType.setDefaultMetric(Metric.sum); + return fieldType; + } + + private void testCase(Query query, CheckedConsumer buildIndex, Consumer verify) + throws IOException { + MappedFieldType fieldType = createDefaultFieldType(FIELD_NAME); + AggregationBuilder aggregationBuilder = createAggBuilderForTypeTest(fieldType, FIELD_NAME); + testCase(aggregationBuilder, query, buildIndex, verify, fieldType); + } + + @Override + protected List getSearchPlugins() { + return List.of(new AggregateMetricMapperPlugin()); + } + + @Override + protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) { + return new SumAggregationBuilder("sum_agg").field(fieldName); + } + + @Override + protected List getSupportedValuesSourceTypes() { + return List.of( + CoreValuesSourceType.NUMERIC, + CoreValuesSourceType.DATE, + CoreValuesSourceType.BOOLEAN, + AggregateMetricsValuesSourceType.AGGREGATE_METRIC + ); + } + +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedValueCountAggregatorTests.java b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedValueCountAggregatorTests.java new file mode 100644 index 0000000000000..1f5dad1a37a28 --- /dev/null +++ b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/aggregations/metrics/AggregateMetricBackedValueCountAggregatorTests.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermQuery; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.metrics.InternalValueCount; +import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; +import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; +import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; +import org.elasticsearch.search.aggregations.support.ValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; +import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSourceType; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; + +import java.io.IOException; +import java.util.List; +import java.util.function.Consumer; + +import static java.util.Collections.singleton; +import static org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.subfieldName; + +public class AggregateMetricBackedValueCountAggregatorTests extends AggregatorTestCase { + + private static final String FIELD_NAME = "aggregate_metric_field"; + + public void testMatchesNumericDocValues() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument( + List.of( + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(10)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), 2) + ) + ); + iw.addDocument( + List.of( + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(50)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), 5) + ) + ); + }, valueCount -> { + assertEquals(7, valueCount.getValue(), 0d); + assertTrue(AggregationInspectionHelper.hasValue(valueCount)); + }); + } + + public void testNoDocs() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + // Intentionally not writing any docs + }, valueCount -> { + assertEquals(0L, valueCount.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(valueCount)); + }); + } + + public void testNoMatchingField() throws IOException { + testCase(new MatchAllDocsQuery(), iw -> { + iw.addDocument(singleton(new NumericDocValuesField("wrong_number", 7))); + iw.addDocument(singleton(new NumericDocValuesField("wrong_number", 1))); + }, sum -> { + assertEquals(0L, sum.getValue(), 0d); + assertFalse(AggregationInspectionHelper.hasValue(sum)); + }); + } + + public void testQueryFiltering() throws IOException { + testCase(new TermQuery(new Term("match", "yes")), iw -> { + iw.addDocument( + List.of( + new StringField("match", "yes", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(10)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), 2) + ) + ); + iw.addDocument( + List.of( + new StringField("match", "yes", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(20)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), 5) + ) + ); + iw.addDocument( + List.of( + new StringField("match", "no", Field.Store.NO), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.sum), Double.doubleToLongBits(40)), + new NumericDocValuesField(subfieldName(FIELD_NAME, Metric.value_count), 5) + ) + ); + }, sum -> { + assertEquals(7L, sum.getValue(), 0d); + assertTrue(AggregationInspectionHelper.hasValue(sum)); + }); + } + + /** + * Create a default aggregate_metric_double field type containing sum and a value_count metrics. + * + * @param fieldName the name of the field + * @return the created field type + */ + private AggregateDoubleMetricFieldType createDefaultFieldType(String fieldName) { + AggregateDoubleMetricFieldType fieldType = new AggregateDoubleMetricFieldType(); + fieldType.setName(fieldName); + + for (Metric m : List.of(Metric.value_count, Metric.sum)) { + NumberFieldMapper.NumberFieldType subfield = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE); + fieldType.addMetricField(m, subfield); + } + fieldType.setDefaultMetric(Metric.sum); + return fieldType; + } + + private void testCase(Query query, CheckedConsumer buildIndex, Consumer verify) + throws IOException { + MappedFieldType fieldType = createDefaultFieldType(FIELD_NAME); + AggregationBuilder aggregationBuilder = createAggBuilderForTypeTest(fieldType, FIELD_NAME); + testCase(aggregationBuilder, query, buildIndex, verify, fieldType); + } + + @Override + protected List getSearchPlugins() { + return List.of(new AggregateMetricMapperPlugin()); + } + + @Override + protected AggregationBuilder createAggBuilderForTypeTest(MappedFieldType fieldType, String fieldName) { + return new ValueCountAggregationBuilder("value_count_agg").field(fieldName); + } + + @Override + protected List getSupportedValuesSourceTypes() { + return List.of( + CoreValuesSourceType.NUMERIC, + CoreValuesSourceType.DATE, + CoreValuesSourceType.BOOLEAN, + AggregateMetricsValuesSourceType.AGGREGATE_METRIC + ); + } + +} diff --git a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java index a61b3c1687ef0..fc3cf81e36b19 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java @@ -73,7 +73,7 @@ public void testParseValue() throws Exception { XContentFactory.jsonBuilder() .startObject() .startObject("metric") - .field("min", 10.1) + .field("min", -10.1) .field("max", 50.0) .field("sum", 43) .field("value_count", 14) @@ -84,11 +84,11 @@ public void testParseValue() throws Exception { ) ); - assertThat(doc.rootDoc().getField("metric._min"), notNullValue()); + assertEquals(-10.1, doc.rootDoc().getField("metric.min").numericValue()); } /** - * Test parsing field mapping and adding simple field + * Test that invalid field mapping containing no metrics is not accepted */ public void testInvalidMapping() throws Exception { ensureGreen(); @@ -306,7 +306,6 @@ public void testUnmappedMetricWithIgnoreMalformed() throws Exception { XContentType.JSON ) ); - assertNull(doc.rootDoc().getField("metric.min")); } @@ -616,7 +615,7 @@ public void testValueCountDouble() throws Exception { ) ); - assertThat(doc.rootDoc().getField("metric._value_count"), notNullValue()); + assertThat(doc.rootDoc().getField("metric.value_count"), notNullValue()); } /** @@ -663,10 +662,7 @@ public void testInvalidDoubleValueCount() throws Exception { ); assertThat( e.getCause().getMessage(), - containsString( - "failed to parse field [metric._value_count] of type [integer] in document with id '1'." - + " Preview of field's value: '45.43'" - ) + containsString("failed to parse field [metric.value_count] of type [integer] in document with id '1'.") ); } @@ -722,8 +718,8 @@ public void testParseArrayValue() throws Exception { assertThat( e.getCause().getMessage(), containsString( - "Field [metric] of type [aggregate_metric_double] does not support " - + "indexing multiple values for the same metric in the same field" + "Field [metric] of type [aggregate_metric_double] " + + "does not support indexing multiple values for the same field in the same document" ) ); } @@ -955,8 +951,7 @@ public void testParseNestedValue() throws Exception { XContentType.JSON ) ); - - assertThat(doc.rootDoc().getField("parent.metric._min"), notNullValue()); + assertThat(doc.rootDoc().getField("parent.metric.min"), notNullValue()); } @Override @@ -966,5 +961,4 @@ protected Collection> getPlugins() { plugins.add(LocalStateCompositeXPackPlugin.class); return plugins; } - } diff --git a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldTypeTests.java b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldTypeTests.java index f8e98e3333f08..e69c811bb7155 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldTypeTests.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldTypeTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType; import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; -import java.util.EnumMap; import java.util.List; import static java.util.Arrays.asList; @@ -26,14 +25,14 @@ public class AggregateDoubleMetricFieldTypeTests extends FieldTypeTestCase { @Override protected MappedFieldType createDefaultFieldType() { AggregateDoubleMetricFieldType fieldType = new AggregateDoubleMetricFieldType(); - EnumMap metricFields = new EnumMap<>(Metric.class); - for (Metric m : List.of(Metric.min, Metric.max)) { - String fieldName = "foo" + "._" + m.name(); + fieldType.setName("foo"); + for (AggregateDoubleMetricFieldMapper.Metric m : List.of( + AggregateDoubleMetricFieldMapper.Metric.min, + AggregateDoubleMetricFieldMapper.Metric.max + )) { NumberFieldMapper.NumberFieldType subfield = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE); - subfield.setName(fieldName); - metricFields.put(m, subfield); + fieldType.addMetricField(m, subfield); } - fieldType.setMetricFields(metricFields); fieldType.setDefaultMetric(Metric.max); return fieldType; } @@ -41,13 +40,13 @@ protected MappedFieldType createDefaultFieldType() { public void testTermQuery() { final MappedFieldType fieldType = createDefaultFieldType(); Query query = fieldType.termQuery(55.2, null); - assertThat(query, equalTo(DoublePoint.newRangeQuery("foo._max", 55.2, 55.2))); + assertThat(query, equalTo(DoublePoint.newRangeQuery("foo.max", 55.2, 55.2))); } public void testTermsQuery() { final MappedFieldType fieldType = createDefaultFieldType(); Query query = fieldType.termsQuery(asList(55.2, 500.3), null); - assertThat(query, equalTo(DoublePoint.newSetQuery("foo._max", 55.2, 500.3))); + assertThat(query, equalTo(DoublePoint.newSetQuery("foo.max", 55.2, 500.3))); } public void testRangeQuery() throws Exception { diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/SpatialPlugin.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/SpatialPlugin.java index 2a68215975a9d..6c05f7ac51efe 100644 --- a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/SpatialPlugin.java +++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/SpatialPlugin.java @@ -25,10 +25,10 @@ import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregatorSupplier; import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregatorSupplier; +import org.elasticsearch.search.aggregations.metrics.MetricAggregatorSupplier; import org.elasticsearch.search.aggregations.metrics.GeoGridAggregatorSupplier; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregator; -import org.elasticsearch.search.aggregations.metrics.ValueCountAggregatorSupplier; import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; @@ -166,8 +166,8 @@ private void registerGeoShapeGridAggregators(ValuesSourceRegistry.Builder builde private static void registerValueCountAggregator(ValuesSourceRegistry.Builder builder) { builder.register(ValueCountAggregationBuilder.NAME, GeoShapeValuesSourceType.instance(), - (ValueCountAggregatorSupplier) ValueCountAggregator::new - ); + (MetricAggregatorSupplier) (name, valuesSource, format, context, parent, metadata) + -> new ValueCountAggregator(name, valuesSource, context, parent, metadata)); } private static void registerCardinalityAggregator(ValuesSourceRegistry.Builder builder) { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/10_basic.yml index d681efc656ca4..27188758f36e3 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/10_basic.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/10_basic.yml @@ -39,7 +39,7 @@ body: query: exists: - field: metric._min + field: metric.min - match: { hits.total.value: 1 } @@ -49,7 +49,7 @@ body: query: exists: - field: metric._nonexistent_key + field: metric.nonexistent_key - match: { hits.total.value: 0 } @@ -122,7 +122,7 @@ body: query: term: - metric._min: + metric.min: value: 50 - match: { hits.total.value: 1 } @@ -200,10 +200,100 @@ body: query: range: - metric._min: + metric.min: gte: 10 lte: 40 - match: { hits.total.value: 1 } - length: { hits.hits: 1 } - match: { hits.hits.0._id: "1" } + +--- +"Sort": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + indices.create: + index: test + body: + mappings: + properties: + metric: + type: aggregate_metric_double + metrics: [min, max] + + - do: + index: + index: test + id: 1 + body: + metric: + min: 18.2 + max: 3000 + refresh: true + + - do: + index: + index: test + id: 2 + body: + metric: + min: 50 + max: 1000 + refresh: true + + - do: + index: + index: test + id: 3 + body: + metric: + min: 150 + max: 5000 + refresh: true + + - do: + search: + index: test + body: + sort: [ { metric: asc } ] + + - match: { "hits.total.value": 3 } + - match: { hits.hits.0._id: "2" } + - match: { hits.hits.1._id: "1" } + - match: { hits.hits.2._id: "3" } + + - do: + search: + index: test + body: + sort: [ { metric: desc } ] + + - match: { "hits.total.value": 3 } + - match: { hits.hits.0._id: "3" } + - match: { hits.hits.1._id: "1" } + - match: { hits.hits.2._id: "2" } + + - do: + search: + index: test + body: + sort: [ { metric.max: asc } ] + + - match: { "hits.total.value": 3 } + - match: { hits.hits.0._id: "2" } + - match: { hits.hits.1._id: "1" } + - match: { hits.hits.2._id: "3" } + + - do: + search: + index: test + body: + sort: [ { metric.min: desc } ] + + - match: { "hits.total.value": 3 } + - match: { hits.hits.0._id: "3" } + - match: { hits.hits.1._id: "2" } + - match: { hits.hits.2._id: "1" } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/20_min_max_agg.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/20_min_max_agg.yml new file mode 100644 index 0000000000000..13377d5644fee --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/20_min_max_agg.yml @@ -0,0 +1,81 @@ +############################################################################################################### +# Test min and max aggregations on an index that contains aggregate_metric_double fields +############################################################################################################### + +setup: + - do: + indices.create: + index: aggregate_metric_test + body: + mappings: + properties: + metric: + type: aggregate_metric_double + metrics: [min, max] + default_metric: max + + - do: + bulk: + index: aggregate_metric_test + refresh: true + body: + - '{"index": {}}' + - '{"metric": {"min": 0, "max": 100} }' + - '{"index": {}}' + - '{"metric": {"min": 60, "max": 100} }' + - '{"index": {}}' + - '{"metric": {"min": -400.50, "max": 1000} }' + - '{"index": {}}' + - '{"metric": {"min": 1, "max": 99.3} }' + - '{"index": {}}' + - '{"metric": {"min": -100, "max": -40} }' +--- +"Test min_max aggs": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + search: + index: aggregate_metric_test + size: 0 + body: + aggs: + max_agg: + max: + field: metric + min_agg: + min: + field: metric + + - match: { hits.total.value: 5 } + - match: { aggregations.max_agg.value: 1000} + - match: { aggregations.min_agg.value: -400.50} + + +--- +"Test min_max aggs with query": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + search: + index: aggregate_metric_test + size: 0 + body: + query: + term: + metric: + value: 100 + aggs: + max_agg: + max: + field: metric + min_agg: + min: + field: metric + + - match: { hits.total.value: 2 } + - match: { aggregations.max_agg.value: 100} + - match: { aggregations.min_agg.value: 0} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/30_sum_agg.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/30_sum_agg.yml new file mode 100644 index 0000000000000..7c51696406a2b --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/30_sum_agg.yml @@ -0,0 +1,68 @@ +setup: + - do: + indices.create: + index: aggregate_metric_test + body: + mappings: + properties: + metric: + type: aggregate_metric_double + metrics: [sum, value_count] + default_metric: value_count + + - do: + bulk: + index: aggregate_metric_test + refresh: true + body: + - '{"index": {}}' + - '{"metric": {"sum": 10000.45, "value_count": 100} }' + - '{"index": {}}' + - '{"metric": {"sum": 60, "value_count": 20} }' + - '{"index": {}}' + - '{"metric": {"sum": -400.50, "value_count": 1000} }' + - '{"index": {}}' + - '{"metric": {"sum": 1, "value_count": 20} }' + - '{"index": {}}' + - '{"metric": {"sum": -100, "value_count": 40} }' +--- +"Test sum agg": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + search: + index: aggregate_metric_test + size: 0 + body: + aggs: + sum_agg: + sum: + field: metric + + - match: { hits.total.value: 5 } + - match: { aggregations.sum_agg.value: 9560.95} + +--- +"Test sum agg with query": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + search: + index: aggregate_metric_test + size: 0 + body: + query: + term: + metric: + value: 20 + aggs: + sum_agg: + sum: + field: metric + + - match: { hits.total.value: 2 } + - match: { aggregations.sum_agg.value: 61} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/40_avg_agg.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/40_avg_agg.yml new file mode 100644 index 0000000000000..46f9523fca67a --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/40_avg_agg.yml @@ -0,0 +1,68 @@ +setup: + - do: + indices.create: + index: aggregate_metric_test + body: + mappings: + properties: + metric: + type: aggregate_metric_double + metrics: [sum, value_count] + default_metric: value_count + + - do: + bulk: + index: aggregate_metric_test + refresh: true + body: + - '{"index": {}}' + - '{"metric": {"sum": 10000, "value_count": 100} }' + - '{"index": {}}' + - '{"metric": {"sum": 60, "value_count": 20} }' + - '{"index": {}}' + - '{"metric": {"sum": -400, "value_count": 780} }' + - '{"index": {}}' + - '{"metric": {"sum": 40, "value_count": 20} }' + - '{"index": {}}' + - '{"metric": {"sum": -100, "value_count": 40} }' +--- +"Test avg agg": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + search: + index: aggregate_metric_test + size: 0 + body: + aggs: + avg_agg: + avg: + field: metric + + - match: { hits.total.value: 5 } + - match: { aggregations.avg_agg.value: 10} # = (9600 / 960) + +--- +"Test avg agg with query": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + search: + index: aggregate_metric_test + size: 0 + body: + query: + term: + metric: + value: 20 + aggs: + avg_agg: + avg: + field: metric + + - match: { hits.total.value: 2 } + - match: { aggregations.avg_agg.value: 2.5} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/50_value_count_agg.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/50_value_count_agg.yml new file mode 100644 index 0000000000000..babb8f45eff4a --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/50_value_count_agg.yml @@ -0,0 +1,68 @@ +setup: + - do: + indices.create: + index: aggregate_metric_test + body: + mappings: + properties: + metric: + type: aggregate_metric_double + metrics: [sum, value_count] + default_metric: value_count + + - do: + bulk: + index: aggregate_metric_test + refresh: true + body: + - '{"index": {}}' + - '{"metric": {"sum": 10000, "value_count": 100} }' + - '{"index": {}}' + - '{"metric": {"sum": 60, "value_count": 20} }' + - '{"index": {}}' + - '{"metric": {"sum": -400, "value_count": 780} }' + - '{"index": {}}' + - '{"metric": {"sum": 40, "value_count": 20} }' + - '{"index": {}}' + - '{"metric": {"sum": -100, "value_count": 40} }' +--- +"Test value_count agg": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + search: + index: aggregate_metric_test + size: 0 + body: + aggs: + value_count_agg: + value_count: + field: metric + + - match: { hits.total.value: 5 } + - match: { aggregations.value_count_agg.value: 960} + +--- +"Test value_count agg with query": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + search: + index: aggregate_metric_test + size: 0 + body: + query: + term: + metric: + value: 20 + aggs: + value_count_agg: + value_count: + field: metric + + - match: { hits.total.value: 2 } + - match: { aggregations.value_count_agg.value: 40} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/80_raw_agg_metric_aggs.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/80_raw_agg_metric_aggs.yml new file mode 100644 index 0000000000000..61d0a5bd1c3b5 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/aggregate-metrics/80_raw_agg_metric_aggs.yml @@ -0,0 +1,131 @@ +############################################################################################################### +# Test aggregations on an aggregate index containing aggregate_metric_double fields and a raw index containing +# raw data. +############################################################################################################### + +setup: + # Setup aggregate index with field metric of type aggregate_metric_double + - do: + indices.create: + index: test_metric_aggregate + body: + mappings: + properties: + t: + type: long + metric: + type: aggregate_metric_double + metrics: [min, max, sum, value_count] + default_metric: max + - do: + bulk: + index: test_metric_aggregate + refresh: true + body: + - '{"index": {}}' + - '{"t": 10, "metric": {"min": 10.999, "max": 400.11, "sum": 400, "value_count": 6} }' + - '{"index": {}}' + - '{"t": 20, "metric": {"min": 200, "max": 300, "sum": 1000, "value_count": 5} }' + - '{"index": {}}' + - '{"t": 30, "metric": {"min": 10, "max": 50, "sum": 2000, "value_count": 5} }' + - '{"index": {}}' + - '{"t": 50, "metric": {"min": 10, "max": 400, "sum": 3000, "value_count": 5} }' + + # Setup raw index with field metric of type double + - do: + indices.create: + index: test_metric_raw + body: + mappings: + properties: + t: + type: long + metric: + type: double + - do: + bulk: + index: test_metric_raw + refresh: true + body: + - '{"index": {}}' + - '{"t": 10, "metric": 50}' + - '{"index": {}}' + - '{"t": 10, "metric": 100}' + - '{"index": {}}' + - '{"t": 20, "metric": 200}' + - '{"index": {}}' + - '{"t": 30, "metric": 600}' + +--- +"Test mixed aggregations": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + search: + index: test_metric_* + size: 0 + body: + aggs: + max_agg: + max: + field: metric + min_agg: + min: + field: metric + sum_agg: + sum: + field: metric + avg_agg: + avg: + field: metric + value_count_agg: + value_count: + field: metric + + - match: { hits.total.value: 8 } + - match: { aggregations.max_agg.value: 600} + - match: { aggregations.min_agg.value: 10} + - match: { aggregations.sum_agg.value: 7350} + - match: { aggregations.avg_agg.value: 294} + - match: { aggregations.value_count_agg.value: 25} + +--- +"Test mixed aggregations with query": + - skip: + version: " - 7.99.99" + reason: "Aggregate metric fields are currently only implemented in 8.0." + + - do: + search: + index: test_metric_* + size: 0 + body: + query: + term: + t: + value: 10 + aggs: + max_agg: + max: + field: metric + min_agg: + min: + field: metric + sum_agg: + sum: + field: metric + avg_agg: + avg: + field: metric + value_count_agg: + value_count: + field: metric + + - match: { hits.total.value: 3 } + - match: { aggregations.max_agg.value: 400.11} + - match: { aggregations.min_agg.value: 10.999} + - match: { aggregations.sum_agg.value: 550} + - match: { aggregations.avg_agg.value: 68.75} + - match: { aggregations.value_count_agg.value: 8}