Skip to content

Commit 08c0a87

Browse files
author
Hendrik Muhs
authored
[Transform] improve bucket key normalization (#64196)
re-factor bucket key normalization, depending on the source and mapped type. Remove floating point component for numeric keys if mapped to an integer type (long, unsigned long, integer, ...) fixes #64070
1 parent 46a2e28 commit 08c0a87

File tree

8 files changed

+398
-61
lines changed

8 files changed

+398
-61
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,16 @@
88

99
import org.elasticsearch.common.ParseField;
1010
import org.elasticsearch.common.geo.GeoBoundingBox;
11-
import org.elasticsearch.common.geo.GeoPoint;
12-
import org.elasticsearch.common.geo.builders.PolygonBuilder;
13-
import org.elasticsearch.common.geo.parsers.ShapeParser;
1411
import org.elasticsearch.common.io.stream.StreamInput;
1512
import org.elasticsearch.common.io.stream.StreamOutput;
1613
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
1714
import org.elasticsearch.common.xcontent.ObjectParser;
1815
import org.elasticsearch.common.xcontent.XContentBuilder;
1916
import org.elasticsearch.common.xcontent.XContentParser;
20-
import org.elasticsearch.geometry.Rectangle;
2117
import org.elasticsearch.index.mapper.GeoShapeFieldMapper;
22-
import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder;
23-
import org.elasticsearch.index.query.QueryBuilders;
2418
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
2519

2620
import java.io.IOException;
27-
import java.util.Arrays;
28-
import java.util.Collections;
29-
import java.util.HashMap;
30-
import java.util.Map;
3121
import java.util.Objects;
3222

3323
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
@@ -148,32 +138,4 @@ public String getMappingType() {
148138
return GeoShapeFieldMapper.CONTENT_TYPE;
149139
}
150140

151-
@Override
152-
public Object transformBucketKey(Object key) {
153-
assert key instanceof String;
154-
Rectangle rectangle = GeoTileUtils.toBoundingBox(key.toString());
155-
final Map<String, Object> geoShape = new HashMap<>();
156-
geoShape.put(ShapeParser.FIELD_TYPE.getPreferredName(), PolygonBuilder.TYPE.shapeName());
157-
geoShape.put(
158-
ShapeParser.FIELD_COORDINATES.getPreferredName(),
159-
Collections.singletonList(
160-
Arrays.asList(
161-
new Double[] { rectangle.getMaxLon(), rectangle.getMinLat() },
162-
new Double[] { rectangle.getMinLon(), rectangle.getMinLat() },
163-
new Double[] { rectangle.getMinLon(), rectangle.getMaxLat() },
164-
new Double[] { rectangle.getMaxLon(), rectangle.getMaxLat() },
165-
new Double[] { rectangle.getMaxLon(), rectangle.getMinLat() }
166-
)
167-
)
168-
);
169-
return geoShape;
170-
}
171-
172-
private GeoBoundingBoxQueryBuilder toGeoQuery(Rectangle rectangle) {
173-
return QueryBuilders.geoBoundingBoxQuery(field)
174-
.setCorners(
175-
new GeoPoint(rectangle.getMaxLat(), rectangle.getMinLon()),
176-
new GeoPoint(rectangle.getMinLat(), rectangle.getMaxLon())
177-
);
178-
}
179141
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,4 @@ public String getMappingType() {
187187
return null;
188188
}
189189

190-
/**
191-
* This will transform a composite aggregation bucket key into the desired format for indexing.
192-
*
193-
* @param key The bucket key for this group source
194-
* @return the transformed bucket key for indexing
195-
*/
196-
public Object transformBucketKey(Object key) {
197-
return key;
198-
}
199190
}

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ public void testHistogramPivot() throws Exception {
510510
// we expect 3 documents as there shall be 5 unique star values and we are bucketing every 2 starting at 0
511511
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
512512
assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
513-
assertOnePivotValue(transformIndex + "/_search?q=every_2:0.0", 1.0);
513+
assertOnePivotValue(transformIndex + "/_search?q=every_2:0", 1.0);
514514
}
515515

516516
public void testContinuousPivotHistogram() throws Exception {
@@ -556,15 +556,15 @@ public void testContinuousPivotHistogram() throws Exception {
556556
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
557557
assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
558558

559-
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0");
559+
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=every_2:0");
560560
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
561561
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19));
562562

563-
searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
563+
searchResult = getAsMap(transformIndex + "/_search?q=every_2:2");
564564
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
565565
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));
566566

567-
searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0");
567+
searchResult = getAsMap(transformIndex + "/_search?q=every_2:4");
568568
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
569569
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));
570570

@@ -608,18 +608,20 @@ public void testContinuousPivotHistogram() throws Exception {
608608
indexStats = getAsMap(transformIndex + "/_stats");
609609
assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
610610

611-
searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0");
611+
searchResult = getAsMap(transformIndex + "/_search?q=every_2:0");
612612
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
613613
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19));
614614

615-
searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
615+
searchResult = getAsMap(transformIndex + "/_search?q=every_2:2");
616616
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
617617
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(30));
618618

619-
searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0");
619+
searchResult = getAsMap(transformIndex + "/_search?q=every_2:4");
620620
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
621621
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));
622622

623+
deleteTransform(transformId);
624+
deleteIndex(indexName);
623625
}
624626

625627
public void testBiggerPivot() throws Exception {

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ protected void putReviewsIndex(String indexName, String dateType, boolean isData
185185
.field("type", "keyword")
186186
.endObject()
187187
.startObject("stars")
188-
.field("type", "integer")
188+
.field("type", randomFrom("integer", "long")) // gh#64347 unsigned_long disabled
189189
.endObject()
190190
.startObject("location")
191191
.field("type", "geo_point")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package org.elasticsearch.xpack.transform.integration.continuous;
2+
3+
import org.elasticsearch.action.search.SearchRequest;
4+
import org.elasticsearch.action.search.SearchResponse;
5+
import org.elasticsearch.action.support.IndicesOptions;
6+
import org.elasticsearch.client.transform.transforms.DestConfig;
7+
import org.elasticsearch.client.transform.transforms.SourceConfig;
8+
import org.elasticsearch.client.transform.transforms.TransformConfig;
9+
import org.elasticsearch.client.transform.transforms.pivot.GroupConfig;
10+
import org.elasticsearch.client.transform.transforms.pivot.HistogramGroupSource;
11+
import org.elasticsearch.client.transform.transforms.pivot.PivotConfig;
12+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
13+
import org.elasticsearch.search.SearchHit;
14+
import org.elasticsearch.search.aggregations.AggregatorFactories;
15+
import org.elasticsearch.search.aggregations.BucketOrder;
16+
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
17+
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
18+
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
19+
import org.elasticsearch.search.builder.SearchSourceBuilder;
20+
21+
import java.io.IOException;
22+
import java.util.Iterator;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
import static org.hamcrest.Matchers.equalTo;
27+
28+
public class HistogramGroupByIT extends ContinuousTestCase {
29+
private static final String NAME = "continuous-histogram-pivot-test";
30+
31+
@Override
32+
public String getName() {
33+
return NAME;
34+
}
35+
36+
@Override
37+
public TransformConfig createConfig() {
38+
TransformConfig.Builder transformConfigBuilder = new TransformConfig.Builder();
39+
addCommonBuilderParameters(transformConfigBuilder);
40+
transformConfigBuilder.setSource(new SourceConfig(CONTINUOUS_EVENTS_SOURCE_INDEX));
41+
transformConfigBuilder.setDest(new DestConfig(NAME, INGEST_PIPELINE));
42+
transformConfigBuilder.setId(NAME);
43+
PivotConfig.Builder pivotConfigBuilder = new PivotConfig.Builder();
44+
pivotConfigBuilder.setGroups(
45+
new GroupConfig.Builder().groupBy("metric", new HistogramGroupSource.Builder().setField("metric").setInterval(50.0).build())
46+
.build()
47+
);
48+
AggregatorFactories.Builder aggregations = new AggregatorFactories.Builder();
49+
addCommonAggregations(aggregations);
50+
51+
pivotConfigBuilder.setAggregations(aggregations);
52+
transformConfigBuilder.setPivotConfig(pivotConfigBuilder.build());
53+
return transformConfigBuilder.build();
54+
}
55+
56+
@Override
57+
public void testIteration(int iteration) throws IOException {
58+
SearchRequest searchRequestSource = new SearchRequest(CONTINUOUS_EVENTS_SOURCE_INDEX).allowPartialSearchResults(false)
59+
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
60+
SearchSourceBuilder sourceBuilderSource = new SearchSourceBuilder().size(0);
61+
HistogramAggregationBuilder metricBuckets = new HistogramAggregationBuilder("metric").field("metric")
62+
.interval(50.0)
63+
.order(BucketOrder.key(true));
64+
sourceBuilderSource.aggregation(metricBuckets);
65+
searchRequestSource.source(sourceBuilderSource);
66+
SearchResponse responseSource = search(searchRequestSource);
67+
68+
SearchRequest searchRequestDest = new SearchRequest(NAME).allowPartialSearchResults(false)
69+
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
70+
SearchSourceBuilder sourceBuilderDest = new SearchSourceBuilder().size(10000).sort("metric");
71+
searchRequestDest.source(sourceBuilderDest);
72+
SearchResponse responseDest = search(searchRequestDest);
73+
74+
List<? extends Bucket> buckets = ((Histogram) responseSource.getAggregations().get("metric")).getBuckets();
75+
76+
Iterator<? extends Bucket> sourceIterator = buckets.iterator();
77+
Iterator<SearchHit> destIterator = responseDest.getHits().iterator();
78+
79+
while (sourceIterator.hasNext() && destIterator.hasNext()) {
80+
Bucket bucket = sourceIterator.next();
81+
SearchHit searchHit = destIterator.next();
82+
Map<String, Object> source = searchHit.getSourceAsMap();
83+
84+
Long transformBucketKey = ((Integer) XContentMapValues.extractValue("metric", source)).longValue();
85+
86+
// aggs return buckets with 0 doc_count while composite aggs skip over them
87+
while (bucket.getDocCount() == 0L) {
88+
assertTrue(sourceIterator.hasNext());
89+
bucket = sourceIterator.next();
90+
}
91+
long bucketKey = ((Double) bucket.getKey()).longValue();
92+
93+
// test correctness, the results from the aggregation and the results from the transform should be the same
94+
assertThat(
95+
"Buckets did not match, source: " + source + ", expected: " + bucketKey + ", iteration: " + iteration,
96+
transformBucketKey,
97+
equalTo(bucketKey)
98+
);
99+
assertThat(
100+
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
101+
((Integer) XContentMapValues.extractValue("count", source)).longValue(),
102+
equalTo(bucket.getDocCount())
103+
);
104+
105+
// TODO: gh#63801 transform is not optimized for histogram it, it should only rewrite documents that require it
106+
}
107+
108+
assertFalse(sourceIterator.hasNext());
109+
assertFalse(destIterator.hasNext());
110+
}
111+
112+
}

0 commit comments

Comments
 (0)