Skip to content

Commit bea2341

Browse files
authored
Save memory when date_histogram is not on top (#56921)
When `date_histogram` is a sub-aggregator it used to allocate a bunch of objects for every one of it's parent's buckets. This uses the data structures that we built in #55873 rework the `date_histogram` aggregator instead of all of the allocation. Part of #56487
1 parent d77388f commit bea2341

File tree

8 files changed

+265
-63
lines changed

8 files changed

+265
-63
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/10_histogram.yml

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,3 +418,55 @@ setup:
418418
- length: { aggregations.histo.buckets: 1 }
419419
- match: { aggregations.histo.buckets.0.key_as_string: "2015-12-31T17:00:00.000-07:00" }
420420
- match: { aggregations.histo.buckets.0.doc_count: 1 }
421+
422+
---
423+
"profiler":
424+
- skip:
425+
version: " - 7.99.99"
426+
reason: debug info added in 8.0.0 (will backported to 7.9.0)
427+
428+
- do:
429+
indices.create:
430+
index: test_2
431+
body:
432+
settings:
433+
number_of_replicas: 0
434+
number_of_shards: 1
435+
mappings:
436+
properties:
437+
date:
438+
type: date
439+
440+
- do:
441+
bulk:
442+
index: test_2
443+
refresh: true
444+
body:
445+
- '{"index": {}}'
446+
- '{"date": "2016-01-01"}'
447+
- '{"index": {}}'
448+
- '{"date": "2016-01-02"}'
449+
- '{"index": {}}'
450+
- '{"date": "2016-02-01"}'
451+
- '{"index": {}}'
452+
- '{"date": "2016-03-01"}'
453+
454+
- do:
455+
search:
456+
index: test_2
457+
body:
458+
size: 0
459+
profile: true
460+
aggs:
461+
histo:
462+
date_histogram:
463+
field: date
464+
calendar_interval: month
465+
- match: { hits.total.value: 4 }
466+
- length: { aggregations.histo.buckets: 3 }
467+
- match: { aggregations.histo.buckets.0.key_as_string: "2016-01-01T00:00:00.000Z" }
468+
- match: { aggregations.histo.buckets.0.doc_count: 2 }
469+
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator }
470+
- match: { profile.shards.0.aggregations.0.description: histo }
471+
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
472+
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 }

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import org.elasticsearch.common.util.BigArrays;
2323
import org.elasticsearch.common.util.IntArray;
2424
import org.elasticsearch.common.util.LongHash;
25+
import org.elasticsearch.search.aggregations.AggregationExecutionException;
2526
import org.elasticsearch.search.aggregations.Aggregator;
2627
import org.elasticsearch.search.aggregations.AggregatorBase;
2728
import org.elasticsearch.search.aggregations.AggregatorFactories;
2829
import org.elasticsearch.search.aggregations.AggregatorFactory;
2930
import org.elasticsearch.search.aggregations.InternalAggregation;
3031
import org.elasticsearch.search.aggregations.InternalAggregations;
3132
import org.elasticsearch.search.aggregations.LeafBucketCollector;
33+
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
3234
import org.elasticsearch.search.aggregations.support.AggregationPath;
3335
import org.elasticsearch.search.internal.SearchContext;
3436
import org.elasticsearch.search.sort.SortOrder;
@@ -340,6 +342,51 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(lo
340342

341343
return new InternalAggregation[] { resultBuilder.apply(buckets) };
342344
}
345+
346+
/**
347+
* Build aggregation results for an aggregator with a varying number of
348+
* {@code long} keyed buckets that is at the top level or wrapped in
349+
* {@link AggregatorFactory#asMultiBucketAggregator}.
350+
* @param owningBucketOrds owning bucket ordinals for which to build the results
351+
* @param bucketOrds hash of values to the bucket ordinal
352+
*/
353+
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongKeyedBucketOrds bucketOrds,
354+
BucketBuilderForVariable<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
355+
long totalOrdsToCollect = 0;
356+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
357+
totalOrdsToCollect += bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
358+
}
359+
if (totalOrdsToCollect > Integer.MAX_VALUE) {
360+
throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE
361+
+ "] buckets but attempted [" + totalOrdsToCollect + "]");
362+
}
363+
consumeBucketsAndMaybeBreak((int) totalOrdsToCollect);
364+
long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
365+
int b = 0;
366+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
367+
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
368+
while(ordsEnum.next()) {
369+
bucketOrdsToCollect[b++] = ordsEnum.ord();
370+
}
371+
}
372+
InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
373+
374+
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
375+
b = 0;
376+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
377+
List<B> buckets = new ArrayList<>((int) bucketOrds.size());
378+
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
379+
while(ordsEnum.next()) {
380+
if (bucketOrdsToCollect[b] != ordsEnum.ord()) {
381+
throw new AggregationExecutionException("Iteration order of [" + bucketOrds + "] changed without mutating. ["
382+
+ ordsEnum.ord() + "] should have been [" + bucketOrdsToCollect[b] + "]");
383+
}
384+
buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults[b++]));
385+
}
386+
results[ordIdx] = resultBuilder.apply(buckets);
387+
}
388+
return results;
389+
}
343390
@FunctionalInterface
344391
protected interface BucketBuilderForVariable<B> {
345392
B build(long bucketValue, int docCount, InternalAggregations subAggregationResults);

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregationSupplier.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,6 @@ Aggregator build(String name,
4646
DocValueFormat formatter,
4747
SearchContext aggregationContext,
4848
Aggregator parent,
49+
boolean collectsFromSingleBucket,
4950
Map<String, Object> metadata) throws IOException;
5051
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.common.Nullable;
2626
import org.elasticsearch.common.Rounding;
2727
import org.elasticsearch.common.lease.Releasables;
28-
import org.elasticsearch.common.util.LongHash;
2928
import org.elasticsearch.search.DocValueFormat;
3029
import org.elasticsearch.search.aggregations.Aggregator;
3130
import org.elasticsearch.search.aggregations.AggregatorFactories;
@@ -34,12 +33,14 @@
3433
import org.elasticsearch.search.aggregations.LeafBucketCollector;
3534
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
3635
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
36+
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
3737
import org.elasticsearch.search.aggregations.support.ValuesSource;
3838
import org.elasticsearch.search.internal.SearchContext;
3939

4040
import java.io.IOException;
4141
import java.util.Collections;
4242
import java.util.Map;
43+
import java.util.function.BiConsumer;
4344

4445
/**
4546
* An aggregator for date values. Every date is rounded down using a configured
@@ -62,13 +63,13 @@ class DateHistogramAggregator extends BucketsAggregator {
6263
private final long minDocCount;
6364
private final ExtendedBounds extendedBounds;
6465

65-
private final LongHash bucketOrds;
66+
private final LongKeyedBucketOrds bucketOrds;
6667

6768
DateHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding.Prepared preparedRounding,
6869
BucketOrder order, boolean keyed,
6970
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource valuesSource,
7071
DocValueFormat formatter, SearchContext aggregationContext,
71-
Aggregator parent, Map<String, Object> metadata) throws IOException {
72+
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
7273

7374
super(name, factories, aggregationContext, parent, metadata);
7475
this.rounding = rounding;
@@ -81,7 +82,7 @@ class DateHistogramAggregator extends BucketsAggregator {
8182
this.valuesSource = (ValuesSource.Numeric) valuesSource;
8283
this.formatter = formatter;
8384

84-
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
85+
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
8586
}
8687

8788
@Override
@@ -93,30 +94,26 @@ public ScoreMode scoreMode() {
9394
}
9495

9596
@Override
96-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
97-
final LeafBucketCollector sub) throws IOException {
97+
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
9898
if (valuesSource == null) {
9999
return LeafBucketCollector.NO_OP_COLLECTOR;
100100
}
101-
final SortedNumericDocValues values = valuesSource.longValues(ctx);
101+
SortedNumericDocValues values = valuesSource.longValues(ctx);
102102
return new LeafBucketCollectorBase(sub, values) {
103103
@Override
104-
public void collect(int doc, long bucket) throws IOException {
105-
assert bucket == 0;
104+
public void collect(int doc, long owningBucketOrd) throws IOException {
106105
if (values.advanceExact(doc)) {
107-
final int valuesCount = values.docValueCount();
106+
int valuesCount = values.docValueCount();
108107

109108
long previousRounded = Long.MIN_VALUE;
110109
for (int i = 0; i < valuesCount; ++i) {
111110
long value = values.nextValue();
112-
// We can use shardRounding here, which is sometimes more efficient
113-
// if daylight saving times are involved.
114111
long rounded = preparedRounding.round(value);
115112
assert rounded >= previousRounded;
116113
if (rounded == previousRounded) {
117114
continue;
118115
}
119-
long bucketOrd = bucketOrds.add(rounded);
116+
long bucketOrd = bucketOrds.add(owningBucketOrd, rounded);
120117
if (bucketOrd < 0) { // already seen
121118
bucketOrd = -1 - bucketOrd;
122119
collectExistingBucket(sub, doc, bucketOrd);
@@ -162,4 +159,9 @@ public InternalAggregation buildEmptyAggregation() {
162159
public void doClose() {
163160
Releasables.close(bucketOrds);
164161
}
162+
163+
@Override
164+
public void collectDebugInfo(BiConsumer<String, Object> add) {
165+
add.accept("total_buckets", bucketOrds.size());
166+
}
165167
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorFactory.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,26 +81,36 @@ protected Aggregator doCreateInternal(ValuesSource valuesSource,
8181
Aggregator parent,
8282
boolean collectsFromSingleBucket,
8383
Map<String, Object> metadata) throws IOException {
84-
if (collectsFromSingleBucket == false) {
85-
return asMultiBucketAggregator(this, searchContext, parent);
86-
}
8784
AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config.valueSourceType(),
8885
DateHistogramAggregationBuilder.NAME);
8986
if (aggregatorSupplier instanceof DateHistogramAggregationSupplier == false) {
9087
throw new AggregationExecutionException("Registry miss-match - expected DateHistogramAggregationSupplier, found [" +
9188
aggregatorSupplier.getClass().toString() + "]");
9289
}
9390
Rounding.Prepared preparedRounding = valuesSource.roundingPreparer(queryShardContext.getIndexReader()).apply(shardRounding);
94-
return ((DateHistogramAggregationSupplier) aggregatorSupplier).build(name, factories, rounding, preparedRounding, order, keyed,
95-
minDocCount, extendedBounds, valuesSource, config.format(), searchContext,
96-
parent, metadata);
91+
return ((DateHistogramAggregationSupplier) aggregatorSupplier).build(
92+
name,
93+
factories,
94+
rounding,
95+
preparedRounding,
96+
order,
97+
keyed,
98+
minDocCount,
99+
extendedBounds,
100+
valuesSource,
101+
config.format(),
102+
searchContext,
103+
parent,
104+
collectsFromSingleBucket,
105+
metadata
106+
);
97107
}
98108

99109
@Override
100110
protected Aggregator createUnmapped(SearchContext searchContext,
101111
Aggregator parent,
102112
Map<String, Object> metadata) throws IOException {
103113
return new DateHistogramAggregator(name, factories, rounding, null, order, keyed, minDocCount, extendedBounds,
104-
null, config.format(), searchContext, parent, metadata);
114+
null, config.format(), searchContext, parent, false, metadata);
105115
}
106116
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.common.Nullable;
2626
import org.elasticsearch.common.Rounding;
2727
import org.elasticsearch.common.lease.Releasables;
28-
import org.elasticsearch.common.util.LongHash;
2928
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
3029
import org.elasticsearch.index.mapper.RangeFieldMapper;
3130
import org.elasticsearch.index.mapper.RangeType;
@@ -37,13 +36,15 @@
3736
import org.elasticsearch.search.aggregations.LeafBucketCollector;
3837
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
3938
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
39+
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
4040
import org.elasticsearch.search.aggregations.support.ValuesSource;
4141
import org.elasticsearch.search.internal.SearchContext;
4242

4343
import java.io.IOException;
4444
import java.util.Collections;
4545
import java.util.List;
4646
import java.util.Map;
47+
import java.util.function.BiConsumer;
4748

4849
/**
4950
* An aggregator for date values. Every date is rounded down using a configured
@@ -66,13 +67,13 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
6667
private final long minDocCount;
6768
private final ExtendedBounds extendedBounds;
6869

69-
private final LongHash bucketOrds;
70+
private final LongKeyedBucketOrds bucketOrds;
7071

7172
DateRangeHistogramAggregator(String name, AggregatorFactories factories, Rounding rounding, Rounding.Prepared preparedRounding,
7273
BucketOrder order, boolean keyed,
7374
long minDocCount, @Nullable ExtendedBounds extendedBounds, @Nullable ValuesSource valuesSource,
7475
DocValueFormat formatter, SearchContext aggregationContext,
75-
Aggregator parent, Map<String, Object> metadata) throws IOException {
76+
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
7677

7778
super(name, factories, aggregationContext, parent, metadata);
7879
this.rounding = rounding;
@@ -89,7 +90,7 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
8990
+ "]");
9091
}
9192

92-
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
93+
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
9394
}
9495

9596
@Override
@@ -101,21 +102,19 @@ public ScoreMode scoreMode() {
101102
}
102103

103104
@Override
104-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
105-
final LeafBucketCollector sub) throws IOException {
105+
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
106106
if (valuesSource == null) {
107107
return LeafBucketCollector.NO_OP_COLLECTOR;
108108
}
109-
final SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
110-
final RangeType rangeType = valuesSource.rangeType();
109+
SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
110+
RangeType rangeType = valuesSource.rangeType();
111111
return new LeafBucketCollectorBase(sub, values) {
112112
@Override
113-
public void collect(int doc, long bucket) throws IOException {
114-
assert bucket == 0;
113+
public void collect(int doc, long owningBucketOrd) throws IOException {
115114
if (values.advanceExact(doc)) {
116115
// Is it possible for valuesCount to be > 1 here? Multiple ranges are encoded into the same BytesRef in the binary doc
117116
// values, so it isn't clear what we'd be iterating over.
118-
final int valuesCount = values.docValueCount();
117+
int valuesCount = values.docValueCount();
119118
assert valuesCount == 1 : "Value count for ranges should always be 1";
120119
long previousKey = Long.MIN_VALUE;
121120

@@ -124,7 +123,7 @@ public void collect(int doc, long bucket) throws IOException {
124123
List<RangeFieldMapper.Range> ranges = rangeType.decodeRanges(encodedRanges);
125124
long previousFrom = Long.MIN_VALUE;
126125
for (RangeFieldMapper.Range range : ranges) {
127-
final Long from = (Long) range.getFrom();
126+
Long from = (Long) range.getFrom();
128127
// The encoding should ensure that this assert is always true.
129128
assert from >= previousFrom : "Start of range not >= previous start";
130129
final Long to = (Long) range.getTo();
@@ -136,7 +135,7 @@ public void collect(int doc, long bucket) throws IOException {
136135
continue;
137136
}
138137
// Bucket collection identical to NumericHistogramAggregator, could be refactored
139-
long bucketOrd = bucketOrds.add(key);
138+
long bucketOrd = bucketOrds.add(owningBucketOrd, key);
140139
if (bucketOrd < 0) { // already seen
141140
bucketOrd = -1 - bucketOrd;
142141
collectExistingBucket(sub, doc, bucketOrd);
@@ -187,4 +186,9 @@ public InternalAggregation buildEmptyAggregation() {
187186
public void doClose() {
188187
Releasables.close(bucketOrds);
189188
}
189+
190+
@Override
191+
public void collectDebugInfo(BiConsumer<String, Object> add) {
192+
add.accept("total_buckets", bucketOrds.size());
193+
}
190194
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ private LongKeyedBucketOrds() {}
5757
public abstract long size();
5858

5959
/**
60-
* Build an iterator for buckets inside {@code owningBucketOrd}.
60+
* Build an iterator for buckets inside {@code owningBucketOrd} in order
61+
* of increasing ord.
6162
* <p>
6263
* When this is first returns it is "unpositioned" and you must call
6364
* {@link BucketOrdsEnum#next()} to move it to the first value.

0 commit comments

Comments
 (0)