From f4e992d54184e250da8d4ad571c7ae15542c333f Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 20 May 2021 09:57:22 -1000 Subject: [PATCH 1/5] Add keep_values gap policy Adds a new keep_values gap policy that works like skip, except if the metric calculated on an empty bucket provides a non-null non-NaN value, this value is used for the bucket. Fixes #27377 --- docs/reference/aggregations/pipeline.asciidoc | 4 ++ .../aggregations/pipeline/SerialDiffIT.java | 2 +- .../aggregations/pipeline/BucketHelpers.java | 53 ++++++++++++++----- .../BucketScriptPipelineAggregator.java | 2 +- .../BucketSortPipelineAggregator.java | 2 +- .../pipeline/DerivativeAggregatorTests.java | 10 +++- .../PipelineAggregationHelperTests.java | 2 +- 7 files changed, 58 insertions(+), 17 deletions(-) diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index 0f56b48830b5e..24f7060afcdfd 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -267,6 +267,10 @@ _insert_zeros_:: This option will replace missing values with a zero (`0`) and pipeline aggregation computation will proceed as normal. +_keep_values_:: + This option is similar to skip, except if the metric provides a non-null, non-NaN value this value is + used, otherwise the empty bucket is skipped. + include::pipeline/avg-bucket-aggregation.asciidoc[] include::pipeline/bucket-script-aggregation.asciidoc[] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffIT.java index 113c6beacc2ca..5f1fe413fc085 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffIT.java @@ -127,7 +127,7 @@ public void setupSuiteScopeCluster() throws Exception { numBuckets = randomIntBetween(10, 80); lag = randomIntBetween(1, numBuckets / 2); - gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.SKIP : BucketHelpers.GapPolicy.INSERT_ZEROS; + gapPolicy = randomFrom(BucketHelpers.GapPolicy.values()); metric = randomMetric("the_metric", VALUE_FIELD); mockHisto = PipelineAggregationHelperTests.generateHistogram(interval, numBuckets, randomDouble(), randomDouble()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java index 2bb5becb6a366..b58b4746754d3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java @@ -41,9 +41,40 @@ public class BucketHelpers { * * "insert_zeros": empty buckets will be filled with zeros for all metrics * "skip": empty buckets will simply be ignored + * "none": for empty buckets the values provided by the metrics will still be used if they are available */ public enum GapPolicy implements Writeable { - INSERT_ZEROS((byte) 0, "insert_zeros"), SKIP((byte) 1, "skip"); + INSERT_ZEROS((byte) 0, "insert_zeros", false) { + @Override + public Double processValue(long docCount, Double value) { + if (Double.isInfinite(value) || Double.isNaN(value) || docCount == 0) { + return 0.0; + } else { + return value; + } + } + }, + + SKIP((byte) 1, "skip", true) { + @Override + public Double processValue(long docCount, Double value) { + if (Double.isInfinite(value) || docCount == 0) { + return Double.NaN; + } else { + return value; + } + } + }, + + KEEP_VALUES((byte) 2, "keep_values", true) { + public Double processValue(long docCount, Double value) { + if (Double.isInfinite(value) || Double.isNaN(value)) { + return Double.NaN; + } else { + return value; + } + } + }; /** * Parse a string GapPolicy into the byte enum @@ -76,10 +107,12 @@ public static GapPolicy parse(String text, XContentLocation tokenLocation) { private final byte id; private final ParseField parseField; + public final boolean isSkippable; - GapPolicy(byte id, String name) { + GapPolicy(byte id, String name, boolean isSkippable) { this.id = id; this.parseField = new ParseField(name); + this.isSkippable = isSkippable; } /** @@ -113,6 +146,8 @@ public static GapPolicy readFrom(StreamInput in) throws IOException { public String getName() { return parseField.getPreferredName(); } + + public abstract Double processValue(long docCount, Double value); } /** @@ -161,18 +196,12 @@ public static Double resolveBucketValue(MultiBucketsAggregation agg, throw formatResolutionError(agg, aggPathAsList, propertyValue); } // doc count never has missing values so gap policy doesn't apply here - boolean isDocCountProperty = aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0)); - if (Double.isInfinite(value) || Double.isNaN(value) || (bucket.getDocCount() == 0 && isDocCountProperty == false)) { - switch (gapPolicy) { - case INSERT_ZEROS: - return 0.0; - case SKIP: - default: - return Double.NaN; - } - } else { + if (aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0))) { return value; + } else { + return gapPolicy.processValue(bucket.getDocCount(), value); } + } } catch (InvalidAggregationPathException e) { return null; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java index d08bb7d4449f8..314e8646de827 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java @@ -60,7 +60,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext String varName = entry.getKey(); String bucketsPath = entry.getValue(); Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy); - if (GapPolicy.SKIP == gapPolicy && (value == null || Double.isNaN(value))) { + if (gapPolicy.isSkippable && (value == null || Double.isNaN(value))) { skipBucket = true; break; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java index 49506d241b31c..e87a3445ca48e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java @@ -95,7 +95,7 @@ private Map> resolveAndCacheSortValues() { resolved.put(sort, (Comparable) internalBucket.getKey()); } else { Double bucketValue = BucketHelpers.resolveBucketValue(parentAgg, internalBucket, sortField, gapPolicy); - if (GapPolicy.SKIP == gapPolicy && Double.isNaN(bucketValue)) { + if (gapPolicy.isSkippable && Double.isNaN(bucketValue)) { continue; } resolved.put(sort, (Comparable) (Object) bucketValue); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeAggregatorTests.java index 971a79c5bf819..076c58135f638 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeAggregatorTests.java @@ -586,7 +586,15 @@ public void testSingleValueAggDerivativeWithGaps_random() throws IOException { Sum sum = bucket.getAggregations().get("sum"); double thisSumValue = sum.value(); if (bucket.getDocCount() == 0) { - thisSumValue = gapPolicy == GapPolicy.INSERT_ZEROS ? 0 : Double.NaN; + switch (gapPolicy) { + case INSERT_ZEROS: + thisSumValue = 0; + break; + case KEEP_VALUES: + break; + default: + thisSumValue = Double.NaN; + } } SimpleValue sumDeriv = bucket.getAggregations().get("deriv"); if (i == 0) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java index 6c8a7783bae14..0e0eaebdcd3d9 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java @@ -129,7 +129,7 @@ public static double calculateMetric(double[] values, ValuesSourceAggregationBui for (double value : values) { accumulator += value; } - return accumulator / values.length; + return values.length == 0 ? Double.NaN : accumulator / values.length ; } return 0.0; From 49340253282edc4bf8afd6a035e85488c218eff8 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 20 May 2021 11:27:25 -1000 Subject: [PATCH 2/5] Fix GapPolicyTests --- .../search/aggregations/pipeline/GapPolicyTests.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/GapPolicyTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/GapPolicyTests.java index 79ddcdc69bd77..f7e2e006035a3 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/GapPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/GapPolicyTests.java @@ -25,26 +25,30 @@ public GapPolicyTests() { public void testValidOrdinals() { assertThat(BucketHelpers.GapPolicy.INSERT_ZEROS.ordinal(), equalTo(0)); assertThat(BucketHelpers.GapPolicy.SKIP.ordinal(), equalTo(1)); + assertThat(BucketHelpers.GapPolicy.KEEP_VALUES.ordinal(), equalTo(2)); } @Override public void testFromString() { assertThat(BucketHelpers.GapPolicy.parse("insert_zeros", null), equalTo(BucketHelpers.GapPolicy.INSERT_ZEROS)); assertThat(BucketHelpers.GapPolicy.parse("skip", null), equalTo(BucketHelpers.GapPolicy.SKIP)); + assertThat(BucketHelpers.GapPolicy.parse("keep_values", null), equalTo(BucketHelpers.GapPolicy.KEEP_VALUES)); ParsingException e = expectThrows(ParsingException.class, () -> BucketHelpers.GapPolicy.parse("does_not_exist", null)); assertThat(e.getMessage(), - equalTo("Invalid gap policy: [does_not_exist], accepted values: [insert_zeros, skip]")); + equalTo("Invalid gap policy: [does_not_exist], accepted values: [insert_zeros, skip, keep_values]")); } @Override public void testReadFrom() throws IOException { assertReadFromStream(0, BucketHelpers.GapPolicy.INSERT_ZEROS); assertReadFromStream(1, BucketHelpers.GapPolicy.SKIP); + assertReadFromStream(2, BucketHelpers.GapPolicy.KEEP_VALUES); } @Override public void testWriteTo() throws IOException { assertWriteToStream(BucketHelpers.GapPolicy.INSERT_ZEROS, 0); assertWriteToStream(BucketHelpers.GapPolicy.SKIP, 1); + assertWriteToStream(BucketHelpers.GapPolicy.KEEP_VALUES, 2); } } From 448ee5931e61d797ee7ac9f0c8e831c88da50623 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 1 Jun 2021 08:39:36 -1000 Subject: [PATCH 3/5] Fix SerialDiffIT tests --- .../search/aggregations/pipeline/SerialDiffIT.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffIT.java index 5f1fe413fc085..7f602cdb614a3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffIT.java @@ -171,6 +171,12 @@ private void setupExpected(MetricTarget target) { metricValue = 0.0; } else { metricValue = PipelineAggregationHelperTests.calculateMetric(docValues, metric); + if (gapPolicy.equals(BucketHelpers.GapPolicy.KEEP_VALUES)) { + if (Double.isInfinite(metricValue) || Double.isNaN(metricValue)) { + // serial diff ignores these values and replaces them with null + metricValue = Double.NaN; + } + } } } else { @@ -204,13 +210,8 @@ private void setupExpected(MetricTarget target) { } lagWindow.add(metricValue); - - - - } - testValues.put(target.toString(), values); } From 59bcd4e51a80621c342d0e60f402551a064342e8 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 1 Jun 2021 13:57:23 -1000 Subject: [PATCH 4/5] Fix DerivativeIT --- .../search/aggregations/pipeline/DerivativeIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/DerivativeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/DerivativeIT.java index 237c9246109b3..1087bbac9bbb7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/DerivativeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/DerivativeIT.java @@ -557,7 +557,7 @@ public void testSingleValueAggDerivativeWithGaps_random() throws Exception { checkBucketKeyAndDocCount("InternalBucket " + i, bucket, i, valueCounts_empty_rnd[i]); Sum sum = bucket.getAggregations().get("sum"); double thisSumValue = sum.value(); - if (bucket.getDocCount() == 0) { + if (bucket.getDocCount() == 0 && gapPolicy != GapPolicy.KEEP_VALUES) { thisSumValue = gapPolicy == GapPolicy.INSERT_ZEROS ? 0 : Double.NaN; } SimpleValue sumDeriv = bucket.getAggregations().get("deriv"); From ac33c5ceaa94d9e906b5ae0d4ed1ebf20f8408a8 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 8 Jun 2021 06:34:45 -1000 Subject: [PATCH 5/5] Update server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java Co-authored-by: Mark Tozzi --- .../search/aggregations/pipeline/BucketHelpers.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java index b58b4746754d3..563cf974f333b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java @@ -41,7 +41,7 @@ public class BucketHelpers { * * "insert_zeros": empty buckets will be filled with zeros for all metrics * "skip": empty buckets will simply be ignored - * "none": for empty buckets the values provided by the metrics will still be used if they are available + * "keep_values": for empty buckets the values provided by the metrics will still be used if they are available */ public enum GapPolicy implements Writeable { INSERT_ZEROS((byte) 0, "insert_zeros", false) {