diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java index 6201dd936ba19..0a5c25e77cb11 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java @@ -6,15 +6,13 @@ package org.elasticsearch.xpack.dataframe.transforms.pivot; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.Numbers; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.metrics.GeoCentroid; -import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue; import org.elasticsearch.search.aggregations.metrics.ScriptedMetric; import org.elasticsearch.xpack.core.dataframe.DataFrameField; @@ -23,6 +21,7 @@ import org.elasticsearch.xpack.dataframe.transforms.IDGenerator; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,7 +31,15 @@ import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType; public final class AggregationResultUtils { - private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class); + + private static final Map TYPE_VALUE_EXTRACTOR_MAP; + static { + Map tempMap = new HashMap<>(); + tempMap.put(SingleValue.class.getName(), new SingleValueAggExtractor()); + tempMap.put(ScriptedMetric.class.getName(), new ScriptedMetricAggExtractor()); + tempMap.put(GeoCentroid.class.getName(), new GeoCentroidAggExtractor()); + TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap); + } /** * Extracts aggregation results from a composite aggregation and puts it into a map. @@ -73,27 +80,8 @@ public static Stream> extractCompositeAggregationResults(Com // TODO: support other aggregation types Aggregation aggResult = bucket.getAggregations().get(aggName); - if (aggResult instanceof NumericMetricsAggregation.SingleValue) { - NumericMetricsAggregation.SingleValue aggResultSingleValue = (SingleValue) aggResult; - // If the type is numeric or if the formatted string is the same as simply making the value a string, - // gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs. - if (isNumericType(fieldType) || - (aggResultSingleValue.getValueAsString().equals(String.valueOf(aggResultSingleValue.value())))) { - updateDocument(document, aggName, aggResultSingleValue.value()); - } else { - updateDocument(document, aggName, aggResultSingleValue.getValueAsString()); - } - } else if (aggResult instanceof ScriptedMetric) { - updateDocument(document, aggName, ((ScriptedMetric) aggResult).aggregation()); - } else if (aggResult instanceof GeoCentroid) { - updateDocument(document, aggName, ((GeoCentroid) aggResult).centroid().toString()); - } else { - // Execution should never reach this point! - // Creating transforms with unsupported aggregations shall not be possible - throw new AggregationExtractionException("unsupported aggregation [{}] with name [{}]", - aggResult.getType(), - aggResult.getName()); - } + AggValueExtractor extractor = getExtractor(aggResult); + updateDocument(document, aggName, extractor.value(aggResult, fieldType)); } document.put(DataFrameField.DOCUMENT_ID_FIELD, idGen.getID()); @@ -102,6 +90,23 @@ public static Stream> extractCompositeAggregationResults(Com }); } + static AggValueExtractor getExtractor(Aggregation aggregation) { + if (aggregation instanceof SingleValue) { + return TYPE_VALUE_EXTRACTOR_MAP.get(SingleValue.class.getName()); + } else if (aggregation instanceof ScriptedMetric) { + return TYPE_VALUE_EXTRACTOR_MAP.get(ScriptedMetric.class.getName()); + } else if (aggregation instanceof GeoCentroid) { + return TYPE_VALUE_EXTRACTOR_MAP.get(GeoCentroid.class.getName()); + } else { + // Execution should never reach this point! + // Creating transforms with unsupported aggregations shall not be possible + throw new AggregationExtractionException("unsupported aggregation [{}] with name [{}]", + aggregation.getType(), + aggregation.getName()); + } + } + + @SuppressWarnings("unchecked") static void updateDocument(Map document, String fieldName, Object value) { String[] fieldTokens = fieldName.split("\\."); @@ -147,4 +152,44 @@ public static class AggregationExtractionException extends ElasticsearchExceptio super(msg, args); } } + + private interface AggValueExtractor { + Object value(Aggregation aggregation, String fieldType); + } + + private static class SingleValueAggExtractor implements AggValueExtractor { + @Override + public Object value(Aggregation agg, String fieldType) { + SingleValue aggregation = (SingleValue)agg; + // If the double is invalid, this indicates sparse data + if (Numbers.isValidDouble(aggregation.value()) == false) { + return null; + } + // If the type is numeric or if the formatted string is the same as simply making the value a string, + // gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs. + if (isNumericType(fieldType) || + aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))){ + return aggregation.value(); + } else { + return aggregation.getValueAsString(); + } + } + } + + private static class ScriptedMetricAggExtractor implements AggValueExtractor { + @Override + public Object value(Aggregation agg, String fieldType) { + ScriptedMetric aggregation = (ScriptedMetric)agg; + return aggregation.aggregation(); + } + } + + private static class GeoCentroidAggExtractor implements AggValueExtractor { + @Override + public Object value(Aggregation agg, String fieldType) { + GeoCentroid aggregation = (GeoCentroid)agg; + // if the account is `0` iff there is no contained centroid + return aggregation.count() > 0 ? aggregation.centroid().toString() : null; + } + } } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java index 1a835c9d19b59..34a4b0d80b4e4 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java @@ -135,8 +135,8 @@ aggTypedName, asMap( KEY, asMap( targetField, "ID3"), aggTypedName, asMap( - "value", 12.55), - DOC_COUNT, 9) + "value", Double.NaN), + DOC_COUNT, 0) )); List> expected = asList( @@ -150,14 +150,14 @@ aggTypedName, asMap( ), asMap( targetField, "ID3", - aggName, 12.55 + aggName, null ) ); Map fieldTypeMap = asStringMap( targetField, "keyword", aggName, "double" ); - executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 20); + executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 11); } public void testExtractCompositeAggregationResultsMultipleGroups() throws IOException { @@ -212,8 +212,8 @@ KEY, asMap( targetField2, "ID2_2" ), aggTypedName, asMap( - "value", 12.55), - DOC_COUNT, 4) + "value", Double.NaN), + DOC_COUNT, 0) )); List> expected = asList( @@ -235,7 +235,7 @@ aggTypedName, asMap( asMap( targetField, "ID3", targetField2, "ID2_2", - aggName, 12.55 + aggName, null ) ); Map fieldTypeMap = asStringMap( @@ -243,7 +243,7 @@ aggTypedName, asMap( targetField, "keyword", targetField2, "keyword" ); - executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 10); + executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 6); } public void testExtractCompositeAggregationResultsMultiAggregations() throws IOException { @@ -287,7 +287,7 @@ KEY, asMap( aggTypedName, asMap( "value", 12.55), aggTypedName2, asMap( - "value", -2.44), + "value", Double.NaN), DOC_COUNT, 1) )); @@ -305,7 +305,7 @@ aggTypedName2, asMap( asMap( targetField, "ID3", aggName, 12.55, - aggName2, -2.44 + aggName2, null ) ); Map fieldTypeMap = asStringMap( @@ -383,8 +383,8 @@ KEY, asMap( aggTypedName, asMap( "value", 12.55), aggTypedName2, asMap( - "value", -100.44, - "value_as_string", "-100.44F"), + "value", Double.NaN, + "value_as_string", "NaN"), DOC_COUNT, 4) )); @@ -411,7 +411,7 @@ aggTypedName2, asMap( targetField, "ID3", targetField2, "ID2_2", aggName, 12.55, - aggName2, "-100.44F" + aggName2, null ) ); Map fieldTypeMap = asStringMap( @@ -476,8 +476,8 @@ KEY, asMap( targetField2, "ID2_2" ), aggTypedName, asMap( - "value", asMap("field", 12.0)), - DOC_COUNT, 4) + "value", null), + DOC_COUNT, 0) )); List> expected = asList( @@ -499,14 +499,14 @@ aggName, asMap("field", 2.13) asMap( targetField, "ID3", targetField2, "ID2_2", - aggName, asMap("field", 12.0) + aggName, null ) ); Map fieldTypeMap = asStringMap( targetField, "keyword", targetField2, "keyword" ); - executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 10); + executeTest(groupBy, aggregationBuilders, Collections.emptyList(), input, fieldTypeMap, expected, 6); } public void testExtractCompositeAggregationResultsWithPipelineAggregation() throws IOException { @@ -576,7 +576,7 @@ KEY, asMap( aggTypedName, asMap( "value", 12.0), pipelineAggTypedName, asMap( - "value", 12.0), + "value", Double.NaN), DOC_COUNT, 4) )); @@ -603,7 +603,7 @@ pipelineAggTypedName, asMap( targetField, "ID3", targetField2, "ID2_2", aggName, 12.0, - pipelineAggName, 12.0 + pipelineAggName, null ) ); Map fieldTypeMap = asStringMap(