Skip to content

Commit de8b39e

Browse files
authored
Lower contention on requests with many aggs (#66895)
This lowers the contention on the `REQUEST` circuit breaker when building many aggregations on many threads by preallocating a chunk of breaker up front. This cuts down on the number of times we enter the busy loop in `ChildMemoryCircuitBreaker.limit`. Now we hit it one time when building aggregations. We still hit the busy loop if we collect many buckets. We let the `AggregationBuilder` pick size of the "chunk" that we preallocate but it doesn't have much to go on - not even the field types. But it is available in a convenient spot and the estimates don't have to be particularly accurate. The benchmarks on my 12 core desktop are interesting: ``` Benchmark (breaker) Mode Cnt Score Error Units sum noop avgt 10 1.672 ± 0.042 us/op sum real avgt 10 4.100 ± 0.027 us/op sum preallocate avgt 10 4.230 ± 0.034 us/op termsSixtySums noop avgt 10 92.658 ± 0.939 us/op termsSixtySums real avgt 10 278.764 ± 39.751 us/op termsSixtySums preallocate avgt 10 120.896 ± 16.097 us/op termsSum noop avgt 10 4.573 ± 0.095 us/op termsSum real avgt 10 9.932 ± 0.211 us/op termsSum preallocate avgt 10 7.695 ± 0.313 us/op ``` They show pretty clearly that not using the circuit breaker at all is faster. But we can't do that because we don't want to bring the node down on bad aggs. When there are many aggs (termsSixtySums) the preallocation claws back much of the performance. It even helps marginally when there are two aggs (termsSum). For a single agg (sum) we see a 130 nanosecond hit. Fine. But these values are all pretty small. At best we're seeing a 160 microsecond savings. Not so on a 160 vCPU machine: ``` Benchmark (breaker) Mode Cnt Score Error Units sum noop avgt 10 44.956 ± 8.851 us/op sum real avgt 10 118.008 ± 19.505 us/op sum preallocate avgt 10 241.234 ± 305.998 us/op termsSixtySums noop avgt 10 1339.802 ± 51.410 us/op termsSixtySums real avgt 10 12077.671 ± 12110.993 us/op termsSixtySums preallocate avgt 10 3804.515 ± 1458.702 us/op termsSum noop avgt 10 59.478 ± 2.261 us/op termsSum real avgt 10 293.756 ± 253.854 us/op termsSum preallocate avgt 10 197.963 ± 41.578 us/op ``` All of these numbers are larger because we're running all the CPUs flat out and we're seeing more contention everywhere. Even the "noop" breaker sees some contention, but I think it is mostly around memory allocation. Anyway, with many many (termsSixtySums) aggs we're looking at 8 milliseconds of savings by preallocating. Just by dodging the busy loop as much as possible. The error in the measurements there are substantial. Here are the runs: ``` real: Iteration 1: 8679.417 ±(99.9%) 273.220 us/op Iteration 2: 5849.538 ±(99.9%) 179.258 us/op Iteration 3: 5953.935 ±(99.9%) 152.829 us/op Iteration 4: 5763.465 ±(99.9%) 150.759 us/op Iteration 5: 14157.592 ±(99.9%) 395.224 us/op Iteration 1: 24857.020 ±(99.9%) 1133.847 us/op Iteration 2: 24730.903 ±(99.9%) 1107.718 us/op Iteration 3: 18894.383 ±(99.9%) 738.706 us/op Iteration 4: 5493.965 ±(99.9%) 120.529 us/op Iteration 5: 6396.493 ±(99.9%) 143.630 us/op preallocate: Iteration 1: 5512.590 ±(99.9%) 110.222 us/op Iteration 2: 3087.771 ±(99.9%) 120.084 us/op Iteration 3: 3544.282 ±(99.9%) 110.373 us/op Iteration 4: 3477.228 ±(99.9%) 107.270 us/op Iteration 5: 4351.820 ±(99.9%) 82.946 us/op Iteration 1: 3185.250 ±(99.9%) 154.102 us/op Iteration 2: 3058.000 ±(99.9%) 143.758 us/op Iteration 3: 3199.920 ±(99.9%) 61.589 us/op Iteration 4: 3163.735 ±(99.9%) 71.291 us/op Iteration 5: 5464.556 ±(99.9%) 59.034 us/op ``` That variability from 5.5ms to 25ms is terrible. It makes me not particularly trust the 8ms savings from the report. But still, the preallocating method has much less variability between runs and almost all the runs are faster than all of the non-preallocated runs. Maybe the savings is more like 2 or 3 milliseconds, but still. Or maybe we should think of hte savings as worst vs worst? If so its 19 milliseconds. Anyway, its hard to measure how much this helps. But, certainly some. Closes #58647
1 parent a184486 commit de8b39e

File tree

16 files changed

+884
-56
lines changed

16 files changed

+884
-56
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.benchmark.search.aggregations;
21+
22+
import org.apache.lucene.analysis.Analyzer;
23+
import org.apache.lucene.search.IndexSearcher;
24+
import org.apache.lucene.search.MatchAllDocsQuery;
25+
import org.apache.lucene.search.Query;
26+
import org.elasticsearch.common.breaker.CircuitBreaker;
27+
import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
28+
import org.elasticsearch.common.lease.Releasable;
29+
import org.elasticsearch.common.lease.Releasables;
30+
import org.elasticsearch.common.settings.ClusterSettings;
31+
import org.elasticsearch.common.settings.Settings;
32+
import org.elasticsearch.common.util.BigArrays;
33+
import org.elasticsearch.common.util.PageCacheRecycler;
34+
import org.elasticsearch.index.Index;
35+
import org.elasticsearch.index.IndexSettings;
36+
import org.elasticsearch.index.analysis.NamedAnalyzer;
37+
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
38+
import org.elasticsearch.index.fielddata.IndexFieldData;
39+
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
40+
import org.elasticsearch.index.mapper.MappedFieldType;
41+
import org.elasticsearch.index.mapper.NumberFieldMapper;
42+
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType;
43+
import org.elasticsearch.index.mapper.ObjectMapper;
44+
import org.elasticsearch.index.query.QueryBuilder;
45+
import org.elasticsearch.index.query.support.NestedScope;
46+
import org.elasticsearch.indices.breaker.CircuitBreakerService;
47+
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
48+
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
49+
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
50+
import org.elasticsearch.script.Script;
51+
import org.elasticsearch.script.ScriptContext;
52+
import org.elasticsearch.search.SearchModule;
53+
import org.elasticsearch.search.aggregations.Aggregator;
54+
import org.elasticsearch.search.aggregations.AggregatorFactories;
55+
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
56+
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
57+
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
58+
import org.elasticsearch.search.aggregations.support.AggregationContext;
59+
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
60+
import org.elasticsearch.search.internal.SubSearchContext;
61+
import org.elasticsearch.search.lookup.SearchLookup;
62+
import org.elasticsearch.search.sort.BucketedSort;
63+
import org.elasticsearch.search.sort.BucketedSort.ExtraData;
64+
import org.elasticsearch.search.sort.SortAndFormats;
65+
import org.elasticsearch.search.sort.SortBuilder;
66+
import org.openjdk.jmh.annotations.Benchmark;
67+
import org.openjdk.jmh.annotations.BenchmarkMode;
68+
import org.openjdk.jmh.annotations.Fork;
69+
import org.openjdk.jmh.annotations.Measurement;
70+
import org.openjdk.jmh.annotations.Mode;
71+
import org.openjdk.jmh.annotations.OutputTimeUnit;
72+
import org.openjdk.jmh.annotations.Param;
73+
import org.openjdk.jmh.annotations.Scope;
74+
import org.openjdk.jmh.annotations.Setup;
75+
import org.openjdk.jmh.annotations.State;
76+
import org.openjdk.jmh.annotations.Threads;
77+
import org.openjdk.jmh.annotations.Warmup;
78+
79+
import java.io.IOException;
80+
import java.util.ArrayList;
81+
import java.util.List;
82+
import java.util.Optional;
83+
import java.util.concurrent.TimeUnit;
84+
import java.util.function.Function;
85+
86+
/**
87+
* Benchmarks the overhead of constructing {@link Aggregator}s in many
88+
* parallel threads. Machines with different numbers of cores will see
89+
* wildly different results running this from running this with more
90+
* cores seeing more benefits from preallocation.
91+
*/
92+
@Fork(2)
93+
@Warmup(iterations = 10)
94+
@Measurement(iterations = 5)
95+
@BenchmarkMode(Mode.AverageTime)
96+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
97+
@State(Scope.Benchmark)
98+
@Threads(Threads.MAX)
99+
public class AggConstructionContentionBenchmark {
100+
private final SearchModule searchModule = new SearchModule(Settings.EMPTY, List.of());
101+
private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
102+
private final PageCacheRecycler recycler = new PageCacheRecycler(Settings.EMPTY);
103+
private final Index index = new Index("test", "uuid");
104+
private final IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(
105+
Settings.EMPTY,
106+
new IndexFieldDataCache.Listener() {
107+
}
108+
);
109+
110+
private CircuitBreakerService breakerService;
111+
private BigArrays bigArrays;
112+
private boolean preallocateBreaker;
113+
114+
@Param({ "noop", "real", "preallocate" })
115+
private String breaker;
116+
117+
@Setup
118+
public void setup() {
119+
switch (breaker) {
120+
case "real":
121+
breakerService = new HierarchyCircuitBreakerService(Settings.EMPTY, List.of(), clusterSettings);
122+
break;
123+
case "preallocate":
124+
preallocateBreaker = true;
125+
breakerService = new HierarchyCircuitBreakerService(Settings.EMPTY, List.of(), clusterSettings);
126+
break;
127+
case "noop":
128+
breakerService = new NoneCircuitBreakerService();
129+
break;
130+
default:
131+
throw new UnsupportedOperationException();
132+
}
133+
bigArrays = new BigArrays(recycler, breakerService, "request");
134+
}
135+
136+
@Benchmark
137+
public void sum() throws IOException {
138+
buildFactories(new AggregatorFactories.Builder().addAggregator(new SumAggregationBuilder("s").field("int_1")));
139+
}
140+
141+
@Benchmark
142+
public void termsSum() throws IOException {
143+
buildFactories(
144+
new AggregatorFactories.Builder().addAggregator(
145+
new TermsAggregationBuilder("t").field("int_1").subAggregation(new SumAggregationBuilder("s").field("int_2"))
146+
)
147+
);
148+
}
149+
150+
@Benchmark
151+
public void termsSixtySums() throws IOException {
152+
TermsAggregationBuilder b = new TermsAggregationBuilder("t").field("int_1");
153+
for (int i = 0; i < 60; i++) {
154+
b.subAggregation(new SumAggregationBuilder("s" + i).field("int_" + i));
155+
}
156+
buildFactories(new AggregatorFactories.Builder().addAggregator(b));
157+
}
158+
159+
private void buildFactories(AggregatorFactories.Builder factories) throws IOException {
160+
try (DummyAggregationContext context = new DummyAggregationContext(factories.bytesToPreallocate())) {
161+
factories.build(context, null).createTopLevelAggregators();
162+
}
163+
}
164+
165+
private class DummyAggregationContext extends AggregationContext {
166+
private final Query query = new MatchAllDocsQuery();
167+
private final List<Releasable> releaseMe = new ArrayList<>();
168+
169+
private final CircuitBreaker breaker;
170+
private final PreallocatedCircuitBreakerService preallocated;
171+
private final MultiBucketConsumer multiBucketConsumer;
172+
173+
DummyAggregationContext(long bytesToPreallocate) {
174+
CircuitBreakerService breakerService;
175+
if (preallocateBreaker) {
176+
breakerService = preallocated = new PreallocatedCircuitBreakerService(
177+
AggConstructionContentionBenchmark.this.breakerService,
178+
CircuitBreaker.REQUEST,
179+
bytesToPreallocate,
180+
"aggregations"
181+
);
182+
} else {
183+
breakerService = AggConstructionContentionBenchmark.this.breakerService;
184+
preallocated = null;
185+
}
186+
breaker = breakerService.getBreaker(CircuitBreaker.REQUEST);
187+
multiBucketConsumer = new MultiBucketConsumer(Integer.MAX_VALUE, breaker);
188+
}
189+
190+
@Override
191+
public Query query() {
192+
return query;
193+
}
194+
195+
@Override
196+
public Aggregator profileIfEnabled(Aggregator agg) throws IOException {
197+
return agg;
198+
}
199+
200+
@Override
201+
public boolean profiling() {
202+
return false;
203+
}
204+
205+
@Override
206+
public long nowInMillis() {
207+
return 0;
208+
}
209+
210+
@Override
211+
protected IndexFieldData<?> buildFieldData(MappedFieldType ft) {
212+
IndexFieldDataCache indexFieldDataCache = indicesFieldDataCache.buildIndexFieldDataCache(new IndexFieldDataCache.Listener() {
213+
}, index, ft.name());
214+
return ft.fielddataBuilder("test", this::lookup).build(indexFieldDataCache, breakerService);
215+
}
216+
217+
@Override
218+
public MappedFieldType getFieldType(String path) {
219+
if (path.startsWith("int")) {
220+
return new NumberFieldMapper.NumberFieldType(path, NumberType.INTEGER);
221+
}
222+
throw new UnsupportedOperationException();
223+
}
224+
225+
@Override
226+
public boolean isFieldMapped(String field) {
227+
return field.startsWith("int");
228+
}
229+
230+
@Override
231+
public <FactoryType> FactoryType compile(Script script, ScriptContext<FactoryType> context) {
232+
throw new UnsupportedOperationException();
233+
}
234+
235+
@Override
236+
public SearchLookup lookup() {
237+
throw new UnsupportedOperationException();
238+
}
239+
240+
@Override
241+
public ValuesSourceRegistry getValuesSourceRegistry() {
242+
return searchModule.getValuesSourceRegistry();
243+
}
244+
245+
@Override
246+
public BigArrays bigArrays() {
247+
return bigArrays;
248+
}
249+
250+
@Override
251+
public IndexSearcher searcher() {
252+
return null;
253+
}
254+
255+
@Override
256+
public Query buildQuery(QueryBuilder builder) throws IOException {
257+
throw new UnsupportedOperationException();
258+
}
259+
260+
@Override
261+
public IndexSettings getIndexSettings() {
262+
throw new UnsupportedOperationException();
263+
}
264+
265+
@Override
266+
public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sortBuilders) throws IOException {
267+
throw new UnsupportedOperationException();
268+
}
269+
270+
@Override
271+
public ObjectMapper getObjectMapper(String path) {
272+
throw new UnsupportedOperationException();
273+
}
274+
275+
@Override
276+
public NestedScope nestedScope() {
277+
throw new UnsupportedOperationException();
278+
}
279+
280+
@Override
281+
public SubSearchContext subSearchContext() {
282+
throw new UnsupportedOperationException();
283+
}
284+
285+
@Override
286+
public void addReleasable(Aggregator aggregator) {
287+
releaseMe.add(aggregator);
288+
}
289+
290+
@Override
291+
public MultiBucketConsumer multiBucketConsumer() {
292+
return multiBucketConsumer;
293+
}
294+
295+
@Override
296+
public BitsetFilterCache bitsetFilterCache() {
297+
throw new UnsupportedOperationException();
298+
}
299+
300+
@Override
301+
public BucketedSort buildBucketedSort(SortBuilder<?> sort, int size, ExtraData values) throws IOException {
302+
throw new UnsupportedOperationException();
303+
}
304+
305+
@Override
306+
public int shardRandomSeed() {
307+
return 0;
308+
}
309+
310+
@Override
311+
public long getRelativeTimeInMillis() {
312+
return 0;
313+
}
314+
315+
@Override
316+
public boolean isCancelled() {
317+
return false;
318+
}
319+
320+
@Override
321+
public CircuitBreaker breaker() {
322+
return breaker;
323+
}
324+
325+
@Override
326+
public Analyzer getIndexAnalyzer(Function<String, NamedAnalyzer> unindexedFieldAnalyzer) {
327+
throw new UnsupportedOperationException();
328+
}
329+
330+
@Override
331+
public boolean isCacheable() {
332+
throw new UnsupportedOperationException();
333+
}
334+
335+
@Override
336+
public void close() {
337+
List<Releasable> releaseMe = new ArrayList<>(this.releaseMe);
338+
releaseMe.add(preallocated);
339+
Releasables.close(releaseMe);
340+
}
341+
}
342+
}

server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,11 @@ public BucketCardinality bucketCardinality() {
533533
public String getType() {
534534
return "test";
535535
}
536+
537+
@Override
538+
public long bytesToPreallocate() {
539+
return 0;
540+
}
536541
}
537542

538543
/**
@@ -567,13 +572,13 @@ public Aggregator subAggregator(String name) {
567572
@Override
568573
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
569574
return new InternalAggregation[] {
570-
new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap())
575+
buildEmptyAggregation()
571576
};
572577
}
573578

574579
@Override
575580
public InternalAggregation buildEmptyAggregation() {
576-
return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap());
581+
return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, null);
577582
}
578583

579584
@Override

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
604604
.addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get());
605605
logger.info("--> got an expected exception", e);
606606
assertThat(e.getCause(), notNullValue());
607-
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [<agg [foo_terms]>]"));
607+
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [preallocate[aggregations]]"));
608608

609609
client().admin().cluster().prepareUpdateSettings()
610610
.setTransientSettings(Settings.builder()

server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ public void testRequestBreaker() throws Exception {
252252
}
253253
}
254254

255-
public void testBucketBreaker() throws Exception {
255+
public void testAggTookTooMuch() throws Exception {
256256
if (noopBreakerUsed()) {
257257
logger.info("--> noop breakers used, skipping test");
258258
return;
@@ -285,7 +285,7 @@ public void testBucketBreaker() throws Exception {
285285
} catch (Exception e) {
286286
Throwable cause = e.getCause();
287287
assertThat(cause, instanceOf(CircuitBreakingException.class));
288-
assertThat(cause.toString(), containsString("[request] Data too large, data for [<agg [my_terms]>] would be"));
288+
assertThat(cause.toString(), containsString("[request] Data too large, data for [preallocate[aggregations]] would be"));
289289
assertThat(cause.toString(), containsString("which is larger than the limit of [100/100b]"));
290290
}
291291
}

0 commit comments

Comments
 (0)