Skip to content

Commit 87b363a

Browse files
committed
Correctly set doc_count when MovAvg "predicts" values on existing buckets (#24892)
If the bucket already exists, due to non-overlapping series or missing data, the MovAvg creates a merged bucket with the existing aggs + the new prediction. This fixes a small bug where the doc_count was not being set correctly. Relates to #24327
1 parent 977dc29 commit 87b363a

File tree

2 files changed

+68
-1
lines changed

2 files changed

+68
-1
lines changed

core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext
161161
}).collect(Collectors.toList());
162162
aggs.add(new InternalSimpleValue(name(), predictions[i], formatter, new ArrayList<PipelineAggregator>(), metaData()));
163163

164-
Bucket newBucket = factory.createBucket(newKey, 0, new InternalAggregations(aggs));
164+
Bucket newBucket = factory.createBucket(newKey, bucket.getDocCount(), new InternalAggregations(aggs));
165165

166166
// Overwrite the existing bucket with the new version
167167
newBuckets.set(lastValidPosition + i + 1, newBucket);

core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919

2020
package org.elasticsearch.search.aggregations.pipeline.moving.avg;
2121

22+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
23+
import org.elasticsearch.action.bulk.BulkResponse;
2224
import org.elasticsearch.action.index.IndexRequestBuilder;
2325
import org.elasticsearch.action.search.SearchPhaseExecutionException;
2426
import org.elasticsearch.action.search.SearchResponse;
27+
import org.elasticsearch.action.support.WriteRequest;
2528
import org.elasticsearch.common.collect.EvictingQueue;
2629
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
2730
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
@@ -41,6 +44,7 @@
4144
import org.elasticsearch.test.ESIntegTestCase;
4245
import org.hamcrest.Matchers;
4346

47+
import java.io.IOException;
4448
import java.util.ArrayList;
4549
import java.util.Arrays;
4650
import java.util.Collection;
@@ -67,6 +71,7 @@
6771
public class MovAvgIT extends ESIntegTestCase {
6872
private static final String INTERVAL_FIELD = "l_value";
6973
private static final String VALUE_FIELD = "v_value";
74+
private static final String VALUE_FIELD2 = "v_value2";
7075

7176
static int interval;
7277
static int numBuckets;
@@ -1204,6 +1209,68 @@ public void testCheckIfTunableCanBeMinimized() {
12041209
}
12051210
}
12061211

1212+
public void testPredictWithNonEmptyBuckets() throws Exception {
1213+
1214+
createIndex("predict_non_empty");
1215+
BulkRequestBuilder bulkBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
1216+
1217+
for (int i = 0; i < 10; i++) {
1218+
bulkBuilder.add(client().prepareIndex("predict_non_empty", "type").setSource(
1219+
jsonBuilder().startObject().field(INTERVAL_FIELD, i)
1220+
.field(VALUE_FIELD, 10)
1221+
.field(VALUE_FIELD2, 10)
1222+
.endObject()));
1223+
}
1224+
for (int i = 10; i < 20; i++) {
1225+
// Extra so there is a bucket that only has second field
1226+
bulkBuilder.add(client().prepareIndex("predict_non_empty", "type").setSource(
1227+
jsonBuilder().startObject().field(INTERVAL_FIELD, i).field(VALUE_FIELD2, 10).endObject()));
1228+
}
1229+
1230+
bulkBuilder.execute().actionGet();
1231+
ensureSearchable();
1232+
1233+
SearchResponse response = client()
1234+
.prepareSearch("predict_non_empty")
1235+
.setTypes("type")
1236+
.addAggregation(
1237+
histogram("histo")
1238+
.field(INTERVAL_FIELD)
1239+
.interval(1)
1240+
.subAggregation(max("max").field(VALUE_FIELD))
1241+
.subAggregation(max("max2").field(VALUE_FIELD2))
1242+
.subAggregation(
1243+
movingAvg("movavg_values", "max")
1244+
.window(windowSize)
1245+
.modelBuilder(new SimpleModel.SimpleModelBuilder())
1246+
.gapPolicy(BucketHelpers.GapPolicy.SKIP).predict(5))).execute().actionGet();
1247+
1248+
assertSearchResponse(response);
1249+
1250+
Histogram histo = response.getAggregations().get("histo");
1251+
assertThat(histo, notNullValue());
1252+
assertThat(histo.getName(), equalTo("histo"));
1253+
List<? extends Bucket> buckets = histo.getBuckets();
1254+
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(20));
1255+
1256+
SimpleValue current = buckets.get(0).getAggregations().get("movavg_values");
1257+
assertThat(current, nullValue());
1258+
1259+
for (int i = 1; i < 20; i++) {
1260+
Bucket bucket = buckets.get(i);
1261+
assertThat(bucket, notNullValue());
1262+
assertThat(bucket.getKey(), equalTo((double)i));
1263+
assertThat(bucket.getDocCount(), equalTo(1L));
1264+
SimpleValue movAvgAgg = bucket.getAggregations().get("movavg_values");
1265+
if (i < 15) {
1266+
assertThat(movAvgAgg, notNullValue());
1267+
assertThat(movAvgAgg.value(), equalTo(10d));
1268+
} else {
1269+
assertThat(movAvgAgg, nullValue());
1270+
}
1271+
}
1272+
}
1273+
12071274
private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) {
12081275
if (!expectedBucketIter.hasNext()) {
12091276
fail("`expectedBucketIter` iterator ended before `actual` iterator, size mismatch");

0 commit comments

Comments
 (0)