Skip to content

Commit 481b219

Browse files
author
Hendrik Muhs
committed
make transform ready for multi value aggregations and add support for percentile
1 parent ff00052 commit 481b219

File tree

7 files changed

+165
-6
lines changed

7 files changed

+165
-6
lines changed

x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

+64
Original file line numberDiff line numberDiff line change
@@ -1109,6 +1109,70 @@ public void testContinuousDateNanos() throws Exception {
11091109
deleteIndex(indexName);
11101110
}
11111111

1112+
public void testPivotWithPercentile() throws Exception {
1113+
String transformId = "percentile_pivot";
1114+
String transformIndex = "percentile_pivot_reviews";
1115+
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
1116+
final Request createTransformRequest = createRequestWithAuth(
1117+
"PUT",
1118+
getTransformEndpoint() + transformId,
1119+
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
1120+
);
1121+
String config = "{"
1122+
+ " \"source\": {\"index\":\""
1123+
+ REVIEWS_INDEX_NAME
1124+
+ "\"},"
1125+
+ " \"dest\": {\"index\":\""
1126+
+ transformIndex
1127+
+ "\"},"
1128+
+ " \"frequency\": \"1s\","
1129+
+ " \"pivot\": {"
1130+
+ " \"group_by\": {"
1131+
+ " \"reviewer\": {"
1132+
+ " \"terms\": {"
1133+
+ " \"field\": \"user_id\""
1134+
+ " } } },"
1135+
+ " \"aggregations\": {"
1136+
+ " \"avg_rating\": {"
1137+
+ " \"avg\": {"
1138+
+ " \"field\": \"stars\""
1139+
+ " } },"
1140+
+ " \"p\": {"
1141+
+ " \"percentiles\" : {"
1142+
+ " \"field\": \"stars\", "
1143+
+ " \"percents\": [5, 50, 90, 99.9]"
1144+
+ " }"
1145+
+ " } } }"
1146+
+ "}";
1147+
createTransformRequest.setJsonEntity(config);
1148+
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
1149+
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
1150+
1151+
startAndWaitForTransform(transformId, transformIndex);
1152+
assertTrue(indexExists(transformIndex));
1153+
// get and check some users
1154+
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
1155+
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
1156+
1157+
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
1158+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
1159+
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0);
1160+
assertEquals(1, actual.longValue());
1161+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0);
1162+
assertEquals(5, actual.longValue());
1163+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0);
1164+
assertEquals(5, actual.longValue());
1165+
1166+
searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_11");
1167+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
1168+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0);
1169+
assertEquals(1, actual.longValue());
1170+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0);
1171+
assertEquals(4, actual.longValue());
1172+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0);
1173+
assertEquals(5, actual.longValue());
1174+
}
1175+
11121176
private void createDateNanoIndex(String indexName, int numDocs) throws IOException {
11131177
// create mapping
11141178
try (XContentBuilder builder = jsonBuilder()) {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

+21
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
2121
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
2222
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
23+
import org.elasticsearch.search.aggregations.metrics.Percentile;
24+
import org.elasticsearch.search.aggregations.metrics.Percentiles;
2325
import org.elasticsearch.search.aggregations.metrics.ScriptedMetric;
2426
import org.elasticsearch.xpack.core.transform.TransformField;
2527
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
2628
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
2729
import org.elasticsearch.xpack.transform.transforms.IDGenerator;
30+
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;
2831

2932
import java.util.Arrays;
3033
import java.util.Collection;
@@ -46,6 +49,7 @@ public final class AggregationResultUtils {
4649
tempMap.put(ScriptedMetric.class.getName(), new ScriptedMetricAggExtractor());
4750
tempMap.put(GeoCentroid.class.getName(), new GeoCentroidAggExtractor());
4851
tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
52+
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
4953
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
5054
}
5155

@@ -111,6 +115,8 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
111115
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoCentroid.class.getName());
112116
} else if (aggregation instanceof GeoBounds) {
113117
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName());
118+
} else if (aggregation instanceof Percentiles) {
119+
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
114120
} else {
115121
// Execution should never reach this point!
116122
// Creating transforms with unsupported aggregations shall not be possible
@@ -190,6 +196,21 @@ public Object value(Aggregation agg, String fieldType) {
190196
}
191197
}
192198

199+
static class PercentilesAggExtractor implements AggValueExtractor {
200+
@Override
201+
public Object value(Aggregation agg, String fieldType) {
202+
Percentiles aggregation = (Percentiles) agg;
203+
204+
HashMap<String, Double> percentiles = new HashMap<>();
205+
206+
for (Percentile p : aggregation) {
207+
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
208+
}
209+
210+
return percentiles;
211+
}
212+
}
213+
193214
static class ScriptedMetricAggExtractor implements AggValueExtractor {
194215
@Override
195216
public Object value(Aggregation agg, String fieldType) {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@
66

77
package org.elasticsearch.xpack.transform.transforms.pivot;
88

9+
import org.elasticsearch.search.aggregations.AggregationBuilder;
10+
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
11+
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;
12+
13+
import java.util.Arrays;
14+
import java.util.Collections;
915
import java.util.Locale;
16+
import java.util.Map;
1017
import java.util.Set;
1118
import java.util.stream.Collectors;
1219
import java.util.stream.Stream;
@@ -17,6 +24,7 @@ public final class Aggregations {
1724
private static final String DYNAMIC = "_dynamic";
1825
// the field mapping should be determined explicitly from the source field mapping if possible.
1926
private static final String SOURCE = "_source";
27+
2028
private Aggregations() {}
2129

2230
/**
@@ -40,7 +48,8 @@ enum AggregationType {
4048
SCRIPTED_METRIC("scripted_metric", DYNAMIC),
4149
WEIGHTED_AVG("weighted_avg", DYNAMIC),
4250
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
43-
BUCKET_SCRIPT("bucket_script", DYNAMIC);
51+
BUCKET_SCRIPT("bucket_script", DYNAMIC),
52+
PERCENTILES("percentiles", "double");
4453

4554
private final String aggregationType;
4655
private final String targetMapping;
@@ -59,8 +68,9 @@ public String getTargetMapping() {
5968
}
6069
}
6170

62-
private static Set<String> aggregationSupported = Stream.of(AggregationType.values()).map(AggregationType::name)
63-
.collect(Collectors.toSet());
71+
private static Set<String> aggregationSupported = Stream.of(AggregationType.values())
72+
.map(AggregationType::name)
73+
.collect(Collectors.toSet());
6474

6575
public static boolean isSupportedByTransform(String aggregationType) {
6676
return aggregationSupported.contains(aggregationType.toUpperCase(Locale.ROOT));
@@ -74,4 +84,19 @@ public static String resolveTargetMapping(String aggregationType, String sourceT
7484
AggregationType agg = AggregationType.valueOf(aggregationType.toUpperCase(Locale.ROOT));
7585
return agg.getTargetMapping().equals(SOURCE) ? sourceType : agg.getTargetMapping();
7686
}
87+
88+
public static Map<String, String> getAggregationOutputTypes(AggregationBuilder agg) {
89+
if (agg instanceof PercentilesAggregationBuilder) {
90+
PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg;
91+
92+
// note: eclipse does not like p -> agg.getType()
93+
// the merge function (p1, p2) -> p1 ignores duplicates
94+
return Arrays.stream(percentilesAgg.percentiles())
95+
.mapToObj(OutputFieldNameConverter::fromDouble)
96+
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1));
97+
}
98+
// catch all
99+
return Collections.singletonMap(agg.getName(), agg.getType());
100+
}
101+
77102
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public static void deduceMappings(
6666
) {
6767
// collects the fieldnames used as source for aggregations
6868
Map<String, String> aggregationSourceFieldNames = new HashMap<>();
69-
// collects the aggregation types by source name
69+
// collects the aggregation types by output field name
7070
Map<String, String> aggregationTypes = new HashMap<>();
7171
// collects the fieldnames and target fieldnames used for grouping
7272
Map<String, String> fieldNamesForGrouping = new HashMap<>();
@@ -79,9 +79,9 @@ public static void deduceMappings(
7979
if (agg instanceof ValuesSourceAggregationBuilder) {
8080
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
8181
aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field());
82-
aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType());
82+
aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(valueSourceAggregation));
8383
} else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
84-
aggregationTypes.put(agg.getName(), agg.getType());
84+
aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(agg));
8585
} else {
8686
// execution should not reach this point
8787
listener.onFailure(new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]"));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.transform.utils;
8+
9+
public final class OutputFieldNameConverter {
10+
11+
private OutputFieldNameConverter() {}
12+
13+
public static String fromDouble(double d) {
14+
if (d == (long) d) return String.valueOf((long) d);
15+
else return String.valueOf(d).replace('.', '_');
16+
}
17+
}

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationsTests.java

+4
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,9 @@ public void testResolveTargetMapping() {
6161
// weighted_avg
6262
assertEquals("_dynamic", Aggregations.resolveTargetMapping("weighted_avg", null));
6363
assertEquals("_dynamic", Aggregations.resolveTargetMapping("weighted_avg", "double"));
64+
65+
// percentile
66+
assertEquals("double", Aggregations.resolveTargetMapping("percentiles", null));
67+
assertEquals("double", Aggregations.resolveTargetMapping("percentiles", "int"));
6468
}
6569
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.transform.utils;
8+
9+
import org.elasticsearch.test.ESTestCase;
10+
11+
public class OutputFieldNameConverterTests extends ESTestCase {
12+
13+
public void testFromDouble() {
14+
assertEquals("42_42", OutputFieldNameConverter.fromDouble(42.42));
15+
// remove '.0' if possible
16+
assertEquals("42", OutputFieldNameConverter.fromDouble(42.0));
17+
// digit limit
18+
assertEquals("42_42424242424242", OutputFieldNameConverter.fromDouble(42.4242424242424242424242424242424242));
19+
// scientific notation keep the '.0'
20+
assertEquals("1_0E-100", OutputFieldNameConverter.fromDouble(1.0E-100));
21+
// scientific with digits
22+
assertEquals("1_12345E-100", OutputFieldNameConverter.fromDouble(1.12345E-100));
23+
// NaN (OutputFieldNameConverter clients should disallow that)
24+
assertEquals("NaN", OutputFieldNameConverter.fromDouble(Double.NaN));
25+
// infinity
26+
assertEquals("-Infinity", OutputFieldNameConverter.fromDouble(Double.NEGATIVE_INFINITY));
27+
}
28+
}

0 commit comments

Comments
 (0)