Skip to content

Commit 748a0e7

Browse files
iverasealexey-ivanov-es
authored andcommitted
Use LongArray instead of long[] for owning ordinals when building Internal aggregations (elastic#116874)
This commit changes the signature of InternalAggregation#buildAggregations(long[]) to InternalAggregation#buildAggregations(LongArray) to avoid allocations of humongous arrays.
1 parent d9404ee commit 748a0e7

File tree

49 files changed

+877
-741
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+877
-741
lines changed

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.common.io.stream.Writeable;
1717
import org.elasticsearch.common.lucene.Lucene;
18+
import org.elasticsearch.common.util.LongArray;
1819
import org.elasticsearch.index.query.QueryBuilder;
1920
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
2021
import org.elasticsearch.search.aggregations.Aggregator;
@@ -177,65 +178,66 @@ public void collect(int doc, long bucket) throws IOException {
177178
}
178179

179180
@Override
180-
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
181+
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
181182
// Buckets are ordered into groups - [keyed filters] [key1&key2 intersects]
182-
int maxOrd = owningBucketOrds.length * totalNumKeys;
183-
int totalBucketsToBuild = 0;
184-
for (int ord = 0; ord < maxOrd; ord++) {
183+
long maxOrd = owningBucketOrds.size() * totalNumKeys;
184+
long totalBucketsToBuild = 0;
185+
for (long ord = 0; ord < maxOrd; ord++) {
185186
if (bucketDocCount(ord) > 0) {
186187
totalBucketsToBuild++;
187188
}
188189
}
189-
long[] bucketOrdsToBuild = new long[totalBucketsToBuild];
190-
int builtBucketIndex = 0;
191-
for (int ord = 0; ord < maxOrd; ord++) {
192-
if (bucketDocCount(ord) > 0) {
193-
bucketOrdsToBuild[builtBucketIndex++] = ord;
194-
}
195-
}
196-
assert builtBucketIndex == totalBucketsToBuild;
197-
builtBucketIndex = 0;
198-
var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild);
199-
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
200-
for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < owningBucketOrds.length; owningBucketOrdIdx++) {
201-
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
202-
for (int i = 0; i < keys.length; i++) {
203-
long bucketOrd = bucketOrd(owningBucketOrds[owningBucketOrdIdx], i);
204-
long docCount = bucketDocCount(bucketOrd);
205-
// Empty buckets are not returned because this aggregation will commonly be used under a
206-
// a date-histogram where we will look for transactions over time and can expect many
207-
// empty buckets.
208-
if (docCount > 0) {
209-
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
210-
keys[i],
211-
docCount,
212-
bucketSubAggs.apply(builtBucketIndex++)
213-
);
214-
buckets.add(bucket);
190+
try (LongArray bucketOrdsToBuild = bigArrays().newLongArray(totalBucketsToBuild)) {
191+
int builtBucketIndex = 0;
192+
for (int ord = 0; ord < maxOrd; ord++) {
193+
if (bucketDocCount(ord) > 0) {
194+
bucketOrdsToBuild.set(builtBucketIndex++, ord);
215195
}
216196
}
217-
int pos = keys.length;
218-
for (int i = 0; i < keys.length; i++) {
219-
for (int j = i + 1; j < keys.length; j++) {
220-
long bucketOrd = bucketOrd(owningBucketOrds[owningBucketOrdIdx], pos);
197+
assert builtBucketIndex == totalBucketsToBuild;
198+
builtBucketIndex = 0;
199+
var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild);
200+
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
201+
for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < results.length; owningBucketOrdIdx++) {
202+
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
203+
for (int i = 0; i < keys.length; i++) {
204+
long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), i);
221205
long docCount = bucketDocCount(bucketOrd);
222-
// Empty buckets are not returned due to potential for very sparse matrices
206+
// Empty buckets are not returned because this aggregation will commonly be used under a
207+
// a date-histogram where we will look for transactions over time and can expect many
208+
// empty buckets.
223209
if (docCount > 0) {
224-
String intersectKey = keys[i] + separator + keys[j];
225210
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
226-
intersectKey,
211+
keys[i],
227212
docCount,
228213
bucketSubAggs.apply(builtBucketIndex++)
229214
);
230215
buckets.add(bucket);
231216
}
232-
pos++;
233217
}
218+
int pos = keys.length;
219+
for (int i = 0; i < keys.length; i++) {
220+
for (int j = i + 1; j < keys.length; j++) {
221+
long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), pos);
222+
long docCount = bucketDocCount(bucketOrd);
223+
// Empty buckets are not returned due to potential for very sparse matrices
224+
if (docCount > 0) {
225+
String intersectKey = keys[i] + separator + keys[j];
226+
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
227+
intersectKey,
228+
docCount,
229+
bucketSubAggs.apply(builtBucketIndex++)
230+
);
231+
buckets.add(bucket);
232+
}
233+
pos++;
234+
}
235+
}
236+
results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata());
234237
}
235-
results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata());
238+
assert builtBucketIndex == totalBucketsToBuild;
239+
return results;
236240
}
237-
assert builtBucketIndex == totalBucketsToBuild;
238-
return results;
239241
}
240242

241243
@Override

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public final LeafBucketCollector getLeafCollector(AggregationExecutionContext ag
141141
protected final InternalAggregation[] buildAggregations(
142142
LongKeyedBucketOrds bucketOrds,
143143
LongToIntFunction roundingIndexFor,
144-
long[] owningBucketOrds
144+
LongArray owningBucketOrds
145145
) throws IOException {
146146
return buildAggregationsForVariableBuckets(
147147
owningBucketOrds,
@@ -324,7 +324,7 @@ private void increaseRoundingIfNeeded(long rounded) {
324324
}
325325

326326
@Override
327-
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
327+
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
328328
return buildAggregations(bucketOrds, l -> roundingIdx, owningBucketOrds);
329329
}
330330

@@ -594,7 +594,7 @@ private void rebucket() {
594594
}
595595

596596
@Override
597-
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
597+
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
598598
/*
599599
* Rebucket before building the aggregation to build as small as result
600600
* as possible.

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import org.apache.lucene.index.SortedNumericDocValues;
1313
import org.apache.lucene.util.BytesRef;
14+
import org.elasticsearch.common.util.LongArray;
15+
import org.elasticsearch.common.util.ObjectArray;
1416
import org.elasticsearch.core.Releasables;
1517
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
1618
import org.elasticsearch.index.mapper.RoutingPathFields;
@@ -30,6 +32,7 @@
3032

3133
import java.io.IOException;
3234
import java.util.ArrayList;
35+
import java.util.Arrays;
3336
import java.util.Comparator;
3437
import java.util.List;
3538
import java.util.Map;
@@ -67,42 +70,43 @@ public TimeSeriesAggregator(
6770
}
6871

6972
@Override
70-
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
73+
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
7174
BytesRef spare = new BytesRef();
72-
InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][];
73-
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
74-
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
75-
List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>();
76-
while (ordsEnum.next()) {
77-
long docCount = bucketDocCount(ordsEnum.ord());
78-
ordsEnum.readValue(spare);
79-
InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
80-
BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here.
81-
docCount,
82-
null,
83-
keyed
84-
);
85-
bucket.bucketOrd = ordsEnum.ord();
86-
buckets.add(bucket);
87-
if (buckets.size() >= size) {
88-
break;
75+
try (ObjectArray<InternalTimeSeries.InternalBucket[]> allBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())) {
76+
for (long ordIdx = 0; ordIdx < allBucketsPerOrd.size(); ordIdx++) {
77+
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
78+
List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>();
79+
while (ordsEnum.next()) {
80+
long docCount = bucketDocCount(ordsEnum.ord());
81+
ordsEnum.readValue(spare);
82+
InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
83+
BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here.
84+
docCount,
85+
null,
86+
keyed
87+
);
88+
bucket.bucketOrd = ordsEnum.ord();
89+
buckets.add(bucket);
90+
if (buckets.size() >= size) {
91+
break;
92+
}
8993
}
94+
// NOTE: after introducing _tsid hashing time series are sorted by (_tsid hash, @timestamp) instead of (_tsid, timestamp).
95+
// _tsid hash and _tsid might sort differently, and out of order data might result in incorrect buckets due to _tsid value
96+
// changes not matching _tsid hash changes. Changes in _tsid hash are handled creating a new bucket as a result of making
97+
// the assumption that sorting data results in new buckets whenever there is a change in _tsid hash. This is no true anymore
98+
// because we collect data sorted on (_tsid hash, timestamp) but build aggregation results sorted by (_tsid, timestamp).
99+
buckets.sort(Comparator.comparing(bucket -> bucket.key));
100+
allBucketsPerOrd.set(ordIdx, buckets.toArray(new InternalTimeSeries.InternalBucket[0]));
90101
}
91-
// NOTE: after introducing _tsid hashing time series are sorted by (_tsid hash, @timestamp) instead of (_tsid, timestamp).
92-
// _tsid hash and _tsid might sort differently, and out of order data might result in incorrect buckets due to _tsid value
93-
// changes not matching _tsid hash changes. Changes in _tsid hash are handled creating a new bucket as a result of making
94-
// the assumption that sorting data results in new buckets whenever there is a change in _tsid hash. This is no true anymore
95-
// because we collect data sorted on (_tsid hash, timestamp) but build aggregation results sorted by (_tsid, timestamp).
96-
buckets.sort(Comparator.comparing(bucket -> bucket.key));
97-
allBucketsPerOrd[ordIdx] = buckets.toArray(new InternalTimeSeries.InternalBucket[0]);
98-
}
99-
buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
102+
buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
100103

101-
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
102-
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
103-
result[ordIdx] = buildResult(allBucketsPerOrd[ordIdx]);
104+
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(allBucketsPerOrd.size())];
105+
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
106+
result[ordIdx] = buildResult(allBucketsPerOrd.get(ordIdx));
107+
}
108+
return result;
104109
}
105-
return result;
106110
}
107111

108112
@Override
@@ -185,7 +189,7 @@ public void collect(int doc, long bucket) throws IOException {
185189
}
186190

187191
InternalTimeSeries buildResult(InternalTimeSeries.InternalBucket[] topBuckets) {
188-
return new InternalTimeSeries(name, List.of(topBuckets), keyed, metadata());
192+
return new InternalTimeSeries(name, Arrays.asList(topBuckets), keyed, metadata());
189193
}
190194

191195
@FunctionalInterface

modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.join.aggregations;
1010

1111
import org.apache.lucene.search.Query;
12+
import org.elasticsearch.common.util.LongArray;
1213
import org.elasticsearch.search.aggregations.Aggregator;
1314
import org.elasticsearch.search.aggregations.AggregatorFactories;
1415
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
@@ -44,7 +45,7 @@ public ChildrenToParentAggregator(
4445
}
4546

4647
@Override
47-
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
48+
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
4849
return buildAggregationsForSingleBucket(
4950
owningBucketOrds,
5051
(owningBucketOrd, subAggregationResults) -> new InternalParent(

modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentJoinAggregator.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.lucene.Lucene;
2222
import org.elasticsearch.common.util.BigArrays;
2323
import org.elasticsearch.common.util.BitArray;
24+
import org.elasticsearch.common.util.LongArray;
2425
import org.elasticsearch.core.Releasable;
2526
import org.elasticsearch.core.Releasables;
2627
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
@@ -115,7 +116,7 @@ public void postCollection() throws IOException {
115116
}
116117

117118
@Override
118-
protected void prepareSubAggs(long[] ordsToCollect) throws IOException {
119+
protected void prepareSubAggs(LongArray ordsToCollect) throws IOException {
119120
IndexReader indexReader = searcher().getIndexReader();
120121
for (LeafReaderContext ctx : indexReader.leaves()) {
121122
Scorer childDocsScorer = outFilter.scorer(ctx);
@@ -153,9 +154,10 @@ public float score() {
153154
* structure that maps a primitive long to a list of primitive
154155
* longs.
155156
*/
156-
for (long owningBucketOrd : ordsToCollect) {
157-
if (collectionStrategy.exists(owningBucketOrd, globalOrdinal)) {
158-
collectBucket(sub, docId, owningBucketOrd);
157+
for (long ord = 0; ord < ordsToCollect.size(); ord++) {
158+
long ordToCollect = ordsToCollect.get(ord);
159+
if (collectionStrategy.exists(ordToCollect, globalOrdinal)) {
160+
collectBucket(sub, docId, ordToCollect);
159161
}
160162
}
161163
}

modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.join.aggregations;
1010

1111
import org.apache.lucene.search.Query;
12+
import org.elasticsearch.common.util.LongArray;
1213
import org.elasticsearch.search.aggregations.Aggregator;
1314
import org.elasticsearch.search.aggregations.AggregatorFactories;
1415
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
@@ -40,7 +41,7 @@ public ParentToChildrenAggregator(
4041
}
4142

4243
@Override
43-
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
44+
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
4445
return buildAggregationsForSingleBucket(
4546
owningBucketOrds,
4647
(owningBucketOrd, subAggregationResults) -> new InternalChildren(

server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.io.stream.StreamInput;
2828
import org.elasticsearch.common.io.stream.StreamOutput;
2929
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.common.util.LongArray;
3031
import org.elasticsearch.common.util.concurrent.AtomicArray;
3132
import org.elasticsearch.core.TimeValue;
3233
import org.elasticsearch.index.IndexSettings;
@@ -669,7 +670,7 @@ public Aggregator subAggregator(String aggregatorName) {
669670
}
670671

671672
@Override
672-
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
673+
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) {
673674
return new InternalAggregation[] { buildEmptyAggregation() };
674675
}
675676

server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.search.aggregations;
1111

1212
import org.apache.lucene.search.ScoreMode;
13+
import org.elasticsearch.common.util.LongArray;
1314
import org.elasticsearch.core.CheckedFunction;
1415
import org.elasticsearch.search.profile.aggregation.InternalAggregationProfileTree;
1516

@@ -98,10 +99,10 @@ public final void postCollection() throws IOException {
9899
}
99100

100101
@Override
101-
public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
102+
public final InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
102103
InternalAggregation[] delegateResults = delegate.buildAggregations(owningBucketOrds);
103-
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
104-
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
104+
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
105+
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
105106
result[ordIdx] = adapt(delegateResults[ordIdx]);
106107
}
107108
return result;

server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.elasticsearch.common.io.stream.StreamInput;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.io.stream.Writeable;
16+
import org.elasticsearch.common.util.BigArrays;
17+
import org.elasticsearch.common.util.LongArray;
1618
import org.elasticsearch.core.Releasable;
1719
import org.elasticsearch.search.aggregations.support.AggregationPath;
1820
import org.elasticsearch.search.sort.SortOrder;
@@ -142,7 +144,7 @@ public interface BucketComparator {
142144
* @return the results for each ordinal, in the same order as the array
143145
* of ordinals
144146
*/
145-
public abstract InternalAggregation[] buildAggregations(long[] ordsToCollect) throws IOException;
147+
public abstract InternalAggregation[] buildAggregations(LongArray ordsToCollect) throws IOException;
146148

147149
/**
148150
* Release this aggregation and its sub-aggregations.
@@ -153,11 +155,11 @@ public interface BucketComparator {
153155
* Build the result of this aggregation if it is at the "top level"
154156
* of the aggregation tree. If, instead, it is a sub-aggregation of
155157
* another aggregation then the aggregation that contains it will call
156-
* {@link #buildAggregations(long[])}.
158+
* {@link #buildAggregations(LongArray)}.
157159
*/
158160
public final InternalAggregation buildTopLevel() throws IOException {
159161
assert parent() == null;
160-
return buildAggregations(new long[] { 0 })[0];
162+
return buildAggregations(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true))[0];
161163
}
162164

163165
/**

0 commit comments

Comments
 (0)