Skip to content

Implement aggregations on aggregate metrics #53986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
649b06e
Implemented aggregate_metric field type for storing pre-computed aggr…
csoulios Dec 4, 2019
2a6292c
Addressing code review comments
csoulios Dec 5, 2019
e43595d
Moved AggregateMetricFieldMapper to its own module
csoulios Dec 5, 2019
7d1ef0e
Fix broken doc test
csoulios Dec 11, 2019
8e25466
Fixes to address code review comments
csoulios Dec 11, 2019
415a34f
Fixes to address code review comments.
csoulios Dec 17, 2019
9e10078
Fix broken integration tests
csoulios Dec 17, 2019
9adc268
Added delegate field mappers of NumberFieldType
csoulios Dec 20, 2019
4455be0
Delegate queries to NumberFieldType fields
csoulios Jan 7, 2020
4ef00f8
Style fixes
csoulios Jan 7, 2020
b949b16
Nit: removed blank line
csoulios Jan 10, 2020
61c5f85
Addressed reviewer comments
csoulios Jan 10, 2020
cbb2138
Fixed NPE issue when "metrics" field is missing
csoulios Jan 10, 2020
d15d8ca
Added integration test
csoulios Jan 10, 2020
9b6da4a
Override AggregateDoubleMetricFieldMapper methods
csoulios Jan 13, 2020
fdf47d7
Ensure that a metric field cannot be an array
csoulios Jan 16, 2020
cc2b32b
Ensure that merging two fields with different
csoulios Jan 16, 2020
3128ffd
Checkstyle
csoulios Feb 18, 2020
04ad089
Fix typo
csoulios Feb 18, 2020
a67a73e
Build changes
csoulios Mar 19, 2020
4001c76
First draft of VS based aggs on aggregate metrics
csoulios Mar 20, 2020
bd1d0b6
Moved field data related classes to x-pack plugin
csoulios Mar 20, 2020
505f9d3
Removed AggregateDoubleMetricValues classes.
csoulios Mar 20, 2020
e29531d
Added more tests + code cleanup
csoulios Mar 20, 2020
24841e1
And more tests
csoulios Mar 23, 2020
1d2ad1c
Implemented avg agg for aggregate_metric field
csoulios Mar 23, 2020
7d9a716
Fixed license issues
csoulios Mar 23, 2020
3728d6f
Fix broken tests
csoulios Mar 23, 2020
de264c2
Implemented min/max aggs on aggregate_metric field
csoulios Mar 27, 2020
21da92a
Applied "spotless" to mapper-aggregate-metric
csoulios Mar 27, 2020
24d551a
Added unit tests for AggregateMetricBackedMinAggregator
csoulios Mar 27, 2020
1fe3b22
Merge branch 'feature/aggregate-metrics' into prototype/vs-refactor-a…
csoulios Mar 31, 2020
d7dc204
Addressed review comment
csoulios Mar 31, 2020
d6861d1
Added tests for min aggregator
csoulios Mar 31, 2020
1432ad1
Added tests for max aggregator
csoulios Mar 31, 2020
f0a091f
Removed BKD optimizations for min/max aggs
csoulios Mar 31, 2020
4740d59
Added ValueCount agg backed by aggregate_metric
csoulios Apr 3, 2020
71b9b40
Merge branch 'feature/aggregate-metrics' into prototype/vs-refactor-a…
csoulios Apr 7, 2020
70bbc29
Merged changes from master
csoulios Apr 7, 2020
d497fc7
Fixed bug with partial saving of agg metrics
csoulios Apr 13, 2020
c0cc1d9
Merge branch 'feature/aggregate-metrics' into prototype/vs-refactor-a…
csoulios Apr 14, 2020
1de3913
Removed pipeline aggregators to merge with master
csoulios Apr 14, 2020
2588b06
Fix test
csoulios Apr 14, 2020
61dbae7
Removed multifield and copyTo settings
csoulios Apr 16, 2020
b9bf259
Modified metric subfield separator
csoulios Apr 16, 2020
5d95698
Merge branch 'feature/aggregate-metrics' into prototype/vs-refactor-a…
csoulios Apr 21, 2020
e57aa40
Implemented plugin getMinimalSupportedVersion()
csoulios Apr 21, 2020
8584d5c
Fixed broken integration test
csoulios Apr 21, 2020
022dac2
Implemented sortfield for aggregate_metric mapper
csoulios Apr 21, 2020
157c40d
Added yml tests for aggs on aggregate_metric field
csoulios Apr 22, 2020
8a90594
Added more yml tests
csoulios Apr 22, 2020
496f1d2
Merge branch 'feature/aggregate-metrics' into prototype/vs-refactor-a…
csoulios Apr 27, 2020
92ea8d5
Fixed errors because of merge
csoulios Apr 27, 2020
873db96
Merge branch 'feature/aggregate-metrics' into prototype/vs-refactor-a…
csoulios Apr 28, 2020
4e3538a
Muted failing tests
csoulios Apr 28, 2020
32799ea
Merge branch 'prototype/vs-refactor-aggregate-metrics' into value-cou…
csoulios Apr 28, 2020
abf673f
Prepare merge value_count for aggregate metrics
csoulios Apr 28, 2020
32be5aa
Added yml integration test for value_count
csoulios Apr 28, 2020
5cc2620
Do no allow merging mappers with missing metrics
csoulios May 12, 2020
4bbbc82
Merge branch 'feature/aggregate-metrics' into prototype/vs-refactor-a…
csoulios May 12, 2020
423e81a
Merge with master - fixes
csoulios May 12, 2020
3bafd2f
Fixed broken tests
csoulios May 13, 2020
3180ba4
Merge branch 'feature/aggregate-metrics' into prototype/vs-refactor-a…
csoulios May 13, 2020
6920dcc
Merge branch 'feature/aggregate-metrics' into prototype/vs-refactor-a…
csoulios May 14, 2020
d37d622
Fix build issues from last merge
csoulios May 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class InternalSum extends InternalNumericMetricsAggregation.SingleValue implements Sum {
private final double sum;

public InternalSum(String name, double sum, DocValueFormat formatter, Map<String, Object> metadata) {
public InternalSum(String name, double sum, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, metadata);
this.sum = sum;
this.format = formatter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import java.util.Map;
import java.util.function.Function;

class MinAggregator extends NumericMetricsAggregator.SingleValue {
public class MinAggregator extends NumericMetricsAggregator.SingleValue {
private static final int MAX_BKD_LOOKUPS = 1024;

final ValuesSource.Numeric valuesSource;
Expand Down Expand Up @@ -168,7 +168,7 @@ public void doClose() {
* @param parent The parent aggregator.
* @param config The config for the values source metric.
*/
static Function<byte[], Number> getPointReaderOrNull(SearchContext context, Aggregator parent,
public static Function<byte[], Number> getPointReaderOrNull(SearchContext context, Aggregator parent,
ValuesSourceConfig config) {
if (context.query() != null &&
context.query().getClass() != MatchAllDocsQuery.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
Expand Down
6 changes: 5 additions & 1 deletion x-pack/plugin/mapper-aggregate-metric/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
evaluationDependsOn(xpackModule('core'))

apply plugin: 'elasticsearch.esplugin'

esplugin {
name 'x-pack-aggregate-metric'
description 'Module for the aggregate_metric field type, which allows pre-aggregated fields to be stored a single field.'
Expand All @@ -16,7 +15,12 @@ esplugin {
}
archivesBaseName = 'x-pack-aggregate-metric'

compileJava.options.compilerArgs << "-Xlint:-rawtypes"
compileTestJava.options.compilerArgs << "-Xlint:-rawtypes"

dependencies {
compileOnly project(":server")

compileOnly project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.xpack.aggregatemetric.aggregations.metrics.AggregateMetricsAggregatorsRegistrar;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static java.util.Collections.singletonMap;

public class AggregateMetricMapperPlugin extends Plugin implements MapperPlugin, ActionPlugin {
public class AggregateMetricMapperPlugin extends Plugin implements MapperPlugin, ActionPlugin, SearchPlugin {

@Override
public Map<String, Mapper.TypeParser> getMappers() {
Expand All @@ -37,4 +41,14 @@ public Map<String, Mapper.TypeParser> getMappers() {
);
}

@Override
public List<Consumer<ValuesSourceRegistry.Builder>> getAggregationExtentions() {
return List.of(
AggregateMetricsAggregatorsRegistrar::registerSumAggregator,
AggregateMetricsAggregatorsRegistrar::registerAvgAggregator,
AggregateMetricsAggregatorsRegistrar::registerMinAggregator,
AggregateMetricsAggregatorsRegistrar::registerMaxAggregator,
AggregateMetricsAggregatorsRegistrar::registerValueCountAggregator
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSource;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric;

import java.io.IOException;
import java.util.Map;

class AggregateMetricBackedAvgAggregator extends NumericMetricsAggregator.SingleValue {

final AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource;

LongArray counts;
DoubleArray sums;
DoubleArray compensations;
DocValueFormat format;

AggregateMetricBackedAvgAggregator(
String name,
AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource,
DocValueFormat formatter,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
this.format = formatter;
if (valuesSource != null) {
final BigArrays bigArrays = context.bigArrays();
counts = bigArrays.newLongArray(1, true);
sums = bigArrays.newDoubleArray(1, true);
compensations = bigArrays.newDoubleArray(1, true);
}
}

@Override
public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
// Retrieve aggregate values for metrics sum and value_count
final SortedNumericDoubleValues aggregateSums = valuesSource.getAggregateMetricValues(ctx, Metric.sum);
final SortedNumericDoubleValues aggregateValueCounts = valuesSource.getAggregateMetricValues(ctx, Metric.value_count);
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
return new LeafBucketCollectorBase(sub, sums) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays.grow(sums, bucket + 1);
compensations = bigArrays.grow(compensations, bucket + 1);

// Read aggregate values for sums
if (aggregateSums.advanceExact(doc)) {
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);

kahanSummation.reset(sum, compensation);
for (int i = 0; i < aggregateSums.docValueCount(); i++) {
double value = aggregateSums.nextValue();
kahanSummation.add(value);
}

sums.set(bucket, kahanSummation.value());
compensations.set(bucket, kahanSummation.delta());
}

counts = bigArrays.grow(counts, bucket + 1);
// Read aggregate values for value_count
if (aggregateValueCounts.advanceExact(doc)) {
for (int i = 0; i < aggregateValueCounts.docValueCount(); i++) {
double d = aggregateValueCounts.nextValue();
long value = Double.valueOf(d).longValue();
counts.increment(bucket, value);
}
}
}
};
}

@Override
public double metric(long owningBucketOrd) {
if (valuesSource == null || owningBucketOrd >= sums.size()) {
return Double.NaN;
}
return sums.get(owningBucketOrd) / counts.get(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= sums.size()) {
return buildEmptyAggregation();
}
return new InternalAvg(name, sums.get(bucket), counts.get(bucket), format, metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalAvg(name, 0.0, 0L, format, metadata());
}

@Override
public void doClose() {
Releasables.close(counts, sums, compensations);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.aggregatemetric.aggregations.metrics;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.xpack.aggregatemetric.aggregations.support.AggregateMetricsValuesSource;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric;

import java.io.IOException;
import java.util.Map;

class AggregateMetricBackedMaxAggregator extends NumericMetricsAggregator.SingleValue {

private final AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource;
final DocValueFormat formatter;
DoubleArray maxes;

AggregateMetricBackedMaxAggregator(
String name,
ValuesSourceConfig config,
AggregateMetricsValuesSource.AggregateDoubleMetric valuesSource,
SearchContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
if (valuesSource != null) {
maxes = context.bigArrays().newDoubleArray(1, false);
maxes.fill(0, maxes.size(), Double.NEGATIVE_INFINITY);
}
this.formatter = config.format();
}

@Override
public ScoreMode scoreMode() {
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
if (parent != null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
// we have no parent and the values source is empty so we can skip collecting hits.
throw new CollectionTerminatedException();
}
}

final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues allValues = valuesSource.getAggregateMetricValues(ctx, Metric.max);
final NumericDoubleValues values = MultiValueMode.MAX.select(allValues);
return new LeafBucketCollectorBase(sub, allValues) {

@Override
public void collect(int doc, long bucket) throws IOException {
if (bucket >= maxes.size()) {
long from = maxes.size();
maxes = bigArrays.grow(maxes, bucket + 1);
maxes.fill(from, maxes.size(), Double.NEGATIVE_INFINITY);
}
if (values.advanceExact(doc)) {
final double value = values.doubleValue();
double max = maxes.get(bucket);
max = Math.max(max, value);
maxes.set(bucket, max);
}
}
};
}

@Override
public double metric(long owningBucketOrd) {
if (valuesSource == null || owningBucketOrd >= maxes.size()) {
return Double.NEGATIVE_INFINITY;
}
return maxes.get(owningBucketOrd);
}

@Override
public InternalAggregation buildAggregation(long bucket) {
if (valuesSource == null || bucket >= maxes.size()) {
return buildEmptyAggregation();
}
return new InternalMax(name, maxes.get(bucket), formatter, metadata());
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalMax(name, Double.NEGATIVE_INFINITY, formatter, metadata());
}

@Override
public void doClose() {
Releasables.close(maxes);
}
}
Loading