Skip to content

Commit 993c782

Browse files
committed
Collects more buckets than needed on shards
This gives us more options at reduce time in terms of how we do the final merge of the buckeets to produce the final result
1 parent 678802a commit 993c782

File tree

4 files changed

+55
-24
lines changed

4 files changed

+55
-24
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator {
6161
private LongHash bucketOrds;
6262
private int targetBuckets;
6363
private MergingBucketsDeferringCollector deferringCollector;
64+
private int numCollectedValues = 0;
6465

6566
AutoDateHistogramAggregator(String name, AggregatorFactories factories, int numBuckets, Rounding[] roundings,
6667
@Nullable ValuesSource.Numeric valuesSource, DocValueFormat formatter, SearchContext aggregationContext, Aggregator parent,
@@ -109,6 +110,7 @@ public void collect(int doc, long bucket) throws IOException {
109110
long previousRounded = Long.MIN_VALUE;
110111
for (int i = 0; i < valuesCount; ++i) {
111112
long value = values.nextValue();
113+
numCollectedValues++;
112114
long rounded = roundings[roundingIdx].round(value);
113115
assert rounded >= previousRounded;
114116
if (rounded == previousRounded) {
@@ -120,7 +122,8 @@ public void collect(int doc, long bucket) throws IOException {
120122
collectExistingBucket(sub, doc, bucketOrd);
121123
} else {
122124
collectBucket(sub, doc, bucketOrd);
123-
while (bucketOrds.size() > targetBuckets) {
125+
double maxBuckets = Math.max(targetBuckets, targetBuckets * Math.log(numCollectedValues));
126+
while (bucketOrds.size() > maxBuckets) {
124127
increaseRounding();
125128
}
126129
}
@@ -179,14 +182,15 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
179182
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundings, roundingIdx,
180183
buildEmptySubAggregations());
181184

182-
return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, pipelineAggregators(), metaData());
185+
return new InternalAutoDateHistogram(name, buckets, targetBuckets, numCollectedValues, emptyBucketInfo, formatter,
186+
pipelineAggregators(), metaData());
183187
}
184188

185189
@Override
186190
public InternalAggregation buildEmptyAggregation() {
187191
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundings, roundingIdx,
188192
buildEmptySubAggregations());
189-
return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, emptyBucketInfo, formatter,
193+
return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, numCollectedValues, emptyBucketInfo, formatter,
190194
pipelineAggregators(), metaData());
191195
}
192196

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java

+29-11
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.ListIterator;
4444
import java.util.Map;
4545
import java.util.Objects;
46+
import java.util.stream.Collectors;
4647

4748
/**
4849
* Implementation of {@link Histogram}.
@@ -207,15 +208,17 @@ public int hashCode() {
207208
private final DocValueFormat format;
208209
private final BucketInfo bucketInfo;
209210
private final int targetBuckets;
211+
private final long numValuesCollected;
210212

211213

212-
InternalAutoDateHistogram(String name, List<Bucket> buckets, int targetBuckets, BucketInfo emptyBucketInfo, DocValueFormat formatter,
213-
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
214+
InternalAutoDateHistogram(String name, List<Bucket> buckets, int targetBuckets, long numValuesCollected, BucketInfo emptyBucketInfo,
215+
DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
214216
super(name, pipelineAggregators, metaData);
215217
this.buckets = buckets;
216218
this.bucketInfo = emptyBucketInfo;
217219
this.format = formatter;
218220
this.targetBuckets = targetBuckets;
221+
this.numValuesCollected = numValuesCollected;
219222
}
220223

221224
/**
@@ -227,6 +230,7 @@ public InternalAutoDateHistogram(StreamInput in) throws IOException {
227230
format = in.readNamedWriteable(DocValueFormat.class);
228231
buckets = in.readList(stream -> new Bucket(stream, format));
229232
this.targetBuckets = in.readVInt();
233+
this.numValuesCollected = in.readVLong();
230234
}
231235

232236
@Override
@@ -235,6 +239,7 @@ protected void doWriteTo(StreamOutput out) throws IOException {
235239
out.writeNamedWriteable(format);
236240
out.writeList(buckets);
237241
out.writeVInt(targetBuckets);
242+
out.writeVLong(numValuesCollected);
238243
}
239244

240245
@Override
@@ -255,13 +260,18 @@ public int getTargetBuckets() {
255260
return targetBuckets;
256261
}
257262

263+
public long getNumValuesCollected() {
264+
return numValuesCollected;
265+
}
266+
258267
public BucketInfo getBucketInfo() {
259268
return bucketInfo;
260269
}
261270

262271
@Override
263272
public InternalAutoDateHistogram create(List<Bucket> buckets) {
264-
return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators(), metaData);
273+
return new InternalAutoDateHistogram(name, buckets, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators(),
274+
metaData);
265275
}
266276

267277
@Override
@@ -365,7 +375,8 @@ private BucketReduceResult mergeBucketsIfNeeded(List<Bucket> reducedBuckets, int
365375
return new BucketReduceResult(reducedBuckets, reduceRounding, reduceRoundingIdx);
366376
}
367377

368-
private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRounding, ReduceContext reduceContext) {
378+
private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRounding,
379+
ReduceContext reduceContext) {
369380
List<Bucket> mergedBuckets = new ArrayList<>();
370381

371382
List<Bucket> sameKeyedBuckets = new ArrayList<>();
@@ -409,12 +420,13 @@ private static class BucketReduceResult {
409420
}
410421
}
411422

412-
private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, ReduceContext reduceContext) {
423+
private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, long numValuesCollected, ReduceContext reduceContext) {
413424
List<Bucket> list = currentResult.buckets;
414425
if (list.isEmpty()) {
415426
return currentResult;
416427
}
417-
int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, currentResult.roundingIdx,
428+
double maxBuckets = Math.max(targetBuckets, targetBuckets * Math.log(numValuesCollected));
429+
int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, maxBuckets, currentResult.roundingIdx,
418430
bucketInfo.roundings);
419431
Rounding rounding = bucketInfo.roundings[roundingIdx];
420432
// merge buckets using the new rounding
@@ -443,7 +455,7 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
443455
return new BucketReduceResult(list, rounding, roundingIdx);
444456
}
445457

446-
private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, Rounding[] roundings) {
458+
private int getAppropriateRounding(long minKey, long maxKey, double maxBuckets, int roundingIdx, Rounding[] roundings) {
447459
if (roundingIdx == roundings.length - 1) {
448460
return roundingIdx;
449461
}
@@ -458,18 +470,21 @@ private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, Ro
458470
currentKey = currentRounding.nextRoundingValue(currentKey);
459471
}
460472
currentRoundingIdx++;
461-
} while (requiredBuckets > targetBuckets && currentRoundingIdx < roundings.length);
473+
} while (requiredBuckets > maxBuckets && currentRoundingIdx < roundings.length);
462474
// The loop will increase past the correct rounding index here so we
463475
// need to subtract one to get the rounding index we need
464476
return currentRoundingIdx - 1;
465477
}
466478

467479
@Override
468480
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
481+
long numValuesCollected = aggregations.stream()
482+
.collect(Collectors.summingLong(agg -> ((InternalAutoDateHistogram) agg).getNumValuesCollected()));
483+
469484
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);
470485

471486
// adding empty buckets if needed
472-
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext);
487+
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, numValuesCollected, reduceContext);
473488

474489
// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
475490
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
@@ -478,7 +493,7 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
478493
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundings, reducedBucketsResult.roundingIdx,
479494
this.bucketInfo.emptySubAggregations);
480495

481-
return new InternalAutoDateHistogram(getName(), reducedBucketsResult.buckets, targetBuckets, bucketInfo, format,
496+
return new InternalAutoDateHistogram(getName(), reducedBucketsResult.buckets, targetBuckets, numValuesCollected, bucketInfo, format,
482497
pipelineAggregators(), getMetaData());
483498
}
484499

@@ -512,7 +527,8 @@ public InternalAggregation createAggregation(List<MultiBucketsAggregation.Bucket
512527
buckets2.add((Bucket) b);
513528
}
514529
buckets2 = Collections.unmodifiableList(buckets2);
515-
return new InternalAutoDateHistogram(name, buckets2, targetBuckets, bucketInfo, format, pipelineAggregators(), getMetaData());
530+
return new InternalAutoDateHistogram(name, buckets2, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators(),
531+
getMetaData());
516532
}
517533

518534
@Override
@@ -524,6 +540,8 @@ public Bucket createBucket(Number key, long docCount, InternalAggregations aggre
524540
protected boolean doEquals(Object obj) {
525541
InternalAutoDateHistogram that = (InternalAutoDateHistogram) obj;
526542
return Objects.equals(buckets, that.buckets)
543+
&& Objects.equals(targetBuckets, that.targetBuckets)
544+
&& Objects.equals(numValuesCollected, that.numValuesCollected)
527545
&& Objects.equals(format, that.format)
528546
&& Objects.equals(bucketInfo, that.bucketInfo);
529547
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testMatchAllDocs() throws IOException {
6969
Query query = new MatchAllDocsQuery();
7070

7171
testSearchCase(query, dataset,
72-
aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD),
72+
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD),
7373
histogram -> assertEquals(6, histogram.getBuckets().size())
7474
);
7575
testSearchAndReduceCase(query, dataset,
@@ -82,7 +82,7 @@ public void testSubAggregations() throws IOException {
8282
Query query = new MatchAllDocsQuery();
8383

8484
testSearchCase(query, dataset,
85-
aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD)
85+
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD)
8686
.subAggregation(AggregationBuilders.stats("stats").field(DATE_FIELD)),
8787
histogram -> {
8888
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
@@ -231,7 +231,7 @@ public void testAggregateWrongField() throws IOException {
231231

232232
public void testIntervalYear() throws IOException {
233233
testBothCases(LongPoint.newRangeQuery(INSTANT_FIELD, asLong("2015-01-01"), asLong("2017-12-31")), dataset,
234-
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
234+
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD),
235235
histogram -> {
236236
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
237237
assertEquals(3, buckets.size());
@@ -254,7 +254,7 @@ public void testIntervalYear() throws IOException {
254254
public void testIntervalMonth() throws IOException {
255255
testBothCases(new MatchAllDocsQuery(),
256256
Arrays.asList("2017-01-01", "2017-02-02", "2017-02-03", "2017-03-04", "2017-03-05", "2017-03-06"),
257-
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
257+
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD),
258258
histogram -> {
259259
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
260260
assertEquals(3, buckets.size());
@@ -349,7 +349,7 @@ public void testIntervalHour() throws IOException {
349349
"2017-02-01T16:48:00.000Z",
350350
"2017-02-01T16:59:00.000Z"
351351
),
352-
aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD),
352+
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD),
353353
histogram -> {
354354
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
355355
assertEquals(6, buckets.size());
@@ -441,7 +441,7 @@ public void testIntervalMinute() throws IOException {
441441
"2017-02-01T09:16:04.000Z",
442442
"2017-02-01T09:16:42.000Z"
443443
),
444-
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
444+
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD),
445445
histogram -> {
446446
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
447447
assertEquals(3, buckets.size());

server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ protected InternalAutoDateHistogram createTestInstance(String name,
7979
}
8080
InternalAggregations subAggregations = new InternalAggregations(Collections.emptyList());
8181
BucketInfo bucketInfo = new BucketInfo(roundings, randomIntBetween(0, roundings.length - 1), subAggregations);
82-
83-
return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData);
82+
long numValuesCollected = randomNonNegativeLong();
83+
return new InternalAutoDateHistogram(name, buckets, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators,
84+
metaData);
8485
}
8586

8687
@Override
@@ -121,10 +122,11 @@ protected InternalAutoDateHistogram mutateInstance(InternalAutoDateHistogram ins
121122
String name = instance.getName();
122123
List<InternalAutoDateHistogram.Bucket> buckets = instance.getBuckets();
123124
int targetBuckets = instance.getTargetBuckets();
125+
long numValuesCollected = instance.getNumValuesCollected();
124126
BucketInfo bucketInfo = instance.getBucketInfo();
125127
List<PipelineAggregator> pipelineAggregators = instance.pipelineAggregators();
126128
Map<String, Object> metaData = instance.getMetaData();
127-
switch (between(0, 3)) {
129+
switch (between(0, 5)) {
128130
case 0:
129131
name += randomAlphaOfLength(5);
130132
break;
@@ -145,9 +147,16 @@ protected InternalAutoDateHistogram mutateInstance(InternalAutoDateHistogram ins
145147
}
146148
metaData.put(randomAlphaOfLength(15), randomInt());
147149
break;
150+
case 4:
151+
targetBuckets += between(1, 100);
152+
break;
153+
case 5:
154+
numValuesCollected += between(1, 100);
155+
break;
148156
default:
149157
throw new AssertionError("Illegal randomisation branch");
150158
}
151-
return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData);
159+
return new InternalAutoDateHistogram(name, buckets, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators,
160+
metaData);
152161
}
153162
}

0 commit comments

Comments
 (0)