Skip to content

Commit a451dd8

Browse files
Reduce merge map memory overhead in the Variable Width Histogram Aggregation (#59366) (#60171)
When a document which is distant from existing buckets gets collected, the `variable_width_histogram` will create a new bucket and then insert it into the ordered list of buckets. Currently, a new merge map array is created to move this bucket. This is very expensive as there might be thousands of buckets. This PR creates `mergeBuckets(UnaryOperator<Long> mergeMap)` methods in `BucketsAggregator` and `MergingBucketsDefferingCollector`, and updates the `variable_width_histogram` to use them. This eliminates the need to create an entire merge map array for each new bucket and reduces the memory overhead of the algorithm. Co-authored-by: James Dorfman <[email protected]>
1 parent 95c99ca commit a451dd8

File tree

5 files changed

+394
-20
lines changed

5 files changed

+394
-20
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.function.BiConsumer;
4646
import java.util.function.Function;
4747
import java.util.function.IntConsumer;
48+
import java.util.function.LongUnaryOperator;
4849
import java.util.function.ToLongFunction;
4950

5051
public abstract class BucketsAggregator extends AggregatorBase {
@@ -105,17 +106,35 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
105106
* ordinals and doc ID deltas.
106107
*
107108
* Refer to that method for documentation about the merge map.
109+
*
110+
* @deprecated use {@link mergeBuckets(long, LongUnaryOperator)}
108111
*/
112+
@Deprecated
109113
public final void mergeBuckets(long[] mergeMap, long newNumBuckets) {
114+
mergeBuckets(newNumBuckets, bucket -> mergeMap[Math.toIntExact(bucket)]);
115+
}
116+
117+
/**
118+
*
119+
* @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with.
120+
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
121+
*
122+
* This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(LongUnaryOperator)} to
123+
* merge the actual ordinals and doc ID deltas.
124+
*/
125+
public final void mergeBuckets(long newNumBuckets, LongUnaryOperator mergeMap){
110126
try (IntArray oldDocCounts = docCounts) {
111127
docCounts = bigArrays.newIntArray(newNumBuckets, true);
112128
docCounts.fill(0, newNumBuckets, 0);
113-
for (int i = 0; i < oldDocCounts.size(); i++) {
129+
for (long i = 0; i < oldDocCounts.size(); i++) {
114130
int docCount = oldDocCounts.get(i);
115131

132+
if(docCount == 0) continue;
133+
116134
// Skip any in the map which have been "removed", signified with -1
117-
if (docCount != 0 && mergeMap[i] != -1) {
118-
docCounts.increment(mergeMap[i], docCount);
135+
long destinationOrdinal = mergeMap.applyAsLong(i);
136+
if (destinationOrdinal != -1) {
137+
docCounts.increment(destinationOrdinal, docCount);
119138
}
120139
}
121140
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.util.ArrayList;
2727
import java.util.List;
28+
import java.util.function.LongUnaryOperator;
2829

2930
/**
3031
* A specialization of {@link BestBucketsDeferringCollector} that collects all
@@ -51,8 +52,24 @@ public MergingBucketsDeferringCollector(SearchContext context, boolean isGlobal)
5152
*
5253
* This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should
5354
* not be called unless there are actually changes to be made, to avoid unnecessary work.
55+
*
56+
* @deprecated use {@link mergeBuckets(LongUnaryOperator)}
5457
*/
58+
@Deprecated
5559
public void mergeBuckets(long[] mergeMap) {
60+
mergeBuckets(bucket -> mergeMap[Math.toIntExact(bucket)]);
61+
}
62+
63+
/**
64+
* Merges/prunes the existing bucket ordinals and docDeltas according to the provided mergeMap.
65+
*
66+
* @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with.
67+
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
68+
*
69+
* This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should
70+
* not be called unless there are actually changes to be made, to avoid unnecessary work.
71+
*/
72+
public void mergeBuckets(LongUnaryOperator mergeMap){
5673
List<Entry> newEntries = new ArrayList<>(entries.size());
5774
for (Entry sourceEntry : entries) {
5875
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
@@ -66,7 +83,7 @@ public void mergeBuckets(long[] mergeMap) {
6683
long delta = docDeltasItr.next();
6784

6885
// Only merge in the ordinal if it hasn't been "removed", signified with -1
69-
long ordinal = mergeMap[Math.toIntExact(bucket)];
86+
long ordinal = mergeMap.applyAsLong(bucket);
7087

7188
if (ordinal != -1) {
7289
newBuckets.add(ordinal);
@@ -102,7 +119,7 @@ public void mergeBuckets(long[] mergeMap) {
102119
long bucket = itr.next();
103120
assert docDeltasItr.hasNext();
104121
long delta = docDeltasItr.next();
105-
long ordinal = mergeMap[Math.toIntExact(bucket)];
122+
long ordinal = mergeMap.applyAsLong(bucket);
106123

107124
// Only merge in the ordinal if it hasn't been "removed", signified with -1
108125
if (ordinal != -1) {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.List;
5252
import java.util.Map;
5353
import java.util.function.Function;
54+
import java.util.function.LongUnaryOperator;
5455

5556
public class VariableWidthHistogramAggregator extends DeferableBucketAggregator {
5657

@@ -353,22 +354,23 @@ private void moveLastCluster(int index){
353354
clusterSizes.set(index, holdSize);
354355

355356
// Move the underlying buckets
356-
long[] mergeMap = new long[numClusters];
357-
for (int i = 0; i < index; i++) {
358-
// The clusters in range {0 ... idx - 1} don't move
359-
mergeMap[i] = i;
360-
}
361-
for (int i = index; i < numClusters - 1; i++) {
362-
// The clusters in range {index ... numClusters - 1} shift up
363-
mergeMap[i] = i + 1;
364-
}
365-
// Finally, the new cluster moves to index
366-
mergeMap[numClusters - 1] = index;
357+
LongUnaryOperator mergeMap = new LongUnaryOperator() {
358+
@Override
359+
public long applyAsLong(long i) {
360+
if(i < index) {
361+
// The clusters in range {0 ... idx - 1} don't move
362+
return i;
363+
}
364+
if(i == numClusters - 1) {
365+
// The new cluster moves to index
366+
return (long)index;
367+
}
368+
// The clusters in range {index ... numClusters - 1} shift forward
369+
return i + 1;
370+
}
371+
};
367372

368-
// TODO: Create a moveLastCluster() method in BucketsAggregator which is like BucketsAggregator::mergeBuckets,
369-
// except it doesn't require a merge map. This would be more efficient as there would be no need to create a
370-
// merge map on every call.
371-
mergeBuckets(mergeMap, numClusters);
373+
mergeBuckets(numClusters, mergeMap);
372374
if (deferringCollector != null) {
373375
deferringCollector.mergeBuckets(mergeMap);
374376
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations.bucket;
21+
22+
import org.apache.lucene.document.Document;
23+
import org.apache.lucene.document.SortedNumericDocValuesField;
24+
import org.apache.lucene.index.DirectoryReader;
25+
import org.apache.lucene.index.IndexReader;
26+
import org.apache.lucene.index.LeafReaderContext;
27+
import org.apache.lucene.index.IndexReader;
28+
import org.apache.lucene.index.RandomIndexWriter;
29+
import org.apache.lucene.search.IndexSearcher;
30+
import org.apache.lucene.store.Directory;
31+
import org.elasticsearch.common.breaker.CircuitBreaker;
32+
import org.elasticsearch.index.mapper.NumberFieldMapper;
33+
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
34+
import org.elasticsearch.search.aggregations.AggregatorFactories;
35+
import org.elasticsearch.search.aggregations.AggregatorTestCase;
36+
import org.elasticsearch.search.aggregations.InternalAggregation;
37+
import org.elasticsearch.search.aggregations.LeafBucketCollector;
38+
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
39+
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
40+
import org.elasticsearch.search.internal.SearchContext;
41+
42+
import java.io.IOException;
43+
44+
import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;
45+
46+
public class BucketsAggregatorTests extends AggregatorTestCase{
47+
48+
public BucketsAggregator buildMergeAggregator() throws IOException{
49+
try(Directory directory = newDirectory()) {
50+
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
51+
Document document = new Document();
52+
document.add(new SortedNumericDocValuesField("numeric", 0));
53+
indexWriter.addDocument(document);
54+
}
55+
56+
try (IndexReader indexReader = DirectoryReader.open(directory)) {
57+
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
58+
59+
SearchContext searchContext = createSearchContext(
60+
indexSearcher,
61+
createIndexSettings(),
62+
null,
63+
new MultiBucketConsumerService.MultiBucketConsumer(
64+
DEFAULT_MAX_BUCKETS,
65+
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
66+
),
67+
new NumberFieldMapper.NumberFieldType("test", NumberFieldMapper.NumberType.INTEGER)
68+
);
69+
70+
return new BucketsAggregator("test", AggregatorFactories.EMPTY, searchContext, null, null, null) {
71+
@Override
72+
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
73+
return null;
74+
}
75+
76+
@Override
77+
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
78+
return new InternalAggregation[0];
79+
}
80+
81+
@Override
82+
public InternalAggregation buildEmptyAggregation() {
83+
return null;
84+
}
85+
};
86+
}
87+
}
88+
}
89+
90+
public void testBucketMergeNoDelete() throws IOException{
91+
BucketsAggregator mergeAggregator = buildMergeAggregator();
92+
93+
mergeAggregator.grow(10);
94+
for(int i = 0; i < 10; i++){
95+
mergeAggregator.incrementBucketDocCount(i, i);
96+
}
97+
98+
mergeAggregator.mergeBuckets(10, bucket -> bucket % 5);
99+
100+
for(int i=0; i<5; i++) {
101+
// The i'th bucket should now have all docs whose index % 5 = i
102+
// This is buckets i and i + 5
103+
// i + (i+5) = 2*i + 5
104+
assertEquals(mergeAggregator.getDocCounts().get(i), (2 * i) + 5);
105+
}
106+
for(int i=5; i<10; i++){
107+
assertEquals(mergeAggregator.getDocCounts().get(i), 0);
108+
}
109+
}
110+
111+
public void testBucketMergeAndDelete() throws IOException{
112+
BucketsAggregator mergeAggregator = buildMergeAggregator();
113+
114+
mergeAggregator.grow(10);
115+
int sum = 0;
116+
for(int i = 0; i < 20; i++){
117+
mergeAggregator.incrementBucketDocCount(i, i);
118+
if(5 <= i && i < 15) {
119+
sum += i;
120+
}
121+
}
122+
123+
// Put the buckets in indices 5 ... 14 into bucket 5, and delete the rest of the buckets
124+
mergeAggregator.mergeBuckets(10, bucket -> (5 <= bucket && bucket < 15) ? 5 : -1);
125+
126+
assertEquals(mergeAggregator.getDocCounts().size(), 10); // Confirm that the 10 other buckets were deleted
127+
for(int i=0; i<10; i++){
128+
assertEquals(mergeAggregator.getDocCounts().get(i), i == 5 ? sum : 0);
129+
}
130+
}
131+
}

0 commit comments

Comments
 (0)