Skip to content

Commit 8779f57

Browse files
author
Hendrik Muhs
committed
[Transform] improve continuous transform date_histogram group_by with ingest timestamps (#63315)
optimize continuous data histogram group_by for other time fields independent of sync, this allows the usage of ingest timestamps in continuous mode fixes #59061
1 parent db50024 commit 8779f57

File tree

14 files changed

+618
-212
lines changed

14 files changed

+618
-212
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ static void register(ValuesSourceRegistry.Builder builder) {
132132
private int precision = GeoTileGridAggregationBuilder.DEFAULT_PRECISION;
133133
private GeoBoundingBox geoBoundingBox = new GeoBoundingBox(new GeoPoint(Double.NaN, Double.NaN), new GeoPoint(Double.NaN, Double.NaN));
134134

135-
GeoTileGridValuesSourceBuilder(String name) {
135+
public GeoTileGridValuesSourceBuilder(String name) {
136136
super(name);
137137
}
138138

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/DateHistogramGroupSource.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -348,9 +348,4 @@ public boolean equals(Object other) {
348348
public int hashCode() {
349349
return Objects.hash(field, scriptConfig, missingBucket, interval, timeZone);
350350
}
351-
352-
@Override
353-
public boolean supportsIncrementalBucketUpdate() {
354-
return false;
355-
}
356351
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GeoTileGroupSource.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,6 @@ public static GeoTileGroupSource fromXContent(final XContentParser parser, boole
106106
return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
107107
}
108108

109-
@Override
110-
public boolean supportsIncrementalBucketUpdate() {
111-
return true;
112-
}
113-
114109
@Override
115110
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
116111
builder.startObject();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/HistogramGroupSource.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,4 @@ public boolean equals(Object other) {
101101
public int hashCode() {
102102
return Objects.hash(field, scriptConfig, interval);
103103
}
104-
105-
@Override
106-
public boolean supportsIncrementalBucketUpdate() {
107-
return false;
108-
}
109104
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/PivotConfig.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,16 +110,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
110110
return builder;
111111
}
112112

113-
public void toCompositeAggXContent(XContentBuilder builder, boolean forChangeDetection) throws IOException {
113+
public void toCompositeAggXContent(XContentBuilder builder) throws IOException {
114114
builder.startObject();
115115
builder.field(CompositeAggregationBuilder.SOURCES_FIELD_NAME.getPreferredName());
116116
builder.startArray();
117117

118118
for (Entry<String, SingleGroupSource> groupBy : groups.getGroups().entrySet()) {
119-
// some group source do not implement change detection or not makes no sense, skip those
120-
if (forChangeDetection && groupBy.getValue().supportsIncrementalBucketUpdate() == false) {
121-
continue;
122-
}
123119
builder.startObject();
124120
builder.startObject(groupBy.getKey());
125121
builder.field(groupBy.getValue().getType().value(), groupBy.getValue());

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/SingleGroupSource.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,6 @@ public void writeTo(StreamOutput out) throws IOException {
140140

141141
public abstract Type getType();
142142

143-
public abstract boolean supportsIncrementalBucketUpdate();
144-
145143
public String getField() {
146144
return field;
147145
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/pivot/TermsGroupSource.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,4 @@ public Type getType() {
5050
public static TermsGroupSource fromXContent(final XContentParser parser, boolean lenient) throws IOException {
5151
return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
5252
}
53-
54-
@Override
55-
public boolean supportsIncrementalBucketUpdate() {
56-
return true;
57-
}
5853
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package org.elasticsearch.xpack.transform.integration.continuous;
2+
3+
import org.elasticsearch.action.search.SearchRequest;
4+
import org.elasticsearch.action.search.SearchResponse;
5+
import org.elasticsearch.action.support.IndicesOptions;
6+
import org.elasticsearch.client.transform.transforms.DestConfig;
7+
import org.elasticsearch.client.transform.transforms.SourceConfig;
8+
import org.elasticsearch.client.transform.transforms.TransformConfig;
9+
import org.elasticsearch.client.transform.transforms.pivot.DateHistogramGroupSource;
10+
import org.elasticsearch.client.transform.transforms.pivot.GroupConfig;
11+
import org.elasticsearch.client.transform.transforms.pivot.PivotConfig;
12+
import org.elasticsearch.client.transform.transforms.pivot.TermsGroupSource;
13+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
14+
import org.elasticsearch.search.SearchHit;
15+
import org.elasticsearch.search.aggregations.AggregatorFactories;
16+
import org.elasticsearch.search.aggregations.BucketOrder;
17+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
18+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
19+
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
20+
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
21+
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
22+
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
23+
import org.elasticsearch.search.builder.SearchSourceBuilder;
24+
25+
import java.io.IOException;
26+
import java.time.ZonedDateTime;
27+
import java.util.ArrayList;
28+
import java.util.HashMap;
29+
import java.util.Iterator;
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.is;
35+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
36+
37+
/**
38+
* Testcase for date histogram group_by on _different_ fields than used for sync
39+
*/
40+
public class DateHistogramGroupByOtherTimeFieldIT extends ContinuousTestCase {
41+
private static final String NAME = "continuous-date-histogram-pivot-other-timefield-test";
42+
43+
private final boolean addGroupByTerms;
44+
45+
public DateHistogramGroupByOtherTimeFieldIT() {
46+
addGroupByTerms = randomBoolean();
47+
}
48+
49+
@Override
50+
public TransformConfig createConfig() {
51+
TransformConfig.Builder transformConfigBuilder = new TransformConfig.Builder();
52+
addCommonBuilderParameters(transformConfigBuilder);
53+
transformConfigBuilder.setSource(new SourceConfig(CONTINUOUS_EVENTS_SOURCE_INDEX));
54+
transformConfigBuilder.setDest(new DestConfig(NAME, INGEST_PIPELINE));
55+
transformConfigBuilder.setId(NAME);
56+
PivotConfig.Builder pivotConfigBuilder = new PivotConfig.Builder();
57+
GroupConfig.Builder groups = new GroupConfig.Builder().groupBy(
58+
"second",
59+
new DateHistogramGroupSource.Builder().setField("metric-timestamp")
60+
.setInterval(new DateHistogramGroupSource.FixedInterval(DateHistogramInterval.SECOND))
61+
.build()
62+
);
63+
if (addGroupByTerms) {
64+
groups.groupBy("event", new TermsGroupSource.Builder().setField("event").build());
65+
}
66+
pivotConfigBuilder.setGroups(groups.build());
67+
AggregatorFactories.Builder aggregations = new AggregatorFactories.Builder();
68+
addCommonAggregations(aggregations);
69+
70+
pivotConfigBuilder.setAggregations(aggregations);
71+
transformConfigBuilder.setPivotConfig(pivotConfigBuilder.build());
72+
return transformConfigBuilder.build();
73+
}
74+
75+
@Override
76+
public String getName() {
77+
return NAME;
78+
}
79+
80+
@Override
81+
public void testIteration(int iteration) throws IOException {
82+
SearchRequest searchRequestSource = new SearchRequest(CONTINUOUS_EVENTS_SOURCE_INDEX).allowPartialSearchResults(false)
83+
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
84+
SearchSourceBuilder sourceBuilderSource = new SearchSourceBuilder().size(0);
85+
DateHistogramAggregationBuilder bySecond = new DateHistogramAggregationBuilder("second").field("metric-timestamp")
86+
.fixedInterval(DateHistogramInterval.SECOND)
87+
.order(BucketOrder.key(true));
88+
89+
if (addGroupByTerms) {
90+
TermsAggregationBuilder terms = new TermsAggregationBuilder("event").size(1000).field("event").order(BucketOrder.key(true));
91+
bySecond.subAggregation(terms);
92+
}
93+
sourceBuilderSource.aggregation(bySecond);
94+
searchRequestSource.source(sourceBuilderSource);
95+
SearchResponse responseSource = search(searchRequestSource);
96+
97+
SearchRequest searchRequestDest = new SearchRequest(NAME).allowPartialSearchResults(false)
98+
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
99+
SearchSourceBuilder sourceBuilderDest = new SearchSourceBuilder().size(10000).sort("second");
100+
if (addGroupByTerms) {
101+
sourceBuilderDest.sort("event");
102+
}
103+
104+
searchRequestDest.source(sourceBuilderDest);
105+
SearchResponse responseDest = search(searchRequestDest);
106+
107+
if (addGroupByTerms) {
108+
assertResultsGroupByDateHistogramAndTerms(iteration, responseSource, responseDest);
109+
} else {
110+
assertResultsGroupByDateHistogram(iteration, responseSource, responseDest);
111+
}
112+
}
113+
114+
private void assertResultsGroupByDateHistogram(int iteration, SearchResponse responseSource, SearchResponse responseDest) {
115+
List<? extends Bucket> buckets = ((Histogram) responseSource.getAggregations().get("second")).getBuckets();
116+
Iterator<? extends Bucket> sourceIterator = buckets.iterator();
117+
Iterator<SearchHit> destIterator = responseDest.getHits().iterator();
118+
119+
while (sourceIterator.hasNext() && destIterator.hasNext()) {
120+
Bucket bucket = sourceIterator.next();
121+
SearchHit searchHit = destIterator.next();
122+
Map<String, Object> source = searchHit.getSourceAsMap();
123+
124+
Long transformBucketKey = (Long) XContentMapValues.extractValue("second", source);
125+
126+
// aggs return buckets with 0 doc_count while composite aggs skip over them
127+
while (bucket.getDocCount() == 0L) {
128+
assertTrue(sourceIterator.hasNext());
129+
bucket = sourceIterator.next();
130+
}
131+
long bucketKey = ((ZonedDateTime) bucket.getKey()).toEpochSecond() * 1000;
132+
133+
// test correctness, the results from the aggregation and the results from the transform should be the same
134+
assertThat(
135+
"Buckets did not match, source: " + source + ", expected: " + bucketKey + ", iteration: " + iteration,
136+
transformBucketKey,
137+
equalTo(bucketKey)
138+
);
139+
assertThat(
140+
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
141+
XContentMapValues.extractValue("count", source),
142+
equalTo(Double.valueOf(bucket.getDocCount()))
143+
);
144+
145+
// transform should only rewrite documents that require it
146+
assertThat(
147+
"Ingest run: "
148+
+ XContentMapValues.extractValue(INGEST_RUN_FIELD, source)
149+
+ " did not match max run: "
150+
+ XContentMapValues.extractValue(MAX_RUN_FIELD, source)
151+
+ ", iteration: "
152+
+ iteration,
153+
// we use a fixed_interval of `1s`, the transform runs every `1s`, a bucket might be recalculated at the next run
154+
// but should NOT be recalculated for the 2nd/3rd/... run
155+
Double.valueOf((Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source)) - (Double) XContentMapValues
156+
.extractValue(MAX_RUN_FIELD, source),
157+
is(lessThanOrEqualTo(1.0))
158+
);
159+
160+
}
161+
assertFalse(sourceIterator.hasNext());
162+
assertFalse(destIterator.hasNext());
163+
}
164+
165+
private void assertResultsGroupByDateHistogramAndTerms(int iteration, SearchResponse responseSource, SearchResponse responseDest) {
166+
List<? extends Bucket> buckets = ((Histogram) responseSource.getAggregations().get("second")).getBuckets();
167+
168+
List<Map<String, Object>> flattenedBuckets = new ArrayList<>();
169+
for (Bucket b : buckets) {
170+
if (b.getDocCount() == 0) {
171+
continue;
172+
}
173+
long second = ((ZonedDateTime) b.getKey()).toEpochSecond() * 1000;
174+
List<? extends Terms.Bucket> terms = ((Terms) b.getAggregations().get("event")).getBuckets();
175+
for (Terms.Bucket t : terms) {
176+
flattenedBuckets.add(flattenedResult(second, t.getKeyAsString(), t.getDocCount()));
177+
}
178+
}
179+
180+
Iterator<Map<String, Object>> sourceIterator = flattenedBuckets.iterator();
181+
Iterator<SearchHit> destIterator = responseDest.getHits().iterator();
182+
183+
while (sourceIterator.hasNext() && destIterator.hasNext()) {
184+
Map<String, Object> bucket = sourceIterator.next();
185+
186+
SearchHit searchHit = destIterator.next();
187+
Map<String, Object> source = searchHit.getSourceAsMap();
188+
189+
Long transformBucketKey = (Long) XContentMapValues.extractValue("second", source);
190+
191+
// test correctness, the results from the aggregation and the results from the transform should be the same
192+
assertThat(
193+
"Buckets did not match, source: " + source + ", expected: " + bucket.get("second") + ", iteration: " + iteration,
194+
transformBucketKey,
195+
equalTo(bucket.get("second"))
196+
);
197+
assertThat(
198+
"Doc count did not match, source: " + source + ", expected: " + bucket.get("count") + ", iteration: " + iteration,
199+
XContentMapValues.extractValue("count", source),
200+
equalTo(Double.valueOf(((Long) bucket.get("count"))))
201+
);
202+
assertThat(
203+
"Term did not match, source: " + source + ", expected: " + bucket.get("event") + ", iteration: " + iteration,
204+
XContentMapValues.extractValue("event", source),
205+
equalTo(bucket.get("event"))
206+
);
207+
208+
// transform should only rewrite documents that require it
209+
assertThat(
210+
"Ingest run: "
211+
+ XContentMapValues.extractValue(INGEST_RUN_FIELD, source)
212+
+ " did not match max run: "
213+
+ XContentMapValues.extractValue(MAX_RUN_FIELD, source)
214+
+ ", iteration: "
215+
+ iteration,
216+
// we use a fixed_interval of `1s`, the transform runs every `1s`, a bucket might be recalculated at the next run
217+
// but should NOT be recalculated for the 2nd/3rd/... run
218+
Double.valueOf((Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source)) - (Double) XContentMapValues
219+
.extractValue(MAX_RUN_FIELD, source),
220+
is(lessThanOrEqualTo(1.0))
221+
);
222+
}
223+
assertFalse(sourceIterator.hasNext());
224+
assertFalse(destIterator.hasNext());
225+
}
226+
227+
private static Map<String, Object> flattenedResult(long second, String event, long count) {
228+
Map<String, Object> doc = new HashMap<>();
229+
doc.put("second", second);
230+
doc.put("event", event);
231+
doc.put("count", count);
232+
return doc;
233+
}
234+
}

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public void setClusterSettings() throws IOException {
125125
public void registerTestCases() {
126126
addTestCaseIfNotDisabled(new TermsGroupByIT());
127127
addTestCaseIfNotDisabled(new DateHistogramGroupByIT());
128+
addTestCaseIfNotDisabled(new DateHistogramGroupByOtherTimeFieldIT());
128129
}
129130

130131
@Before
@@ -214,6 +215,11 @@ public void testContinousEvents() throws Exception {
214215
source.append("\"location\":\"").append(randomizedLat + "," + randomizedLon).append("\",");
215216
}
216217

218+
// simulate a different timestamp that is off from the timestamp used for sync, so it can fall into the previous bucket
219+
String metric_date_string = ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC"))
220+
.format(runDate.minusSeconds(randomIntBetween(0, 5)).plusNanos(randomIntBetween(0, 999999)));
221+
source.append("\"metric-timestamp\":\"").append(metric_date_string).append("\",");
222+
217223
String date_string = ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC"))
218224
.format(runDate.plusNanos(randomIntBetween(0, 999999)));
219225

@@ -318,6 +324,9 @@ private void putIndex(String indexName, String dateType, boolean isDataStream) t
318324
.startObject("run")
319325
.field("type", "integer")
320326
.endObject()
327+
.startObject("metric-timestamp")
328+
.field("type", dateType)
329+
.endObject()
321330
.endObject()
322331
.endObject();
323332
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ public interface ChangeCollector {
6666
* TODO: replace the boolean with a more descriptive enum.
6767
*
6868
* @param searchResponse the response after querying for changes
69-
* @return true in case of no more changed buckets, false in case changes buckets have been collected
69+
* @return the position of the change collector, null in case the collector is exhausted
7070
*/
71-
boolean processSearchResponse(SearchResponse searchResponse);
71+
Map<String, Object> processSearchResponse(SearchResponse searchResponse);
7272

7373
/**
7474
* Build the filter query to narrow the result set given the previously collected changes.
@@ -87,18 +87,18 @@ public interface ChangeCollector {
8787
void clear();
8888

8989
/**
90-
* Get the bucket position of the changes collector.
90+
* Whether the collector optimizes change detection by narrowing the required query.
9191
*
92-
* @return the position, null in case the collector is exhausted
92+
* @return true if the collector optimizes change detection
9393
*/
94-
Map<String, Object> getBucketPosition();
94+
boolean isOptimized();
9595

9696
/**
97-
* Whether the collector optimizes change detection by narrowing the required query.
97+
* Whether the collector requires an extra query to identify the changes.
9898
*
99-
* @return true if the collector optimizes change detection
99+
* @return true if collector requires an extra query for identifying changes
100100
*/
101-
boolean isOptimized();
101+
boolean queryForChanges();
102102
}
103103

104104
/**
@@ -182,17 +182,6 @@ void preview(
182182
*/
183183
int getInitialPageSize();
184184

185-
/**
186-
* Whether this function - given its configuration - supports incremental bucket update used in continuous mode.
187-
*
188-
* If so, the indexer uses the change collector to update the continuous transform.
189-
*
190-
* TODO: simplify and remove this method if possible
191-
*
192-
* @return true if incremental bucket update is supported
193-
*/
194-
boolean supportsIncrementalBucketUpdate();
195-
196185
/**
197186
* Build the query for the next iteration
198187
*

0 commit comments

Comments
 (0)