From ba81b73d34932ab8cbe652bb24702ff3277e6233 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 4 Jan 2021 11:26:42 -0500 Subject: [PATCH] Lower contention on requests with many aggs (backport of #66895) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This lowers the contention on the `REQUEST` circuit breaker when building many aggregations on many threads by preallocating a chunk of breaker up front. This cuts down on the number of times we enter the busy loop in `ChildMemoryCircuitBreaker.limit`. Now we hit it one time when building aggregations. We still hit the busy loop if we collect many buckets. We let the `AggregationBuilder` pick size of the "chunk" that we preallocate but it doesn't have much to go on - not even the field types. But it is available in a convenient spot and the estimates don't have to be particularly accurate. The benchmarks on my 12 core desktop are interesting: ``` Benchmark (breaker) Mode Cnt Score Error Units sum noop avgt 10 1.672 ± 0.042 us/op sum real avgt 10 4.100 ± 0.027 us/op sum preallocate avgt 10 4.230 ± 0.034 us/op termsSixtySums noop avgt 10 92.658 ± 0.939 us/op termsSixtySums real avgt 10 278.764 ± 39.751 us/op termsSixtySums preallocate avgt 10 120.896 ± 16.097 us/op termsSum noop avgt 10 4.573 ± 0.095 us/op termsSum real avgt 10 9.932 ± 0.211 us/op termsSum preallocate avgt 10 7.695 ± 0.313 us/op ``` They show pretty clearly that not using the circuit breaker at all is faster. But we can't do that because we don't want to bring the node down on bad aggs. When there are many aggs (termsSixtySums) the preallocation claws back much of the performance. It even helps marginally when there are two aggs (termsSum). For a single agg (sum) we see a 130 nanosecond hit. Fine. But these values are all pretty small. At best we're seeing a 160 microsecond savings. Not so on a 160 vCPU machine: ``` Benchmark (breaker) Mode Cnt Score Error Units sum noop avgt 10 44.956 ± 8.851 us/op sum real avgt 10 118.008 ± 19.505 us/op sum preallocate avgt 10 241.234 ± 305.998 us/op termsSixtySums noop avgt 10 1339.802 ± 51.410 us/op termsSixtySums real avgt 10 12077.671 ± 12110.993 us/op termsSixtySums preallocate avgt 10 3804.515 ± 1458.702 us/op termsSum noop avgt 10 59.478 ± 2.261 us/op termsSum real avgt 10 293.756 ± 253.854 us/op termsSum preallocate avgt 10 197.963 ± 41.578 us/op ``` All of these numbers are larger because we're running all the CPUs flat out and we're seeing more contention everywhere. Even the "noop" breaker sees some contention, but I think it is mostly around memory allocation. Anyway, with many many (termsSixtySums) aggs we're looking at 8 milliseconds of savings by preallocating. Just by dodging the busy loop as much as possible. The error in the measurements there are substantial. Here are the runs: ``` real: Iteration 1: 8679.417 ±(99.9%) 273.220 us/op Iteration 2: 5849.538 ±(99.9%) 179.258 us/op Iteration 3: 5953.935 ±(99.9%) 152.829 us/op Iteration 4: 5763.465 ±(99.9%) 150.759 us/op Iteration 5: 14157.592 ±(99.9%) 395.224 us/op Iteration 1: 24857.020 ±(99.9%) 1133.847 us/op Iteration 2: 24730.903 ±(99.9%) 1107.718 us/op Iteration 3: 18894.383 ±(99.9%) 738.706 us/op Iteration 4: 5493.965 ±(99.9%) 120.529 us/op Iteration 5: 6396.493 ±(99.9%) 143.630 us/op preallocate: Iteration 1: 5512.590 ±(99.9%) 110.222 us/op Iteration 2: 3087.771 ±(99.9%) 120.084 us/op Iteration 3: 3544.282 ±(99.9%) 110.373 us/op Iteration 4: 3477.228 ±(99.9%) 107.270 us/op Iteration 5: 4351.820 ±(99.9%) 82.946 us/op Iteration 1: 3185.250 ±(99.9%) 154.102 us/op Iteration 2: 3058.000 ±(99.9%) 143.758 us/op Iteration 3: 3199.920 ±(99.9%) 61.589 us/op Iteration 4: 3163.735 ±(99.9%) 71.291 us/op Iteration 5: 5464.556 ±(99.9%) 59.034 us/op ``` That variability from 5.5ms to 25ms is terrible. It makes me not particularly trust the 8ms savings from the report. But still, the preallocating method has much less variability between runs and almost all the runs are faster than all of the non-preallocated runs. Maybe the savings is more like 2 or 3 milliseconds, but still. Or maybe we should think of hte savings as worst vs worst? If so its 19 milliseconds. Anyway, its hard to measure how much this helps. But, certainly some. Closes #58647 --- .../AggConstructionContentionBenchmark.java | 356 ++++++++++++++++++ .../action/search/TransportSearchIT.java | 9 +- .../index/shard/IndexShardIT.java | 2 +- .../breaker/CircuitBreakerServiceIT.java | 12 +- .../PreallocatedCircuitBreakerService.java | 214 +++++++++++ .../elasticsearch/common/util/BigArrays.java | 9 + .../elasticsearch/search/SearchService.java | 3 +- .../aggregations/AggregationBuilder.java | 13 + .../aggregations/AggregatorFactories.java | 12 + .../support/AggregationContext.java | 52 ++- ...reallocatedCircuitBreakerServiceTests.java | 141 +++++++ .../common/util/BigArraysTests.java | 66 +++- .../common/util/MockBigArrays.java | 4 +- .../index/mapper/MapperServiceTestCase.java | 5 + .../aggregations/AggregatorTestCase.java | 42 ++- .../topmetrics/TopMetricsAggregatorTests.java | 26 +- 16 files changed, 905 insertions(+), 61 deletions(-) create mode 100644 benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java create mode 100644 server/src/main/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerService.java create mode 100644 server/src/test/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerServiceTests.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java new file mode 100644 index 0000000000000..6876ed192cae0 --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java @@ -0,0 +1,356 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.benchmark.search.aggregations; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.elasticsearch.Version; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.analysis.NamedAnalyzer; +import org.elasticsearch.index.cache.bitset.BitsetFilterCache; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; +import org.elasticsearch.index.mapper.ObjectMapper; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.support.NestedScope; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.search.internal.SubSearchContext; +import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.search.sort.BucketedSort; +import org.elasticsearch.search.sort.BucketedSort.ExtraData; +import org.elasticsearch.search.sort.SortAndFormats; +import org.elasticsearch.search.sort.SortBuilder; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * Benchmarks the overhead of constructing {@link Aggregator}s in many + * parallel threads. Machines with different numbers of cores will see + * wildly different results running this from running this with more + * cores seeing more benefits from preallocation. + */ +@Fork(2) +@Warmup(iterations = 10) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +@Threads(Threads.MAX) +public class AggConstructionContentionBenchmark { + private final SearchModule searchModule = new SearchModule(Settings.EMPTY, false, org.elasticsearch.common.collect.List.of()); + private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private final PageCacheRecycler recycler = new PageCacheRecycler(Settings.EMPTY); + private final Index index = new Index("test", "uuid"); + private final IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache( + Settings.EMPTY, + new IndexFieldDataCache.Listener() { + } + ); + + private CircuitBreakerService breakerService; + private BigArrays bigArrays; + private boolean preallocateBreaker; + + @Param({ "noop", "real", "preallocate" }) + private String breaker; + + @Setup + public void setup() { + switch (breaker) { + case "real": + breakerService = new HierarchyCircuitBreakerService( + Settings.EMPTY, + org.elasticsearch.common.collect.List.of(), + clusterSettings + ); + break; + case "preallocate": + preallocateBreaker = true; + breakerService = new HierarchyCircuitBreakerService( + Settings.EMPTY, + org.elasticsearch.common.collect.List.of(), + clusterSettings + ); + break; + case "noop": + breakerService = new NoneCircuitBreakerService(); + break; + default: + throw new UnsupportedOperationException(); + } + bigArrays = new BigArrays(recycler, breakerService, "request"); + } + + @Benchmark + public void sum() throws IOException { + buildFactories(new AggregatorFactories.Builder().addAggregator(new SumAggregationBuilder("s").field("int_1"))); + } + + @Benchmark + public void termsSum() throws IOException { + buildFactories( + new AggregatorFactories.Builder().addAggregator( + new TermsAggregationBuilder("t").field("int_1").subAggregation(new SumAggregationBuilder("s").field("int_2")) + ) + ); + } + + @Benchmark + public void termsSixtySums() throws IOException { + TermsAggregationBuilder b = new TermsAggregationBuilder("t").field("int_1"); + for (int i = 0; i < 60; i++) { + b.subAggregation(new SumAggregationBuilder("s" + i).field("int_" + i)); + } + buildFactories(new AggregatorFactories.Builder().addAggregator(b)); + } + + private void buildFactories(AggregatorFactories.Builder factories) throws IOException { + try (DummyAggregationContext context = new DummyAggregationContext(factories.bytesToPreallocate())) { + factories.build(context, null).createTopLevelAggregators(); + } + } + + private class DummyAggregationContext extends AggregationContext { + private final Query query = new MatchAllDocsQuery(); + private final List releaseMe = new ArrayList<>(); + + private final CircuitBreaker breaker; + private final PreallocatedCircuitBreakerService preallocated; + private final MultiBucketConsumer multiBucketConsumer; + + DummyAggregationContext(long bytesToPreallocate) { + CircuitBreakerService breakerService; + if (preallocateBreaker) { + breakerService = preallocated = new PreallocatedCircuitBreakerService( + AggConstructionContentionBenchmark.this.breakerService, + CircuitBreaker.REQUEST, + bytesToPreallocate, + "aggregations" + ); + } else { + breakerService = AggConstructionContentionBenchmark.this.breakerService; + preallocated = null; + } + breaker = breakerService.getBreaker(CircuitBreaker.REQUEST); + multiBucketConsumer = new MultiBucketConsumer(Integer.MAX_VALUE, breaker); + } + + @Override + public Query query() { + return query; + } + + @Override + public Aggregator profileIfEnabled(Aggregator agg) throws IOException { + return agg; + } + + @Override + public boolean profiling() { + return false; + } + + @Override + public long nowInMillis() { + return 0; + } + + @Override + protected IndexFieldData buildFieldData(MappedFieldType ft) { + IndexFieldDataCache indexFieldDataCache = indicesFieldDataCache.buildIndexFieldDataCache(new IndexFieldDataCache.Listener() { + }, index, ft.name()); + return ft.fielddataBuilder("test", this::lookup).build(indexFieldDataCache, breakerService); + } + + @Override + public MappedFieldType getFieldType(String path) { + if (path.startsWith("int")) { + return new NumberFieldMapper.NumberFieldType(path, NumberType.INTEGER); + } + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFieldMapped(String field) { + return field.startsWith("int"); + } + + @Override + public FactoryType compile(Script script, ScriptContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public SearchLookup lookup() { + throw new UnsupportedOperationException(); + } + + @Override + public ValuesSourceRegistry getValuesSourceRegistry() { + return searchModule.getValuesSourceRegistry(); + } + + @Override + public BigArrays bigArrays() { + return bigArrays; + } + + @Override + public IndexSearcher searcher() { + return null; + } + + @Override + public Query buildQuery(QueryBuilder builder) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public IndexSettings getIndexSettings() { + throw new UnsupportedOperationException(); + } + + @Override + public Optional buildSort(List> sortBuilders) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ObjectMapper getObjectMapper(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public NestedScope nestedScope() { + throw new UnsupportedOperationException(); + } + + @Override + public SubSearchContext subSearchContext() { + throw new UnsupportedOperationException(); + } + + @Override + public void addReleasable(Aggregator aggregator) { + releaseMe.add(aggregator); + } + + @Override + public MultiBucketConsumer multiBucketConsumer() { + return multiBucketConsumer; + } + + @Override + public BitsetFilterCache bitsetFilterCache() { + throw new UnsupportedOperationException(); + } + + @Override + public BucketedSort buildBucketedSort(SortBuilder sort, int size, ExtraData values) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int shardRandomSeed() { + return 0; + } + + @Override + public long getRelativeTimeInMillis() { + return 0; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public CircuitBreaker breaker() { + return breaker; + } + + @Override + public Analyzer getIndexAnalyzer(Function unindexedFieldAnalyzer) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCacheable() { + throw new UnsupportedOperationException(); + } + + @Override + public Version indexVersionCreated() { + return Version.CURRENT; + } + + @Override + public void close() { + List releaseMe = new ArrayList<>(this.releaseMe); + releaseMe.add(preallocated); + Releasables.close(releaseMe); + } + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java index 861f7fa1ab695..951378f9476e5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java @@ -536,6 +536,11 @@ public BucketCardinality bucketCardinality() { public String getType() { return "test"; } + + @Override + public long bytesToPreallocate() { + return 0; + } } /** @@ -570,13 +575,13 @@ public Aggregator subAggregator(String name) { @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { return new InternalAggregation[] { - new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap()) + buildEmptyAggregation() }; } @Override public InternalAggregation buildEmptyAggregation() { - return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap()); + return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, null); } @Override diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 73220a636562f..c78dd0bbc466e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -624,7 +624,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception { .addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get()); logger.info("--> got an expected exception", e); assertThat(e.getCause(), notNullValue()); - assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for []")); + assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [preallocate[aggregations]]")); client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java index 9d9770e8554fe..d72736d2e65dc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java @@ -259,7 +259,7 @@ public void testRequestBreaker() throws Exception { } } - public void testBucketBreaker() throws Exception { + public void testAggTookTooMuch() throws Exception { if (noopBreakerUsed()) { logger.info("--> noop breakers used, skipping test"); return; @@ -290,12 +290,10 @@ public void testBucketBreaker() throws Exception { assertTrue("there should be shard failures", resp.getFailedShards() > 0); fail("aggregation should have tripped the breaker"); } catch (Exception e) { - String errMsg = "CircuitBreakingException[[request] Data too large, data for [] would be"; - assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException", - e.toString(), containsString(errMsg)); - errMsg = "which is larger than the limit of [100/100b]]"; - assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException", - e.toString(), containsString(errMsg)); + Throwable cause = e.getCause(); + assertThat(cause, instanceOf(CircuitBreakingException.class)); + assertThat(cause.toString(), containsString("[request] Data too large, data for [preallocate[aggregations]] would be")); + assertThat(cause.toString(), containsString("which is larger than the limit of [100/100b]")); } } diff --git a/server/src/main/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerService.java new file mode 100644 index 0000000000000..5c185df824636 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerService.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerStats; + +/** + * {@link CircuitBreakerService} that preallocates some bytes on construction. + * Use this when you know you'll be allocating many small things on a + * {@link CircuitBreaker} quickly and there is a definite "finished" time, like + * when aggregations are built. + */ +public class PreallocatedCircuitBreakerService extends CircuitBreakerService implements Releasable { + private final CircuitBreakerService next; + private final PreallocedCircuitBreaker preallocated; + + public PreallocatedCircuitBreakerService( + CircuitBreakerService next, + String breakerToPreallocate, + long bytesToPreallocate, + String label + ) { + if (bytesToPreallocate <= 0) { + throw new IllegalArgumentException("can't preallocate negative or zero bytes but got [" + bytesToPreallocate + "]"); + } + CircuitBreaker nextBreaker = next.getBreaker(breakerToPreallocate); + nextBreaker.addEstimateBytesAndMaybeBreak(bytesToPreallocate, "preallocate[" + label + "]"); + this.next = next; + this.preallocated = new PreallocedCircuitBreaker(nextBreaker, bytesToPreallocate); + } + + @Override + public CircuitBreaker getBreaker(String name) { + if (name.equals(preallocated.getName())) { + return preallocated; + } + return next.getBreaker(name); + } + + @Override + public AllCircuitBreakerStats stats() { + throw new UnsupportedOperationException(); + } + + @Override + public CircuitBreakerStats stats(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + preallocated.close(); + } + + /** + * The preallocated breaker. + *

+ * This breaker operates in two states: + *

    + *
  1. We've used fewer bytes than we've preallocated. + *
  2. We've used all of the preallocated bytes. + *
+ *

+ * If we're in the "used fewer bytes" state than we've allocated then + * allocating new bytes just adds to + * {@link PreallocedCircuitBreaker#preallocationUsed}, maxing out at + * {@link PreallocedCircuitBreaker#preallocated}. If we max + * out we irreversibly switch to "used all" state. In that state any + * additional allocations are passed directly to the underlying breaker. + *

+ * De-allocating is just allocating a negative number of bytes. De-allocating + * can not transition us from the "used all" state back into the + * "used fewer bytes" state. It is a one way trip. Once we're in the + * "used all" state all de-allocates are done directly on the underlying + * breaker. So well behaved callers will naturally de-allocate everything. + *

+ * {@link PreallocedCircuitBreaker#close()} is only used to de-allocate + * bytes from the underlying breaker if we're still in the "used fewer bytes" + * state. There is nothing to de-allocate if we are in the "used all" state. + */ + private static class PreallocedCircuitBreaker implements CircuitBreaker, Releasable { + private final CircuitBreaker next; + private final long preallocated; + private long preallocationUsed; + private boolean closed; + + PreallocedCircuitBreaker(CircuitBreaker next, long preallocated) { + this.next = next; + this.preallocated = preallocated; + } + + @Override + public void circuitBreak(String fieldName, long bytesNeeded) { + next.circuitBreak(fieldName, bytesNeeded); + } + + @Override + public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { + if (closed) { + throw new IllegalStateException("already closed"); + } + if (preallocationUsed == preallocated) { + // Preallocation buffer was full before this request + return next.addEstimateBytesAndMaybeBreak(bytes, label); + } + long newUsed = preallocationUsed + bytes; + if (newUsed > preallocated) { + // This request filled up the buffer + preallocationUsed = preallocated; + return next.addEstimateBytesAndMaybeBreak(newUsed - preallocated, label); + } + // This is the fast case. No volatile reads or writes here, ma! + preallocationUsed = newUsed; + // We return garbage here but callers never use the result for anything interesting + return 0; + } + + @Override + public long addWithoutBreaking(long bytes) { + if (closed) { + throw new IllegalStateException("already closed"); + } + if (preallocationUsed == preallocated) { + // Preallocation buffer was full before this request + return next.addWithoutBreaking(bytes); + } + long newUsed = preallocationUsed + bytes; + if (newUsed > preallocated) { + // This request filled up the buffer + preallocationUsed = preallocated; + return next.addWithoutBreaking(newUsed - preallocated); + } + // This is the fast case. No volatile reads or writes here, ma! + preallocationUsed = newUsed; + // We return garbage here but callers never use the result for anything interesting + return 0; + } + + @Override + public String getName() { + return next.getName(); + } + + @Override + public void close() { + if (closed) { + return; + } + if (preallocationUsed < preallocated) { + /* + * We only need to give bytes back if we haven't used up + * all of our preallocated bytes. This is because if we + * *have* used up all of our preallcated bytes then all + * operations hit the underlying breaker directly, including + * deallocations. This is using up the bytes is a one way + * transition - as soon as we transition we know all + * deallocations will go directly to the underlying breaker. + */ + next.addWithoutBreaking(-preallocated); + } + closed = true; + } + + @Override + public long getUsed() { + throw new UnsupportedOperationException(); + } + + @Override + public long getLimit() { + throw new UnsupportedOperationException(); + } + + @Override + public double getOverhead() { + throw new UnsupportedOperationException(); + } + + @Override + public long getTrippedCount() { + throw new UnsupportedOperationException(); + } + + @Override + public Durability getDurability() { + throw new UnsupportedOperationException(); + } + + @Override + public void setLimitAndOverhead(long limit, double overhead) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java index d703761f6076c..6e5ffd94d7905 100644 --- a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.recycler.Recycler; @@ -430,6 +431,14 @@ public BigArrays withCircuitBreaking() { return this.circuitBreakingInstance; } + /** + * Creates a new {@link BigArray} pointing at the specified + * {@link CircuitBreakerService}. Use with {@link PreallocatedCircuitBreakerService}. + */ + public BigArrays withBreakerService(CircuitBreakerService breakerService) { + return new BigArrays(recycler, breakerService, breakerName, checkBreaker); + } + public CircuitBreakerService breakerService() { // TODO this feels like it is for tests but it has escaped return this.circuitBreakingInstance.breakerService; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index a151091447f49..b2181a1a8295d 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -938,16 +938,17 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.aggregations() != null && includeAggregations) { AggregationContext aggContext = new ProductionAggregationContext( context.getQueryShardContext(), + source.aggregations().bytesToPreallocate(), context.query() == null ? new MatchAllDocsQuery() : context.query(), context.getProfilers() == null ? null : context.getProfilers().getAggregationProfiler(), multiBucketConsumerService.create(), () -> new SubSearchContext(context).parsedQuery(context.parsedQuery()).fetchFieldsContext(context.fetchFieldsContext()), - context::addReleasable, context.bitsetFilterCache(), context.indexShard().shardId().hashCode(), context::getRelativeTimeInMillis, context::isCancelled ); + context.addReleasable(aggContext); try { AggregatorFactories factories = source.aggregations().build(aggContext, null); context.aggregations(new SearchContextAggregations(factories)); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java index 4f1bc2ccd47a1..e9fe75c017939 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java @@ -39,6 +39,7 @@ */ public abstract class AggregationBuilder implements NamedWriteable, ToXContentFragment, BaseAggregationBuilder, Rewriteable { + public static final long DEFAULT_PREALLOCATION = 1024 * 6; protected final String name; protected AggregatorFactories.Builder factoriesBuilder = AggregatorFactories.builder(); @@ -158,6 +159,18 @@ public enum BucketCardinality { */ public abstract BucketCardinality bucketCardinality(); + /** + * Bytes to preallocate on the "request" breaker for this aggregation. The + * goal is to request a few more bytes than we expect to use at first to + * cut down on contention on the "request" breaker when we are constructing + * the aggs. Underestimating what we allocate up front will fail to + * accomplish the goal. Overestimating will cause requests to fail for no + * reason. + */ + public long bytesToPreallocate() { + return DEFAULT_PREALLOCATION; + } + /** Common xcontent fields shared among aggregator builders */ public static final class CommonFields extends ParseField.CommonFields { public static final ParseField VALUE_TYPE = new ParseField("value_type"); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 4d7183e49bd75..7fc6d4518f98a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -476,6 +476,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + /** + * Bytes to preallocate on the "request" breaker for these aggregations. The + * goal is to request a few more bytes than we expect to use at first to + * cut down on contention on the "request" breaker when we are constructing + * the aggs. Underestimating what we allocate up front will fail to + * accomplish the goal. Overestimating will cause requests to fail for no + * reason. + */ + public long bytesToPreallocate() { + return aggregationBuilders.stream().mapToLong(b -> b.bytesToPreallocate() + b.factoriesBuilder.bytesToPreallocate()).sum(); + } + @Override public String toString() { return Strings.toString(this, true, true); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java index d94c315e2f9a8..0870b55136d6c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java @@ -25,6 +25,9 @@ import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.analysis.NamedAnalyzer; @@ -49,9 +52,9 @@ import org.elasticsearch.search.sort.SortBuilder; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -63,8 +66,13 @@ * In production we always use the {@link ProductionAggregationContext} but * this is {@code abstract} so that tests can build it without creating the * massing {@link QueryShardContext}. + *

+ * {@linkplain AggregationContext}s are {@link Releasable} because they track + * the {@link Aggregator}s they build and {@link Aggregator#close} them when + * the request is done. {@linkplain AggregationContext} may also preallocate + * bytes on the "REQUEST" breaker and is responsible for releasing those bytes. */ -public abstract class AggregationContext { +public abstract class AggregationContext implements Releasable { /** * The query at the top level of the search in which these aggregations are running. */ @@ -246,36 +254,55 @@ public final AggregationUsageService getUsageService() { */ public static class ProductionAggregationContext extends AggregationContext { private final QueryShardContext context; + private final PreallocatedCircuitBreakerService breakerService; private final BigArrays bigArrays; private final Query topLevelQuery; private final AggregationProfiler profiler; private final MultiBucketConsumer multiBucketConsumer; private final Supplier subSearchContextBuilder; - private final Consumer addReleasable; private final BitsetFilterCache bitsetFilterCache; private final int randomSeed; private final LongSupplier relativeTimeInMillis; private final Supplier isCancelled; + private final List releaseMe = new ArrayList<>(); + public ProductionAggregationContext( QueryShardContext context, + long bytesToPreallocate, Query topLevelQuery, @Nullable AggregationProfiler profiler, MultiBucketConsumer multiBucketConsumer, Supplier subSearchContextBuilder, - Consumer addReleasable, BitsetFilterCache bitsetFilterCache, int randomSeed, LongSupplier relativeTimeInMillis, Supplier isCancelled ) { this.context = context; - this.bigArrays = context.bigArrays().withCircuitBreaking(); // We can break in searches. + if (bytesToPreallocate == 0) { + /* + * Its possible if a bit strange for the aggregations to ask + * to preallocate 0 bytes. Mostly this is for testing other + * things, but we should honor it and just not preallocate + * anything. Setting the breakerService reference to null will + * cause us to skip it when we close this context. + */ + this.breakerService = null; + this.bigArrays = context.bigArrays().withCircuitBreaking(); + } else { + this.breakerService = new PreallocatedCircuitBreakerService( + context.bigArrays().breakerService(), + CircuitBreaker.REQUEST, + bytesToPreallocate, + "aggregations" + ); + this.bigArrays = context.bigArrays().withBreakerService(breakerService).withCircuitBreaking(); + } this.topLevelQuery = topLevelQuery; this.profiler = profiler; this.multiBucketConsumer = multiBucketConsumer; this.subSearchContextBuilder = subSearchContextBuilder; - this.addReleasable = addReleasable; this.bitsetFilterCache = bitsetFilterCache; this.randomSeed = randomSeed; this.relativeTimeInMillis = relativeTimeInMillis; @@ -377,7 +404,7 @@ public SubSearchContext subSearchContext() { @Override public void addReleasable(Aggregator aggregator) { - addReleasable.accept(aggregator); + releaseMe.add(aggregator); } @Override @@ -429,5 +456,16 @@ public Version indexVersionCreated() { public boolean isCacheable() { return context.isCacheable(); } + + @Override + public void close() { + /* + * Add the breakerService to the end of the list so we release it + * after all the aggregations that allocate bytes on it. + */ + List releaseMe = new ArrayList<>(this.releaseMe); + releaseMe.add(breakerService); + Releasables.close(releaseMe); + } } } diff --git a/server/src/test/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerServiceTests.java new file mode 100644 index 0000000000000..eb7bf3f53b11b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerServiceTests.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; + +public class PreallocatedCircuitBreakerServiceTests extends ESTestCase { + public void testUseNotPreallocated() { + try (HierarchyCircuitBreakerService real = real()) { + try (PreallocatedCircuitBreakerService preallocated = preallocateRequest(real, 1024)) { + CircuitBreaker b = preallocated.getBreaker(CircuitBreaker.ACCOUNTING); + b.addEstimateBytesAndMaybeBreak(100, "test"); + b.addWithoutBreaking(-100); + } + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + assertThat(real.getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L)); + } + } + + public void testUseLessThanPreallocated() { + try (HierarchyCircuitBreakerService real = real()) { + try (PreallocatedCircuitBreakerService preallocated = preallocateRequest(real, 1024)) { + CircuitBreaker b = preallocated.getBreaker(CircuitBreaker.REQUEST); + b.addEstimateBytesAndMaybeBreak(100, "test"); + b.addWithoutBreaking(-100); + } + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + } + } + + public void testCloseIsIdempotent() { + try (HierarchyCircuitBreakerService real = real()) { + try (PreallocatedCircuitBreakerService preallocated = preallocateRequest(real, 1024)) { + CircuitBreaker b = preallocated.getBreaker(CircuitBreaker.REQUEST); + b.addEstimateBytesAndMaybeBreak(100, "test"); + b.addWithoutBreaking(-100); + preallocated.close(); + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + } // Closes again which should do nothing + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + } + } + + public void testUseMoreThanPreallocated() { + try (HierarchyCircuitBreakerService real = real()) { + try (PreallocatedCircuitBreakerService preallocated = preallocateRequest(real, 1024)) { + CircuitBreaker b = preallocated.getBreaker(CircuitBreaker.REQUEST); + b.addEstimateBytesAndMaybeBreak(2048, "test"); + b.addWithoutBreaking(-2048); + } + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + } + } + + public void testPreallocateMoreThanRemains() { + try (HierarchyCircuitBreakerService real = real()) { + long limit = real.getBreaker(CircuitBreaker.REQUEST).getLimit(); + Exception e = expectThrows(CircuitBreakingException.class, () -> preallocateRequest(real, limit + 1024)); + assertThat(e.getMessage(), startsWith("[request] Data too large, data for [preallocate[test]] would be [")); + } + } + + public void testRandom() { + try (HierarchyCircuitBreakerService real = real()) { + CircuitBreaker realBreaker = real.getBreaker(CircuitBreaker.REQUEST); + long preallocatedBytes = randomLongBetween(1, realBreaker.getLimit()); + try (PreallocatedCircuitBreakerService preallocated = preallocateRequest(real, preallocatedBytes)) { + CircuitBreaker b = preallocated.getBreaker(CircuitBreaker.REQUEST); + boolean usedPreallocated = false; + long current = 0; + for (int i = 0; i < 10000; i++) { + if (current >= preallocatedBytes) { + usedPreallocated = true; + } + if (usedPreallocated) { + assertThat(realBreaker.getUsed(), equalTo(current)); + } else { + assertThat(realBreaker.getUsed(), equalTo(preallocatedBytes)); + } + if (current > 0 && randomBoolean()) { + long delta = randomLongBetween(Math.max(-current, -realBreaker.getLimit() / 100), 0); + b.addWithoutBreaking(delta); + current += delta; + continue; + } + long delta = randomLongBetween(0, realBreaker.getLimit() / 100); + if (randomBoolean()) { + b.addWithoutBreaking(delta); + current += delta; + continue; + } + if (current + delta < realBreaker.getLimit()) { + b.addEstimateBytesAndMaybeBreak(delta, "test"); + current += delta; + continue; + } + Exception e = expectThrows(CircuitBreakingException.class, () -> b.addEstimateBytesAndMaybeBreak(delta, "test")); + assertThat(e.getMessage(), startsWith("[request] Data too large, data for [test] would be [")); + } + b.addWithoutBreaking(-current); + } + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + } + } + + private HierarchyCircuitBreakerService real() { + return new HierarchyCircuitBreakerService( + Settings.EMPTY, + org.elasticsearch.common.collect.List.of(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + } + + private PreallocatedCircuitBreakerService preallocateRequest(CircuitBreakerService real, long bytes) { + return new PreallocatedCircuitBreakerService(real, CircuitBreaker.REQUEST, bytes, "test"); + } +} diff --git a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java index 0f150d91368f4..513f2b9516a4c 100644 --- a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java @@ -22,16 +22,18 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; -import org.junit.Before; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -41,19 +43,10 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public class BigArraysTests extends ESTestCase { - - private BigArrays randombigArrays() { - return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); - } - - private BigArrays bigArrays; - - @Before - public void init() { - bigArrays = randombigArrays(); - } + private final BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); public void testByteArrayGrowth() { final int totalLen = randomIntBetween(1, 4000000); @@ -402,6 +395,55 @@ public void testOverSizeUsesMinPageCount() { assertThat(size - minSize, lessThan((long) pageSize)); } + /** + * Test the pattern we use to pre-allocate space for many {@link BigArray}s. + */ + public void testPreallocate() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + try ( + HierarchyCircuitBreakerService realBreakers = new HierarchyCircuitBreakerService( + Settings.EMPTY, + org.elasticsearch.common.collect.List.of(), + clusterSettings + ) + ) { + BigArrays unPreAllocated = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), realBreakers); + long toPreallocate = randomLongBetween(4000, 10000); + CircuitBreaker realBreaker = realBreakers.getBreaker(CircuitBreaker.REQUEST); + assertThat(realBreaker.getUsed(), equalTo(0L)); + try ( + PreallocatedCircuitBreakerService prealloctedBreakerService = new PreallocatedCircuitBreakerService( + realBreakers, + CircuitBreaker.REQUEST, + toPreallocate, + "test" + ) + ) { + assertThat(realBreaker.getUsed(), equalTo(toPreallocate)); + BigArrays preallocated = unPreAllocated.withBreakerService(prealloctedBreakerService); + + // We don't grab any bytes just making a new BigArrays + assertThat(realBreaker.getUsed(), equalTo(toPreallocate)); + + List arrays = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + // We're well under the preallocation so grabbing a little array doesn't allocate anything + arrays.add(preallocated.newLongArray(1)); + assertThat(realBreaker.getUsed(), equalTo(toPreallocate)); + } + + // Allocating a large array *does* allocate some bytes + arrays.add(preallocated.newLongArray(1024)); + long expectedMin = (PageCacheRecycler.LONG_PAGE_SIZE + arrays.size()) * Long.BYTES; + assertThat(realBreaker.getUsed(), greaterThanOrEqualTo(expectedMin)); + // 64 should be enough room for each BigArray object + assertThat(realBreaker.getUsed(), lessThanOrEqualTo(expectedMin + 64 * arrays.size())); + Releasables.close(arrays); + } + assertThat(realBreaker.getUsed(), equalTo(0L)); + } + } + private List bigArrayCreators(final long maxSize, final boolean withBreaking) { final BigArrays byteBigArrays = newBigArraysInstance(maxSize, withBreaking); BigArraysHelper byteHelper = new BigArraysHelper(byteBigArrays, diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java index 0bb2f05c20035..3ba429880eaf3 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java @@ -628,11 +628,11 @@ public Collection getChildResources() { } } - private static class LimitedBreaker extends NoopCircuitBreaker { + public static class LimitedBreaker extends NoopCircuitBreaker { private final AtomicLong used = new AtomicLong(); private final ByteSizeValue max; - LimitedBreaker(String name, ByteSizeValue max) { + public LimitedBreaker(String name, ByteSizeValue max) { super(name); this.max = max; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java index 324180077d5e7..b7eacc400f458 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java @@ -476,6 +476,11 @@ public Version indexVersionCreated() { public boolean isCacheable() { throw new UnsupportedOperationException(); } + + @Override + public void close() { + throw new UnsupportedOperationException(); + } }; } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 0a33532b3ce3d..a9bc18d93e8e4 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -55,6 +55,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.network.NetworkAddress; @@ -156,7 +157,7 @@ * {@link AggregationBuilder} instance. */ public abstract class AggregatorTestCase extends ESTestCase { - private List releasables = new ArrayList<>(); + private List releasables = new ArrayList<>(); protected ValuesSourceRegistry valuesSourceRegistry; // A list of field types that should not be tested, or are not currently supported @@ -206,21 +207,38 @@ protected A createAggregator(AggregationBuilder builder, } /** - * Create a {@linkplain SearchContext} for testing an {@link Aggregator}. + * Create a {@linkplain AggregationContext} for testing an {@link Aggregator}. + * While {@linkplain AggregationContext} is {@link Releasable} the caller is + * not responsible for releasing it. Instead, it is released automatically in + * in {@link #cleanupReleasables()}. */ protected AggregationContext createAggregationContext( IndexSearcher indexSearcher, Query query, MappedFieldType... fieldTypes ) throws IOException { - CircuitBreakerService breakerService = new NoneCircuitBreakerService(); - return createAggregationContext(indexSearcher, createIndexSettings(), query, breakerService, DEFAULT_MAX_BUCKETS, fieldTypes); + return createAggregationContext( + indexSearcher, + createIndexSettings(), + query, + new NoneCircuitBreakerService(), + AggregationBuilder.DEFAULT_PREALLOCATION * 5, // We don't know how many bytes to preallocate so we grab a hand full + DEFAULT_MAX_BUCKETS, + fieldTypes + ); } + /** + * Create a {@linkplain AggregationContext} for testing an {@link Aggregator}. + * While {@linkplain AggregationContext} is {@link Releasable} the caller is + * not responsible for releasing it. Instead, it is released automatically in + * in {@link #cleanupReleasables()}. + */ protected AggregationContext createAggregationContext(IndexSearcher indexSearcher, IndexSettings indexSettings, Query query, CircuitBreakerService breakerService, + long bytesToPreallocate, int maxBucket, MappedFieldType... fieldTypes) throws IOException { /* @@ -279,18 +297,20 @@ public void onCache(ShardId shardId, Accountable accountable) {} ); MultiBucketConsumer consumer = new MultiBucketConsumer(maxBucket, breakerService.getBreaker(CircuitBreaker.REQUEST)); - return new ProductionAggregationContext( + AggregationContext context = new ProductionAggregationContext( queryShardContext, + bytesToPreallocate, query, null, consumer, () -> buildSubSearchContext(indexSettings, queryShardContext, bitsetFilterCache), - releasables::add, bitsetFilterCache, randomInt(), () -> 0L, () -> false ); + releasables.add(context); + return context; } /** @@ -416,7 +436,15 @@ protected A searchAndReduc List aggs = new ArrayList<>(); Query rewritten = searcher.rewrite(query); CircuitBreakerService breakerService = new NoneCircuitBreakerService(); - AggregationContext context = createAggregationContext(searcher, indexSettings, query, breakerService, maxBucket, fieldTypes); + AggregationContext context = createAggregationContext( + searcher, + indexSettings, + query, + breakerService, + builder.bytesToPreallocate(), + maxBucket, + fieldTypes + ); C root = createAggregator(builder, context); if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) { diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java index 57ca6c6954fcc..de55741fb8b4d 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java @@ -29,10 +29,10 @@ import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; -import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.index.fielddata.ScriptDocValues; import org.elasticsearch.index.mapper.GeoPointFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; @@ -332,26 +332,7 @@ public void testTonsOfBucketsTriggersBreaker() throws IOException { // Build a "simple" circuit breaker that trips at 20k CircuitBreakerService breaker = mock(CircuitBreakerService.class); ByteSizeValue max = new ByteSizeValue(20, ByteSizeUnit.KB); - when(breaker.getBreaker(CircuitBreaker.REQUEST)).thenReturn(new NoopCircuitBreaker(CircuitBreaker.REQUEST) { - private long total = 0; - - @Override - public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { - logger.debug("Used {} grabbing {} for {}", total, bytes, label); - total += bytes; - if (total > max.getBytes()) { - throw new CircuitBreakingException("test error", bytes, max.getBytes(), Durability.TRANSIENT); - } - return total; - } - - @Override - public long addWithoutBreaking(long bytes) { - logger.debug("Used {} grabbing {}", total, bytes); - total += bytes; - return total; - } - }); + when(breaker.getBreaker(CircuitBreaker.REQUEST)).thenReturn(new MockBigArrays.LimitedBreaker(CircuitBreaker.REQUEST, max)); // Collect some buckets with it try (Directory directory = newDirectory()) { @@ -361,15 +342,16 @@ public long addWithoutBreaking(long bytes) { try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = newSearcher(indexReader, false, false); + TopMetricsAggregationBuilder builder = simpleBuilder(new FieldSortBuilder("s").order(SortOrder.ASC)); AggregationContext context = createAggregationContext( indexSearcher, createIndexSettings(), new MatchAllDocsQuery(), breaker, + builder.bytesToPreallocate(), MultiBucketConsumerService.DEFAULT_MAX_BUCKETS, doubleFields() ); - TopMetricsAggregationBuilder builder = simpleBuilder(new FieldSortBuilder("s").order(SortOrder.ASC)); Aggregator aggregator = builder.build(context, null).create(null, CardinalityUpperBound.ONE); aggregator.preCollection(); assertThat(indexReader.leaves(), hasSize(1));