Skip to content

Commit 406f18f

Browse files
author
Hendrik Muhs
authored
[Transform] add support for percentile aggs (#51808)
make transform ready for multi value aggregations and add support for percentile fixes #51663
1 parent eebd6b3 commit 406f18f

File tree

7 files changed

+223
-51
lines changed

7 files changed

+223
-51
lines changed

x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

+64
Original file line numberDiff line numberDiff line change
@@ -1109,6 +1109,70 @@ public void testContinuousDateNanos() throws Exception {
11091109
deleteIndex(indexName);
11101110
}
11111111

1112+
public void testPivotWithPercentile() throws Exception {
1113+
String transformId = "percentile_pivot";
1114+
String transformIndex = "percentile_pivot_reviews";
1115+
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
1116+
final Request createTransformRequest = createRequestWithAuth(
1117+
"PUT",
1118+
getTransformEndpoint() + transformId,
1119+
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
1120+
);
1121+
String config = "{"
1122+
+ " \"source\": {\"index\":\""
1123+
+ REVIEWS_INDEX_NAME
1124+
+ "\"},"
1125+
+ " \"dest\": {\"index\":\""
1126+
+ transformIndex
1127+
+ "\"},"
1128+
+ " \"frequency\": \"1s\","
1129+
+ " \"pivot\": {"
1130+
+ " \"group_by\": {"
1131+
+ " \"reviewer\": {"
1132+
+ " \"terms\": {"
1133+
+ " \"field\": \"user_id\""
1134+
+ " } } },"
1135+
+ " \"aggregations\": {"
1136+
+ " \"avg_rating\": {"
1137+
+ " \"avg\": {"
1138+
+ " \"field\": \"stars\""
1139+
+ " } },"
1140+
+ " \"p\": {"
1141+
+ " \"percentiles\" : {"
1142+
+ " \"field\": \"stars\", "
1143+
+ " \"percents\": [5, 50, 90, 99.9]"
1144+
+ " }"
1145+
+ " } } }"
1146+
+ "}";
1147+
createTransformRequest.setJsonEntity(config);
1148+
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
1149+
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
1150+
1151+
startAndWaitForTransform(transformId, transformIndex);
1152+
assertTrue(indexExists(transformIndex));
1153+
// get and check some users
1154+
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_11", 3.846153846);
1155+
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
1156+
1157+
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
1158+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
1159+
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0);
1160+
assertEquals(1, actual.longValue());
1161+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0);
1162+
assertEquals(5, actual.longValue());
1163+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0);
1164+
assertEquals(5, actual.longValue());
1165+
1166+
searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_11");
1167+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
1168+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.5", searchResult)).get(0);
1169+
assertEquals(1, actual.longValue());
1170+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.50", searchResult)).get(0);
1171+
assertEquals(4, actual.longValue());
1172+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.p.99_9", searchResult)).get(0);
1173+
assertEquals(5, actual.longValue());
1174+
}
1175+
11121176
private void createDateNanoIndex(String indexName, int numDocs) throws IOException {
11131177
// create mapping
11141178
try (XContentBuilder builder = jsonBuilder()) {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

+79-45
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
2121
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
2222
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
23+
import org.elasticsearch.search.aggregations.metrics.Percentile;
24+
import org.elasticsearch.search.aggregations.metrics.Percentiles;
2325
import org.elasticsearch.search.aggregations.metrics.ScriptedMetric;
2426
import org.elasticsearch.xpack.core.transform.TransformField;
2527
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
2628
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
2729
import org.elasticsearch.xpack.transform.transforms.IDGenerator;
30+
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;
2831

2932
import java.util.Arrays;
3033
import java.util.Collection;
@@ -46,6 +49,7 @@ public final class AggregationResultUtils {
4649
tempMap.put(ScriptedMetric.class.getName(), new ScriptedMetricAggExtractor());
4750
tempMap.put(GeoCentroid.class.getName(), new GeoCentroidAggExtractor());
4851
tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
52+
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
4953
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
5054
}
5155

@@ -59,12 +63,14 @@ public final class AggregationResultUtils {
5963
* @param stats stats collector
6064
* @return a map containing the results of the aggregation in a consumable way
6165
*/
62-
public static Stream<Map<String, Object>> extractCompositeAggregationResults(CompositeAggregation agg,
63-
GroupConfig groups,
64-
Collection<AggregationBuilder> aggregationBuilders,
65-
Collection<PipelineAggregationBuilder> pipelineAggs,
66-
Map<String, String> fieldTypeMap,
67-
TransformIndexerStats stats) {
66+
public static Stream<Map<String, Object>> extractCompositeAggregationResults(
67+
CompositeAggregation agg,
68+
GroupConfig groups,
69+
Collection<AggregationBuilder> aggregationBuilders,
70+
Collection<PipelineAggregationBuilder> pipelineAggs,
71+
Map<String, String> fieldTypeMap,
72+
TransformIndexerStats stats
73+
) {
6874
return agg.getBuckets().stream().map(bucket -> {
6975
stats.incrementNumDocuments(bucket.getDocCount());
7076
Map<String, Object> document = new HashMap<>();
@@ -82,7 +88,7 @@ public static Stream<Map<String, Object>> extractCompositeAggregationResults(Com
8288
List<String> aggNames = aggregationBuilders.stream().map(AggregationBuilder::getName).collect(Collectors.toList());
8389
aggNames.addAll(pipelineAggs.stream().map(PipelineAggregationBuilder::getName).collect(Collectors.toList()));
8490

85-
for (String aggName: aggNames) {
91+
for (String aggName : aggNames) {
8692
Aggregation aggResult = bucket.getAggregations().get(aggName);
8793
// This indicates not that the value contained in the `aggResult` is null, but that the `aggResult` is not
8894
// present at all in the `bucket.getAggregations`. This could occur in the case of a `bucket_selector` agg, which
@@ -109,16 +115,19 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
109115
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoCentroid.class.getName());
110116
} else if (aggregation instanceof GeoBounds) {
111117
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName());
118+
} else if (aggregation instanceof Percentiles) {
119+
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
112120
} else {
113121
// Execution should never reach this point!
114122
// Creating transforms with unsupported aggregations shall not be possible
115-
throw new AggregationExtractionException("unsupported aggregation [{}] with name [{}]",
123+
throw new AggregationExtractionException(
124+
"unsupported aggregation [{}] with name [{}]",
116125
aggregation.getType(),
117-
aggregation.getName());
126+
aggregation.getName()
127+
);
118128
}
119129
}
120130

121-
122131
@SuppressWarnings("unchecked")
123132
static void updateDocument(Map<String, Object> document, String fieldName, Object value) {
124133
String[] fieldTokens = fieldName.split("\\.");
@@ -132,23 +141,23 @@ static void updateDocument(Map<String, Object> document, String fieldName, Objec
132141
if (i == fieldTokens.length - 1) {
133142
if (internalMap.containsKey(token)) {
134143
if (internalMap.get(token) instanceof Map) {
135-
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]",
136-
fieldName);
144+
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]", fieldName);
137145
} else {
138-
throw new AggregationExtractionException("duplicate key value pairs key [{}] old value [{}] duplicate value [{}]",
146+
throw new AggregationExtractionException(
147+
"duplicate key value pairs key [{}] old value [{}] duplicate value [{}]",
139148
fieldName,
140149
internalMap.get(token),
141-
value);
150+
value
151+
);
142152
}
143153
}
144154
internalMap.put(token, value);
145155
} else {
146156
if (internalMap.containsKey(token)) {
147157
if (internalMap.get(token) instanceof Map) {
148-
internalMap = (Map<String, Object>)internalMap.get(token);
158+
internalMap = (Map<String, Object>) internalMap.get(token);
149159
} else {
150-
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]",
151-
fieldName);
160+
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]", fieldName);
152161
}
153162
} else {
154163
Map<String, Object> newMap = new HashMap<>();
@@ -172,34 +181,48 @@ interface AggValueExtractor {
172181
static class SingleValueAggExtractor implements AggValueExtractor {
173182
@Override
174183
public Object value(Aggregation agg, String fieldType) {
175-
SingleValue aggregation = (SingleValue)agg;
184+
SingleValue aggregation = (SingleValue) agg;
176185
// If the double is invalid, this indicates sparse data
177186
if (Numbers.isValidDouble(aggregation.value()) == false) {
178187
return null;
179188
}
180189
// If the type is numeric or if the formatted string is the same as simply making the value a string,
181-
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
182-
if (isNumericType(fieldType) ||
183-
aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))){
190+
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
191+
if (isNumericType(fieldType) || aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))) {
184192
return aggregation.value();
185193
} else {
186194
return aggregation.getValueAsString();
187195
}
188196
}
189197
}
190198

199+
static class PercentilesAggExtractor implements AggValueExtractor {
200+
@Override
201+
public Object value(Aggregation agg, String fieldType) {
202+
Percentiles aggregation = (Percentiles) agg;
203+
204+
HashMap<String, Double> percentiles = new HashMap<>();
205+
206+
for (Percentile p : aggregation) {
207+
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
208+
}
209+
210+
return percentiles;
211+
}
212+
}
213+
191214
static class ScriptedMetricAggExtractor implements AggValueExtractor {
192215
@Override
193216
public Object value(Aggregation agg, String fieldType) {
194-
ScriptedMetric aggregation = (ScriptedMetric)agg;
217+
ScriptedMetric aggregation = (ScriptedMetric) agg;
195218
return aggregation.aggregation();
196219
}
197220
}
198221

199222
static class GeoCentroidAggExtractor implements AggValueExtractor {
200223
@Override
201224
public Object value(Aggregation agg, String fieldType) {
202-
GeoCentroid aggregation = (GeoCentroid)agg;
225+
GeoCentroid aggregation = (GeoCentroid) agg;
203226
// if the account is `0` iff there is no contained centroid
204227
return aggregation.count() > 0 ? aggregation.centroid().toString() : null;
205228
}
@@ -208,38 +231,49 @@ public Object value(Aggregation agg, String fieldType) {
208231
static class GeoBoundsAggExtractor implements AggValueExtractor {
209232
@Override
210233
public Object value(Aggregation agg, String fieldType) {
211-
GeoBounds aggregation = (GeoBounds)agg;
234+
GeoBounds aggregation = (GeoBounds) agg;
212235
if (aggregation.bottomRight() == null || aggregation.topLeft() == null) {
213236
return null;
214237
}
215238
final Map<String, Object> geoShape = new HashMap<>();
216239
// If the two geo_points are equal, it is a point
217240
if (aggregation.topLeft().equals(aggregation.bottomRight())) {
218241
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), PointBuilder.TYPE.shapeName());
219-
geoShape.put(ShapeParser.FIELD_COORDINATES.getPreferredName(),
220-
Arrays.asList(aggregation.topLeft().getLon(), aggregation.bottomRight().getLat()));
221-
// If only the lat or the lon of the two geo_points are equal, than we know it should be a line
242+
geoShape.put(
243+
ShapeParser.FIELD_COORDINATES.getPreferredName(),
244+
Arrays.asList(aggregation.topLeft().getLon(), aggregation.bottomRight().getLat())
245+
);
246+
// If only the lat or the lon of the two geo_points are equal, than we know it should be a line
222247
} else if (Double.compare(aggregation.topLeft().getLat(), aggregation.bottomRight().getLat()) == 0
223248
|| Double.compare(aggregation.topLeft().getLon(), aggregation.bottomRight().getLon()) == 0) {
224-
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), LineStringBuilder.TYPE.shapeName());
225-
geoShape.put(ShapeParser.FIELD_COORDINATES.getPreferredName(),
226-
Arrays.asList(
227-
new Double[]{aggregation.topLeft().getLon(), aggregation.topLeft().getLat()},
228-
new Double[]{aggregation.bottomRight().getLon(), aggregation.bottomRight().getLat()}));
229-
} else {
230-
// neither points are equal, we have a polygon that is a square
231-
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), PolygonBuilder.TYPE.shapeName());
232-
final GeoPoint tl = aggregation.topLeft();
233-
final GeoPoint br = aggregation.bottomRight();
234-
geoShape.put(ShapeParser.FIELD_COORDINATES.getPreferredName(),
235-
Collections.singletonList(Arrays.asList(
236-
new Double[]{tl.getLon(), tl.getLat()},
237-
new Double[]{br.getLon(), tl.getLat()},
238-
new Double[]{br.getLon(), br.getLat()},
239-
new Double[]{tl.getLon(), br.getLat()},
240-
new Double[]{tl.getLon(), tl.getLat()})));
241-
}
249+
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), LineStringBuilder.TYPE.shapeName());
250+
geoShape.put(
251+
ShapeParser.FIELD_COORDINATES.getPreferredName(),
252+
Arrays.asList(
253+
new Double[] { aggregation.topLeft().getLon(), aggregation.topLeft().getLat() },
254+
new Double[] { aggregation.bottomRight().getLon(), aggregation.bottomRight().getLat() }
255+
)
256+
);
257+
} else {
258+
// neither points are equal, we have a polygon that is a square
259+
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), PolygonBuilder.TYPE.shapeName());
260+
final GeoPoint tl = aggregation.topLeft();
261+
final GeoPoint br = aggregation.bottomRight();
262+
geoShape.put(
263+
ShapeParser.FIELD_COORDINATES.getPreferredName(),
264+
Collections.singletonList(
265+
Arrays.asList(
266+
new Double[] { tl.getLon(), tl.getLat() },
267+
new Double[] { br.getLon(), tl.getLat() },
268+
new Double[] { br.getLon(), br.getLat() },
269+
new Double[] { tl.getLon(), br.getLat() },
270+
new Double[] { tl.getLon(), tl.getLat() }
271+
)
272+
)
273+
);
274+
}
242275
return geoShape;
243276
}
244277
}
278+
245279
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@
66

77
package org.elasticsearch.xpack.transform.transforms.pivot;
88

9+
import org.elasticsearch.search.aggregations.AggregationBuilder;
10+
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
11+
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;
12+
13+
import java.util.Arrays;
14+
import java.util.Collections;
915
import java.util.Locale;
16+
import java.util.Map;
1017
import java.util.Set;
1118
import java.util.stream.Collectors;
1219
import java.util.stream.Stream;
@@ -17,6 +24,7 @@ public final class Aggregations {
1724
private static final String DYNAMIC = "_dynamic";
1825
// the field mapping should be determined explicitly from the source field mapping if possible.
1926
private static final String SOURCE = "_source";
27+
2028
private Aggregations() {}
2129

2230
/**
@@ -40,7 +48,8 @@ enum AggregationType {
4048
SCRIPTED_METRIC("scripted_metric", DYNAMIC),
4149
WEIGHTED_AVG("weighted_avg", DYNAMIC),
4250
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
43-
BUCKET_SCRIPT("bucket_script", DYNAMIC);
51+
BUCKET_SCRIPT("bucket_script", DYNAMIC),
52+
PERCENTILES("percentiles", "double");
4453

4554
private final String aggregationType;
4655
private final String targetMapping;
@@ -59,8 +68,9 @@ public String getTargetMapping() {
5968
}
6069
}
6170

62-
private static Set<String> aggregationSupported = Stream.of(AggregationType.values()).map(AggregationType::name)
63-
.collect(Collectors.toSet());
71+
private static Set<String> aggregationSupported = Stream.of(AggregationType.values())
72+
.map(AggregationType::name)
73+
.collect(Collectors.toSet());
6474

6575
public static boolean isSupportedByTransform(String aggregationType) {
6676
return aggregationSupported.contains(aggregationType.toUpperCase(Locale.ROOT));
@@ -74,4 +84,19 @@ public static String resolveTargetMapping(String aggregationType, String sourceT
7484
AggregationType agg = AggregationType.valueOf(aggregationType.toUpperCase(Locale.ROOT));
7585
return agg.getTargetMapping().equals(SOURCE) ? sourceType : agg.getTargetMapping();
7686
}
87+
88+
public static Map<String, String> getAggregationOutputTypes(AggregationBuilder agg) {
89+
if (agg instanceof PercentilesAggregationBuilder) {
90+
PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg;
91+
92+
// note: eclipse does not like p -> agg.getType()
93+
// the merge function (p1, p2) -> p1 ignores duplicates
94+
return Arrays.stream(percentilesAgg.percentiles())
95+
.mapToObj(OutputFieldNameConverter::fromDouble)
96+
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1));
97+
}
98+
// catch all
99+
return Collections.singletonMap(agg.getName(), agg.getType());
100+
}
101+
77102
}

0 commit comments

Comments
 (0)