Skip to content

Commit 288ccae

Browse files
author
Hendrik Muhs
committed
[Transform] add support for filter aggregation (#52483)
add support for filter aggregations, refactor code for sub-aggregation support in mapping deduction fixes #52151
1 parent 7fe2843 commit 288ccae

File tree

8 files changed

+1052
-567
lines changed

8 files changed

+1052
-567
lines changed

x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

+88
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,94 @@ public void testPivotWithPercentile() throws Exception {
11731173
assertEquals(5, actual.longValue());
11741174
}
11751175

1176+
public void testPivotWithFilter() throws Exception {
1177+
String transformId = "filter_pivot";
1178+
String transformIndex = "filter_pivot_reviews";
1179+
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);
1180+
final Request createTransformRequest = createRequestWithAuth(
1181+
"PUT",
1182+
getTransformEndpoint() + transformId,
1183+
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
1184+
);
1185+
String config = "{"
1186+
+ " \"source\": {\"index\":\""
1187+
+ REVIEWS_INDEX_NAME
1188+
+ "\"},"
1189+
+ " \"dest\": {\"index\":\""
1190+
+ transformIndex
1191+
+ "\"},"
1192+
+ " \"frequency\": \"1s\","
1193+
+ " \"pivot\": {"
1194+
+ " \"group_by\": {"
1195+
+ " \"reviewer\": {"
1196+
+ " \"terms\": {"
1197+
+ " \"field\": \"user_id\""
1198+
+ " } } },"
1199+
+ " \"aggregations\": {"
1200+
+ " \"top_ratings\": {"
1201+
+ " \"filter\": {"
1202+
+ " \"range\": {"
1203+
+ " \"stars\": {"
1204+
+ " \"gte\": 4 "
1205+
+ " } } } },"
1206+
+ " \"top_ratings_detail\": {"
1207+
+ " \"filter\": {"
1208+
+ " \"range\": {"
1209+
+ " \"stars\": {"
1210+
+ " \"gte\": 4"
1211+
+ " } } },"
1212+
+ " \"aggregations\": {"
1213+
+ " \"unique_count\": {"
1214+
+ " \"cardinality\": {"
1215+
+ " \"field\": \"business_id\""
1216+
+ " } },"
1217+
+ " \"max\": {"
1218+
+ " \"max\": {"
1219+
+ " \"field\": \"stars\""
1220+
+ " } },"
1221+
+ " \"min\": {"
1222+
+ " \"min\": {"
1223+
+ " \"field\": \"stars\""
1224+
+ " } }"
1225+
+ " } } } }"
1226+
+ "}";
1227+
1228+
createTransformRequest.setJsonEntity(config);
1229+
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
1230+
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
1231+
1232+
startAndWaitForTransform(transformId, transformIndex);
1233+
assertTrue(indexExists(transformIndex));
1234+
// get and check some users
1235+
1236+
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4");
1237+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
1238+
1239+
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings", searchResult)).get(0);
1240+
assertEquals(29, actual.longValue());
1241+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.min", searchResult)).get(0);
1242+
assertEquals(4, actual.longValue());
1243+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.max", searchResult)).get(0);
1244+
assertEquals(5, actual.longValue());
1245+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.unique_count", searchResult)).get(
1246+
0
1247+
);
1248+
assertEquals(4, actual.longValue());
1249+
1250+
searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_2");
1251+
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
1252+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings", searchResult)).get(0);
1253+
assertEquals(19, actual.longValue());
1254+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.min", searchResult)).get(0);
1255+
assertEquals(4, actual.longValue());
1256+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.max", searchResult)).get(0);
1257+
assertEquals(5, actual.longValue());
1258+
actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.top_ratings_detail.unique_count", searchResult)).get(
1259+
0
1260+
);
1261+
assertEquals(3, actual.longValue());
1262+
}
1263+
11761264
private void createDateNanoIndex(String indexName, int numDocs) throws IOException {
11771265
// create mapping
11781266
try (XContentBuilder builder = jsonBuilder()) {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/AggregationResultUtils.java

+38-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.search.aggregations.Aggregation;
1717
import org.elasticsearch.search.aggregations.AggregationBuilder;
1818
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
19+
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
1920
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
2021
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
2122
import org.elasticsearch.search.aggregations.metrics.GeoCentroid;
@@ -50,6 +51,7 @@ public final class AggregationResultUtils {
5051
tempMap.put(GeoCentroid.class.getName(), new GeoCentroidAggExtractor());
5152
tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
5253
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
54+
tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
5355
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
5456
}
5557

@@ -94,9 +96,8 @@ public static Stream<Map<String, Object>> extractCompositeAggregationResults(
9496
// present at all in the `bucket.getAggregations`. This could occur in the case of a `bucket_selector` agg, which
9597
// does not calculate a value, but instead manipulates other results.
9698
if (aggResult != null) {
97-
final String fieldType = fieldTypeMap.get(aggName);
9899
AggValueExtractor extractor = getExtractor(aggResult);
99-
updateDocument(document, aggName, extractor.value(aggResult, fieldType));
100+
updateDocument(document, aggName, extractor.value(aggResult, fieldTypeMap, ""));
100101
}
101102
}
102103

@@ -117,6 +118,8 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
117118
return TYPE_VALUE_EXTRACTOR_MAP.get(GeoBounds.class.getName());
118119
} else if (aggregation instanceof Percentiles) {
119120
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
121+
} else if (aggregation instanceof SingleBucketAggregation) {
122+
return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
120123
} else {
121124
// Execution should never reach this point!
122125
// Creating transforms with unsupported aggregations shall not be possible
@@ -175,17 +178,19 @@ public static class AggregationExtractionException extends ElasticsearchExceptio
175178
}
176179

177180
interface AggValueExtractor {
178-
Object value(Aggregation aggregation, String fieldType);
181+
Object value(Aggregation aggregation, Map<String, String> fieldTypeMap, String lookupFieldPrefix);
179182
}
180183

181184
static class SingleValueAggExtractor implements AggValueExtractor {
182185
@Override
183-
public Object value(Aggregation agg, String fieldType) {
186+
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
184187
SingleValue aggregation = (SingleValue) agg;
185188
// If the double is invalid, this indicates sparse data
186189
if (Numbers.isValidDouble(aggregation.value()) == false) {
187190
return null;
188191
}
192+
193+
String fieldType = fieldTypeMap.get(lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName());
189194
// If the type is numeric or if the formatted string is the same as simply making the value a string,
190195
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
191196
if (isNumericType(fieldType) || aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))) {
@@ -198,7 +203,7 @@ public Object value(Aggregation agg, String fieldType) {
198203

199204
static class PercentilesAggExtractor implements AggValueExtractor {
200205
@Override
201-
public Object value(Aggregation agg, String fieldType) {
206+
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
202207
Percentiles aggregation = (Percentiles) agg;
203208

204209
HashMap<String, Double> percentiles = new HashMap<>();
@@ -211,17 +216,42 @@ public Object value(Aggregation agg, String fieldType) {
211216
}
212217
}
213218

219+
static class SingleBucketAggExtractor implements AggValueExtractor {
220+
@Override
221+
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
222+
SingleBucketAggregation aggregation = (SingleBucketAggregation) agg;
223+
224+
if (aggregation.getAggregations().iterator().hasNext() == false) {
225+
return aggregation.getDocCount();
226+
}
227+
228+
HashMap<String, Object> nested = new HashMap<>();
229+
for (Aggregation subAgg : aggregation.getAggregations()) {
230+
nested.put(
231+
subAgg.getName(),
232+
getExtractor(subAgg).value(
233+
subAgg,
234+
fieldTypeMap,
235+
lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName()
236+
)
237+
);
238+
}
239+
240+
return nested;
241+
}
242+
}
243+
214244
static class ScriptedMetricAggExtractor implements AggValueExtractor {
215245
@Override
216-
public Object value(Aggregation agg, String fieldType) {
246+
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
217247
ScriptedMetric aggregation = (ScriptedMetric) agg;
218248
return aggregation.aggregation();
219249
}
220250
}
221251

222252
static class GeoCentroidAggExtractor implements AggValueExtractor {
223253
@Override
224-
public Object value(Aggregation agg, String fieldType) {
254+
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
225255
GeoCentroid aggregation = (GeoCentroid) agg;
226256
// if the account is `0` iff there is no contained centroid
227257
return aggregation.count() > 0 ? aggregation.centroid().toString() : null;
@@ -230,7 +260,7 @@ public Object value(Aggregation agg, String fieldType) {
230260

231261
static class GeoBoundsAggExtractor implements AggValueExtractor {
232262
@Override
233-
public Object value(Aggregation agg, String fieldType) {
263+
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
234264
GeoBounds aggregation = (GeoBounds) agg;
235265
if (aggregation.bottomRight() == null || aggregation.topLeft() == null) {
236266
return null;

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Aggregations.java

+52-8
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,19 @@
66

77
package org.elasticsearch.xpack.transform.transforms.pivot;
88

9+
import org.elasticsearch.common.collect.Tuple;
910
import org.elasticsearch.search.aggregations.AggregationBuilder;
1011
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
12+
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
1113
import org.elasticsearch.xpack.transform.utils.OutputFieldNameConverter;
1214

1315
import java.util.Arrays;
1416
import java.util.Collections;
17+
import java.util.HashMap;
1518
import java.util.List;
1619
import java.util.Locale;
1720
import java.util.Map;
21+
import java.util.Map.Entry;
1822
import java.util.Set;
1923
import java.util.stream.Collectors;
2024
import java.util.stream.Stream;
@@ -52,7 +56,6 @@ public final class Aggregations {
5256
"date_range",
5357
"diversified_sampler",
5458
"extended_stats", // https://github.com/elastic/elasticsearch/issues/51925
55-
"filter", // https://github.com/elastic/elasticsearch/issues/52151
5659
"filters",
5760
"geo_distance",
5861
"geohash_grid",
@@ -102,7 +105,8 @@ enum AggregationType {
102105
WEIGHTED_AVG("weighted_avg", DYNAMIC),
103106
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
104107
BUCKET_SCRIPT("bucket_script", DYNAMIC),
105-
PERCENTILES("percentiles", DOUBLE);
108+
PERCENTILES("percentiles", DOUBLE),
109+
FILTER("filter", LONG);
106110

107111
private final String aggregationType;
108112
private final String targetMapping;
@@ -146,28 +150,68 @@ public static String resolveTargetMapping(String aggregationType, String sourceT
146150
AggregationType agg = AggregationType.valueOf(aggregationType.toUpperCase(Locale.ROOT));
147151

148152
if (agg.getTargetMapping().equals(SOURCE)) {
153+
154+
if (sourceType == null) {
155+
// this should never happen and would mean a bug in the calling code, the error is logged in {@link
156+
// org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil#resolveMappings()}
157+
return null;
158+
}
159+
149160
// scaled float requires an additional parameter "scaling_factor", which we do not know, therefore we fallback to float
150161
if (sourceType.equals(SCALED_FLOAT)) {
151162
return FLOAT;
152163
}
164+
153165
return sourceType;
154166
}
155167

156168
return agg.getTargetMapping();
157169
}
158170

159-
public static Map<String, String> getAggregationOutputTypes(AggregationBuilder agg) {
171+
public static Tuple<Map<String, String>, Map<String, String>> getAggregationInputAndOutputTypes(AggregationBuilder agg) {
160172
if (agg instanceof PercentilesAggregationBuilder) {
161173
PercentilesAggregationBuilder percentilesAgg = (PercentilesAggregationBuilder) agg;
162174

163175
// note: eclipse does not like p -> agg.getType()
164176
// the merge function (p1, p2) -> p1 ignores duplicates
165-
return Arrays.stream(percentilesAgg.percentiles())
166-
.mapToObj(OutputFieldNameConverter::fromDouble)
167-
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1));
177+
return new Tuple<>(
178+
Collections.emptyMap(),
179+
Arrays.stream(percentilesAgg.percentiles())
180+
.mapToObj(OutputFieldNameConverter::fromDouble)
181+
.collect(Collectors.toMap(p -> agg.getName() + "." + p, p -> { return agg.getType(); }, (p1, p2) -> p1))
182+
);
183+
}
184+
185+
if (agg instanceof ValuesSourceAggregationBuilder) {
186+
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
187+
return new Tuple<>(
188+
Collections.singletonMap(valueSourceAggregation.getName(), valueSourceAggregation.field()),
189+
Collections.singletonMap(agg.getName(), agg.getType())
190+
);
191+
}
192+
193+
// does the agg have sub aggregations?
194+
if (agg.getSubAggregations().size() > 0) {
195+
HashMap<String, String> outputTypes = new HashMap<>();
196+
HashMap<String, String> inputTypes = new HashMap<>();
197+
198+
for (AggregationBuilder subAgg : agg.getSubAggregations()) {
199+
Tuple<Map<String, String>, Map<String, String>> subAggregationTypes = getAggregationInputAndOutputTypes(subAgg);
200+
201+
for (Entry<String, String> subAggOutputType : subAggregationTypes.v2().entrySet()) {
202+
outputTypes.put(String.join(".", agg.getName(), subAggOutputType.getKey()), subAggOutputType.getValue());
203+
}
204+
205+
for (Entry<String, String> subAggInputType : subAggregationTypes.v1().entrySet()) {
206+
inputTypes.put(String.join(".", agg.getName(), subAggInputType.getKey()), subAggInputType.getValue());
207+
}
208+
}
209+
210+
return new Tuple<>(inputTypes, outputTypes);
168211
}
169-
// catch all
170-
return Collections.singletonMap(agg.getName(), agg.getType());
212+
213+
// catch all in case no special handling required
214+
return new Tuple<>(Collections.emptyMap(), Collections.singletonMap(agg.getName(), agg.getType()));
171215
}
172216

173217
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/SchemaUtil.java

+4-14
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@
1414
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
1515
import org.elasticsearch.action.support.IndicesOptions;
1616
import org.elasticsearch.client.Client;
17+
import org.elasticsearch.common.collect.Tuple;
1718
import org.elasticsearch.index.mapper.NumberFieldMapper;
1819
import org.elasticsearch.search.aggregations.AggregationBuilder;
1920
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
20-
import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder;
21-
import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder;
22-
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
2321
import org.elasticsearch.xpack.core.ClientHelper;
2422
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
2523

@@ -76,17 +74,9 @@ public static void deduceMappings(
7674
.forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); });
7775

7876
for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
79-
if (agg instanceof ValuesSourceAggregationBuilder) {
80-
ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg;
81-
aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field());
82-
aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(valueSourceAggregation));
83-
} else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) {
84-
aggregationTypes.putAll(Aggregations.getAggregationOutputTypes(agg));
85-
} else {
86-
// execution should not reach this point
87-
listener.onFailure(new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]"));
88-
return;
89-
}
77+
Tuple<Map<String, String>, Map<String, String>> inputAndOutputTypes = Aggregations.getAggregationInputAndOutputTypes(agg);
78+
aggregationSourceFieldNames.putAll(inputAndOutputTypes.v1());
79+
aggregationTypes.putAll(inputAndOutputTypes.v2());
9080
}
9181

9282
// For pipeline aggs, since they are referencing other aggregations in the payload, they do not have any

0 commit comments

Comments
 (0)