Skip to content

Commit 94cb1e2

Browse files
authored
Same memory when geo aggregations are not on top (#57483)
Saves memory when the `geotile_grid` and `geohash_grid` are not on the top level by using the `LongKeyedBucketOrds` we built in #55873.
1 parent b072f5f commit 94cb1e2

File tree

13 files changed

+144
-74
lines changed

13 files changed

+144
-74
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
import org.apache.lucene.index.SortedNumericDocValues;
2323
import org.apache.lucene.search.ScoreMode;
2424
import org.elasticsearch.common.lease.Releasables;
25-
import org.elasticsearch.common.util.LongHash;
2625
import org.elasticsearch.search.aggregations.Aggregator;
2726
import org.elasticsearch.search.aggregations.AggregatorFactories;
2827
import org.elasticsearch.search.aggregations.InternalAggregation;
2928
import org.elasticsearch.search.aggregations.LeafBucketCollector;
3029
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
3130
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
31+
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
3232
import org.elasticsearch.search.aggregations.support.ValuesSource;
3333
import org.elasticsearch.search.internal.SearchContext;
3434

@@ -46,17 +46,16 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
4646
protected final int requiredSize;
4747
protected final int shardSize;
4848
protected final ValuesSource.Numeric valuesSource;
49-
protected final LongHash bucketOrds;
50-
protected SortedNumericDocValues values;
49+
protected final LongKeyedBucketOrds bucketOrds;
5150

5251
GeoGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
5352
int requiredSize, int shardSize, SearchContext aggregationContext,
54-
Aggregator parent, Map<String, Object> metadata) throws IOException {
53+
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
5554
super(name, factories, aggregationContext, parent, metadata);
5655
this.valuesSource = valuesSource;
5756
this.requiredSize = requiredSize;
5857
this.shardSize = shardSize;
59-
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
58+
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
6059
}
6160

6261
@Override
@@ -70,19 +69,18 @@ public ScoreMode scoreMode() {
7069
@Override
7170
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
7271
final LeafBucketCollector sub) throws IOException {
73-
values = valuesSource.longValues(ctx);
72+
SortedNumericDocValues values = valuesSource.longValues(ctx);
7473
return new LeafBucketCollectorBase(sub, null) {
7574
@Override
76-
public void collect(int doc, long bucket) throws IOException {
77-
assert bucket == 0;
75+
public void collect(int doc, long owningBucketOrd) throws IOException {
7876
if (values.advanceExact(doc)) {
7977
final int valuesCount = values.docValueCount();
8078

8179
long previous = Long.MAX_VALUE;
8280
for (int i = 0; i < valuesCount; ++i) {
8381
final long val = values.nextValue();
8482
if (previous != val || i == 0) {
85-
long bucketOrdinal = bucketOrds.add(val);
83+
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
8684
if (bucketOrdinal < 0) { // already seen
8785
bucketOrdinal = -1 - bucketOrdinal;
8886
collectExistingBucket(sub, doc, bucketOrdinal);
@@ -108,31 +106,38 @@ public void collect(int doc, long bucket) throws IOException {
108106

109107
@Override
110108
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
111-
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
112-
final int size = (int) Math.min(bucketOrds.size(), shardSize);
113-
consumeBucketsAndMaybeBreak(size);
114-
115-
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
116-
InternalGeoGridBucket spare = null;
117-
for (long i = 0; i < bucketOrds.size(); i++) {
118-
if (spare == null) {
119-
spare = newEmptyBucket();
109+
InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
110+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
111+
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
112+
consumeBucketsAndMaybeBreak(size);
113+
114+
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
115+
InternalGeoGridBucket spare = null;
116+
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
117+
while (ordsEnum.next()) {
118+
if (spare == null) {
119+
spare = newEmptyBucket();
120+
}
121+
122+
// need a special function to keep the source bucket
123+
// up-to-date so it can get the appropriate key
124+
spare.hashAsLong = ordsEnum.value();
125+
spare.docCount = bucketDocCount(ordsEnum.ord());
126+
spare.bucketOrd = ordsEnum.ord();
127+
spare = ordered.insertWithOverflow(spare);
128+
}
129+
130+
topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()];
131+
for (int i = ordered.size() - 1; i >= 0; --i) {
132+
topBucketsPerOrd[ordIdx][i] = ordered.pop();
120133
}
121-
122-
// need a special function to keep the source bucket
123-
// up-to-date so it can get the appropriate key
124-
spare.hashAsLong = bucketOrds.get(i);
125-
spare.docCount = bucketDocCount(i);
126-
spare.bucketOrd = i;
127-
spare = ordered.insertWithOverflow(spare);
128134
}
129-
130-
final InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
131-
for (int i = ordered.size() - 1; i >= 0; --i) {
132-
list[i] = ordered.pop();
135+
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
136+
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
137+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
138+
results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd[ordIdx]), metadata());
133139
}
134-
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
135-
return new InternalAggregation[] {buildAggregation(name, requiredSize, Arrays.asList(list), metadata())};
140+
return results;
136141
}
137142

138143
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ public class GeoHashGridAggregator extends GeoGridAggregator<InternalGeoHashGrid
3535

3636
public GeoHashGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
3737
int requiredSize, int shardSize, SearchContext aggregationContext,
38-
Aggregator parent, Map<String, Object> metadata) throws IOException {
39-
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, metadata);
38+
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
39+
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, collectsFromSingleBucket, metadata);
4040
}
4141

4242
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoHashGridAggregatorFactory.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,18 @@ protected Aggregator doCreateInternal(final ValuesSource valuesSource,
8585
throw new AggregationExecutionException("Registry miss-match - expected "
8686
+ GeoGridAggregatorSupplier.class.getName() + ", found [" + aggregatorSupplier.getClass().toString() + "]");
8787
}
88-
if (collectsFromSingleBucket == false) {
89-
return asMultiBucketAggregator(this, searchContext, parent);
90-
}
9188
return ((GeoGridAggregatorSupplier) aggregatorSupplier).build(name, factories, valuesSource, precision, geoBoundingBox,
92-
requiredSize, shardSize, searchContext, parent, metadata);
89+
requiredSize, shardSize, searchContext, parent, collectsFromSingleBucket, metadata);
9390
}
9491

9592
static void registerAggregators(ValuesSourceRegistry.Builder builder) {
9693
builder.register(GeoHashGridAggregationBuilder.NAME, CoreValuesSourceType.GEOPOINT,
9794
(GeoGridAggregatorSupplier) (name, factories, valuesSource, precision, geoBoundingBox, requiredSize, shardSize,
98-
aggregationContext, parent, metadata) -> {
95+
aggregationContext, parent, collectsFromSingleBucket, metadata) -> {
9996
CellIdSource cellIdSource = new CellIdSource((ValuesSource.GeoPoint) valuesSource, precision, geoBoundingBox,
10097
Geohash::longEncode);
10198
return new GeoHashGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, aggregationContext,
102-
parent, metadata);
99+
parent, collectsFromSingleBucket, metadata);
103100
});
104101
}
105102
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoTileGridAggregator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ public class GeoTileGridAggregator extends GeoGridAggregator<InternalGeoTileGrid
3636

3737
public GeoTileGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
3838
int requiredSize, int shardSize, SearchContext aggregationContext,
39-
Aggregator parent, Map<String, Object> metadata) throws IOException {
40-
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, metadata);
39+
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
40+
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, collectsFromSingleBucket, metadata);
4141
}
4242

4343
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoTileGridAggregatorFactory.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,21 +83,18 @@ protected Aggregator doCreateInternal(final ValuesSource valuesSource,
8383
throw new AggregationExecutionException("Registry miss-match - expected "
8484
+ GeoGridAggregatorSupplier.class.getName() + ", found [" + aggregatorSupplier.getClass().toString() + "]");
8585
}
86-
if (collectsFromSingleBucket == false) {
87-
return asMultiBucketAggregator(this, searchContext, parent);
88-
}
8986
return ((GeoGridAggregatorSupplier) aggregatorSupplier).build(name, factories, valuesSource, precision, geoBoundingBox,
90-
requiredSize, shardSize, searchContext, parent, metadata);
87+
requiredSize, shardSize, searchContext, parent, collectsFromSingleBucket, metadata);
9188
}
9289

9390
static void registerAggregators(ValuesSourceRegistry.Builder builder) {
9491
builder.register(GeoTileGridAggregationBuilder.NAME, CoreValuesSourceType.GEOPOINT,
9592
(GeoGridAggregatorSupplier) (name, factories, valuesSource, precision, geoBoundingBox, requiredSize, shardSize,
96-
aggregationContext, parent, metadata) -> {
93+
aggregationContext, parent, collectsFromSingleBucket, metadata) -> {
9794
CellIdSource cellIdSource = new CellIdSource((ValuesSource.GeoPoint) valuesSource, precision, geoBoundingBox,
9895
GeoTileUtils::longEncode);
9996
return new GeoTileGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, aggregationContext,
100-
parent, metadata);
97+
parent, collectsFromSingleBucket, metadata);
10198
});
10299
}
103100
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoGridAggregatorSupplier.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,17 @@
3333
@FunctionalInterface
3434
public interface GeoGridAggregatorSupplier extends AggregatorSupplier {
3535

36-
GeoGridAggregator build(String name, AggregatorFactories factories, ValuesSource valuesSource,
37-
int precision, GeoBoundingBox geoBoundingBox, int requiredSize, int shardSize,
38-
SearchContext aggregationContext, Aggregator parent, Map<String, Object> metadata) throws IOException;
36+
GeoGridAggregator build(
37+
String name,
38+
AggregatorFactories factories,
39+
ValuesSource valuesSource,
40+
int precision,
41+
GeoBoundingBox geoBoundingBox,
42+
int requiredSize,
43+
int shardSize,
44+
SearchContext aggregationContext,
45+
Aggregator parent,
46+
boolean collectsFromSingleBucket,
47+
Map<String, Object> metadata
48+
) throws IOException;
3949
}

server/src/test/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregatorTestCase.java

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@
1919
package org.elasticsearch.search.aggregations.bucket.geogrid;
2020

2121
import org.apache.lucene.document.LatLonDocValuesField;
22+
import org.apache.lucene.document.SortedSetDocValuesField;
2223
import org.apache.lucene.geo.GeoEncodingUtils;
2324
import org.apache.lucene.index.DirectoryReader;
2425
import org.apache.lucene.index.IndexReader;
26+
import org.apache.lucene.index.IndexableField;
2527
import org.apache.lucene.index.RandomIndexWriter;
2628
import org.apache.lucene.search.IndexSearcher;
2729
import org.apache.lucene.search.MatchAllDocsQuery;
2830
import org.apache.lucene.search.Query;
2931
import org.apache.lucene.store.Directory;
32+
import org.apache.lucene.util.BytesRef;
3033
import org.elasticsearch.common.CheckedConsumer;
3134
import org.elasticsearch.common.geo.GeoBoundingBox;
3235
import org.elasticsearch.common.geo.GeoBoundingBoxTests;
@@ -36,6 +39,8 @@
3639
import org.elasticsearch.search.aggregations.AggregationBuilder;
3740
import org.elasticsearch.search.aggregations.Aggregator;
3841
import org.elasticsearch.search.aggregations.AggregatorTestCase;
42+
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
43+
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
3944
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
4045
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
4146
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
@@ -48,6 +53,7 @@
4853
import java.util.List;
4954
import java.util.Map;
5055
import java.util.Set;
56+
import java.util.TreeMap;
5157
import java.util.function.Consumer;
5258
import java.util.function.Function;
5359

@@ -122,18 +128,9 @@ public void testWithSeveralDocs() throws IOException {
122128
List<LatLonDocValuesField> points = new ArrayList<>();
123129
Set<String> distinctHashesPerDoc = new HashSet<>();
124130
for (int pointId = 0; pointId < numPoints; pointId++) {
125-
double lat = (180d * randomDouble()) - 90d;
126-
double lng = (360d * randomDouble()) - 180d;
127-
128-
// Precision-adjust longitude/latitude to avoid wrong bucket placement
129-
// Internally, lat/lng get converted to 32 bit integers, loosing some precision.
130-
// This does not affect geohashing because geohash uses the same algorithm,
131-
// but it does affect other bucketing algos, thus we need to do the same steps here.
132-
lng = GeoEncodingUtils.decodeLongitude(GeoEncodingUtils.encodeLongitude(lng));
133-
lat = GeoEncodingUtils.decodeLatitude(GeoEncodingUtils.encodeLatitude(lat));
134-
135-
points.add(new LatLonDocValuesField(FIELD_NAME, lat, lng));
136-
String hash = hashAsString(lng, lat, precision);
131+
double[] latLng = randomLatLng();
132+
points.add(new LatLonDocValuesField(FIELD_NAME, latLng[0], latLng[1]));
133+
String hash = hashAsString(latLng[1], latLng[0], precision);
137134
if (distinctHashesPerDoc.contains(hash) == false) {
138135
expectedCountPerGeoHash.put(hash, expectedCountPerGeoHash.getOrDefault(hash, 0) + 1);
139136
}
@@ -150,6 +147,60 @@ public void testWithSeveralDocs() throws IOException {
150147
});
151148
}
152149

150+
public void testAsSubAgg() throws IOException {
151+
int precision = randomPrecision();
152+
Map<String, Map<String, Long>> expectedCountPerTPerGeoHash = new TreeMap<>();
153+
List<List<IndexableField>> docs = new ArrayList<>();
154+
for (int i = 0; i < 30; i++) {
155+
String t = randomAlphaOfLength(1);
156+
double[] latLng = randomLatLng();
157+
158+
List<IndexableField> doc = new ArrayList<>();
159+
docs.add(doc);
160+
doc.add(new LatLonDocValuesField(FIELD_NAME, latLng[0], latLng[1]));
161+
doc.add(new SortedSetDocValuesField("t", new BytesRef(t)));
162+
163+
String hash = hashAsString(latLng[1], latLng[0], precision);
164+
Map<String, Long> expectedCountPerGeoHash = expectedCountPerTPerGeoHash.get(t);
165+
if (expectedCountPerGeoHash == null) {
166+
expectedCountPerGeoHash = new TreeMap<>();
167+
expectedCountPerTPerGeoHash.put(t, expectedCountPerGeoHash);
168+
}
169+
expectedCountPerGeoHash.put(hash, expectedCountPerGeoHash.getOrDefault(hash, 0L) + 1);
170+
}
171+
CheckedConsumer<RandomIndexWriter, IOException> buildIndex = iw -> iw.addDocuments(docs);
172+
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("t").field("t")
173+
.size(expectedCountPerTPerGeoHash.size())
174+
.subAggregation(createBuilder("gg").field(FIELD_NAME).precision(precision));
175+
Consumer<StringTerms> verify = (terms) -> {
176+
Map<String, Map<String, Long>> actual = new TreeMap<>();
177+
for (StringTerms.Bucket tb: terms.getBuckets()) {
178+
InternalGeoGrid<?> gg = tb.getAggregations().get("gg");
179+
Map<String, Long> sub = new TreeMap<>();
180+
for (InternalGeoGridBucket<?> ggb : gg.getBuckets()) {
181+
sub.put(ggb.getKeyAsString(), ggb.getDocCount());
182+
}
183+
actual.put(tb.getKeyAsString(), sub);
184+
}
185+
assertThat(actual, equalTo(expectedCountPerTPerGeoHash));
186+
};
187+
testCase(aggregationBuilder, new MatchAllDocsQuery(), buildIndex, verify, keywordField("t"), geoPointField(FIELD_NAME));
188+
}
189+
190+
private double[] randomLatLng() {
191+
double lat = (180d * randomDouble()) - 90d;
192+
double lng = (360d * randomDouble()) - 180d;
193+
194+
// Precision-adjust longitude/latitude to avoid wrong bucket placement
195+
// Internally, lat/lng get converted to 32 bit integers, loosing some precision.
196+
// This does not affect geohashing because geohash uses the same algorithm,
197+
// but it does affect other bucketing algos, thus we need to do the same steps here.
198+
lng = GeoEncodingUtils.decodeLongitude(GeoEncodingUtils.encodeLongitude(lng));
199+
lat = GeoEncodingUtils.decodeLatitude(GeoEncodingUtils.encodeLatitude(lat));
200+
201+
return new double[] {lat, lng};
202+
}
203+
153204
public void testBounds() throws IOException {
154205
final int numDocs = randomIntBetween(64, 256);
155206
final GeoGridAggregationBuilder builder = createBuilder("_name");

server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/NumericHistogramAggregatorTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,11 @@ public void testIncorrectFieldType() throws Exception {
300300
HistogramAggregationBuilder aggBuilder = new HistogramAggregationBuilder("my_agg")
301301
.field("field")
302302
.interval(5);
303-
MappedFieldType fieldType = keywordField("field");
304-
fieldType.setHasDocValues(true);
305303
try (IndexReader reader = w.getReader()) {
306304
IndexSearcher searcher = new IndexSearcher(reader);
307305

308306
expectThrows(IllegalArgumentException.class, () -> {
309-
search(searcher, new MatchAllDocsQuery(), aggBuilder, fieldType);
307+
search(searcher, new MatchAllDocsQuery(), aggBuilder, keywordField("field"));
310308
});
311309
}
312310
}

0 commit comments

Comments
 (0)