Skip to content

Commit b4b20bd

Browse files
authored
Fix circuit breaker leak in MultiTerms aggregation (elastic#79362) (elastic#79422)
The MultiTermsAggregator creates a BytesKeyedBucketOrds that never gets closed and therefore it might leak the memory allocated into the circuit breaker.
1 parent ef6b6c8 commit b4b20bd

File tree

4 files changed

+110
-2
lines changed

4 files changed

+110
-2
lines changed

test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
459459
indexSettings,
460460
query,
461461
breakerService,
462-
builder.bytesToPreallocate(),
462+
randomBoolean() ? 0 : builder.bytesToPreallocate(),
463463
maxBucket,
464464
fieldTypes
465465
);

x-pack/plugin/analytics/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
apply plugin: 'elasticsearch.internal-es-plugin'
2+
apply plugin: 'elasticsearch.internal-cluster-test'
3+
24
esplugin {
35
name 'x-pack-analytics'
46
description 'Elasticsearch Expanded Pack Plugin - Analytics'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
/*
9+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
10+
* or more contributor license agreements. Licensed under the Elastic License
11+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
12+
* in compliance with, at your election, the Elastic License 2.0 or the Server
13+
* Side Public License, v 1.
14+
*/
15+
16+
package org.elasticsearch.xpack.analytics.multiterms;
17+
18+
import org.elasticsearch.ElasticsearchException;
19+
import org.elasticsearch.ExceptionsHelper;
20+
import org.elasticsearch.action.index.IndexRequestBuilder;
21+
import org.elasticsearch.common.breaker.CircuitBreakingException;
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
24+
import org.elasticsearch.plugins.Plugin;
25+
import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
26+
import org.elasticsearch.test.ESIntegTestCase;
27+
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
28+
29+
import java.util.Collection;
30+
import java.util.stream.IntStream;
31+
32+
/**
33+
* test forked from CardinalityWithRequestBreakerIT
34+
*/
35+
public class MultiTermsWithRequestBreakerIT extends ESIntegTestCase {
36+
37+
@Override
38+
protected Collection<Class<? extends Plugin>> nodePlugins() {
39+
return org.elasticsearch.core.List.of(AnalyticsPlugin.class);
40+
}
41+
42+
@Override
43+
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
44+
return org.elasticsearch.core.List.of(AnalyticsPlugin.class);
45+
}
46+
47+
/**
48+
* Test that searches using multiterms aggregations returns all request breaker memory.
49+
*/
50+
public void testRequestBreaker() throws Exception {
51+
final String requestBreaker = randomIntBetween(1, 10000) + "kb";
52+
logger.info("--> Using request breaker setting: {}", requestBreaker);
53+
54+
indexRandom(
55+
true,
56+
IntStream.range(0, randomIntBetween(10, 1000))
57+
.mapToObj(
58+
i -> client().prepareIndex("test", "_doc")
59+
.setId("id_" + i)
60+
.setSource(org.elasticsearch.core.Map.of("field0", randomAlphaOfLength(5), "field1", randomAlphaOfLength(5)))
61+
)
62+
.toArray(IndexRequestBuilder[]::new)
63+
);
64+
65+
client().admin()
66+
.cluster()
67+
.prepareUpdateSettings()
68+
.setPersistentSettings(
69+
Settings.builder().put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), requestBreaker)
70+
)
71+
.get();
72+
73+
try {
74+
client().prepareSearch("test")
75+
.addAggregation(
76+
new MultiTermsAggregationBuilder("xxx").terms(
77+
org.elasticsearch.core.List.of(
78+
new MultiValuesSourceFieldConfig.Builder().setFieldName("field0.keyword").build(),
79+
new MultiValuesSourceFieldConfig.Builder().setFieldName("field1.keyword").build()
80+
)
81+
)
82+
)
83+
.get();
84+
} catch (ElasticsearchException e) {
85+
if (ExceptionsHelper.unwrap(e, CircuitBreakingException.class) == null) {
86+
throw e;
87+
}
88+
}
89+
90+
client().admin()
91+
.cluster()
92+
.prepareUpdateSettings()
93+
.setPersistentSettings(
94+
Settings.builder().putNull(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey())
95+
)
96+
.get();
97+
98+
// validation done by InternalTestCluster.ensureEstimatedStats()
99+
}
100+
}

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414
import org.apache.lucene.util.BytesRefBuilder;
1515
import org.apache.lucene.util.PriorityQueue;
1616
import org.elasticsearch.ExceptionsHelper;
17-
import org.elasticsearch.core.CheckedConsumer;
1817
import org.elasticsearch.common.bytes.BytesArray;
1918
import org.elasticsearch.common.io.stream.BytesStreamOutput;
2019
import org.elasticsearch.common.io.stream.StreamInput;
2120
import org.elasticsearch.common.io.stream.StreamOutput;
21+
import org.elasticsearch.core.CheckedConsumer;
22+
import org.elasticsearch.core.Releasables;
2223
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
2324
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
2425
import org.elasticsearch.search.DocValueFormat;
@@ -220,6 +221,11 @@ public void accept(Integer start) throws IOException {
220221
};
221222
}
222223

224+
@Override
225+
protected void doClose() {
226+
Releasables.close(bucketOrds);
227+
}
228+
223229
@Override
224230
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
225231
InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][];

0 commit comments

Comments
 (0)