Skip to content

Commit 678802a

Browse files
committed
Fixes multiBucketConsumer accounting
1 parent 33458f6 commit 678802a

File tree

1 file changed

+9
-12
lines changed

1 file changed

+9
-12
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

+9-12
Original file line numberDiff line numberDiff line change
@@ -327,12 +327,8 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
327327
if (reduceRounding.round(top.current.key) != key) {
328328
// the key changes, reduce what we already buffered and reset the buffer for current buckets
329329
final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceRounding, reduceContext);
330-
if (reduceContext.isFinalReduce() == false) {
331-
reduceContext.consumeBucketsAndMaybeBreak(1);
332-
reducedBuckets.add(reduced);
333-
} else {
334-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
335-
}
330+
reduceContext.consumeBucketsAndMaybeBreak(1);
331+
reducedBuckets.add(reduced);
336332
currentBuckets.clear();
337333
key = reduceRounding.round(top.current.key);
338334
}
@@ -351,12 +347,8 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
351347

352348
if (currentBuckets.isEmpty() == false) {
353349
final Bucket reduced = currentBuckets.get(0).reduce(currentBuckets, reduceRounding, reduceContext);
354-
if (reduceContext.isFinalReduce() == false) {
355-
reduceContext.consumeBucketsAndMaybeBreak(1);
356-
reducedBuckets.add(reduced);
357-
} else {
358-
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reduced));
359-
}
350+
reduceContext.consumeBucketsAndMaybeBreak(1);
351+
reducedBuckets.add(reduced);
360352
}
361353
}
362354

@@ -382,17 +374,22 @@ private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRo
382374
long roundedBucketKey = reduceRounding.round(bucket.key);
383375
if (Double.isNaN(key)) {
384376
key = roundedBucketKey;
377+
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
385378
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
386379
} else if (roundedBucketKey == key) {
380+
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
387381
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
388382
} else {
383+
reduceContext.consumeBucketsAndMaybeBreak(1);
389384
mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, reduceRounding, reduceContext));
390385
sameKeyedBuckets.clear();
391386
key = roundedBucketKey;
387+
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
392388
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
393389
}
394390
}
395391
if (sameKeyedBuckets.isEmpty() == false) {
392+
reduceContext.consumeBucketsAndMaybeBreak(1);
396393
mergedBuckets.add(sameKeyedBuckets.get(0).reduce(sameKeyedBuckets, reduceRounding, reduceContext));
397394
}
398395
reducedBuckets = mergedBuckets;

0 commit comments

Comments
 (0)