Skip to content

Gap policy for scripted subaggregates #28077

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -158,33 +158,43 @@ public static Double resolveBucketValue(MultiBucketsAggregation agg,
if (propertyValue == null) {
throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
+ " must reference either a number value or a single value numeric metric aggregation");
} else {
double value;
if (propertyValue instanceof Number) {
value = ((Number) propertyValue).doubleValue();
} else if (propertyValue instanceof InternalNumericMetricsAggregation.SingleValue) {
value = ((InternalNumericMetricsAggregation.SingleValue) propertyValue).value();
} else {
throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
+ " must reference either a number value or a single value numeric metric aggregation, got: "
+ propertyValue.getClass().getCanonicalName());
}
// doc count never has missing values so gap policy doesn't apply here
boolean isDocCountProperty = aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0));
if (Double.isInfinite(value) || Double.isNaN(value) || (bucket.getDocCount() == 0 && !isDocCountProperty)) {
switch (gapPolicy) {
case INSERT_ZEROS:
return 0.0;
case SKIP:
default:
return Double.NaN;
}
} else {
return value;
}
}
boolean isDocCountProperty = aggPathAsList.size() == 1 && "_count".equals(aggPathAsList.get(0));
if (GapPolicy.SKIP == gapPolicy && bucket.getDocCount() == 0 && !isDocCountProperty) {
return Double.NaN;
}

double value = getBucketPropertyValue(agg, bucket, aggPathAsList);
if (Double.isFinite(value)) {
return value;
}
return GapPolicy.INSERT_ZEROS == gapPolicy ? 0.0 : Double.NaN;
} catch (InvalidAggregationPathException e) {
return null;
}
}

public static Double getBucketPropertyValue(MultiBucketsAggregation agg,
InternalMultiBucketAggregation.InternalBucket bucket, String aggPath) {
return getBucketPropertyValue(agg, bucket, AggregationPath.parse(aggPath).getPathElementsAsStringList());
}

private static double getBucketPropertyValue(MultiBucketsAggregation agg,
InternalMultiBucketAggregation.InternalBucket bucket, List<String> aggPathAsList) {
Object propertyValue = bucket.getProperty(agg.getName(), aggPathAsList);
if (propertyValue == null) {
throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
+ " must reference either a number value or a single value numeric metric aggregation");
}

if (propertyValue instanceof Number) {
return ((Number) propertyValue).doubleValue();
} else if (propertyValue instanceof InternalNumericMetricsAggregation.SingleValue) {
return ((InternalNumericMetricsAggregation.SingleValue) propertyValue).value();
} else {
throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
+ " must reference either a number value or a single value numeric metric aggregation, got: "
+ propertyValue.getClass().getCanonicalName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
Expand Down Expand Up @@ -96,37 +97,24 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
if (script.getParams() != null) {
vars.putAll(script.getParams());
}
boolean skipBucket = false;
for (Map.Entry<String, String> entry : bucketsPathsMap.entrySet()) {
String varName = entry.getKey();
String bucketsPath = entry.getValue();
Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy);
if (GapPolicy.SKIP == gapPolicy && (value == null || Double.isNaN(value))) {
skipBucket = true;
break;
}
vars.put(varName, value);
}
if (skipBucket) {
bucketsPathsMap.forEach((varName, bucketsPath) -> vars.put(
varName, BucketHelpers.getBucketPropertyValue(originalAgg, bucket, bucketsPath)));
ExecutableScript executableScript = factory.newInstance(vars);
Object returned = executableScript.run();
if (returned == null) {
newBuckets.add(bucket);
} else {
ExecutableScript executableScript = factory.newInstance(vars);
Object returned = executableScript.run();
if (returned == null) {
newBuckets.add(bucket);
} else {
if (!(returned instanceof Number)) {
throw new AggregationExecutionException("series_arithmetic script for reducer [" + name()
+ "] must return a Number");
}
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map(
(p) -> (InternalAggregation) p).collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), ((Number) returned).doubleValue(), formatter,
new ArrayList<>(), metaData()));
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs),
bucket);
newBuckets.add(newBucket);
if (!(returned instanceof Number)) {
throw new AggregationExecutionException("series_arithmetic script for reducer [" + name()
+ "] must return a Number");
}
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map(
(p) -> (InternalAggregation) p).collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), ((Number) returned).doubleValue(), formatter,
new ArrayList<>(), metaData()));
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs),
bucket);
newBuckets.add(newBucket);
}
}
return originalAgg.create(newBuckets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

Expand Down Expand Up @@ -89,12 +90,8 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
if (script.getParams() != null) {
vars.putAll(script.getParams());
}
for (Map.Entry<String, String> entry : bucketsPathsMap.entrySet()) {
String varName = entry.getKey();
String bucketsPath = entry.getValue();
Double value = resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy);
vars.put(varName, value);
}
bucketsPathsMap.forEach((varName, bucketsPath) -> vars.put(varName,
BucketHelpers.getBucketPropertyValue(originalAgg, bucket, bucketsPath)));
// TODO: can we use one instance of the script for all buckets? it should be stateless?
ExecutableScript executableScript = factory.newInstance(vars);
Object scriptReturnValue = executableScript.run();
Expand Down