Skip to content

Commit d51b995

Browse files
author
Hendrik Muhs
authored
[Transform] optmize histogam group_by change detection (#74031)
implement a simple change optimization for histograms using min and max aggregations. The optimization is not applied if the range cutoff would be too small compared to the overall range from previous checkpoints. At least 20% must be cut compared to former checkpoints. fixes #63801
1 parent 6d52cd6 commit d51b995

File tree

5 files changed

+93
-25
lines changed

5 files changed

+93
-25
lines changed

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/ContinuousTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
1717
import org.elasticsearch.client.transform.transforms.TransformConfig;
1818
import org.elasticsearch.common.settings.Settings;
19-
import org.elasticsearch.core.TimeValue;
2019
import org.elasticsearch.common.util.concurrent.ThreadContext;
2120
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
21+
import org.elasticsearch.core.TimeValue;
2222
import org.elasticsearch.search.SearchModule;
2323
import org.elasticsearch.search.aggregations.AggregationBuilders;
2424
import org.elasticsearch.search.aggregations.AggregatorFactories;
@@ -44,6 +44,7 @@
4444
public abstract class ContinuousTestCase extends ESRestTestCase {
4545

4646
public static final TimeValue SYNC_DELAY = new TimeValue(1, TimeUnit.SECONDS);
47+
public static final int METRIC_TREND = 5000;
4748
public static final String CONTINUOUS_EVENTS_SOURCE_INDEX = "test-transform-continuous-events";
4849
public static final String INGEST_PIPELINE = "transform-ingest";
4950
public static final String MAX_RUN_FIELD = "run.max";

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/HistogramGroupByIT.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.elasticsearch.xpack.transform.integration.continuous;
22

3-
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
43
import org.elasticsearch.action.search.SearchRequest;
54
import org.elasticsearch.action.search.SearchResponse;
65
import org.elasticsearch.action.support.IndicesOptions;
@@ -10,7 +9,6 @@
109
import org.elasticsearch.client.transform.transforms.pivot.GroupConfig;
1110
import org.elasticsearch.client.transform.transforms.pivot.HistogramGroupSource;
1211
import org.elasticsearch.client.transform.transforms.pivot.PivotConfig;
13-
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1412
import org.elasticsearch.search.SearchHit;
1513
import org.elasticsearch.search.aggregations.AggregatorFactories;
1614
import org.elasticsearch.search.aggregations.BucketOrder;
@@ -25,9 +23,11 @@
2523
import java.util.Map;
2624
import java.util.Set;
2725

26+
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
2827
import static org.hamcrest.Matchers.equalTo;
28+
import static org.hamcrest.Matchers.is;
29+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2930

30-
@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/67887")
3131
public class HistogramGroupByIT extends ContinuousTestCase {
3232
private static final String NAME = "continuous-histogram-pivot-test";
3333

@@ -90,7 +90,7 @@ public void testIteration(int iteration, Set<String> modifiedEvents) throws IOEx
9090
SearchHit searchHit = destIterator.next();
9191
Map<String, Object> source = searchHit.getSourceAsMap();
9292

93-
Long transformBucketKey = ((Integer) XContentMapValues.extractValue("metric", source)).longValue();
93+
Long transformBucketKey = ((Integer) extractValue("metric", source)).longValue();
9494

9595
// aggs return buckets with 0 doc_count while composite aggs skip over them
9696
while (bucket.getDocCount() == 0L) {
@@ -107,11 +107,26 @@ public void testIteration(int iteration, Set<String> modifiedEvents) throws IOEx
107107
);
108108
assertThat(
109109
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
110-
((Integer) XContentMapValues.extractValue("count", source)).longValue(),
110+
((Integer) extractValue("count", source)).longValue(),
111111
equalTo(bucket.getDocCount())
112112
);
113113

114-
// TODO: gh#63801 transform is not optimized for histogram it, it should only rewrite documents that require it
114+
// test optimization, transform should only rewrite documents that require it
115+
// we artificially created a trend, that's why smaller buckets should not get rewritten
116+
if (transformBucketKey < iteration * METRIC_TREND) {
117+
assertThat(
118+
"Ingest run: "
119+
+ extractValue(INGEST_RUN_FIELD, source)
120+
+ " did not match max run: "
121+
+ extractValue(MAX_RUN_FIELD, source)
122+
+ ", iteration: "
123+
+ iteration
124+
+ " full source: "
125+
+ source,
126+
(Integer) extractValue(INGEST_RUN_FIELD, source) - (Integer) extractValue(MAX_RUN_FIELD, source),
127+
is(lessThanOrEqualTo(1))
128+
);
129+
}
115130
}
116131

117132
assertFalse(sourceIterator.hasNext());

x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@
3333
import org.elasticsearch.client.transform.transforms.TransformStats;
3434
import org.elasticsearch.common.Strings;
3535
import org.elasticsearch.common.bytes.BytesReference;
36-
import org.elasticsearch.core.Tuple;
3736
import org.elasticsearch.common.settings.Settings;
38-
import org.elasticsearch.core.TimeValue;
3937
import org.elasticsearch.common.util.concurrent.ThreadContext;
4038
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
4139
import org.elasticsearch.common.xcontent.XContentBuilder;
4240
import org.elasticsearch.common.xcontent.XContentType;
4341
import org.elasticsearch.common.xcontent.support.XContentMapValues;
42+
import org.elasticsearch.core.TimeValue;
43+
import org.elasticsearch.core.Tuple;
4444
import org.elasticsearch.search.SearchModule;
4545
import org.elasticsearch.test.rest.ESRestTestCase;
4646
import org.junit.After;
@@ -88,7 +88,7 @@
8888
* - sync config for continuous mode
8989
* - page size 10 to trigger paging
9090
* - count field to test how many buckets
91-
* - max run field to check what was the hight run field, see below for more details
91+
* - max run field to check what was the highest run field, see below for more details
9292
* - a test ingest pipeline
9393
* - execute 10 rounds ("run"):
9494
* - set run = #round
@@ -228,7 +228,7 @@ public void testContinousEvents() throws Exception {
228228
Integer metric = metric_bucket.get((numDoc + randomIntBetween(0, 50)) % 50);
229229
if (metric != null) {
230230
// randomize, but ensure it falls into the same bucket
231-
int randomizedMetric = metric + randomIntBetween(0, 99);
231+
int randomizedMetric = run * ContinuousTestCase.METRIC_TREND + metric + randomIntBetween(0, 99);
232232
source.append("\"metric\":").append(randomizedMetric).append(",");
233233
}
234234

@@ -517,7 +517,7 @@ private void waitUntilTransformsProcessedNewData(TimeValue delay, int iteration)
517517
stats.getCheckpointingInfo().getLastSearchTime(),
518518
greaterThan(waitUntil)
519519
);
520-
}, 20, TimeUnit.SECONDS);
520+
}, 30, TimeUnit.SECONDS);
521521
}
522522
}
523523

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/CompositeBucketsChangeCollector.java

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99

1010
import org.apache.lucene.search.BooleanQuery;
1111
import org.elasticsearch.action.search.SearchResponse;
12-
import org.elasticsearch.core.Nullable;
1312
import org.elasticsearch.common.Rounding;
1413
import org.elasticsearch.common.geo.GeoPoint;
14+
import org.elasticsearch.core.Nullable;
1515
import org.elasticsearch.geometry.Rectangle;
1616
import org.elasticsearch.index.query.BoolQueryBuilder;
1717
import org.elasticsearch.index.query.ExistsQueryBuilder;
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
3434
import org.elasticsearch.search.builder.SearchSourceBuilder;
3535
import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource;
36+
import org.elasticsearch.xpack.core.transform.transforms.pivot.HistogramGroupSource;
3637
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
3738
import org.elasticsearch.xpack.transform.transforms.Function.ChangeCollector;
3839

@@ -382,8 +383,7 @@ public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheck
382383
if (missingBucket) {
383384
return null;
384385
}
385-
// we only need to round the lower bound, because the checkpoint will not contain new data for the upper bound
386-
return new RangeQueryBuilder(sourceFieldName).gte(rounding.round(lowerBound)).lte(upperBound).format("epoch_millis");
386+
return new RangeQueryBuilder(sourceFieldName).gte(lowerBound).lte(upperBound).format("epoch_millis");
387387
}
388388

389389
@Override
@@ -401,7 +401,8 @@ public boolean collectChangesFromAggregations(Aggregations aggregations) {
401401
final SingleValue upperBoundResult = aggregations.get(maxAggregationOutputName);
402402

403403
if (lowerBoundResult != null && upperBoundResult != null) {
404-
lowerBound = (long) lowerBoundResult.value();
404+
// we only need to round the lower bound, because the checkpoint will not contain new data for the upper bound
405+
lowerBound = rounding.round((long) lowerBoundResult.value());
405406
upperBound = (long) upperBoundResult.value();
406407

407408
return false;
@@ -425,14 +426,40 @@ public boolean queryForChanges() {
425426

426427
static class HistogramFieldCollector implements FieldCollector {
427428

429+
// cutoff is calculated with max_range/current_range, current_range must be smaller
430+
// the optimization gets only applied if we cut at least by 20%
431+
private static final double MIN_CUT_OFF = 1.2;
428432
private final String sourceFieldName;
429-
private final String targetFieldName;
430433
private final boolean missingBucket;
434+
private final double interval;
435+
private final Collection<AggregationBuilder> histogramFieldAggregations;
436+
private final String minAggregationOutputName;
437+
private final String maxAggregationOutputName;
438+
439+
private double minLowerBound;
440+
private double maxUpperBound;
431441

432-
HistogramFieldCollector(final String sourceFieldName, final String targetFieldName, final boolean missingBucket) {
442+
private double lowerBound;
443+
private double upperBound;
444+
445+
HistogramFieldCollector(
446+
final String sourceFieldName,
447+
final String targetFieldName,
448+
final boolean missingBucket,
449+
final double interval
450+
) {
451+
assert sourceFieldName != null;
433452
this.sourceFieldName = sourceFieldName;
434-
this.targetFieldName = targetFieldName;
435453
this.missingBucket = missingBucket;
454+
455+
this.interval = interval;
456+
457+
minAggregationOutputName = COMPOSITE_AGGREGATION_NAME + "." + targetFieldName + ".min";
458+
maxAggregationOutputName = COMPOSITE_AGGREGATION_NAME + "." + targetFieldName + ".max";
459+
460+
histogramFieldAggregations = new ArrayList<>();
461+
histogramFieldAggregations.add(AggregationBuilders.min(minAggregationOutputName).field(sourceFieldName));
462+
histogramFieldAggregations.add(AggregationBuilders.max(maxAggregationOutputName).field(sourceFieldName));
436463
}
437464

438465
@Override
@@ -452,30 +479,54 @@ public boolean collectChangesFromCompositeBuckets(Collection<? extends Bucket> b
452479

453480
@Override
454481
public QueryBuilder filterByChanges(long lastCheckpointTimestamp, long nextcheckpointTimestamp) {
455-
return null;
482+
if (missingBucket) {
483+
return null;
484+
}
485+
486+
// (upperBound - lowerBound) >= interval, so never 0
487+
if ((maxUpperBound - minLowerBound) / (upperBound - lowerBound) < MIN_CUT_OFF) {
488+
return null;
489+
}
490+
491+
return new RangeQueryBuilder(sourceFieldName).gte(lowerBound).lt(upperBound);
456492
}
457493

458494
@Override
459495
public void clear() {}
460496

461497
@Override
462498
public Collection<AggregationBuilder> aggregateChanges() {
463-
return Collections.emptyList();
499+
// optimization can't be applied for missing bucket
500+
return missingBucket ? Collections.emptyList() : histogramFieldAggregations;
464501
}
465502

466503
@Override
467504
public boolean collectChangesFromAggregations(Aggregations aggregations) {
505+
final SingleValue lowerBoundResult = aggregations.get(minAggregationOutputName);
506+
final SingleValue upperBoundResult = aggregations.get(maxAggregationOutputName);
507+
508+
if (lowerBoundResult != null && upperBoundResult != null) {
509+
lowerBound = interval * (Math.floor(lowerBoundResult.value() / interval));
510+
upperBound = interval * (1 + Math.floor(upperBoundResult.value() / interval));
511+
512+
minLowerBound = Math.min(minLowerBound, lowerBound);
513+
maxUpperBound = Math.max(maxUpperBound, upperBound);
514+
return false;
515+
}
516+
468517
return true;
469518
}
470519

471520
@Override
472521
public boolean isOptimized() {
473-
return false;
522+
// not optimized if missing bucket is true
523+
return missingBucket == false;
474524
}
475525

476526
@Override
477527
public boolean queryForChanges() {
478-
return false;
528+
// not optimized if missing bucket is true
529+
return missingBucket == false;
479530
}
480531
}
481532

@@ -774,7 +825,8 @@ static Map<String, FieldCollector> createFieldCollectors(Map<String, SingleGroup
774825
new CompositeBucketsChangeCollector.HistogramFieldCollector(
775826
entry.getValue().getField(),
776827
entry.getKey(),
777-
entry.getValue().getMissingBucket()
828+
entry.getValue().getMissingBucket(),
829+
((HistogramGroupSource) entry.getValue()).getInterval()
778830
)
779831
);
780832
break;

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformConfigLinterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void testGetWarnings_Pivot_CouldNotFindAnyOptimization() {
9595
new PivotConfig(
9696
GroupConfigTests.randomGroupConfig(
9797
() -> new HistogramGroupSource(
98-
randomAlphaOfLengthBetween(1, 20), null, false, randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false))),
98+
randomAlphaOfLengthBetween(1, 20), null, true, randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false))),
9999
AggregationConfigTests.randomAggregationConfig(),
100100
null);
101101
Function function = new Pivot(pivotConfig, new SettingsConfig(), Version.CURRENT);

0 commit comments

Comments
 (0)