Skip to content

Commit d1e6e93

Browse files
imotovnot-napoleon
andauthored
Add keep_values gap policy (#73297) (#73927)
Adds a new keep_values gap policy that works like skip, except if the metric calculated on an empty bucket provides a non-null non-NaN value, this value is used for the bucket. Fixes #27377 Co-authored-by: Mark Tozzi <[email protected]>
1 parent 1f94aaf commit d1e6e93

File tree

9 files changed

+70
-24
lines changed

9 files changed

+70
-24
lines changed

docs/reference/aggregations/pipeline.asciidoc

+4
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,10 @@ _insert_zeros_::
268268
This option will replace missing values with a zero (`0`) and pipeline aggregation computation will
269269
proceed as normal.
270270

271+
_keep_values_::
272+
This option is similar to skip, except if the metric provides a non-null, non-NaN value this value is
273+
used, otherwise the empty bucket is skipped.
274+
271275
include::pipeline/avg-bucket-aggregation.asciidoc[]
272276

273277
include::pipeline/bucket-script-aggregation.asciidoc[]

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/DerivativeIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ public void testSingleValueAggDerivativeWithGaps_random() throws Exception {
558558
checkBucketKeyAndDocCount("InternalBucket " + i, bucket, i, valueCounts_empty_rnd[i]);
559559
Sum sum = bucket.getAggregations().get("sum");
560560
double thisSumValue = sum.value();
561-
if (bucket.getDocCount() == 0) {
561+
if (bucket.getDocCount() == 0 && gapPolicy != GapPolicy.KEEP_VALUES) {
562562
thisSumValue = gapPolicy == GapPolicy.INSERT_ZEROS ? 0 : Double.NaN;
563563
}
564564
SimpleValue sumDeriv = bucket.getAggregations().get("deriv");

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffIT.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void setupSuiteScopeCluster() throws Exception {
127127
numBuckets = randomIntBetween(10, 80);
128128
lag = randomIntBetween(1, numBuckets / 2);
129129

130-
gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.SKIP : BucketHelpers.GapPolicy.INSERT_ZEROS;
130+
gapPolicy = randomFrom(BucketHelpers.GapPolicy.values());
131131
metric = randomMetric("the_metric", VALUE_FIELD);
132132
mockHisto = PipelineAggregationHelperTests.generateHistogram(interval, numBuckets, randomDouble(), randomDouble());
133133

@@ -171,6 +171,12 @@ private void setupExpected(MetricTarget target) {
171171
metricValue = 0.0;
172172
} else {
173173
metricValue = PipelineAggregationHelperTests.calculateMetric(docValues, metric);
174+
if (gapPolicy.equals(BucketHelpers.GapPolicy.KEEP_VALUES)) {
175+
if (Double.isInfinite(metricValue) || Double.isNaN(metricValue)) {
176+
// serial diff ignores these values and replaces them with null
177+
metricValue = Double.NaN;
178+
}
179+
}
174180
}
175181

176182
} else {
@@ -204,13 +210,8 @@ private void setupExpected(MetricTarget target) {
204210
}
205211

206212
lagWindow.add(metricValue);
207-
208-
209-
210-
211213
}
212214

213-
214215
testValues.put(target.toString(), values);
215216
}
216217

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketHelpers.java

+41-12
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,40 @@ public class BucketHelpers {
4141
*
4242
* "insert_zeros": empty buckets will be filled with zeros for all metrics
4343
* "skip": empty buckets will simply be ignored
44+
* "keep_values": for empty buckets the values provided by the metrics will still be used if they are available
4445
*/
4546
public enum GapPolicy implements Writeable {
46-
INSERT_ZEROS((byte) 0, "insert_zeros"), SKIP((byte) 1, "skip");
47+
INSERT_ZEROS((byte) 0, "insert_zeros", false) {
48+
@Override
49+
public Double processValue(long docCount, Double value) {
50+
if (Double.isInfinite(value) || Double.isNaN(value) || docCount == 0) {
51+
return 0.0;
52+
} else {
53+
return value;
54+
}
55+
}
56+
},
57+
58+
SKIP((byte) 1, "skip", true) {
59+
@Override
60+
public Double processValue(long docCount, Double value) {
61+
if (Double.isInfinite(value) || docCount == 0) {
62+
return Double.NaN;
63+
} else {
64+
return value;
65+
}
66+
}
67+
},
68+
69+
KEEP_VALUES((byte) 2, "keep_values", true) {
70+
public Double processValue(long docCount, Double value) {
71+
if (Double.isInfinite(value) || Double.isNaN(value)) {
72+
return Double.NaN;
73+
} else {
74+
return value;
75+
}
76+
}
77+
};
4778

4879
/**
4980
* Parse a string GapPolicy into the byte enum
@@ -76,10 +107,12 @@ public static GapPolicy parse(String text, XContentLocation tokenLocation) {
76107

77108
private final byte id;
78109
private final ParseField parseField;
110+
public final boolean isSkippable;
79111

80-
GapPolicy(byte id, String name) {
112+
GapPolicy(byte id, String name, boolean isSkippable) {
81113
this.id = id;
82114
this.parseField = new ParseField(name);
115+
this.isSkippable = isSkippable;
83116
}
84117

85118
/**
@@ -113,6 +146,8 @@ public static GapPolicy readFrom(StreamInput in) throws IOException {
113146
public String getName() {
114147
return parseField.getPreferredName();
115148
}
149+
150+
public abstract Double processValue(long docCount, Double value);
116151
}
117152

118153
/**
@@ -161,18 +196,12 @@ public static Double resolveBucketValue(MultiBucketsAggregation agg,
161196
throw formatResolutionError(agg, aggPathAsList, propertyValue);
162197
}
163198
// doc count never has missing values so gap policy doesn't apply here
164-
boolean isDocCountProperty = aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0));
165-
if (Double.isInfinite(value) || Double.isNaN(value) || (bucket.getDocCount() == 0 && isDocCountProperty == false)) {
166-
switch (gapPolicy) {
167-
case INSERT_ZEROS:
168-
return 0.0;
169-
case SKIP:
170-
default:
171-
return Double.NaN;
172-
}
173-
} else {
199+
if (aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0))) {
174200
return value;
201+
} else {
202+
return gapPolicy.processValue(bucket.getDocCount(), value);
175203
}
204+
176205
}
177206
} catch (InvalidAggregationPathException e) {
178207
return null;

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
8888
String varName = entry.getKey();
8989
String bucketsPath = entry.getValue();
9090
Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy);
91-
if (GapPolicy.SKIP == gapPolicy && (value == null || Double.isNaN(value))) {
91+
if (gapPolicy.isSkippable && (value == null || Double.isNaN(value))) {
9292
skipBucket = true;
9393
break;
9494
}

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketSortPipelineAggregator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private Map<FieldSortBuilder, Comparable<Object>> resolveAndCacheSortValues() {
122122
resolved.put(sort, (Comparable<Object>) internalBucket.getKey());
123123
} else {
124124
Double bucketValue = BucketHelpers.resolveBucketValue(parentAgg, internalBucket, sortField, gapPolicy);
125-
if (GapPolicy.SKIP == gapPolicy && Double.isNaN(bucketValue)) {
125+
if (gapPolicy.isSkippable && Double.isNaN(bucketValue)) {
126126
continue;
127127
}
128128
resolved.put(sort, (Comparable<Object>) (Object) bucketValue);

server/src/test/java/org/elasticsearch/search/aggregations/pipeline/DerivativeAggregatorTests.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,15 @@ public void testSingleValueAggDerivativeWithGaps_random() throws IOException {
586586
Sum sum = bucket.getAggregations().get("sum");
587587
double thisSumValue = sum.value();
588588
if (bucket.getDocCount() == 0) {
589-
thisSumValue = gapPolicy == GapPolicy.INSERT_ZEROS ? 0 : Double.NaN;
589+
switch (gapPolicy) {
590+
case INSERT_ZEROS:
591+
thisSumValue = 0;
592+
break;
593+
case KEEP_VALUES:
594+
break;
595+
default:
596+
thisSumValue = Double.NaN;
597+
}
590598
}
591599
SimpleValue sumDeriv = bucket.getAggregations().get("deriv");
592600
if (i == 0) {

server/src/test/java/org/elasticsearch/search/aggregations/pipeline/GapPolicyTests.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,30 @@ public GapPolicyTests() {
2525
public void testValidOrdinals() {
2626
assertThat(BucketHelpers.GapPolicy.INSERT_ZEROS.ordinal(), equalTo(0));
2727
assertThat(BucketHelpers.GapPolicy.SKIP.ordinal(), equalTo(1));
28+
assertThat(BucketHelpers.GapPolicy.KEEP_VALUES.ordinal(), equalTo(2));
2829
}
2930

3031
@Override
3132
public void testFromString() {
3233
assertThat(BucketHelpers.GapPolicy.parse("insert_zeros", null), equalTo(BucketHelpers.GapPolicy.INSERT_ZEROS));
3334
assertThat(BucketHelpers.GapPolicy.parse("skip", null), equalTo(BucketHelpers.GapPolicy.SKIP));
35+
assertThat(BucketHelpers.GapPolicy.parse("keep_values", null), equalTo(BucketHelpers.GapPolicy.KEEP_VALUES));
3436
ParsingException e = expectThrows(ParsingException.class, () -> BucketHelpers.GapPolicy.parse("does_not_exist", null));
3537
assertThat(e.getMessage(),
36-
equalTo("Invalid gap policy: [does_not_exist], accepted values: [insert_zeros, skip]"));
38+
equalTo("Invalid gap policy: [does_not_exist], accepted values: [insert_zeros, skip, keep_values]"));
3739
}
3840

3941
@Override
4042
public void testReadFrom() throws IOException {
4143
assertReadFromStream(0, BucketHelpers.GapPolicy.INSERT_ZEROS);
4244
assertReadFromStream(1, BucketHelpers.GapPolicy.SKIP);
45+
assertReadFromStream(2, BucketHelpers.GapPolicy.KEEP_VALUES);
4346
}
4447

4548
@Override
4649
public void testWriteTo() throws IOException {
4750
assertWriteToStream(BucketHelpers.GapPolicy.INSERT_ZEROS, 0);
4851
assertWriteToStream(BucketHelpers.GapPolicy.SKIP, 1);
52+
assertWriteToStream(BucketHelpers.GapPolicy.KEEP_VALUES, 2);
4953
}
5054
}

server/src/test/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregationHelperTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public static double calculateMetric(double[] values, ValuesSourceAggregationBui
129129
for (double value : values) {
130130
accumulator += value;
131131
}
132-
return accumulator / values.length;
132+
return values.length == 0 ? Double.NaN : accumulator / values.length ;
133133
}
134134

135135
return 0.0;

0 commit comments

Comments
 (0)