Skip to content

Lower contention on requests with many aggs #66895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jan 4, 2021
Original file line number Diff line number Diff line change
@@ -0,0 +1,342 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.benchmark.search.aggregations;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.support.NestedScope;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.internal.SubSearchContext;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.BucketedSort.ExtraData;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
* Benchmarks the overhead of constructing {@link Aggregator}s in many
* parallel threads. Machines with different numbers of cores will see
* wildly different results running this from running this with more
* cores seeing more benefits from preallocation.
*/
@Fork(2)
@Warmup(iterations = 10)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
@Threads(Threads.MAX)
public class AggConstructionContentionBenchmark {
private final SearchModule searchModule = new SearchModule(Settings.EMPTY, List.of());
private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final PageCacheRecycler recycler = new PageCacheRecycler(Settings.EMPTY);
private final Index index = new Index("test", "uuid");
private final IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(
Settings.EMPTY,
new IndexFieldDataCache.Listener() {
}
);

private CircuitBreakerService breakerService;
private BigArrays bigArrays;
private boolean preallocateBreaker;

@Param({ "noop", "real", "preallocate" })
private String breaker;

@Setup
public void setup() {
switch (breaker) {
case "real":
breakerService = new HierarchyCircuitBreakerService(Settings.EMPTY, List.of(), clusterSettings);
break;
case "preallocate":
preallocateBreaker = true;
breakerService = new HierarchyCircuitBreakerService(Settings.EMPTY, List.of(), clusterSettings);
break;
case "noop":
breakerService = new NoneCircuitBreakerService();
break;
default:
throw new UnsupportedOperationException();
}
bigArrays = new BigArrays(recycler, breakerService, "request");
}

@Benchmark
public void sum() throws IOException {
buildFactories(new AggregatorFactories.Builder().addAggregator(new SumAggregationBuilder("s").field("int_1")));
}

@Benchmark
public void termsSum() throws IOException {
buildFactories(
new AggregatorFactories.Builder().addAggregator(
new TermsAggregationBuilder("t").field("int_1").subAggregation(new SumAggregationBuilder("s").field("int_2"))
)
);
}

@Benchmark
public void termsSixtySums() throws IOException {
TermsAggregationBuilder b = new TermsAggregationBuilder("t").field("int_1");
for (int i = 0; i < 60; i++) {
b.subAggregation(new SumAggregationBuilder("s" + i).field("int_" + i));
}
buildFactories(new AggregatorFactories.Builder().addAggregator(b));
}

private void buildFactories(AggregatorFactories.Builder factories) throws IOException {
try (DummyAggregationContext context = new DummyAggregationContext(factories.bytesToPreallocate())) {
factories.build(context, null).createTopLevelAggregators();
}
}

private class DummyAggregationContext extends AggregationContext {
private final Query query = new MatchAllDocsQuery();
private final List<Releasable> releaseMe = new ArrayList<>();

private final CircuitBreaker breaker;
private final PreallocatedCircuitBreakerService preallocated;
private final MultiBucketConsumer multiBucketConsumer;

DummyAggregationContext(long bytesToPreallocate) {
CircuitBreakerService breakerService;
if (preallocateBreaker) {
breakerService = preallocated = new PreallocatedCircuitBreakerService(
AggConstructionContentionBenchmark.this.breakerService,
CircuitBreaker.REQUEST,
bytesToPreallocate,
"aggregations"
);
} else {
breakerService = AggConstructionContentionBenchmark.this.breakerService;
preallocated = null;
}
breaker = breakerService.getBreaker(CircuitBreaker.REQUEST);
multiBucketConsumer = new MultiBucketConsumer(Integer.MAX_VALUE, breaker);
}

@Override
public Query query() {
return query;
}

@Override
public Aggregator profileIfEnabled(Aggregator agg) throws IOException {
return agg;
}

@Override
public boolean profiling() {
return false;
}

@Override
public long nowInMillis() {
return 0;
}

@Override
protected IndexFieldData<?> buildFieldData(MappedFieldType ft) {
IndexFieldDataCache indexFieldDataCache = indicesFieldDataCache.buildIndexFieldDataCache(new IndexFieldDataCache.Listener() {
}, index, ft.name());
return ft.fielddataBuilder("test", this::lookup).build(indexFieldDataCache, breakerService);
}

@Override
public MappedFieldType getFieldType(String path) {
if (path.startsWith("int")) {
return new NumberFieldMapper.NumberFieldType(path, NumberType.INTEGER);
}
throw new UnsupportedOperationException();
}

@Override
public boolean isFieldMapped(String field) {
return field.startsWith("int");
}

@Override
public <FactoryType> FactoryType compile(Script script, ScriptContext<FactoryType> context) {
throw new UnsupportedOperationException();
}

@Override
public SearchLookup lookup() {
throw new UnsupportedOperationException();
}

@Override
public ValuesSourceRegistry getValuesSourceRegistry() {
return searchModule.getValuesSourceRegistry();
}

@Override
public BigArrays bigArrays() {
return bigArrays;
}

@Override
public IndexSearcher searcher() {
return null;
}

@Override
public Query buildQuery(QueryBuilder builder) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public IndexSettings getIndexSettings() {
throw new UnsupportedOperationException();
}

@Override
public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sortBuilders) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public ObjectMapper getObjectMapper(String path) {
throw new UnsupportedOperationException();
}

@Override
public NestedScope nestedScope() {
throw new UnsupportedOperationException();
}

@Override
public SubSearchContext subSearchContext() {
throw new UnsupportedOperationException();
}

@Override
public void addReleasable(Aggregator aggregator) {
releaseMe.add(aggregator);
}

@Override
public MultiBucketConsumer multiBucketConsumer() {
return multiBucketConsumer;
}

@Override
public BitsetFilterCache bitsetFilterCache() {
throw new UnsupportedOperationException();
}

@Override
public BucketedSort buildBucketedSort(SortBuilder<?> sort, int size, ExtraData values) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public int shardRandomSeed() {
return 0;
}

@Override
public long getRelativeTimeInMillis() {
return 0;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public CircuitBreaker breaker() {
return breaker;
}

@Override
public Analyzer getIndexAnalyzer(Function<String, NamedAnalyzer> unindexedFieldAnalyzer) {
throw new UnsupportedOperationException();
}

@Override
public boolean isCacheable() {
throw new UnsupportedOperationException();
}

@Override
public void close() {
List<Releasable> releaseMe = new ArrayList<>(this.releaseMe);
releaseMe.add(preallocated);
Releasables.close(releaseMe);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@ public BucketCardinality bucketCardinality() {
public String getType() {
return "test";
}

@Override
public long bytesToPreallocate() {
return 0;
}
}

/**
Expand Down Expand Up @@ -567,13 +572,13 @@ public Aggregator subAggregator(String name) {
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return new InternalAggregation[] {
new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap())
buildEmptyAggregation()
};
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap());
return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we change this to a null here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I had to and on reflection I'll change it back. null here is actually more like what we do in production. emptyMap is more like when I user sends an empty meta.

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
.addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get());
logger.info("--> got an expected exception", e);
assertThat(e.getCause(), notNullValue());
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [<agg [foo_terms]>]"));
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [preallocate[aggregations]]"));

client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void testRequestBreaker() throws Exception {
}
}

public void testBucketBreaker() throws Exception {
public void testAggTookTooMuch() throws Exception {
if (noopBreakerUsed()) {
logger.info("--> noop breakers used, skipping test");
return;
Expand Down Expand Up @@ -285,7 +285,7 @@ public void testBucketBreaker() throws Exception {
} catch (Exception e) {
Throwable cause = e.getCause();
assertThat(cause, instanceOf(CircuitBreakingException.class));
assertThat(cause.toString(), containsString("[request] Data too large, data for [<agg [my_terms]>] would be"));
assertThat(cause.toString(), containsString("[request] Data too large, data for [preallocate[aggregations]] would be"));
assertThat(cause.toString(), containsString("which is larger than the limit of [100/100b]"));
}
}
Expand Down
Loading