Skip to content

Add keep_values gap policy #73297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 8, 2021
4 changes: 4 additions & 0 deletions docs/reference/aggregations/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ _insert_zeros_::
This option will replace missing values with a zero (`0`) and pipeline aggregation computation will
proceed as normal.

_keep_values_::
This option is similar to skip, except if the metric provides a non-null, non-NaN value this value is
used, otherwise the empty bucket is skipped.

include::pipeline/avg-bucket-aggregation.asciidoc[]

include::pipeline/bucket-script-aggregation.asciidoc[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ public void testSingleValueAggDerivativeWithGaps_random() throws Exception {
checkBucketKeyAndDocCount("InternalBucket " + i, bucket, i, valueCounts_empty_rnd[i]);
Sum sum = bucket.getAggregations().get("sum");
double thisSumValue = sum.value();
if (bucket.getDocCount() == 0) {
if (bucket.getDocCount() == 0 && gapPolicy != GapPolicy.KEEP_VALUES) {
thisSumValue = gapPolicy == GapPolicy.INSERT_ZEROS ? 0 : Double.NaN;
}
SimpleValue sumDeriv = bucket.getAggregations().get("deriv");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void setupSuiteScopeCluster() throws Exception {
numBuckets = randomIntBetween(10, 80);
lag = randomIntBetween(1, numBuckets / 2);

gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.SKIP : BucketHelpers.GapPolicy.INSERT_ZEROS;
gapPolicy = randomFrom(BucketHelpers.GapPolicy.values());
metric = randomMetric("the_metric", VALUE_FIELD);
mockHisto = PipelineAggregationHelperTests.generateHistogram(interval, numBuckets, randomDouble(), randomDouble());

Expand Down Expand Up @@ -171,6 +171,12 @@ private void setupExpected(MetricTarget target) {
metricValue = 0.0;
} else {
metricValue = PipelineAggregationHelperTests.calculateMetric(docValues, metric);
if (gapPolicy.equals(BucketHelpers.GapPolicy.KEEP_VALUES)) {
if (Double.isInfinite(metricValue) || Double.isNaN(metricValue)) {
// serial diff ignores these values and replaces them with null
metricValue = Double.NaN;
}
}
}

} else {
Expand Down Expand Up @@ -204,13 +210,8 @@ private void setupExpected(MetricTarget target) {
}

lagWindow.add(metricValue);




}


testValues.put(target.toString(), values);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,40 @@ public class BucketHelpers {
*
* "insert_zeros": empty buckets will be filled with zeros for all metrics
* "skip": empty buckets will simply be ignored
* "keep_values": for empty buckets the values provided by the metrics will still be used if they are available
*/
public enum GapPolicy implements Writeable {
INSERT_ZEROS((byte) 0, "insert_zeros"), SKIP((byte) 1, "skip");
INSERT_ZEROS((byte) 0, "insert_zeros", false) {
@Override
public Double processValue(long docCount, Double value) {
if (Double.isInfinite(value) || Double.isNaN(value) || docCount == 0) {
return 0.0;
} else {
return value;
}
}
},

SKIP((byte) 1, "skip", true) {
@Override
public Double processValue(long docCount, Double value) {
if (Double.isInfinite(value) || docCount == 0) {
return Double.NaN;
} else {
return value;
}
}
},

KEEP_VALUES((byte) 2, "keep_values", true) {
public Double processValue(long docCount, Double value) {
if (Double.isInfinite(value) || Double.isNaN(value)) {
return Double.NaN;
} else {
return value;
}
}
};

/**
* Parse a string GapPolicy into the byte enum
Expand Down Expand Up @@ -76,10 +107,12 @@ public static GapPolicy parse(String text, XContentLocation tokenLocation) {

private final byte id;
private final ParseField parseField;
public final boolean isSkippable;

GapPolicy(byte id, String name) {
GapPolicy(byte id, String name, boolean isSkippable) {
this.id = id;
this.parseField = new ParseField(name);
this.isSkippable = isSkippable;
}

/**
Expand Down Expand Up @@ -113,6 +146,8 @@ public static GapPolicy readFrom(StreamInput in) throws IOException {
public String getName() {
return parseField.getPreferredName();
}

public abstract Double processValue(long docCount, Double value);
}

/**
Expand Down Expand Up @@ -161,18 +196,12 @@ public static Double resolveBucketValue(MultiBucketsAggregation agg,
throw formatResolutionError(agg, aggPathAsList, propertyValue);
}
// doc count never has missing values so gap policy doesn't apply here
boolean isDocCountProperty = aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0));
if (Double.isInfinite(value) || Double.isNaN(value) || (bucket.getDocCount() == 0 && isDocCountProperty == false)) {
switch (gapPolicy) {
case INSERT_ZEROS:
return 0.0;
case SKIP:
default:
return Double.NaN;
}
} else {
if (aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0))) {
return value;
} else {
return gapPolicy.processValue(bucket.getDocCount(), value);
}

}
} catch (InvalidAggregationPathException e) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
String varName = entry.getKey();
String bucketsPath = entry.getValue();
Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy);
if (GapPolicy.SKIP == gapPolicy && (value == null || Double.isNaN(value))) {
if (gapPolicy.isSkippable && (value == null || Double.isNaN(value))) {
skipBucket = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private Map<FieldSortBuilder, Comparable<Object>> resolveAndCacheSortValues() {
resolved.put(sort, (Comparable<Object>) internalBucket.getKey());
} else {
Double bucketValue = BucketHelpers.resolveBucketValue(parentAgg, internalBucket, sortField, gapPolicy);
if (GapPolicy.SKIP == gapPolicy && Double.isNaN(bucketValue)) {
if (gapPolicy.isSkippable && Double.isNaN(bucketValue)) {
continue;
}
resolved.put(sort, (Comparable<Object>) (Object) bucketValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,15 @@ public void testSingleValueAggDerivativeWithGaps_random() throws IOException {
Sum sum = bucket.getAggregations().get("sum");
double thisSumValue = sum.value();
if (bucket.getDocCount() == 0) {
thisSumValue = gapPolicy == GapPolicy.INSERT_ZEROS ? 0 : Double.NaN;
switch (gapPolicy) {
case INSERT_ZEROS:
thisSumValue = 0;
break;
case KEEP_VALUES:
break;
default:
thisSumValue = Double.NaN;
}
}
SimpleValue sumDeriv = bucket.getAggregations().get("deriv");
if (i == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,30 @@ public GapPolicyTests() {
public void testValidOrdinals() {
assertThat(BucketHelpers.GapPolicy.INSERT_ZEROS.ordinal(), equalTo(0));
assertThat(BucketHelpers.GapPolicy.SKIP.ordinal(), equalTo(1));
assertThat(BucketHelpers.GapPolicy.KEEP_VALUES.ordinal(), equalTo(2));
}

@Override
public void testFromString() {
assertThat(BucketHelpers.GapPolicy.parse("insert_zeros", null), equalTo(BucketHelpers.GapPolicy.INSERT_ZEROS));
assertThat(BucketHelpers.GapPolicy.parse("skip", null), equalTo(BucketHelpers.GapPolicy.SKIP));
assertThat(BucketHelpers.GapPolicy.parse("keep_values", null), equalTo(BucketHelpers.GapPolicy.KEEP_VALUES));
ParsingException e = expectThrows(ParsingException.class, () -> BucketHelpers.GapPolicy.parse("does_not_exist", null));
assertThat(e.getMessage(),
equalTo("Invalid gap policy: [does_not_exist], accepted values: [insert_zeros, skip]"));
equalTo("Invalid gap policy: [does_not_exist], accepted values: [insert_zeros, skip, keep_values]"));
}

@Override
public void testReadFrom() throws IOException {
assertReadFromStream(0, BucketHelpers.GapPolicy.INSERT_ZEROS);
assertReadFromStream(1, BucketHelpers.GapPolicy.SKIP);
assertReadFromStream(2, BucketHelpers.GapPolicy.KEEP_VALUES);
}

@Override
public void testWriteTo() throws IOException {
assertWriteToStream(BucketHelpers.GapPolicy.INSERT_ZEROS, 0);
assertWriteToStream(BucketHelpers.GapPolicy.SKIP, 1);
assertWriteToStream(BucketHelpers.GapPolicy.KEEP_VALUES, 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public static double calculateMetric(double[] values, ValuesSourceAggregationBui
for (double value : values) {
accumulator += value;
}
return accumulator / values.length;
return values.length == 0 ? Double.NaN : accumulator / values.length ;
}

return 0.0;
Expand Down