Skip to content

Commit 7fff5df

Browse files
author
Hendrik Muhs
authored
[Transform] add support for top metrics (#71850)
add support for the stats and top metrics aggregation in transform. With this change it became easier to add more multi value aggregations to transform Limitations: - only the 1st element of top_metrics gets consumed by transform[*]. - all values of stats will be mapped to double if mapping deduction is used, including count, sum, min, max fixes #52236 relates #51925
1 parent 12b60f6 commit 7fff5df

File tree

8 files changed

+362
-24
lines changed

8 files changed

+362
-24
lines changed

docs/reference/rest-api/common-parms.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,8 +716,10 @@ currently supported:
716716
* <<search-aggregations-metrics-percentile-aggregation,Percentiles>>
717717
* <<search-aggregations-bucket-rare-terms-aggregation, Rare Terms>>
718718
* <<search-aggregations-metrics-scripted-metric-aggregation,Scripted metric>>
719+
* <<search-aggregations-metrics-stats-aggregation,Stats>>
719720
* <<search-aggregations-metrics-sum-aggregation,Sum>>
720721
* <<search-aggregations-bucket-terms-aggregation, Terms>>
722+
* <<search-aggregations-metrics-top-metrics,Top Metrics>>
721723
* <<search-aggregations-metrics-valuecount-aggregation,Value count>>
722724
* <<search-aggregations-metrics-weight-avg-aggregation,Weighted average>>
723725

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,6 +1412,57 @@ public void testPivotWithWeightedAvgAgg() throws Exception {
14121412
assertEquals(4.47169811, actual.doubleValue(), 0.000001);
14131413
}
14141414

1415+
public void testPivotWithTopMetrics() throws Exception {
1416+
String transformId = "top_metrics_transform";
1417+
String transformIndex = "top_metrics_pivot_reviews";
1418+
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
1419+
1420+
final Request createTransformRequest = createRequestWithAuth(
1421+
"PUT",
1422+
getTransformEndpoint() + transformId,
1423+
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
1424+
);
1425+
1426+
String config = "{"
1427+
+ " \"source\": {\"index\":\""
1428+
+ REVIEWS_INDEX_NAME
1429+
+ "\"},"
1430+
+ " \"dest\": {\"index\":\""
1431+
+ transformIndex
1432+
+ "\"},";
1433+
1434+
config += " \"pivot\": {"
1435+
+ " \"group_by\": {"
1436+
+ " \"reviewer\": {"
1437+
+ " \"terms\": {"
1438+
+ " \"field\": \"user_id\""
1439+
+ " } } },"
1440+
+ " \"aggregations\": {"
1441+
+ " \"top_business\": {"
1442+
+ " \"top_metrics\": {"
1443+
+ " \"metrics\": {\"field\": \"business_id\"},"
1444+
+ " \"sort\": {\"timestamp\": \"desc\"}"
1445+
+ "} } } }"
1446+
+ "}";
1447+
1448+
createTransformRequest.setJsonEntity(config);
1449+
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
1450+
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
1451+
1452+
startAndWaitForTransform(transformId, transformIndex, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS);
1453+
assertTrue(indexExists(transformIndex));
1454+
1455+
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
1456+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
1457+
String actual = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_business.business_id", searchResult)).get(0);
1458+
assertEquals("business_9", actual);
1459+
1460+
searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_1");
1461+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
1462+
actual = (String) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_business.business_id", searchResult)).get(0);
1463+
assertEquals("business_3", actual);
1464+
}
1465+
14151466
public void testManyBucketsWithSmallPageSize() throws Exception {
14161467
String transformId = "test_with_many_buckets";
14171468
String transformIndex = transformId + "-idx";

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,10 @@ protected void createPivotReviewsTransform(
276276
+ " \"affiliate_missing\": {"
277277
+ " \"missing\": {"
278278
+ " \"field\": \"affiliate_id\""
279-
279+
+ " } },"
280+
+ " \"stats\": {"
281+
+ " \"stats\": {"
282+
+ " \"field\": \"stars\""
280283
+ " } } } },"
281284
+ "\"frequency\":\"1s\""
282285
+ "}";

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
2626
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
2727
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
28+
import org.elasticsearch.search.aggregations.metrics.MultiValueAggregation;
29+
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.MultiValue;
2830
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
2931
import org.elasticsearch.search.aggregations.metrics.Percentile;
3032
import org.elasticsearch.search.aggregations.metrics.Percentiles;
@@ -63,6 +65,8 @@ public final class AggregationResultUtils {
6365
tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
6466
tempMap.put(MultiBucketsAggregation.class.getName(), new MultiBucketsAggExtractor());
6567
tempMap.put(GeoShapeMetricAggregation.class.getName(), new GeoShapeMetricAggExtractor());
68+
tempMap.put(MultiValue.class.getName(), new NumericMultiValueAggExtractor());
69+
tempMap.put(MultiValueAggregation.class.getName(), new MultiValueAggExtractor());
6670
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
6771
}
6872

@@ -151,8 +155,14 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
151155
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoCentroid.class.getName());
152156
} else if (aggregation instanceof GeoBounds) {
153157
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName());
158+
// note: percentiles is also a multi value agg, therefore check percentiles first
159+
// TODO: can the Percentiles extractor be removed?
154160
} else if (aggregation instanceof Percentiles) {
155161
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
162+
} else if (aggregation instanceof MultiValue) {
163+
return TYPE_VALUE_EXTRACTOR_MAP.get(MultiValue.class.getName());
164+
} else if (aggregation instanceof MultiValueAggregation) {
165+
return TYPE_VALUE_EXTRACTOR_MAP.get(MultiValueAggregation.class.getName());
156166
} else if (aggregation instanceof SingleBucketAggregation) {
157167
return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
158168
} else if (aggregation instanceof MultiBucketsAggregation) {
@@ -259,6 +269,44 @@ public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lo
259269
}
260270
}
261271

272+
static class MultiValueAggExtractor implements AggValueExtractor {
273+
@Override
274+
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
275+
MultiValueAggregation aggregation = (MultiValueAggregation) agg;
276+
Map<String, Object> extracted = new HashMap<>();
277+
for (String valueName : aggregation.valueNames()) {
278+
List<String> valueAsStrings = aggregation.getValuesAsStrings(valueName);
279+
280+
// todo: size > 1 is not supported, requires a refactoring so that `size()` is exposed in the agg builder
281+
if (valueAsStrings.size() > 0) {
282+
extracted.put(valueName, valueAsStrings.get(0));
283+
}
284+
}
285+
286+
return extracted;
287+
}
288+
}
289+
290+
static class NumericMultiValueAggExtractor implements AggValueExtractor {
291+
@Override
292+
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
293+
MultiValue aggregation = (MultiValue) agg;
294+
Map<String, Object> extracted = new HashMap<>();
295+
296+
String fieldLookupPrefix = (lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName()) + ".";
297+
for (String valueName : aggregation.valueNames()) {
298+
double value = aggregation.value(valueName);
299+
300+
String fieldType = fieldTypeMap.get(fieldLookupPrefix + valueName);
301+
if (Numbers.isValidDouble(value)) {
302+
extracted.put(valueName, dropFloatingPointComponentIfTypeRequiresIt(fieldType, value));
303+
}
304+
}
305+
306+
return extracted;
307+
}
308+
}
309+
262310
static class PercentilesAggExtractor implements AggValueExtractor {
263311
@Override
264312
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
@@ -402,12 +450,11 @@ static class GeoShapeMetricAggExtractor implements AggValueExtractor {
402450

403451
@Override
404452
public Object value(Aggregation aggregation, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
405-
assert aggregation instanceof GeoShapeMetricAggregation
406-
: "Unexpected type ["
407-
+ aggregation.getClass().getName()
408-
+ "] for aggregation ["
409-
+ aggregation.getName()
410-
+ "]";
453+
assert aggregation instanceof GeoShapeMetricAggregation : "Unexpected type ["
454+
+ aggregation.getClass().getName()
455+
+ "] for aggregation ["
456+
+ aggregation.getName()
457+
+ "]";
411458
return ((GeoShapeMetricAggregation) aggregation).geoJSONGeometry();
412459
}
413460
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/TransformAggregations.java

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Locale;
2121
import java.util.Map;
2222
import java.util.Map.Entry;
23+
import java.util.Optional;
2324
import java.util.Set;
2425
import java.util.stream.Collectors;
2526
import java.util.stream.Stream;
@@ -73,10 +74,8 @@ public final class TransformAggregations {
7374
"sampler",
7475
"significant_terms", // https://github.com/elastic/elasticsearch/issues/51073
7576
"significant_text",
76-
"stats", // https://github.com/elastic/elasticsearch/issues/51925
7777
"string_stats", // https://github.com/elastic/elasticsearch/issues/51925
7878
"top_hits",
79-
"top_metrics", // https://github.com/elastic/elasticsearch/issues/52236
8079
"t_test", // https://github.com/elastic/elasticsearch/issues/54503,
8180
"variable_width_histogram", // https://github.com/elastic/elasticsearch/issues/58140
8281
"rate", // https://github.com/elastic/elasticsearch/issues/61351
@@ -113,7 +112,9 @@ enum AggregationType {
113112
FILTER("filter", LONG),
114113
TERMS("terms", FLATTENED),
115114
RARE_TERMS("rare_terms", FLATTENED),
116-
MISSING("missing", LONG);
115+
MISSING("missing", LONG),
116+
TOP_METRICS("top_metrics", SOURCE),
117+
STATS("stats", DOUBLE);
117118

118119
private final String aggregationType;
119120
private final String targetMapping;
@@ -175,7 +176,32 @@ public static String resolveTargetMapping(String aggregationType, String sourceT
175176
return agg.getTargetMapping();
176177
}
177178

179+
/**
180+
* Checks the aggregation object and returns a tuple with 2 maps:
181+
*
182+
* 1. mapping the name of the agg to the used field
183+
* 2. mapping the name of the agg to the aggregation type
184+
*
185+
* Example:
186+
* {
187+
* "my_agg": {
188+
* "max": {
189+
* "field": "my_field"
190+
* }}}
191+
*
192+
* creates ({ "my_agg": "my_field" }, { "my_agg": "max" })
193+
*
194+
* Both mappings can contain _multiple_ entries, e.g. due to sub aggregations or because of aggregations creating multiple
195+
* values(e.g. percentiles)
196+
*
197+
* Note about order: aggregation can hit in multiple places (e.g. a multi value agg implement {@link ValuesSourceAggregationBuilder})
198+
* Be careful changing the order in this method
199+
*
200+
* @param agg the aggregation builder
201+
* @return a tuple with 2 mappings that maps the used field(s) and aggregation type(s)
202+
*/
178203
public static Tuple<Map<String, String>, Map<String, String>> getAggregationInputAndOutputTypes(AggregationBuilder agg) {
204+
// todo: can this be removed?
179205
if (agg instanceof PercentilesAggregationBuilder) {
180206
PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg;
181207

@@ -185,15 +211,42 @@ public static Tuple<Map<String, String>, Map<String, String>> getAggregationInpu
185211
Collections.emptyMap(),
186212
Arrays.stream(percentilesAgg.percentiles())
187213
.mapToObj(OutputFieldNameConverter::fromDouble)
188-
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1))
214+
.collect(
215+
Collectors.toMap(p -> percentilesAgg.getName() + "." + p, p -> { return percentilesAgg.getType(); }, (p1, p2) -> p1)
216+
)
217+
);
218+
}
219+
220+
// does the agg specify output field names
221+
Optional<Set<String>> outputFieldNames = agg.getOutputFieldNames();
222+
if (outputFieldNames.isPresent()) {
223+
return new Tuple<>(
224+
outputFieldNames.get()
225+
.stream()
226+
.collect(
227+
Collectors.toMap(
228+
outputField -> agg.getName() + "." + outputField,
229+
outputField -> outputField,
230+
(v1, v2) -> v1
231+
)
232+
),
233+
outputFieldNames.get()
234+
.stream()
235+
.collect(
236+
Collectors.toMap(
237+
outputField -> agg.getName() + "." + outputField,
238+
outputField -> agg.getType(),
239+
(v1, v2) -> v1
240+
)
241+
)
189242
);
190243
}
191244

192245
if (agg instanceof ValuesSourceAggregationBuilder) {
193246
ValuesSourceAggregationBuilder<?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?>) agg;
194247
return new Tuple<>(
195248
Collections.singletonMap(valueSourceAggregation.getName(), valueSourceAggregation.field()),
196-
Collections.singletonMap(agg.getName(), agg.getType())
249+
Collections.singletonMap(valueSourceAggregation.getName(), valueSourceAggregation.getType())
197250
);
198251
}
199252

0 commit comments

Comments
 (0)