Skip to content

Commit 2466938

Browse files
authored
Wire Percentiles aggregator into new VS framework (#51639)
This required a bit of a refactor to percentiles itself. Before, the Builder would switch on the chosen algo to generate an algo-specific factory. This doesn't work (or at least, would be difficult) in the new VS framework. This refactor consolidates both factories together and introduces a PercentilesConfig object to act as a standardized way to pass algo-specific parameters through the factory. This object is then used when deciding which kind of aggregator to create Note: CoreValuesSourceType.HISTOGRAM still lives in core, and will be moved in a subsequent PR.
1 parent 362d6fa commit 2466938

6 files changed

+234
-158
lines changed

server/src/main/java/org/elasticsearch/search/SearchModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,8 @@ private void registerAggregations(List<SearchPlugin> plugins) {
349349
registerAggregation(new AggregationSpec(PercentilesAggregationBuilder.NAME, PercentilesAggregationBuilder::new,
350350
PercentilesAggregationBuilder::parse)
351351
.addResultReader(InternalTDigestPercentiles.NAME, InternalTDigestPercentiles::new)
352-
.addResultReader(InternalHDRPercentiles.NAME, InternalHDRPercentiles::new));
352+
.addResultReader(InternalHDRPercentiles.NAME, InternalHDRPercentiles::new)
353+
.setAggregatorRegistrar(PercentilesAggregationBuilder::registerAggregators));
353354
registerAggregation(new AggregationSpec(PercentileRanksAggregationBuilder.NAME, PercentileRanksAggregationBuilder::new,
354355
PercentileRanksAggregationBuilder::parse)
355356
.addResultReader(InternalTDigestPercentileRanks.NAME, InternalTDigestPercentileRanks::new)

server/src/main/java/org/elasticsearch/search/aggregations/metrics/HDRPercentilesAggregatorFactory.java

Lines changed: 0 additions & 77 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/search/aggregations/metrics/PercentilesAggregationBuilder.java

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@
3535
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
3636
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
3737
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
38+
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
3839
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
3940

4041
import java.io.IOException;
4142
import java.util.Arrays;
4243
import java.util.Map;
4344
import java.util.Objects;
45+
import java.util.concurrent.atomic.AtomicBoolean;
4446
import java.util.function.Consumer;
4547

4648
public class PercentilesAggregationBuilder extends LeafOnly<ValuesSource, PercentilesAggregationBuilder> {
@@ -117,6 +119,13 @@ public static AggregationBuilder parse(String aggregationName, XContentParser pa
117119
return returnedAgg;
118120
}
119121

122+
private static AtomicBoolean wasRegistered = new AtomicBoolean(false);
123+
public static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
124+
if (wasRegistered.compareAndSet(false, true) == true) {
125+
PercentilesAggregatorFactory.registerAggregators(valuesSourceRegistry);
126+
}
127+
}
128+
120129
private static <T> void setIfNotNull(Consumer<T> consumer, T value) {
121130
if (value != null) {
122131
consumer.accept(value);
@@ -270,16 +279,18 @@ protected ValuesSourceAggregatorFactory innerBuild(QueryShardContext queryShardC
270279
ValuesSourceConfig config,
271280
AggregatorFactory parent,
272281
Builder subFactoriesBuilder) throws IOException {
273-
switch (method) {
274-
case TDIGEST:
275-
return new TDigestPercentilesAggregatorFactory(name, config, percents, compression, keyed, queryShardContext, parent,
276-
subFactoriesBuilder, metaData);
277-
case HDR:
278-
return new HDRPercentilesAggregatorFactory(name, config, percents,
279-
numberOfSignificantValueDigits, keyed, queryShardContext, parent, subFactoriesBuilder, metaData);
280-
default:
282+
PercentilesConfig percentilesConfig;
283+
if (method.equals(PercentilesMethod.TDIGEST)) {
284+
percentilesConfig = new PercentilesConfig.TDigestConfig(compression);
285+
} else if (method.equals(PercentilesMethod.HDR)) {
286+
percentilesConfig = new PercentilesConfig.HdrHistoConfig(numberOfSignificantValueDigits);
287+
} else {
281288
throw new IllegalStateException("Illegal method [" + method + "]");
282289
}
290+
291+
return new PercentilesAggregatorFactory(name, config, percents, percentilesConfig, keyed, queryShardContext, parent,
292+
subFactoriesBuilder, metaData);
293+
283294
}
284295

285296
@Override
@@ -364,4 +375,47 @@ public InternalBuilder method(PercentilesMethod method) {
364375
}
365376
}
366377
}
378+
379+
/**
380+
* A small config object that carries algo-specific settings. This allows the factory to have
381+
* a single unified constructor for both algos, but internally switch execution
382+
* depending on which algo is selected
383+
*/
384+
abstract static class PercentilesConfig {
385+
private final PercentilesMethod method;
386+
387+
PercentilesConfig(PercentilesMethod method) {
388+
this.method = method;
389+
}
390+
391+
public PercentilesMethod getMethod() {
392+
return method;
393+
}
394+
395+
static class TDigestConfig extends PercentilesConfig {
396+
private final double compression;
397+
398+
TDigestConfig(double compression) {
399+
super(PercentilesMethod.TDIGEST);
400+
this.compression = compression;
401+
}
402+
403+
public double getCompression() {
404+
return compression;
405+
}
406+
}
407+
408+
static class HdrHistoConfig extends PercentilesConfig {
409+
private final int numberOfSignificantValueDigits;
410+
411+
HdrHistoConfig(int numberOfSignificantValueDigits) {
412+
super(PercentilesMethod.HDR);
413+
this.numberOfSignificantValueDigits = numberOfSignificantValueDigits;
414+
}
415+
416+
public int getNumberOfSignificantValueDigits() {
417+
return numberOfSignificantValueDigits;
418+
}
419+
}
420+
}
367421
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations.metrics;
21+
22+
import org.elasticsearch.index.query.QueryShardContext;
23+
import org.elasticsearch.search.DocValueFormat;
24+
import org.elasticsearch.search.aggregations.AggregationExecutionException;
25+
import org.elasticsearch.search.aggregations.Aggregator;
26+
import org.elasticsearch.search.aggregations.AggregatorFactories;
27+
import org.elasticsearch.search.aggregations.AggregatorFactory;
28+
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder.PercentilesConfig;
29+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
30+
import org.elasticsearch.search.aggregations.support.AggregatorSupplier;
31+
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
32+
import org.elasticsearch.search.aggregations.support.ValuesSource;
33+
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
34+
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
35+
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
36+
import org.elasticsearch.search.internal.SearchContext;
37+
38+
import java.io.IOException;
39+
import java.util.List;
40+
import java.util.Map;
41+
42+
/**
43+
* This factory is used to generate both TDigest and HDRHisto aggregators, depending
44+
* on the selected method
45+
*/
46+
class PercentilesAggregatorFactory extends ValuesSourceAggregatorFactory {
47+
48+
private final double[] percents;
49+
private final PercentilesConfig percentilesConfig;
50+
private final boolean keyed;
51+
52+
static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
53+
valuesSourceRegistry.register(PercentilesAggregationBuilder.NAME,
54+
List.of(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.HISTOGRAM),
55+
new PercentilesAggregatorSupplier() {
56+
@Override
57+
public Aggregator build(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
58+
double[] percents, PercentilesConfig percentilesConfig, boolean keyed, DocValueFormat formatter,
59+
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
60+
61+
if (percentilesConfig.getMethod().equals(PercentilesMethod.TDIGEST)) {
62+
double compression = ((PercentilesConfig.TDigestConfig)percentilesConfig).getCompression();
63+
return new TDigestPercentilesAggregator(name, valuesSource, context, parent, percents, compression, keyed,
64+
formatter, pipelineAggregators, metaData);
65+
} else if (percentilesConfig.getMethod().equals(PercentilesMethod.HDR)) {
66+
int numSigFig = ((PercentilesConfig.HdrHistoConfig)percentilesConfig).getNumberOfSignificantValueDigits();
67+
return new HDRPercentilesAggregator(name, valuesSource, context, parent, percents, numSigFig, keyed,
68+
formatter, pipelineAggregators, metaData);
69+
}
70+
71+
// This should already have thrown but just in case
72+
throw new IllegalStateException("Unknown percentiles method: [" + percentilesConfig.getMethod().toString() + "]");
73+
}
74+
}
75+
);
76+
}
77+
78+
PercentilesAggregatorFactory(String name, ValuesSourceConfig config, double[] percents,
79+
PercentilesConfig percentilesConfig, boolean keyed, QueryShardContext queryShardContext,
80+
AggregatorFactory parent, AggregatorFactories.Builder subFactoriesBuilder,
81+
Map<String, Object> metaData) throws IOException {
82+
super(name, config, queryShardContext, parent, subFactoriesBuilder, metaData);
83+
this.percents = percents;
84+
this.percentilesConfig = percentilesConfig;
85+
this.keyed = keyed;
86+
}
87+
88+
@Override
89+
protected Aggregator createUnmapped(SearchContext searchContext,
90+
Aggregator parent,
91+
List<PipelineAggregator> pipelineAggregators,
92+
Map<String, Object> metaData) throws IOException {
93+
if (percentilesConfig.getMethod().equals(PercentilesMethod.TDIGEST)) {
94+
double compression = ((PercentilesConfig.TDigestConfig)percentilesConfig).getCompression();
95+
return new TDigestPercentilesAggregator(name, null, searchContext, parent, percents, compression, keyed, config.format(),
96+
pipelineAggregators, metaData);
97+
} else if (percentilesConfig.getMethod().equals(PercentilesMethod.HDR)) {
98+
int numSigFig = ((PercentilesConfig.HdrHistoConfig)percentilesConfig).getNumberOfSignificantValueDigits();
99+
return new HDRPercentilesAggregator(name, null, searchContext, parent, percents, numSigFig, keyed,
100+
config.format(), pipelineAggregators, metaData);
101+
}
102+
103+
// This should already have thrown but just in case
104+
throw new IllegalStateException("Unknown percentiles method: [" + percentilesConfig.getMethod().toString() + "]");
105+
}
106+
107+
@Override
108+
protected Aggregator doCreateInternal(ValuesSource valuesSource,
109+
SearchContext searchContext,
110+
Aggregator parent,
111+
boolean collectsFromSingleBucket,
112+
List<PipelineAggregator> pipelineAggregators,
113+
Map<String, Object> metaData) throws IOException {
114+
115+
AggregatorSupplier aggregatorSupplier = ValuesSourceRegistry.getInstance().getAggregator(config.valueSourceType(),
116+
PercentilesAggregationBuilder.NAME);
117+
118+
if (aggregatorSupplier instanceof PercentilesAggregatorSupplier == false) {
119+
throw new AggregationExecutionException("Registry miss-match - expected PercentilesAggregatorSupplier, found [" +
120+
aggregatorSupplier.getClass().toString() + "]");
121+
}
122+
PercentilesAggregatorSupplier percentilesAggregatorSupplier = (PercentilesAggregatorSupplier) aggregatorSupplier;
123+
return percentilesAggregatorSupplier.build(name, valuesSource, searchContext, parent, percents, percentilesConfig, keyed,
124+
config.format(), pipelineAggregators, metaData);
125+
}
126+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.search.aggregations.metrics;
20+
21+
import org.elasticsearch.search.DocValueFormat;
22+
import org.elasticsearch.search.aggregations.Aggregator;
23+
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder.PercentilesConfig;
24+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
25+
import org.elasticsearch.search.aggregations.support.AggregatorSupplier;
26+
import org.elasticsearch.search.aggregations.support.ValuesSource;
27+
import org.elasticsearch.search.internal.SearchContext;
28+
29+
import java.io.IOException;
30+
import java.util.List;
31+
import java.util.Map;
32+
33+
public interface PercentilesAggregatorSupplier extends AggregatorSupplier {
34+
Aggregator build(String name,
35+
ValuesSource valuesSource,
36+
SearchContext context,
37+
Aggregator parent,
38+
double[] percents,
39+
PercentilesConfig percentilesConfig,
40+
boolean keyed,
41+
DocValueFormat formatter,
42+
List<PipelineAggregator> pipelineAggregators,
43+
Map<String, Object> metaData) throws IOException;
44+
}

0 commit comments

Comments
 (0)