diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java new file mode 100644 index 0000000000000..152c0a7f4dbdf --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/AggConstructionContentionBenchmark.java @@ -0,0 +1,342 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.benchmark.search.aggregations; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.analysis.NamedAnalyzer; +import org.elasticsearch.index.cache.bitset.BitsetFilterCache; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; +import org.elasticsearch.index.mapper.ObjectMapper; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.support.NestedScope; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; +import org.elasticsearch.search.internal.SubSearchContext; +import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.search.sort.BucketedSort; +import org.elasticsearch.search.sort.BucketedSort.ExtraData; +import org.elasticsearch.search.sort.SortAndFormats; +import org.elasticsearch.search.sort.SortBuilder; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +/** + * Benchmarks the overhead of constructing {@link Aggregator}s in many + * parallel threads. Machines with different numbers of cores will see + * wildly different results running this from running this with more + * cores seeing more benefits from preallocation. + */ +@Fork(2) +@Warmup(iterations = 10) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Benchmark) +@Threads(Threads.MAX) +public class AggConstructionContentionBenchmark { + private final SearchModule searchModule = new SearchModule(Settings.EMPTY, List.of()); + private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + private final PageCacheRecycler recycler = new PageCacheRecycler(Settings.EMPTY); + private final Index index = new Index("test", "uuid"); + private final IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache( + Settings.EMPTY, + new IndexFieldDataCache.Listener() { + } + ); + + private CircuitBreakerService breakerService; + private BigArrays bigArrays; + private boolean preallocateBreaker; + + @Param({ "noop", "real", "preallocate" }) + private String breaker; + + @Setup + public void setup() { + switch (breaker) { + case "real": + breakerService = new HierarchyCircuitBreakerService(Settings.EMPTY, List.of(), clusterSettings); + break; + case "preallocate": + preallocateBreaker = true; + breakerService = new HierarchyCircuitBreakerService(Settings.EMPTY, List.of(), clusterSettings); + break; + case "noop": + breakerService = new NoneCircuitBreakerService(); + break; + default: + throw new UnsupportedOperationException(); + } + bigArrays = new BigArrays(recycler, breakerService, "request"); + } + + @Benchmark + public void sum() throws IOException { + buildFactories(new AggregatorFactories.Builder().addAggregator(new SumAggregationBuilder("s").field("int_1"))); + } + + @Benchmark + public void termsSum() throws IOException { + buildFactories( + new AggregatorFactories.Builder().addAggregator( + new TermsAggregationBuilder("t").field("int_1").subAggregation(new SumAggregationBuilder("s").field("int_2")) + ) + ); + } + + @Benchmark + public void termsSixtySums() throws IOException { + TermsAggregationBuilder b = new TermsAggregationBuilder("t").field("int_1"); + for (int i = 0; i < 60; i++) { + b.subAggregation(new SumAggregationBuilder("s" + i).field("int_" + i)); + } + buildFactories(new AggregatorFactories.Builder().addAggregator(b)); + } + + private void buildFactories(AggregatorFactories.Builder factories) throws IOException { + try (DummyAggregationContext context = new DummyAggregationContext(factories.bytesToPreallocate())) { + factories.build(context, null).createTopLevelAggregators(); + } + } + + private class DummyAggregationContext extends AggregationContext { + private final Query query = new MatchAllDocsQuery(); + private final List releaseMe = new ArrayList<>(); + + private final CircuitBreaker breaker; + private final PreallocatedCircuitBreakerService preallocated; + private final MultiBucketConsumer multiBucketConsumer; + + DummyAggregationContext(long bytesToPreallocate) { + CircuitBreakerService breakerService; + if (preallocateBreaker) { + breakerService = preallocated = new PreallocatedCircuitBreakerService( + AggConstructionContentionBenchmark.this.breakerService, + CircuitBreaker.REQUEST, + bytesToPreallocate, + "aggregations" + ); + } else { + breakerService = AggConstructionContentionBenchmark.this.breakerService; + preallocated = null; + } + breaker = breakerService.getBreaker(CircuitBreaker.REQUEST); + multiBucketConsumer = new MultiBucketConsumer(Integer.MAX_VALUE, breaker); + } + + @Override + public Query query() { + return query; + } + + @Override + public Aggregator profileIfEnabled(Aggregator agg) throws IOException { + return agg; + } + + @Override + public boolean profiling() { + return false; + } + + @Override + public long nowInMillis() { + return 0; + } + + @Override + protected IndexFieldData buildFieldData(MappedFieldType ft) { + IndexFieldDataCache indexFieldDataCache = indicesFieldDataCache.buildIndexFieldDataCache(new IndexFieldDataCache.Listener() { + }, index, ft.name()); + return ft.fielddataBuilder("test", this::lookup).build(indexFieldDataCache, breakerService); + } + + @Override + public MappedFieldType getFieldType(String path) { + if (path.startsWith("int")) { + return new NumberFieldMapper.NumberFieldType(path, NumberType.INTEGER); + } + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFieldMapped(String field) { + return field.startsWith("int"); + } + + @Override + public FactoryType compile(Script script, ScriptContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public SearchLookup lookup() { + throw new UnsupportedOperationException(); + } + + @Override + public ValuesSourceRegistry getValuesSourceRegistry() { + return searchModule.getValuesSourceRegistry(); + } + + @Override + public BigArrays bigArrays() { + return bigArrays; + } + + @Override + public IndexSearcher searcher() { + return null; + } + + @Override + public Query buildQuery(QueryBuilder builder) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public IndexSettings getIndexSettings() { + throw new UnsupportedOperationException(); + } + + @Override + public Optional buildSort(List> sortBuilders) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ObjectMapper getObjectMapper(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public NestedScope nestedScope() { + throw new UnsupportedOperationException(); + } + + @Override + public SubSearchContext subSearchContext() { + throw new UnsupportedOperationException(); + } + + @Override + public void addReleasable(Aggregator aggregator) { + releaseMe.add(aggregator); + } + + @Override + public MultiBucketConsumer multiBucketConsumer() { + return multiBucketConsumer; + } + + @Override + public BitsetFilterCache bitsetFilterCache() { + throw new UnsupportedOperationException(); + } + + @Override + public BucketedSort buildBucketedSort(SortBuilder sort, int size, ExtraData values) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int shardRandomSeed() { + return 0; + } + + @Override + public long getRelativeTimeInMillis() { + return 0; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public CircuitBreaker breaker() { + return breaker; + } + + @Override + public Analyzer getIndexAnalyzer(Function unindexedFieldAnalyzer) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCacheable() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + List releaseMe = new ArrayList<>(this.releaseMe); + releaseMe.add(preallocated); + Releasables.close(releaseMe); + } + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java index b5c67ab21b876..ba0ec8fac78ee 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java @@ -533,6 +533,11 @@ public BucketCardinality bucketCardinality() { public String getType() { return "test"; } + + @Override + public long bytesToPreallocate() { + return 0; + } } /** @@ -567,13 +572,13 @@ public Aggregator subAggregator(String name) { @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { return new InternalAggregation[] { - new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap()) + buildEmptyAggregation() }; } @Override public InternalAggregation buildEmptyAggregation() { - return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap()); + return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, null); } @Override diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 459676087a370..592bc3adfadb2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -604,7 +604,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception { .addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get()); logger.info("--> got an expected exception", e); assertThat(e.getCause(), notNullValue()); - assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for []")); + assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [preallocate[aggregations]]")); client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java index f892e9a31da1a..0fbf3487e0422 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/memory/breaker/CircuitBreakerServiceIT.java @@ -252,7 +252,7 @@ public void testRequestBreaker() throws Exception { } } - public void testBucketBreaker() throws Exception { + public void testAggTookTooMuch() throws Exception { if (noopBreakerUsed()) { logger.info("--> noop breakers used, skipping test"); return; @@ -285,7 +285,7 @@ public void testBucketBreaker() throws Exception { } catch (Exception e) { Throwable cause = e.getCause(); assertThat(cause, instanceOf(CircuitBreakingException.class)); - assertThat(cause.toString(), containsString("[request] Data too large, data for [] would be")); + assertThat(cause.toString(), containsString("[request] Data too large, data for [preallocate[aggregations]] would be")); assertThat(cause.toString(), containsString("which is larger than the limit of [100/100b]")); } } diff --git a/server/src/main/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerService.java b/server/src/main/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerService.java new file mode 100644 index 0000000000000..5c185df824636 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerService.java @@ -0,0 +1,214 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.indices.breaker.AllCircuitBreakerStats; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.CircuitBreakerStats; + +/** + * {@link CircuitBreakerService} that preallocates some bytes on construction. + * Use this when you know you'll be allocating many small things on a + * {@link CircuitBreaker} quickly and there is a definite "finished" time, like + * when aggregations are built. + */ +public class PreallocatedCircuitBreakerService extends CircuitBreakerService implements Releasable { + private final CircuitBreakerService next; + private final PreallocedCircuitBreaker preallocated; + + public PreallocatedCircuitBreakerService( + CircuitBreakerService next, + String breakerToPreallocate, + long bytesToPreallocate, + String label + ) { + if (bytesToPreallocate <= 0) { + throw new IllegalArgumentException("can't preallocate negative or zero bytes but got [" + bytesToPreallocate + "]"); + } + CircuitBreaker nextBreaker = next.getBreaker(breakerToPreallocate); + nextBreaker.addEstimateBytesAndMaybeBreak(bytesToPreallocate, "preallocate[" + label + "]"); + this.next = next; + this.preallocated = new PreallocedCircuitBreaker(nextBreaker, bytesToPreallocate); + } + + @Override + public CircuitBreaker getBreaker(String name) { + if (name.equals(preallocated.getName())) { + return preallocated; + } + return next.getBreaker(name); + } + + @Override + public AllCircuitBreakerStats stats() { + throw new UnsupportedOperationException(); + } + + @Override + public CircuitBreakerStats stats(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + preallocated.close(); + } + + /** + * The preallocated breaker. + *

+ * This breaker operates in two states: + *

    + *
  1. We've used fewer bytes than we've preallocated. + *
  2. We've used all of the preallocated bytes. + *
+ *

+ * If we're in the "used fewer bytes" state than we've allocated then + * allocating new bytes just adds to + * {@link PreallocedCircuitBreaker#preallocationUsed}, maxing out at + * {@link PreallocedCircuitBreaker#preallocated}. If we max + * out we irreversibly switch to "used all" state. In that state any + * additional allocations are passed directly to the underlying breaker. + *

+ * De-allocating is just allocating a negative number of bytes. De-allocating + * can not transition us from the "used all" state back into the + * "used fewer bytes" state. It is a one way trip. Once we're in the + * "used all" state all de-allocates are done directly on the underlying + * breaker. So well behaved callers will naturally de-allocate everything. + *

+ * {@link PreallocedCircuitBreaker#close()} is only used to de-allocate + * bytes from the underlying breaker if we're still in the "used fewer bytes" + * state. There is nothing to de-allocate if we are in the "used all" state. + */ + private static class PreallocedCircuitBreaker implements CircuitBreaker, Releasable { + private final CircuitBreaker next; + private final long preallocated; + private long preallocationUsed; + private boolean closed; + + PreallocedCircuitBreaker(CircuitBreaker next, long preallocated) { + this.next = next; + this.preallocated = preallocated; + } + + @Override + public void circuitBreak(String fieldName, long bytesNeeded) { + next.circuitBreak(fieldName, bytesNeeded); + } + + @Override + public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { + if (closed) { + throw new IllegalStateException("already closed"); + } + if (preallocationUsed == preallocated) { + // Preallocation buffer was full before this request + return next.addEstimateBytesAndMaybeBreak(bytes, label); + } + long newUsed = preallocationUsed + bytes; + if (newUsed > preallocated) { + // This request filled up the buffer + preallocationUsed = preallocated; + return next.addEstimateBytesAndMaybeBreak(newUsed - preallocated, label); + } + // This is the fast case. No volatile reads or writes here, ma! + preallocationUsed = newUsed; + // We return garbage here but callers never use the result for anything interesting + return 0; + } + + @Override + public long addWithoutBreaking(long bytes) { + if (closed) { + throw new IllegalStateException("already closed"); + } + if (preallocationUsed == preallocated) { + // Preallocation buffer was full before this request + return next.addWithoutBreaking(bytes); + } + long newUsed = preallocationUsed + bytes; + if (newUsed > preallocated) { + // This request filled up the buffer + preallocationUsed = preallocated; + return next.addWithoutBreaking(newUsed - preallocated); + } + // This is the fast case. No volatile reads or writes here, ma! + preallocationUsed = newUsed; + // We return garbage here but callers never use the result for anything interesting + return 0; + } + + @Override + public String getName() { + return next.getName(); + } + + @Override + public void close() { + if (closed) { + return; + } + if (preallocationUsed < preallocated) { + /* + * We only need to give bytes back if we haven't used up + * all of our preallocated bytes. This is because if we + * *have* used up all of our preallcated bytes then all + * operations hit the underlying breaker directly, including + * deallocations. This is using up the bytes is a one way + * transition - as soon as we transition we know all + * deallocations will go directly to the underlying breaker. + */ + next.addWithoutBreaking(-preallocated); + } + closed = true; + } + + @Override + public long getUsed() { + throw new UnsupportedOperationException(); + } + + @Override + public long getLimit() { + throw new UnsupportedOperationException(); + } + + @Override + public double getOverhead() { + throw new UnsupportedOperationException(); + } + + @Override + public long getTrippedCount() { + throw new UnsupportedOperationException(); + } + + @Override + public Durability getDurability() { + throw new UnsupportedOperationException(); + } + + @Override + public void setLimitAndOverhead(long limit, double overhead) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java index d703761f6076c..6e5ffd94d7905 100644 --- a/server/src/main/java/org/elasticsearch/common/util/BigArrays.java +++ b/server/src/main/java/org/elasticsearch/common/util/BigArrays.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.recycler.Recycler; @@ -430,6 +431,14 @@ public BigArrays withCircuitBreaking() { return this.circuitBreakingInstance; } + /** + * Creates a new {@link BigArray} pointing at the specified + * {@link CircuitBreakerService}. Use with {@link PreallocatedCircuitBreakerService}. + */ + public BigArrays withBreakerService(CircuitBreakerService breakerService) { + return new BigArrays(recycler, breakerService, breakerName, checkBreaker); + } + public CircuitBreakerService breakerService() { // TODO this feels like it is for tests but it has escaped return this.circuitBreakingInstance.breakerService; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 4a10bb479bb7e..901cc96d06a89 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -940,16 +940,17 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc if (source.aggregations() != null && includeAggregations) { AggregationContext aggContext = new ProductionAggregationContext( context.getQueryShardContext(), + source.aggregations().bytesToPreallocate(), context.query() == null ? new MatchAllDocsQuery() : context.query(), context.getProfilers() == null ? null : context.getProfilers().getAggregationProfiler(), multiBucketConsumerService.create(), () -> new SubSearchContext(context).parsedQuery(context.parsedQuery()).fetchFieldsContext(context.fetchFieldsContext()), - context::addReleasable, context.bitsetFilterCache(), context.indexShard().shardId().hashCode(), context::getRelativeTimeInMillis, context::isCancelled ); + context.addReleasable(aggContext); try { AggregatorFactories factories = source.aggregations().build(aggContext, null); context.aggregations(new SearchContextAggregations(factories)); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java index 4f1bc2ccd47a1..e9fe75c017939 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilder.java @@ -39,6 +39,7 @@ */ public abstract class AggregationBuilder implements NamedWriteable, ToXContentFragment, BaseAggregationBuilder, Rewriteable { + public static final long DEFAULT_PREALLOCATION = 1024 * 6; protected final String name; protected AggregatorFactories.Builder factoriesBuilder = AggregatorFactories.builder(); @@ -158,6 +159,18 @@ public enum BucketCardinality { */ public abstract BucketCardinality bucketCardinality(); + /** + * Bytes to preallocate on the "request" breaker for this aggregation. The + * goal is to request a few more bytes than we expect to use at first to + * cut down on contention on the "request" breaker when we are constructing + * the aggs. Underestimating what we allocate up front will fail to + * accomplish the goal. Overestimating will cause requests to fail for no + * reason. + */ + public long bytesToPreallocate() { + return DEFAULT_PREALLOCATION; + } + /** Common xcontent fields shared among aggregator builders */ public static final class CommonFields extends ParseField.CommonFields { public static final ParseField VALUE_TYPE = new ParseField("value_type"); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index 4d7183e49bd75..7fc6d4518f98a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -476,6 +476,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } + /** + * Bytes to preallocate on the "request" breaker for these aggregations. The + * goal is to request a few more bytes than we expect to use at first to + * cut down on contention on the "request" breaker when we are constructing + * the aggs. Underestimating what we allocate up front will fail to + * accomplish the goal. Overestimating will cause requests to fail for no + * reason. + */ + public long bytesToPreallocate() { + return aggregationBuilders.stream().mapToLong(b -> b.bytesToPreallocate() + b.factoriesBuilder.bytesToPreallocate()).sum(); + } + @Override public String toString() { return Strings.toString(this, true, true); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java index bc57eb0e1fed6..b83b0244ec4e8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java @@ -24,6 +24,9 @@ import org.apache.lucene.search.Query; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.analysis.NamedAnalyzer; @@ -48,9 +51,9 @@ import org.elasticsearch.search.sort.SortBuilder; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -62,8 +65,13 @@ * In production we always use the {@link ProductionAggregationContext} but * this is {@code abstract} so that tests can build it without creating the * massing {@link QueryShardContext}. + *

+ * {@linkplain AggregationContext}s are {@link Releasable} because they track + * the {@link Aggregator}s they build and {@link Aggregator#close} them when + * the request is done. {@linkplain AggregationContext} may also preallocate + * bytes on the "REQUEST" breaker and is responsible for releasing those bytes. */ -public abstract class AggregationContext { +public abstract class AggregationContext implements Releasable { /** * The query at the top level of the search in which these aggregations are running. */ @@ -243,36 +251,55 @@ public final AggregationUsageService getUsageService() { */ public static class ProductionAggregationContext extends AggregationContext { private final QueryShardContext context; + private final PreallocatedCircuitBreakerService breakerService; private final BigArrays bigArrays; private final Query topLevelQuery; private final AggregationProfiler profiler; private final MultiBucketConsumer multiBucketConsumer; private final Supplier subSearchContextBuilder; - private final Consumer addReleasable; private final BitsetFilterCache bitsetFilterCache; private final int randomSeed; private final LongSupplier relativeTimeInMillis; private final Supplier isCancelled; + private final List releaseMe = new ArrayList<>(); + public ProductionAggregationContext( QueryShardContext context, + long bytesToPreallocate, Query topLevelQuery, @Nullable AggregationProfiler profiler, MultiBucketConsumer multiBucketConsumer, Supplier subSearchContextBuilder, - Consumer addReleasable, BitsetFilterCache bitsetFilterCache, int randomSeed, LongSupplier relativeTimeInMillis, Supplier isCancelled ) { this.context = context; - this.bigArrays = context.bigArrays().withCircuitBreaking(); // We can break in searches. + if (bytesToPreallocate == 0) { + /* + * Its possible if a bit strange for the aggregations to ask + * to preallocate 0 bytes. Mostly this is for testing other + * things, but we should honor it and just not preallocate + * anything. Setting the breakerService reference to null will + * cause us to skip it when we close this context. + */ + this.breakerService = null; + this.bigArrays = context.bigArrays().withCircuitBreaking(); + } else { + this.breakerService = new PreallocatedCircuitBreakerService( + context.bigArrays().breakerService(), + CircuitBreaker.REQUEST, + bytesToPreallocate, + "aggregations" + ); + this.bigArrays = context.bigArrays().withBreakerService(breakerService).withCircuitBreaking(); + } this.topLevelQuery = topLevelQuery; this.profiler = profiler; this.multiBucketConsumer = multiBucketConsumer; this.subSearchContextBuilder = subSearchContextBuilder; - this.addReleasable = addReleasable; this.bitsetFilterCache = bitsetFilterCache; this.randomSeed = randomSeed; this.relativeTimeInMillis = relativeTimeInMillis; @@ -374,7 +401,7 @@ public SubSearchContext subSearchContext() { @Override public void addReleasable(Aggregator aggregator) { - addReleasable.accept(aggregator); + releaseMe.add(aggregator); } @Override @@ -421,5 +448,16 @@ public Analyzer getIndexAnalyzer(Function unindexedFieldA public boolean isCacheable() { return context.isCacheable(); } + + @Override + public void close() { + /* + * Add the breakerService to the end of the list so we release it + * after all the aggregations that allocate bytes on it. + */ + List releaseMe = new ArrayList<>(this.releaseMe); + releaseMe.add(breakerService); + Releasables.close(releaseMe); + } } } diff --git a/server/src/test/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerServiceTests.java b/server/src/test/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerServiceTests.java new file mode 100644 index 0000000000000..27fe80756a6d7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/breaker/PreallocatedCircuitBreakerServiceTests.java @@ -0,0 +1,143 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.breaker; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; + +public class PreallocatedCircuitBreakerServiceTests extends ESTestCase { + public void testUseNotPreallocated() { + try (HierarchyCircuitBreakerService real = real()) { + try (PreallocatedCircuitBreakerService preallocated = preallocateRequest(real, 1024)) { + CircuitBreaker b = preallocated.getBreaker(CircuitBreaker.ACCOUNTING); + b.addEstimateBytesAndMaybeBreak(100, "test"); + b.addWithoutBreaking(-100); + } + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + assertThat(real.getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L)); + } + } + + public void testUseLessThanPreallocated() { + try (HierarchyCircuitBreakerService real = real()) { + try (PreallocatedCircuitBreakerService preallocated = preallocateRequest(real, 1024)) { + CircuitBreaker b = preallocated.getBreaker(CircuitBreaker.REQUEST); + b.addEstimateBytesAndMaybeBreak(100, "test"); + b.addWithoutBreaking(-100); + } + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + } + } + + public void testCloseIsIdempotent() { + try (HierarchyCircuitBreakerService real = real()) { + try (PreallocatedCircuitBreakerService preallocated = preallocateRequest(real, 1024)) { + CircuitBreaker b = preallocated.getBreaker(CircuitBreaker.REQUEST); + b.addEstimateBytesAndMaybeBreak(100, "test"); + b.addWithoutBreaking(-100); + preallocated.close(); + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + } // Closes again which should do nothing + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + } + } + + public void testUseMoreThanPreallocated() { + try (HierarchyCircuitBreakerService real = real()) { + try (PreallocatedCircuitBreakerService preallocated = preallocateRequest(real, 1024)) { + CircuitBreaker b = preallocated.getBreaker(CircuitBreaker.REQUEST); + b.addEstimateBytesAndMaybeBreak(2048, "test"); + b.addWithoutBreaking(-2048); + } + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + } + } + + public void testPreallocateMoreThanRemains() { + try (HierarchyCircuitBreakerService real = real()) { + long limit = real.getBreaker(CircuitBreaker.REQUEST).getLimit(); + Exception e = expectThrows(CircuitBreakingException.class, () -> preallocateRequest(real, limit + 1024)); + assertThat(e.getMessage(), startsWith("[request] Data too large, data for [preallocate[test]] would be [")); + } + } + + public void testRandom() { + try (HierarchyCircuitBreakerService real = real()) { + CircuitBreaker realBreaker = real.getBreaker(CircuitBreaker.REQUEST); + long preallocatedBytes = randomLongBetween(1, realBreaker.getLimit()); + try (PreallocatedCircuitBreakerService preallocated = preallocateRequest(real, preallocatedBytes)) { + CircuitBreaker b = preallocated.getBreaker(CircuitBreaker.REQUEST); + boolean usedPreallocated = false; + long current = 0; + for (int i = 0; i < 10000; i++) { + if (current >= preallocatedBytes) { + usedPreallocated = true; + } + if (usedPreallocated) { + assertThat(realBreaker.getUsed(), equalTo(current)); + } else { + assertThat(realBreaker.getUsed(), equalTo(preallocatedBytes)); + } + if (current > 0 && randomBoolean()) { + long delta = randomLongBetween(Math.max(-current, -realBreaker.getLimit() / 100), 0); + b.addWithoutBreaking(delta); + current += delta; + continue; + } + long delta = randomLongBetween(0, realBreaker.getLimit() / 100); + if (randomBoolean()) { + b.addWithoutBreaking(delta); + current += delta; + continue; + } + if (current + delta < realBreaker.getLimit()) { + b.addEstimateBytesAndMaybeBreak(delta, "test"); + current += delta; + continue; + } + Exception e = expectThrows(CircuitBreakingException.class, () -> b.addEstimateBytesAndMaybeBreak(delta, "test")); + assertThat(e.getMessage(), startsWith("[request] Data too large, data for [test] would be [")); + } + b.addWithoutBreaking(-current); + } + assertThat(real.getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L)); + } + } + + private HierarchyCircuitBreakerService real() { + return new HierarchyCircuitBreakerService( + Settings.EMPTY, + List.of(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + } + + private PreallocatedCircuitBreakerService preallocateRequest(CircuitBreakerService real, long bytes) { + return new PreallocatedCircuitBreakerService(real, CircuitBreaker.REQUEST, bytes, "test"); + } +} diff --git a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java index 0f150d91368f4..a769f596bfed8 100644 --- a/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java @@ -22,16 +22,18 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; -import org.junit.Before; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -41,19 +43,10 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public class BigArraysTests extends ESTestCase { - - private BigArrays randombigArrays() { - return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); - } - - private BigArrays bigArrays; - - @Before - public void init() { - bigArrays = randombigArrays(); - } + private final BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); public void testByteArrayGrowth() { final int totalLen = randomIntBetween(1, 4000000); @@ -402,6 +395,49 @@ public void testOverSizeUsesMinPageCount() { assertThat(size - minSize, lessThan((long) pageSize)); } + /** + * Test the pattern we use to pre-allocate space for many {@link BigArray}s. + */ + public void testPreallocate() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + try (HierarchyCircuitBreakerService realBreakers = new HierarchyCircuitBreakerService(Settings.EMPTY, List.of(), clusterSettings)) { + BigArrays unPreAllocated = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), realBreakers); + long toPreallocate = randomLongBetween(4000, 10000); + CircuitBreaker realBreaker = realBreakers.getBreaker(CircuitBreaker.REQUEST); + assertThat(realBreaker.getUsed(), equalTo(0L)); + try ( + PreallocatedCircuitBreakerService prealloctedBreakerService = new PreallocatedCircuitBreakerService( + realBreakers, + CircuitBreaker.REQUEST, + toPreallocate, + "test" + ) + ) { + assertThat(realBreaker.getUsed(), equalTo(toPreallocate)); + BigArrays preallocated = unPreAllocated.withBreakerService(prealloctedBreakerService); + + // We don't grab any bytes just making a new BigArrays + assertThat(realBreaker.getUsed(), equalTo(toPreallocate)); + + List arrays = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + // We're well under the preallocation so grabbing a little array doesn't allocate anything + arrays.add(preallocated.newLongArray(1)); + assertThat(realBreaker.getUsed(), equalTo(toPreallocate)); + } + + // Allocating a large array *does* allocate some bytes + arrays.add(preallocated.newLongArray(1024)); + long expectedMin = (PageCacheRecycler.LONG_PAGE_SIZE + arrays.size()) * Long.BYTES; + assertThat(realBreaker.getUsed(), greaterThanOrEqualTo(expectedMin)); + // 64 should be enough room for each BigArray object + assertThat(realBreaker.getUsed(), lessThanOrEqualTo(expectedMin + 64 * arrays.size())); + Releasables.close(arrays); + } + assertThat(realBreaker.getUsed(), equalTo(0L)); + } + } + private List bigArrayCreators(final long maxSize, final boolean withBreaking) { final BigArrays byteBigArrays = newBigArraysInstance(maxSize, withBreaking); BigArraysHelper byteHelper = new BigArraysHelper(byteBigArrays, diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java index 0bb2f05c20035..3ba429880eaf3 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java @@ -628,11 +628,11 @@ public Collection getChildResources() { } } - private static class LimitedBreaker extends NoopCircuitBreaker { + public static class LimitedBreaker extends NoopCircuitBreaker { private final AtomicLong used = new AtomicLong(); private final ByteSizeValue max; - LimitedBreaker(String name, ByteSizeValue max) { + public LimitedBreaker(String name, ByteSizeValue max) { super(name); this.max = max; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java index befd469a5bf21..0edc1e4b48665 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java @@ -462,6 +462,11 @@ public Analyzer getIndexAnalyzer(Function unindexedFieldA public boolean isCacheable() { throw new UnsupportedOperationException(); } + + @Override + public void close() { + throw new UnsupportedOperationException(); + } }; } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 82ef30ab1fc2f..74fe47e6f0c04 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -55,6 +55,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.network.NetworkAddress; @@ -155,7 +156,7 @@ * {@link AggregationBuilder} instance. */ public abstract class AggregatorTestCase extends ESTestCase { - private List releasables = new ArrayList<>(); + private List releasables = new ArrayList<>(); protected ValuesSourceRegistry valuesSourceRegistry; // A list of field types that should not be tested, or are not currently supported @@ -202,21 +203,38 @@ protected A createAggregator(AggregationBuilder builder, } /** - * Create a {@linkplain SearchContext} for testing an {@link Aggregator}. + * Create a {@linkplain AggregationContext} for testing an {@link Aggregator}. + * While {@linkplain AggregationContext} is {@link Releasable} the caller is + * not responsible for releasing it. Instead, it is released automatically in + * in {@link #cleanupReleasables()}. */ protected AggregationContext createAggregationContext( IndexSearcher indexSearcher, Query query, MappedFieldType... fieldTypes ) throws IOException { - CircuitBreakerService breakerService = new NoneCircuitBreakerService(); - return createAggregationContext(indexSearcher, createIndexSettings(), query, breakerService, DEFAULT_MAX_BUCKETS, fieldTypes); + return createAggregationContext( + indexSearcher, + createIndexSettings(), + query, + new NoneCircuitBreakerService(), + AggregationBuilder.DEFAULT_PREALLOCATION * 5, // We don't know how many bytes to preallocate so we grab a hand full + DEFAULT_MAX_BUCKETS, + fieldTypes + ); } + /** + * Create a {@linkplain AggregationContext} for testing an {@link Aggregator}. + * While {@linkplain AggregationContext} is {@link Releasable} the caller is + * not responsible for releasing it. Instead, it is released automatically in + * in {@link #cleanupReleasables()}. + */ protected AggregationContext createAggregationContext(IndexSearcher indexSearcher, IndexSettings indexSettings, Query query, CircuitBreakerService breakerService, + long bytesToPreallocate, int maxBucket, MappedFieldType... fieldTypes) throws IOException { /* @@ -275,18 +293,20 @@ public void onCache(ShardId shardId, Accountable accountable) {} ); MultiBucketConsumer consumer = new MultiBucketConsumer(maxBucket, breakerService.getBreaker(CircuitBreaker.REQUEST)); - return new ProductionAggregationContext( + AggregationContext context = new ProductionAggregationContext( queryShardContext, + bytesToPreallocate, query, null, consumer, () -> buildSubSearchContext(indexSettings, queryShardContext, bitsetFilterCache), - releasables::add, bitsetFilterCache, randomInt(), () -> 0L, () -> false ); + releasables.add(context); + return context; } /** @@ -411,7 +431,15 @@ protected A searchAndReduc List aggs = new ArrayList<>(); Query rewritten = searcher.rewrite(query); CircuitBreakerService breakerService = new NoneCircuitBreakerService(); - AggregationContext context = createAggregationContext(searcher, indexSettings, query, breakerService, maxBucket, fieldTypes); + AggregationContext context = createAggregationContext( + searcher, + indexSettings, + query, + breakerService, + builder.bytesToPreallocate(), + maxBucket, + fieldTypes + ); C root = createAggregator(builder, context); if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) { diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java index d8aaff5f41e59..4fe02fa649fe0 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java @@ -29,10 +29,10 @@ import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; -import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.index.fielddata.ScriptDocValues; import org.elasticsearch.index.mapper.GeoPointFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; @@ -332,26 +332,7 @@ public void testTonsOfBucketsTriggersBreaker() throws IOException { // Build a "simple" circuit breaker that trips at 20k CircuitBreakerService breaker = mock(CircuitBreakerService.class); ByteSizeValue max = new ByteSizeValue(20, ByteSizeUnit.KB); - when(breaker.getBreaker(CircuitBreaker.REQUEST)).thenReturn(new NoopCircuitBreaker(CircuitBreaker.REQUEST) { - private long total = 0; - - @Override - public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException { - logger.debug("Used {} grabbing {} for {}", total, bytes, label); - total += bytes; - if (total > max.getBytes()) { - throw new CircuitBreakingException("test error", bytes, max.getBytes(), Durability.TRANSIENT); - } - return total; - } - - @Override - public long addWithoutBreaking(long bytes) { - logger.debug("Used {} grabbing {}", total, bytes); - total += bytes; - return total; - } - }); + when(breaker.getBreaker(CircuitBreaker.REQUEST)).thenReturn(new MockBigArrays.LimitedBreaker(CircuitBreaker.REQUEST, max)); // Collect some buckets with it try (Directory directory = newDirectory()) { @@ -361,15 +342,16 @@ public long addWithoutBreaking(long bytes) { try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = newSearcher(indexReader, false, false); + TopMetricsAggregationBuilder builder = simpleBuilder(new FieldSortBuilder("s").order(SortOrder.ASC)); AggregationContext context = createAggregationContext( indexSearcher, createIndexSettings(), new MatchAllDocsQuery(), breaker, + builder.bytesToPreallocate(), MultiBucketConsumerService.DEFAULT_MAX_BUCKETS, doubleFields() ); - TopMetricsAggregationBuilder builder = simpleBuilder(new FieldSortBuilder("s").order(SortOrder.ASC)); Aggregator aggregator = builder.build(context, null).create(null, CardinalityUpperBound.ONE); aggregator.preCollection(); assertThat(indexReader.leaves(), hasSize(1));