Skip to content

Commit 9009606

Browse files
authored
[Transform] add support for extended_stats (#120340)
Building off of `stats` and multi-value aggregations, including the limitation: - all values of extended_stats will be mapped to `double` if mapping deduction is used Relates #51925
1 parent d108c7a commit 9009606

File tree

10 files changed

+300
-3
lines changed

10 files changed

+300
-3
lines changed

docs/changelog/120340.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120340
2+
summary: Add support for `extended_stats`
3+
area: Transform
4+
type: enhancement
5+
issues: []

docs/reference/rest-api/common-parms.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,7 @@ currently supported:
808808
* <<search-aggregations-pipeline-bucket-script-aggregation,Bucket script>>
809809
* <<search-aggregations-pipeline-bucket-selector-aggregation,Bucket selector>>
810810
* <<search-aggregations-metrics-cardinality-aggregation,Cardinality>>
811+
* <<search-aggregations-metrics-extendedstats-aggregation,Extended Stats>>
811812
* <<search-aggregations-bucket-filter-aggregation,Filter>>
812813
* <<search-aggregations-metrics-geobounds-aggregation,Geo bounds>>
813814
* <<search-aggregations-metrics-geocentroid-aggregation,Geo centroid>>

server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregationBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.util.Map;
3030
import java.util.Objects;
31+
import java.util.Optional;
3132
import java.util.Set;
3233

3334
public class ExtendedStatsAggregationBuilder extends ValuesSourceAggregationBuilder.MetricsAggregationBuilder<
@@ -87,6 +88,11 @@ public Set<String> metricNames() {
8788
return InternalExtendedStats.METRIC_NAMES;
8889
}
8990

91+
@Override
92+
public Optional<Set<String>> getOutputFieldNames() {
93+
return Optional.of(InternalExtendedStats.Fields.OUTPUT_FORMAT);
94+
}
95+
9096
@Override
9197
protected ValuesSourceType defaultValueSourceType() {
9298
return CoreValuesSourceType.NUMERIC;

server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStats.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
*/
99
package org.elasticsearch.search.aggregations.metrics;
1010

11+
import org.elasticsearch.common.TriConsumer;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.search.DocValueFormat;
@@ -19,6 +20,7 @@
1920

2021
import java.io.IOException;
2122
import java.util.Collections;
23+
import java.util.HashMap;
2224
import java.util.Map;
2325
import java.util.Objects;
2426
import java.util.Set;
@@ -337,6 +339,67 @@ static class Fields {
337339
public static final String LOWER_POPULATION = "lower_population";
338340
public static final String UPPER_SAMPLING = "upper_sampling";
339341
public static final String LOWER_SAMPLING = "lower_sampling";
342+
343+
static final Set<String> OUTPUT_FORMAT = Set.of(
344+
Metrics.count.name(),
345+
Metrics.sum.name(),
346+
Metrics.min.name(),
347+
Metrics.max.name(),
348+
Metrics.avg.name(),
349+
SUM_OF_SQRS,
350+
VARIANCE,
351+
VARIANCE_POPULATION,
352+
VARIANCE_SAMPLING,
353+
STD_DEVIATION,
354+
STD_DEVIATION_POPULATION,
355+
STD_DEVIATION_SAMPLING,
356+
STD_DEVIATION_BOUNDS + "." + UPPER,
357+
STD_DEVIATION_BOUNDS + "." + LOWER,
358+
STD_DEVIATION_BOUNDS + "." + UPPER_POPULATION,
359+
STD_DEVIATION_BOUNDS + "." + LOWER_POPULATION,
360+
STD_DEVIATION_BOUNDS + "." + UPPER_SAMPLING,
361+
STD_DEVIATION_BOUNDS + "." + LOWER_SAMPLING
362+
);
363+
}
364+
365+
public Map<String, Object> asIndexableMap() {
366+
if (count != 0) {
367+
// NumberFieldMapper will invalidate non-finite doubles
368+
TriConsumer<Map<String, Object>, String, Double> putIfValidDouble = (map, key, value) -> {
369+
if (Double.isFinite(value)) {
370+
map.put(key, value);
371+
}
372+
};
373+
var extendedStatsMap = new HashMap<String, Object>(13);
374+
extendedStatsMap.put(Metrics.count.name(), getCount());
375+
putIfValidDouble.apply(extendedStatsMap, Metrics.sum.name(), getSum());
376+
putIfValidDouble.apply(extendedStatsMap, Metrics.min.name(), getMin());
377+
putIfValidDouble.apply(extendedStatsMap, Metrics.max.name(), getMax());
378+
putIfValidDouble.apply(extendedStatsMap, Metrics.avg.name(), getAvg());
379+
380+
putIfValidDouble.apply(extendedStatsMap, Fields.SUM_OF_SQRS, sumOfSqrs);
381+
putIfValidDouble.apply(extendedStatsMap, Fields.VARIANCE, getVariance());
382+
putIfValidDouble.apply(extendedStatsMap, Fields.VARIANCE_POPULATION, getVariancePopulation());
383+
putIfValidDouble.apply(extendedStatsMap, Fields.VARIANCE_SAMPLING, getVarianceSampling());
384+
putIfValidDouble.apply(extendedStatsMap, Fields.STD_DEVIATION, getStdDeviation());
385+
putIfValidDouble.apply(extendedStatsMap, Fields.STD_DEVIATION_POPULATION, getStdDeviationPopulation());
386+
putIfValidDouble.apply(extendedStatsMap, Fields.STD_DEVIATION_SAMPLING, getStdDeviationSampling());
387+
388+
var stdDevBounds = new HashMap<String, Object>(6);
389+
putIfValidDouble.apply(stdDevBounds, Fields.UPPER, getStdDeviationBound(Bounds.UPPER));
390+
putIfValidDouble.apply(stdDevBounds, Fields.LOWER, getStdDeviationBound(Bounds.LOWER));
391+
putIfValidDouble.apply(stdDevBounds, Fields.UPPER_POPULATION, getStdDeviationBound(Bounds.UPPER_POPULATION));
392+
putIfValidDouble.apply(stdDevBounds, Fields.LOWER_POPULATION, getStdDeviationBound(Bounds.LOWER_POPULATION));
393+
putIfValidDouble.apply(stdDevBounds, Fields.UPPER_SAMPLING, getStdDeviationBound(Bounds.UPPER_SAMPLING));
394+
putIfValidDouble.apply(stdDevBounds, Fields.LOWER_SAMPLING, getStdDeviationBound(Bounds.LOWER_SAMPLING));
395+
if (stdDevBounds.isEmpty() == false) {
396+
extendedStatsMap.put(Fields.STD_DEVIATION_BOUNDS, stdDevBounds);
397+
}
398+
399+
return extendedStatsMap;
400+
} else {
401+
return Map.of();
402+
}
340403
}
341404

342405
@Override

server/src/test/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregatorTests.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@
1919
import org.elasticsearch.index.mapper.DateFieldMapper;
2020
import org.elasticsearch.index.mapper.MappedFieldType;
2121
import org.elasticsearch.index.mapper.NumberFieldMapper;
22+
import org.elasticsearch.search.aggregations.AggregationBuilder;
2223
import org.elasticsearch.search.aggregations.AggregatorTestCase;
24+
import org.elasticsearch.search.aggregations.InternalAggregation;
2325
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
2426

2527
import java.io.IOException;
2628
import java.util.Map;
2729
import java.util.function.Consumer;
2830

2931
import static java.util.Collections.singleton;
30-
import static org.elasticsearch.search.aggregations.AggregationBuilders.stats;
32+
import static org.hamcrest.Matchers.is;
3133

3234
public class ExtendedStatsAggregatorTests extends AggregatorTestCase {
3335
private static final double TOLERANCE = 1e-5;
@@ -304,6 +306,13 @@ public void testCase(
304306
testCase(buildIndex, verify, new AggTestConfig(aggBuilder, ft));
305307
}
306308

309+
@Override
310+
protected <T extends AggregationBuilder, V extends InternalAggregation> void verifyOutputFieldNames(T aggregationBuilder, V agg)
311+
throws IOException {
312+
assertTrue(aggregationBuilder.getOutputFieldNames().isPresent());
313+
assertThat(aggregationBuilder.getOutputFieldNames().get(), is(InternalExtendedStats.Fields.OUTPUT_FORMAT));
314+
}
315+
307316
static class ExtendedSimpleStatsAggregator extends StatsAggregatorTests.SimpleStatsAggregator {
308317
double sumOfSqrs = 0;
309318

server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalExtendedStatsTests.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,29 @@
99

1010
package org.elasticsearch.search.aggregations.metrics;
1111

12+
import org.elasticsearch.common.bytes.BytesReference;
1213
import org.elasticsearch.common.util.Maps;
14+
import org.elasticsearch.common.xcontent.XContentHelper;
1315
import org.elasticsearch.search.DocValueFormat;
1416
import org.elasticsearch.search.aggregations.InternalAggregation;
1517
import org.elasticsearch.search.aggregations.support.SamplingContext;
1618
import org.elasticsearch.test.InternalAggregationTestCase;
19+
import org.elasticsearch.xcontent.ToXContent;
20+
import org.elasticsearch.xcontent.XContentFactory;
1721

22+
import java.io.IOException;
1823
import java.util.ArrayList;
1924
import java.util.HashMap;
2025
import java.util.List;
2126
import java.util.Map;
27+
import java.util.function.Predicate;
28+
29+
import static org.hamcrest.Matchers.aMapWithSize;
30+
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.hasKey;
32+
import static org.hamcrest.Matchers.is;
33+
import static org.hamcrest.Matchers.isA;
34+
import static org.hamcrest.Matchers.notNullValue;
2235

2336
public class InternalExtendedStatsTests extends InternalAggregationTestCase<InternalExtendedStats> {
2437

@@ -209,4 +222,75 @@ private void verifySumOfSqrsOfDoubles(double[] values, double expectedSumOfSqrs,
209222
InternalExtendedStats reduced = (InternalExtendedStats) InternalAggregationTestCase.reduce(aggregations, null);
210223
assertEquals(expectedSumOfSqrs, reduced.getSumOfSquares(), delta);
211224
}
225+
226+
@SuppressWarnings(value = "unchecked")
227+
public void testAsMapMatchesXContent() throws IOException {
228+
var stats = new InternalExtendedStats(
229+
"testAsMapIsSameAsXContent",
230+
randomLongBetween(1, 50),
231+
randomDoubleBetween(1, 50, true),
232+
randomDoubleBetween(1, 50, true),
233+
randomDoubleBetween(1, 50, true),
234+
randomDoubleBetween(1, 50, true),
235+
sigma,
236+
DocValueFormat.RAW,
237+
Map.of()
238+
);
239+
240+
var outputMap = stats.asIndexableMap();
241+
assertThat(outputMap, notNullValue());
242+
243+
Map<String, Object> xContentMap;
244+
try (var builder = XContentFactory.jsonBuilder()) {
245+
builder.startObject();
246+
stats.doXContentBody(builder, ToXContent.EMPTY_PARAMS);
247+
builder.endObject();
248+
xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2();
249+
}
250+
assertThat(xContentMap, notNullValue());
251+
252+
// serializing -> deserializing converts the long to an int, so we convert it back to test
253+
var countMetricName = InternalStats.Metrics.count.name();
254+
var xContentCount = xContentMap.get(countMetricName);
255+
assertThat(xContentCount, isA(Integer.class));
256+
assertThat(((Integer) xContentCount).longValue(), equalTo(outputMap.get(countMetricName)));
257+
258+
// verify the entries in the bounds map are similar
259+
var xContentStdDevBounds = (Map<String, Object>) xContentMap.get(InternalExtendedStats.Fields.STD_DEVIATION_BOUNDS);
260+
var outputStdDevBounds = (Map<String, Object>) outputMap.get(InternalExtendedStats.Fields.STD_DEVIATION_BOUNDS);
261+
xContentStdDevBounds.forEach((key, value) -> {
262+
if (value instanceof String == false || Double.isFinite(Double.parseDouble(value.toString()))) {
263+
assertThat(outputStdDevBounds.get(key), equalTo(value));
264+
}
265+
});
266+
267+
// verify all the other entries that are not "std_deviation_bounds" or "count"
268+
Predicate<Map.Entry<String, Object>> notCountOrStdDevBounds = Predicate.not(
269+
e -> e.getKey().equals(countMetricName) || e.getKey().equals(InternalExtendedStats.Fields.STD_DEVIATION_BOUNDS)
270+
);
271+
xContentMap.entrySet().stream().filter(notCountOrStdDevBounds).forEach(e -> {
272+
if (e.getValue() instanceof String == false || Double.isFinite(Double.parseDouble(e.getValue().toString()))) {
273+
assertThat(outputMap.get(e.getKey()), equalTo(e.getValue()));
274+
}
275+
});
276+
}
277+
278+
public void testIndexableMapExcludesNaN() {
279+
var stats = new InternalExtendedStats(
280+
"testAsMapIsSameAsXContent",
281+
randomLongBetween(1, 50),
282+
Double.NaN,
283+
Double.NaN,
284+
Double.NaN,
285+
Double.NaN,
286+
sigma,
287+
DocValueFormat.RAW,
288+
Map.of()
289+
);
290+
291+
var outputMap = stats.asIndexableMap();
292+
assertThat(outputMap, is(aMapWithSize(1)));
293+
assertThat(outputMap, hasKey(InternalStats.Metrics.count.name()));
294+
assertThat(outputMap.get(InternalStats.Metrics.count.name()), is(stats.getCount()));
295+
}
212296
}

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2003,6 +2003,84 @@ public void testPivotWithTopMetrics() throws Exception {
20032003
assertEquals("business_3", actual);
20042004
}
20052005

2006+
@SuppressWarnings(value = "unchecked")
2007+
public void testPivotWithExtendedStats() throws Exception {
2008+
var transformId = "extended_stats_transform";
2009+
var transformIndex = "extended_stats_pivot_reviews";
2010+
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
2011+
2012+
var createTransformRequest = createRequestWithAuth(
2013+
"PUT",
2014+
getTransformEndpoint() + transformId,
2015+
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
2016+
);
2017+
2018+
var config = Strings.format("""
2019+
{
2020+
"source": {
2021+
"index": "%s"
2022+
},
2023+
"dest": {
2024+
"index": "%s"
2025+
},
2026+
"pivot": {
2027+
"group_by": {
2028+
"reviewer": {
2029+
"terms": {
2030+
"field": "user_id"
2031+
}
2032+
}
2033+
},
2034+
"aggregations": {
2035+
"stars": {
2036+
"extended_stats": {
2037+
"field": "stars"
2038+
}
2039+
}
2040+
}
2041+
}
2042+
}""", REVIEWS_INDEX_NAME, transformIndex);
2043+
2044+
createTransformRequest.setJsonEntity(config);
2045+
var createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
2046+
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
2047+
2048+
startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
2049+
assertTrue(indexExists(transformIndex));
2050+
2051+
var searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
2052+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
2053+
var stdDevMap = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue("hits.hits._source.stars", searchResult)).get(0);
2054+
assertThat(stdDevMap.get("count"), equalTo(41));
2055+
assertThat(
2056+
stdDevMap,
2057+
allOf(
2058+
hasEntry("sum", 159.0),
2059+
hasEntry("min", 1.0),
2060+
hasEntry("max", 5.0),
2061+
hasEntry("avg", 3.8780487804878048),
2062+
hasEntry("sum_of_squares", 711.0),
2063+
hasEntry("variance", 2.3022010707911953),
2064+
hasEntry("variance_population", 2.3022010707911953),
2065+
hasEntry("variance_sampling", 2.3597560975609753),
2066+
hasEntry("std_deviation", 1.5173005868288574),
2067+
hasEntry("std_deviation_sampling", 1.5361497640402693),
2068+
hasEntry("std_deviation_population", 1.5173005868288574)
2069+
)
2070+
);
2071+
assertThat(
2072+
(Map<String, ?>) stdDevMap.get("std_deviation_bounds"),
2073+
allOf(
2074+
hasEntry("upper", 6.91264995414552),
2075+
hasEntry("lower", 0.84344760683009),
2076+
hasEntry("upper_population", 6.91264995414552),
2077+
hasEntry("lower_population", 0.84344760683009),
2078+
hasEntry("upper_sampling", 6.950348308568343),
2079+
hasEntry("lower_sampling", 0.8057492524072662)
2080+
)
2081+
);
2082+
}
2083+
20062084
public void testPivotWithBoxplot() throws Exception {
20072085
String transformId = "boxplot_transform";
20082086
String transformIndex = "boxplot_pivot_reviews";

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.search.aggregations.bucket.range.Range;
2424
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
2525
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
26+
import org.elasticsearch.search.aggregations.metrics.InternalExtendedStats;
2627
import org.elasticsearch.search.aggregations.metrics.MultiValueAggregation;
2728
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.MultiValue;
2829
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
@@ -69,6 +70,7 @@ public final class AggregationResultUtils {
6970
tempMap.put(GeoShapeMetricAggregation.class.getName(), new GeoShapeMetricAggExtractor());
7071
tempMap.put(MultiValue.class.getName(), new NumericMultiValueAggExtractor());
7172
tempMap.put(MultiValueAggregation.class.getName(), new MultiValueAggExtractor());
73+
tempMap.put(InternalExtendedStats.class.getName(), new ExtendedStatsExtractor());
7274
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
7375
}
7476

@@ -171,6 +173,9 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
171173
// TODO: can the Range extractor be removed?
172174
} else if (aggregation instanceof Range) {
173175
return TYPE_VALUE_EXTRACTOR_MAP.get(Range.class.getName());
176+
} else if (aggregation instanceof InternalExtendedStats) {
177+
// note: extended stats is also a multi bucket agg, therefore check range first
178+
return TYPE_VALUE_EXTRACTOR_MAP.get(InternalExtendedStats.class.getName());
174179
} else if (aggregation instanceof MultiValue) {
175180
return TYPE_VALUE_EXTRACTOR_MAP.get(MultiValue.class.getName());
176181
} else if (aggregation instanceof MultiValueAggregation) {
@@ -281,6 +286,13 @@ public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lo
281286
}
282287
}
283288

289+
static class ExtendedStatsExtractor implements AggValueExtractor {
290+
@Override
291+
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
292+
return ((InternalExtendedStats) agg).asIndexableMap();
293+
}
294+
}
295+
284296
static class MultiValueAggExtractor implements AggValueExtractor {
285297
@Override
286298
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public final class TransformAggregations {
6060
"date_histogram",
6161
"date_range",
6262
"diversified_sampler",
63-
"extended_stats", // https://github.com/elastic/elasticsearch/issues/51925
6463
"filters",
6564
"geo_distance",
6665
"geohash_grid",
@@ -120,7 +119,8 @@ enum AggregationType {
120119
MISSING("missing", LONG),
121120
TOP_METRICS("top_metrics", SOURCE),
122121
STATS("stats", DOUBLE),
123-
BOXPLOT("boxplot", DOUBLE);
122+
BOXPLOT("boxplot", DOUBLE),
123+
EXTENDED_STATS("extended_stats", DOUBLE);
124124

125125
private final String aggregationType;
126126
private final String targetMapping;

0 commit comments

Comments
 (0)