Skip to content

Commit f403c0c

Browse files
committed
Same memory when rare_terms is not on top
This uses the optimization that we started making in elastic#55873 for `rare_terms` to save a bit of memory when that aggregation is not on the top level.
1 parent b1e28d9 commit f403c0c

File tree

8 files changed

+373
-284
lines changed

8 files changed

+373
-284
lines changed

server/src/main/java/org/elasticsearch/common/util/SetBackedScalingCuckooFilter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@
3939
* An approximate set membership datastructure that scales as more unique values are inserted.
4040
* Can definitively say if a member does not exist (no false negatives), but may say an item exists
4141
* when it does not (has false positives). Similar in usage to a Bloom Filter.
42-
*
42+
* <p>
4343
* Internally, the datastructure maintains a Set of hashes up to a specified threshold. This provides
4444
* 100% accurate membership queries.
45-
*
45+
* <p>
4646
* When the threshold is breached, a list of CuckooFilters are created and used to track membership.
4747
* These filters are approximate similar to Bloom Filters.
48-
*
48+
* <p>
4949
* This datastructure scales as more values are inserted by growing the list of CuckooFilters.
5050
* Final size is dependent on the cardinality of data inserted, and the precision specified.
5151
*/

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -236,23 +236,6 @@ protected final <B> void buildSubAggsForAllBuckets(B[][] buckets,
236236
}
237237
}
238238

239-
/**
240-
* Build the sub aggregation results for a list of buckets and set them on
241-
* the buckets. This is usually used by aggregations that are selective
242-
* in which bucket they build. They use some mechanism of selecting a list
243-
* of buckets to build use this method to "finish" building the results.
244-
* @param buckets the buckets to finish building
245-
* @param bucketToOrd how to convert a bucket into an ordinal
246-
* @param setAggs how to set the sub-aggregation results on a bucket
247-
*/
248-
protected final <B> void buildSubAggsForBuckets(List<B> buckets,
249-
ToLongFunction<B> bucketToOrd, BiConsumer<B, InternalAggregations> setAggs) throws IOException {
250-
InternalAggregations[] results = buildSubAggsForBuckets(buckets.stream().mapToLong(bucketToOrd).toArray());
251-
for (int i = 0; i < buckets.size(); i++) {
252-
setAggs.accept(buckets.get(i), results[i]);
253-
}
254-
}
255-
256239
/**
257240
* Build aggregation results for an aggregator that has a fixed number of buckets per owning ordinal.
258241
* @param <B> the type of the bucket

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractRareTermsAggregator.java

Lines changed: 27 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,46 +24,47 @@
2424
import org.elasticsearch.search.aggregations.Aggregator;
2525
import org.elasticsearch.search.aggregations.AggregatorFactories;
2626
import org.elasticsearch.search.aggregations.BucketOrder;
27-
import org.elasticsearch.search.aggregations.LeafBucketCollector;
2827
import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator;
2928
import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector;
3029
import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
3130
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregator;
32-
import org.elasticsearch.search.aggregations.support.ValuesSource;
3331
import org.elasticsearch.search.internal.SearchContext;
3432

3533
import java.io.IOException;
3634
import java.util.Map;
3735
import java.util.Random;
3836

39-
public abstract class AbstractRareTermsAggregator<T extends ValuesSource,
40-
U extends IncludeExclude.Filter, V> extends DeferableBucketAggregator {
37+
public abstract class AbstractRareTermsAggregator extends DeferableBucketAggregator {
4138

4239
static final BucketOrder ORDER = BucketOrder.compound(BucketOrder.count(true), BucketOrder.key(true)); // sort by count ascending
4340

4441
protected final long maxDocCount;
45-
protected final double precision;
42+
private final double precision;
4643
protected final DocValueFormat format;
47-
protected final T valuesSource;
48-
protected final U includeExclude;
49-
50-
MergingBucketsDeferringCollector deferringCollector;
51-
final SetBackedScalingCuckooFilter filter;
52-
53-
AbstractRareTermsAggregator(String name, AggregatorFactories factories, SearchContext context,
54-
Aggregator parent, Map<String, Object> metadata, long maxDocCount, double precision,
55-
DocValueFormat format, T valuesSource, U includeExclude) throws IOException {
44+
protected final boolean collectsFromSingleBucket;
45+
private final int filterSeed;
46+
47+
protected MergingBucketsDeferringCollector deferringCollector;
48+
49+
AbstractRareTermsAggregator(
50+
String name,
51+
AggregatorFactories factories,
52+
SearchContext context,
53+
Aggregator parent,
54+
Map<String, Object> metadata,
55+
long maxDocCount,
56+
double precision,
57+
DocValueFormat format,
58+
boolean collectsFromSingleBucket
59+
) throws IOException {
5660
super(name, factories, context, parent, metadata);
5761

58-
// We seed the rng with the ShardID so results are deterministic and don't change randomly
59-
this.filter = new SetBackedScalingCuckooFilter(10000, new Random(context.indexShard().shardId().hashCode()), precision);
60-
this.filter.registerBreaker(this::addRequestCircuitBreakerBytes);
61-
6262
this.maxDocCount = maxDocCount;
6363
this.precision = precision;
6464
this.format = format;
65-
this.valuesSource = valuesSource;
66-
this.includeExclude = includeExclude;
65+
this.collectsFromSingleBucket = collectsFromSingleBucket;
66+
// We seed the rng with the ShardID so results are deterministic and don't change randomly
67+
this.filterSeed = context.indexShard().shardId().hashCode();
6768
String scoringAgg = subAggsNeedScore();
6869
String nestedAgg = descendsFromNestedAggregator(parent);
6970
if (scoringAgg != null && nestedAgg != null) {
@@ -81,6 +82,12 @@ public abstract class AbstractRareTermsAggregator<T extends ValuesSource,
8182
}
8283
}
8384

85+
protected SetBackedScalingCuckooFilter newFilter() {
86+
SetBackedScalingCuckooFilter filter = new SetBackedScalingCuckooFilter(10000, new Random(filterSeed), precision);
87+
filter.registerBreaker(this::addRequestCircuitBreakerBytes);
88+
return filter;
89+
}
90+
8491
@Override
8592
protected boolean shouldDefer(Aggregator aggregator) {
8693
return true;
@@ -110,21 +117,4 @@ private String descendsFromNestedAggregator(Aggregator parent) {
110117
}
111118
return null;
112119
}
113-
114-
protected void doCollect(LeafBucketCollector subCollector, V val, int docId) throws IOException {
115-
long bucketOrdinal = addValueToOrds(val);
116-
117-
if (bucketOrdinal < 0) { // already seen
118-
bucketOrdinal = -1 - bucketOrdinal;
119-
collectExistingBucket(subCollector, docId, bucketOrdinal);
120-
} else {
121-
collectBucket(subCollector, docId, bucketOrdinal);
122-
}
123-
}
124-
125-
/**
126-
* Add's the value to the ordinal map. Return the newly allocated id if it wasn't in the ordinal map yet,
127-
* or <code>-1-id</code> if it was already present
128-
*/
129-
abstract long addValueToOrds(V value);
130120
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java

Lines changed: 114 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import org.apache.lucene.index.LeafReaderContext;
2222
import org.apache.lucene.index.SortedNumericDocValues;
23-
import org.apache.lucene.util.CollectionUtil;
2423
import org.elasticsearch.common.lease.Releasables;
2524
import org.elasticsearch.common.util.LongHash;
25+
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
2626
import org.elasticsearch.search.DocValueFormat;
2727
import org.elasticsearch.search.aggregations.Aggregator;
2828
import org.elasticsearch.search.aggregations.AggregatorFactories;
@@ -34,6 +34,7 @@
3434

3535
import java.io.IOException;
3636
import java.util.ArrayList;
37+
import java.util.Arrays;
3738
import java.util.List;
3839
import java.util.Map;
3940

@@ -42,111 +43,144 @@
4243
/**
4344
* An aggregator that finds "rare" string values (e.g. terms agg that orders ascending)
4445
*/
45-
public class LongRareTermsAggregator extends AbstractRareTermsAggregator<ValuesSource.Numeric, IncludeExclude.LongFilter, Long> {
46-
47-
protected LongHash bucketOrds;
48-
49-
LongRareTermsAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource, DocValueFormat format,
50-
SearchContext aggregationContext, Aggregator parent, IncludeExclude.LongFilter longFilter,
51-
int maxDocCount, double precision, Map<String, Object> metadata) throws IOException {
52-
super(name, factories, aggregationContext, parent, metadata, maxDocCount, precision, format, valuesSource, longFilter);
53-
this.bucketOrds = new LongHash(1, aggregationContext.bigArrays());
46+
public class LongRareTermsAggregator extends AbstractRareTermsAggregator {
47+
private final ValuesSource.Numeric valuesSource;
48+
private final IncludeExclude.LongFilter filter;
49+
private final LongKeyedBucketOrds bucketOrds;
50+
51+
LongRareTermsAggregator(
52+
String name,
53+
AggregatorFactories factories,
54+
ValuesSource.Numeric valuesSource,
55+
DocValueFormat format,
56+
SearchContext aggregationContext,
57+
Aggregator parent,
58+
IncludeExclude.LongFilter filter,
59+
int maxDocCount,
60+
double precision,
61+
boolean collectsFromSingleBucket,
62+
Map<String, Object> metadata
63+
) throws IOException {
64+
super(
65+
name,
66+
factories,
67+
aggregationContext,
68+
parent,
69+
metadata,
70+
maxDocCount,
71+
precision,
72+
format,
73+
collectsFromSingleBucket
74+
);
75+
this.valuesSource = valuesSource;
76+
this.filter = filter;
77+
this.bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
5478
}
5579

5680
protected SortedNumericDocValues getValues(ValuesSource.Numeric valuesSource, LeafReaderContext ctx) throws IOException {
5781
return valuesSource.longValues(ctx);
5882
}
5983

6084
@Override
61-
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
62-
final LeafBucketCollector sub) throws IOException {
63-
final SortedNumericDocValues values = getValues(valuesSource, ctx);
85+
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
86+
SortedNumericDocValues values = getValues(valuesSource, ctx);
6487
return new LeafBucketCollectorBase(sub, values) {
65-
6688
@Override
67-
public void collect(int docId, long owningBucketOrdinal) throws IOException {
68-
if (values.advanceExact(docId)) {
69-
final int valuesCount = values.docValueCount();
70-
long previous = Long.MAX_VALUE;
71-
for (int i = 0; i < valuesCount; ++i) {
72-
final long val = values.nextValue();
73-
if (previous != val || i == 0) {
74-
if ((includeExclude == null) || (includeExclude.accept(val))) {
75-
doCollect(sub, val, docId);
76-
}
77-
previous = val;
78-
}
89+
public void collect(int docId, long owningBucketOrd) throws IOException {
90+
if (false == values.advanceExact(docId)) {
91+
return;
92+
}
93+
int valuesCount = values.docValueCount();
94+
long previous = Long.MAX_VALUE;
95+
for (int i = 0; i < valuesCount; ++i) {
96+
long val = values.nextValue();
97+
if (i == 0 && previous == val) {
98+
continue;
99+
}
100+
previous = val;
101+
if (filter != null && false == filter.accept(val)) {
102+
continue;
103+
}
104+
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
105+
if (bucketOrdinal < 0) { // already seen
106+
bucketOrdinal = -1 - bucketOrdinal;
107+
collectExistingBucket(sub, docId, bucketOrdinal);
108+
} else {
109+
collectBucket(sub, docId, bucketOrdinal);
79110
}
80111
}
81112
}
82113
};
83114
}
84115

85116
@Override
86-
long addValueToOrds(Long value) {
87-
return bucketOrds.add(value);
88-
}
89-
90-
/**
91-
* Merges the ordinals to a minimal set, populates the CuckooFilter and
92-
* generates a final set of buckets.
93-
*
94-
* If a term is below the maxDocCount, it is turned into a Bucket. Otherwise,
95-
* the term is added to the filter, and pruned from the ordinal map. If
96-
* necessary the ordinal map is merged down to a minimal set to remove deletions
97-
*/
98-
private List<LongRareTerms.Bucket> buildSketch() {
99-
long deletionCount = 0;
100-
LongHash newBucketOrds = new LongHash(1, context.bigArrays());
101-
List<LongRareTerms.Bucket> buckets = new ArrayList<>();
102-
try (LongHash oldBucketOrds = bucketOrds) {
103-
104-
long[] mergeMap = new long[(int) oldBucketOrds.size()];
105-
for (int i = 0; i < oldBucketOrds.size(); i++) {
106-
long oldKey = oldBucketOrds.get(i);
107-
long newBucketOrd = -1;
108-
109-
long docCount = bucketDocCount(i);
110-
// if the key is below threshold, reinsert into the new ords
111-
if (docCount <= maxDocCount) {
112-
newBucketOrd = newBucketOrds.add(oldKey);
113-
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(oldKey, docCount, null, format);
114-
bucket.bucketOrd = newBucketOrd;
115-
buckets.add(bucket);
116-
} else {
117-
// Make a note when one of the ords has been deleted
118-
deletionCount += 1;
119-
filter.add(oldKey);
117+
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
118+
/*
119+
* Collect the list of buckets, populate the filter with terms
120+
* that are too frequent, and figure out how to merge sub-buckets.
121+
*/
122+
LongRareTerms.Bucket[][] rarestPerOrd = new LongRareTerms.Bucket[owningBucketOrds.length][];
123+
SetBackedScalingCuckooFilter[] filters = new SetBackedScalingCuckooFilter[owningBucketOrds.length];
124+
long keepCount = 0;
125+
long[] mergeMap = new long[(int) bucketOrds.size()];
126+
Arrays.fill(mergeMap, -1);
127+
long size = 0;
128+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
129+
try (LongHash ordsToCollect = new LongHash(1, context.bigArrays())) {
130+
filters[ordIdx] = newFilter();
131+
List<LongRareTerms.Bucket> buckets = new ArrayList<>();
132+
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
133+
while (ordsEnum.next()) {
134+
long docCount = bucketDocCount(ordsEnum.ord());
135+
// if the key is below threshold, reinsert into the new ords
136+
if (docCount <= maxDocCount) {
137+
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(ordsEnum.value(), docCount, null, format);
138+
bucket.bucketOrd = mergeMap[(int) ordsEnum.ord()] = size + ordsToCollect.add(ordsEnum.value());
139+
buckets.add(bucket);
140+
keepCount++;
141+
} else {
142+
filters[ordIdx].add(ordsEnum.value());
143+
}
120144
}
121-
mergeMap[i] = newBucketOrd;
145+
rarestPerOrd[ordIdx] = buckets.toArray(LongRareTerms.Bucket[]::new);
146+
size += ordsToCollect.size();
122147
}
148+
}
123149

124-
// Only merge/delete the ordinals if we have actually deleted one,
125-
// to save on some redundant work
126-
if (deletionCount > 0) {
127-
mergeBuckets(mergeMap, newBucketOrds.size());
128-
if (deferringCollector != null) {
129-
deferringCollector.mergeBuckets(mergeMap);
130-
}
150+
/*
151+
* Only merge/delete the ordinals if we have actually deleted one,
152+
* to save on some redundant work.
153+
*/
154+
if (keepCount != mergeMap.length) {
155+
mergeBuckets(mergeMap, size);
156+
if (deferringCollector != null) {
157+
deferringCollector.mergeBuckets(mergeMap);
131158
}
132159
}
133-
bucketOrds = newBucketOrds;
134-
return buckets;
135-
}
136160

137-
@Override
138-
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
139-
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
140-
List<LongRareTerms.Bucket> buckets = buildSketch();
141-
buildSubAggsForBuckets(buckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
142-
143-
CollectionUtil.introSort(buckets, ORDER.comparator());
144-
return new InternalAggregation[] {new LongRareTerms(name, ORDER, metadata(), format, buckets, maxDocCount, filter)};
161+
/*
162+
* Now build the results!
163+
*/
164+
buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
165+
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
166+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
167+
Arrays.sort(rarestPerOrd[ordIdx], ORDER.comparator());
168+
result[ordIdx] = new LongRareTerms(
169+
name,
170+
ORDER,
171+
metadata(),
172+
format,
173+
Arrays.asList(rarestPerOrd[ordIdx]),
174+
maxDocCount,
175+
filters[ordIdx]
176+
);
177+
}
178+
return result;
145179
}
146180

147181
@Override
148182
public InternalAggregation buildEmptyAggregation() {
149-
return new LongRareTerms(name, ORDER, metadata(), format, emptyList(), 0, filter);
183+
return new LongRareTerms(name, ORDER, metadata(), format, emptyList(), 0, newFilter());
150184
}
151185

152186
@Override

0 commit comments

Comments
 (0)