Skip to content

Lower contention on requests with many aggs (backport of #66895) #66941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.benchmark.search.aggregations;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.Version;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType;
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.support.NestedScope;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.internal.SubSearchContext;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.BucketedSort.ExtraData;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
* Benchmarks the overhead of constructing {@link Aggregator}s in many
* parallel threads. Machines with different numbers of cores will see
* wildly different results running this from running this with more
* cores seeing more benefits from preallocation.
*/
@Fork(2)
@Warmup(iterations = 10)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
@Threads(Threads.MAX)
public class AggConstructionContentionBenchmark {
private final SearchModule searchModule = new SearchModule(Settings.EMPTY, false, org.elasticsearch.common.collect.List.of());
private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final PageCacheRecycler recycler = new PageCacheRecycler(Settings.EMPTY);
private final Index index = new Index("test", "uuid");
private final IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(
Settings.EMPTY,
new IndexFieldDataCache.Listener() {
}
);

private CircuitBreakerService breakerService;
private BigArrays bigArrays;
private boolean preallocateBreaker;

@Param({ "noop", "real", "preallocate" })
private String breaker;

@Setup
public void setup() {
switch (breaker) {
case "real":
breakerService = new HierarchyCircuitBreakerService(
Settings.EMPTY,
org.elasticsearch.common.collect.List.of(),
clusterSettings
);
break;
case "preallocate":
preallocateBreaker = true;
breakerService = new HierarchyCircuitBreakerService(
Settings.EMPTY,
org.elasticsearch.common.collect.List.of(),
clusterSettings
);
break;
case "noop":
breakerService = new NoneCircuitBreakerService();
break;
default:
throw new UnsupportedOperationException();
}
bigArrays = new BigArrays(recycler, breakerService, "request");
}

@Benchmark
public void sum() throws IOException {
buildFactories(new AggregatorFactories.Builder().addAggregator(new SumAggregationBuilder("s").field("int_1")));
}

@Benchmark
public void termsSum() throws IOException {
buildFactories(
new AggregatorFactories.Builder().addAggregator(
new TermsAggregationBuilder("t").field("int_1").subAggregation(new SumAggregationBuilder("s").field("int_2"))
)
);
}

@Benchmark
public void termsSixtySums() throws IOException {
TermsAggregationBuilder b = new TermsAggregationBuilder("t").field("int_1");
for (int i = 0; i < 60; i++) {
b.subAggregation(new SumAggregationBuilder("s" + i).field("int_" + i));
}
buildFactories(new AggregatorFactories.Builder().addAggregator(b));
}

private void buildFactories(AggregatorFactories.Builder factories) throws IOException {
try (DummyAggregationContext context = new DummyAggregationContext(factories.bytesToPreallocate())) {
factories.build(context, null).createTopLevelAggregators();
}
}

private class DummyAggregationContext extends AggregationContext {
private final Query query = new MatchAllDocsQuery();
private final List<Releasable> releaseMe = new ArrayList<>();

private final CircuitBreaker breaker;
private final PreallocatedCircuitBreakerService preallocated;
private final MultiBucketConsumer multiBucketConsumer;

DummyAggregationContext(long bytesToPreallocate) {
CircuitBreakerService breakerService;
if (preallocateBreaker) {
breakerService = preallocated = new PreallocatedCircuitBreakerService(
AggConstructionContentionBenchmark.this.breakerService,
CircuitBreaker.REQUEST,
bytesToPreallocate,
"aggregations"
);
} else {
breakerService = AggConstructionContentionBenchmark.this.breakerService;
preallocated = null;
}
breaker = breakerService.getBreaker(CircuitBreaker.REQUEST);
multiBucketConsumer = new MultiBucketConsumer(Integer.MAX_VALUE, breaker);
}

@Override
public Query query() {
return query;
}

@Override
public Aggregator profileIfEnabled(Aggregator agg) throws IOException {
return agg;
}

@Override
public boolean profiling() {
return false;
}

@Override
public long nowInMillis() {
return 0;
}

@Override
protected IndexFieldData<?> buildFieldData(MappedFieldType ft) {
IndexFieldDataCache indexFieldDataCache = indicesFieldDataCache.buildIndexFieldDataCache(new IndexFieldDataCache.Listener() {
}, index, ft.name());
return ft.fielddataBuilder("test", this::lookup).build(indexFieldDataCache, breakerService);
}

@Override
public MappedFieldType getFieldType(String path) {
if (path.startsWith("int")) {
return new NumberFieldMapper.NumberFieldType(path, NumberType.INTEGER);
}
throw new UnsupportedOperationException();
}

@Override
public boolean isFieldMapped(String field) {
return field.startsWith("int");
}

@Override
public <FactoryType> FactoryType compile(Script script, ScriptContext<FactoryType> context) {
throw new UnsupportedOperationException();
}

@Override
public SearchLookup lookup() {
throw new UnsupportedOperationException();
}

@Override
public ValuesSourceRegistry getValuesSourceRegistry() {
return searchModule.getValuesSourceRegistry();
}

@Override
public BigArrays bigArrays() {
return bigArrays;
}

@Override
public IndexSearcher searcher() {
return null;
}

@Override
public Query buildQuery(QueryBuilder builder) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public IndexSettings getIndexSettings() {
throw new UnsupportedOperationException();
}

@Override
public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sortBuilders) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public ObjectMapper getObjectMapper(String path) {
throw new UnsupportedOperationException();
}

@Override
public NestedScope nestedScope() {
throw new UnsupportedOperationException();
}

@Override
public SubSearchContext subSearchContext() {
throw new UnsupportedOperationException();
}

@Override
public void addReleasable(Aggregator aggregator) {
releaseMe.add(aggregator);
}

@Override
public MultiBucketConsumer multiBucketConsumer() {
return multiBucketConsumer;
}

@Override
public BitsetFilterCache bitsetFilterCache() {
throw new UnsupportedOperationException();
}

@Override
public BucketedSort buildBucketedSort(SortBuilder<?> sort, int size, ExtraData values) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public int shardRandomSeed() {
return 0;
}

@Override
public long getRelativeTimeInMillis() {
return 0;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public CircuitBreaker breaker() {
return breaker;
}

@Override
public Analyzer getIndexAnalyzer(Function<String, NamedAnalyzer> unindexedFieldAnalyzer) {
throw new UnsupportedOperationException();
}

@Override
public boolean isCacheable() {
throw new UnsupportedOperationException();
}

@Override
public Version indexVersionCreated() {
return Version.CURRENT;
}

@Override
public void close() {
List<Releasable> releaseMe = new ArrayList<>(this.releaseMe);
releaseMe.add(preallocated);
Releasables.close(releaseMe);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,11 @@ public BucketCardinality bucketCardinality() {
public String getType() {
return "test";
}

@Override
public long bytesToPreallocate() {
return 0;
}
}

/**
Expand Down Expand Up @@ -570,13 +575,13 @@ public Aggregator subAggregator(String name) {
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return new InternalAggregation[] {
new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap())
buildEmptyAggregation()
};
}

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, Collections.emptyMap());
return new InternalMax(name(), Double.NaN, DocValueFormat.RAW, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
.addAggregation(AggregationBuilders.terms("foo_terms").field("foo.keyword")).get());
logger.info("--> got an expected exception", e);
assertThat(e.getCause(), notNullValue());
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [<agg [foo_terms]>]"));
assertThat(e.getCause().getMessage(), containsString("[parent] Data too large, data for [preallocate[aggregations]]"));

client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
Expand Down
Loading