Skip to content

Commit ae7c071

Browse files
authored
Allow pipeline aggs to select specific buckets from multi-bucket aggs (#44179)
This adjusts the `buckets_path` parser so that pipeline aggs can select specific buckets (via their bucket keys) instead of fetching the entire set of buckets. This is useful for bucket_script in particular, which might want specific buckets for calculations. It's possible to workaround this with `filter` aggs, but the workaround is hacky and probably less performant. - Adjusts documentation - Adds a barebones AggregatorTestCase for bucket_script - Tweaks AggTestCase to use getMockScriptService() for reductions and pipelines. Previously pipelines could just pass in a script service for testing, but this didnt work for regular aggs. The new getMockScriptService() method fixes that issue, but needs to be used for pipelines too. This had a knock-on effect of touching MovFn, AvgBucket and ScriptedMetric
1 parent 363a7eb commit ae7c071

File tree

10 files changed

+473
-71
lines changed

10 files changed

+473
-71
lines changed

docs/reference/aggregations/pipeline.asciidoc

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ parameter, which follows a specific format:
3535
// https://en.wikipedia.org/wiki/Extended_Backus%E2%80%93Naur_Form
3636
[source,ebnf]
3737
--------------------------------------------------
38-
AGG_SEPARATOR = '>' ;
39-
METRIC_SEPARATOR = '.' ;
38+
AGG_SEPARATOR = `>` ;
39+
METRIC_SEPARATOR = `.` ;
4040
AGG_NAME = <the name of the aggregation> ;
4141
METRIC = <the name of the metric (in case of multi-value metrics aggregation)> ;
42-
PATH = <AGG_NAME> [ <AGG_SEPARATOR>, <AGG_NAME> ]* [ <METRIC_SEPARATOR>, <METRIC> ] ;
42+
MULTIBUCKET_KEY = `[<KEY_NAME>]`
43+
PATH = <AGG_NAME><MULTIBUCKET_KEY>? (<AGG_SEPARATOR>, <AGG_NAME> )* ( <METRIC_SEPARATOR>, <METRIC> ) ;
4344
--------------------------------------------------
4445

4546
For example, the path `"my_bucket>my_stats.avg"` will path to the `avg` value in the `"my_stats"` metric, which is
@@ -110,6 +111,52 @@ POST /_search
110111
<1> `buckets_path` instructs this max_bucket aggregation that we want the maximum value of the `sales` aggregation in the
111112
`sales_per_month` date histogram.
112113

114+
If a Sibling pipeline agg references a multi-bucket aggregation, such as a `terms` agg, it also has the option to
115+
select specific keys from the multi-bucket. For example, a `bucket_script` could select two specific buckets (via
116+
their bucket keys) to perform the calculation:
117+
118+
[source,js]
119+
--------------------------------------------------
120+
POST /_search
121+
{
122+
"aggs" : {
123+
"sales_per_month" : {
124+
"date_histogram" : {
125+
"field" : "date",
126+
"calendar_interval" : "month"
127+
},
128+
"aggs": {
129+
"sale_type": {
130+
"terms": {
131+
"field": "type"
132+
},
133+
"aggs": {
134+
"sales": {
135+
"sum": {
136+
"field": "price"
137+
}
138+
}
139+
}
140+
},
141+
"hat_vs_bag_ratio": {
142+
"bucket_script": {
143+
"buckets_path": {
144+
"hats": "sale_type['hat']>sales", <1>
145+
"bags": "sale_type['bag']>sales" <1>
146+
},
147+
"script": "params.hats / params.bags"
148+
}
149+
}
150+
}
151+
}
152+
}
153+
}
154+
--------------------------------------------------
155+
// CONSOLE
156+
// TEST[setup:sales]
157+
<1> `buckets_path` selects the hats and bags buckets (via `['hat']`/`['bag']``) to use in the script specifically,
158+
instead of fetching all the buckets from `sale_type` aggregation
159+
113160
[float]
114161
=== Special Paths
115162

modules/lang-painless/src/test/resources/rest-api-spec/test/painless/100_terms_agg.yml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,41 @@ setup:
102102
- is_false: aggregations.double_terms.buckets.1.key_as_string
103103
- match: { aggregations.double_terms.buckets.1.doc_count: 1 }
104104

105+
---
106+
"Bucket script with keys":
107+
108+
- do:
109+
search:
110+
rest_total_hits_as_int: true
111+
body:
112+
size: 0
113+
aggs:
114+
placeholder:
115+
filters:
116+
filters:
117+
- match_all: {}
118+
aggs:
119+
str_terms:
120+
terms:
121+
field: "str"
122+
aggs:
123+
the_avg:
124+
avg:
125+
field: "number"
126+
the_bucket_script:
127+
bucket_script:
128+
buckets_path:
129+
foo: "str_terms['bcd']>the_avg.value"
130+
script: "params.foo"
131+
132+
- match: { hits.total: 3 }
133+
134+
- length: { aggregations.placeholder.buckets.0.str_terms.buckets: 2 }
135+
- match: { aggregations.placeholder.buckets.0.str_terms.buckets.0.key: "abc" }
136+
- is_false: aggregations.placeholder.buckets.0.str_terms.buckets.0.key_as_string
137+
- match: { aggregations.placeholder.buckets.0.str_terms.buckets.0.doc_count: 2 }
138+
- match: { aggregations.placeholder.buckets.0.str_terms.buckets.1.key: "bcd" }
139+
- is_false: aggregations.placeholder.buckets.0.str_terms.buckets.1.key_as_string
140+
- match: { aggregations.placeholder.buckets.0.str_terms.buckets.1.doc_count: 1 }
141+
- match: { aggregations.placeholder.buckets.0.the_bucket_script.value: 2.0 }
142+

server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,33 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException {
7373
public Object getProperty(List<String> path) {
7474
if (path.isEmpty()) {
7575
return this;
76-
} else if (path.get(0).equals("_bucket_count")) {
77-
return getBuckets().size();
78-
} else {
79-
List<? extends InternalBucket> buckets = getBuckets();
80-
Object[] propertyArray = new Object[buckets.size()];
81-
for (int i = 0; i < buckets.size(); i++) {
82-
propertyArray[i] = buckets.get(i).getProperty(getName(), path);
76+
}
77+
return resolvePropertyFromPath(path, getBuckets(), getName());
78+
}
79+
80+
static Object resolvePropertyFromPath(List<String> path, List<? extends InternalBucket> buckets, String name) {
81+
String aggName = path.get(0);
82+
if (aggName.equals("_bucket_count")) {
83+
return buckets.size();
84+
}
85+
86+
// This is a bucket key, look through our buckets and see if we can find a match
87+
if (aggName.startsWith("'") && aggName.endsWith("'")) {
88+
for (InternalBucket bucket : buckets) {
89+
if (bucket.getKeyAsString().equals(aggName.substring(1, aggName.length() - 1))) {
90+
return bucket.getProperty(name, path.subList(1, path.size()));
91+
}
8392
}
84-
return propertyArray;
93+
// No key match, time to give up
94+
throw new InvalidAggregationPathException("Cannot find an key [" + aggName + "] in [" + name + "]");
95+
}
96+
97+
Object[] propertyArray = new Object[buckets.size()];
98+
for (int i = 0; i < buckets.size(); i++) {
99+
propertyArray[i] = buckets.get(i).getProperty(name, path);
85100
}
101+
return propertyArray;
102+
86103
}
87104

88105
/**
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations;
21+
22+
import org.apache.lucene.util.BytesRef;
23+
import org.elasticsearch.search.DocValueFormat;
24+
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
25+
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
26+
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
27+
import org.elasticsearch.search.aggregations.metrics.InternalAvg;
28+
import org.elasticsearch.search.aggregations.support.AggregationPath;
29+
import org.elasticsearch.test.ESTestCase;
30+
31+
import java.nio.charset.StandardCharsets;
32+
import java.util.ArrayList;
33+
import java.util.Collections;
34+
import java.util.List;
35+
36+
import static org.elasticsearch.search.aggregations.InternalMultiBucketAggregation.resolvePropertyFromPath;
37+
import static org.hamcrest.Matchers.equalTo;
38+
39+
public class InternalMultiBucketAggregationTests extends ESTestCase {
40+
41+
public void testResolveToAgg() {
42+
AggregationPath path = AggregationPath.parse("the_avg");
43+
List<LongTerms.Bucket> buckets = new ArrayList<>();
44+
InternalAggregation agg = new InternalAvg("the_avg", 2, 1,
45+
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap());
46+
InternalAggregations internalAggregations = new InternalAggregations(Collections.singletonList(agg));
47+
48+
LongTerms.Bucket bucket = new LongTerms.Bucket(1, 1, internalAggregations, false, 0, DocValueFormat.RAW);
49+
buckets.add(bucket);
50+
51+
Object[] value = (Object[]) resolvePropertyFromPath(path.getPathElementsAsStringList(), buckets, "the_long_terms");
52+
assertThat(value[0], equalTo(agg));
53+
}
54+
55+
public void testResolveToAggValue() {
56+
AggregationPath path = AggregationPath.parse("the_avg.value");
57+
List<LongTerms.Bucket> buckets = new ArrayList<>();
58+
InternalAggregation agg = new InternalAvg("the_avg", 2, 1,
59+
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap());
60+
InternalAggregations internalAggregations = new InternalAggregations(Collections.singletonList(agg));
61+
62+
LongTerms.Bucket bucket = new LongTerms.Bucket(1, 1, internalAggregations, false, 0, DocValueFormat.RAW);
63+
buckets.add(bucket);
64+
65+
Object[] value = (Object[]) resolvePropertyFromPath(path.getPathElementsAsStringList(), buckets, "the_long_terms");
66+
assertThat(value[0], equalTo(2.0));
67+
}
68+
69+
public void testResolveToNothing() {
70+
AggregationPath path = AggregationPath.parse("foo.value");
71+
List<LongTerms.Bucket> buckets = new ArrayList<>();
72+
InternalAggregation agg = new InternalAvg("the_avg", 2, 1,
73+
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap());
74+
InternalAggregations internalAggregations = new InternalAggregations(Collections.singletonList(agg));
75+
76+
LongTerms.Bucket bucket = new LongTerms.Bucket(1, 1, internalAggregations, false, 0, DocValueFormat.RAW);
77+
buckets.add(bucket);
78+
79+
InvalidAggregationPathException e = expectThrows(InvalidAggregationPathException.class,
80+
() -> resolvePropertyFromPath(path.getPathElementsAsStringList(), buckets, "the_long_terms"));
81+
assertThat(e.getMessage(), equalTo("Cannot find an aggregation named [foo] in [the_long_terms]"));
82+
}
83+
84+
public void testResolveToUnknown() {
85+
AggregationPath path = AggregationPath.parse("the_avg.unknown");
86+
List<LongTerms.Bucket> buckets = new ArrayList<>();
87+
InternalAggregation agg = new InternalAvg("the_avg", 2, 1,
88+
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap());
89+
InternalAggregations internalAggregations = new InternalAggregations(Collections.singletonList(agg));
90+
91+
LongTerms.Bucket bucket = new LongTerms.Bucket(1, 1, internalAggregations, false, 0, DocValueFormat.RAW);
92+
buckets.add(bucket);
93+
94+
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
95+
() -> resolvePropertyFromPath(path.getPathElementsAsStringList(), buckets, "the_long_terms"));
96+
assertThat(e.getMessage(), equalTo("path not supported for [the_avg]: [unknown]"));
97+
}
98+
99+
public void testResolveToBucketCount() {
100+
AggregationPath path = AggregationPath.parse("_bucket_count");
101+
List<LongTerms.Bucket> buckets = new ArrayList<>();
102+
InternalAggregation agg = new InternalAvg("the_avg", 2, 1,
103+
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap());
104+
InternalAggregations internalAggregations = new InternalAggregations(Collections.singletonList(agg));
105+
106+
LongTerms.Bucket bucket = new LongTerms.Bucket(1, 1, internalAggregations, false, 0, DocValueFormat.RAW);
107+
buckets.add(bucket);
108+
109+
Object value = resolvePropertyFromPath(path.getPathElementsAsStringList(), buckets, "the_long_terms");
110+
assertThat(value, equalTo(1));
111+
}
112+
113+
public void testResolveToCount() {
114+
AggregationPath path = AggregationPath.parse("_count");
115+
List<LongTerms.Bucket> buckets = new ArrayList<>();
116+
InternalAggregation agg = new InternalAvg("the_avg", 2, 1,
117+
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap());
118+
InternalAggregations internalAggregations = new InternalAggregations(Collections.singletonList(agg));
119+
120+
LongTerms.Bucket bucket = new LongTerms.Bucket(1, 1, internalAggregations, false, 0, DocValueFormat.RAW);
121+
buckets.add(bucket);
122+
123+
Object[] value = (Object[]) resolvePropertyFromPath(path.getPathElementsAsStringList(), buckets, "the_long_terms");
124+
assertThat(value[0], equalTo(1L));
125+
}
126+
127+
public void testResolveToKey() {
128+
AggregationPath path = AggregationPath.parse("_key");
129+
List<LongTerms.Bucket> buckets = new ArrayList<>();
130+
InternalAggregation agg = new InternalAvg("the_avg", 2, 1,
131+
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap());
132+
InternalAggregations internalAggregations = new InternalAggregations(Collections.singletonList(agg));
133+
134+
LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW);
135+
buckets.add(bucket);
136+
137+
Object[] value = (Object[]) resolvePropertyFromPath(path.getPathElementsAsStringList(), buckets, "the_long_terms");
138+
assertThat(value[0], equalTo(19L));
139+
}
140+
141+
public void testResolveToSpecificBucket() {
142+
AggregationPath path = AggregationPath.parse("string_terms['foo']>the_avg.value");
143+
144+
List<LongTerms.Bucket> buckets = new ArrayList<>();
145+
InternalAggregation agg = new InternalAvg("the_avg", 2, 1,
146+
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap());
147+
InternalAggregations internalStringAggs = new InternalAggregations(Collections.singletonList(agg));
148+
List<StringTerms.Bucket> stringBuckets = Collections.singletonList(new StringTerms.Bucket(
149+
new BytesRef("foo".getBytes(StandardCharsets.UTF_8), 0, "foo".getBytes(StandardCharsets.UTF_8).length), 1,
150+
internalStringAggs, false, 0, DocValueFormat.RAW));
151+
152+
InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), 1, 0, Collections.emptyList(),
153+
Collections.emptyMap(), DocValueFormat.RAW, 1, false, 0, stringBuckets, 0);
154+
InternalAggregations internalAggregations = new InternalAggregations(Collections.singletonList(termsAgg));
155+
LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW);
156+
buckets.add(bucket);
157+
158+
Object[] value = (Object[]) resolvePropertyFromPath(path.getPathElementsAsStringList(), buckets, "the_long_terms");
159+
assertThat(value[0], equalTo(2.0));
160+
}
161+
162+
public void testResolveToMissingSpecificBucket() {
163+
AggregationPath path = AggregationPath.parse("string_terms['bar']>the_avg.value");
164+
165+
List<LongTerms.Bucket> buckets = new ArrayList<>();
166+
InternalAggregation agg = new InternalAvg("the_avg", 2, 1,
167+
DocValueFormat.RAW, Collections.emptyList(), Collections.emptyMap());
168+
InternalAggregations internalStringAggs = new InternalAggregations(Collections.singletonList(agg));
169+
List<StringTerms.Bucket> stringBuckets = Collections.singletonList(new StringTerms.Bucket(
170+
new BytesRef("foo".getBytes(StandardCharsets.UTF_8), 0, "foo".getBytes(StandardCharsets.UTF_8).length), 1,
171+
internalStringAggs, false, 0, DocValueFormat.RAW));
172+
173+
InternalTerms termsAgg = new StringTerms("string_terms", BucketOrder.count(false), 1, 0, Collections.emptyList(),
174+
Collections.emptyMap(), DocValueFormat.RAW, 1, false, 0, stringBuckets, 0);
175+
InternalAggregations internalAggregations = new InternalAggregations(Collections.singletonList(termsAgg));
176+
LongTerms.Bucket bucket = new LongTerms.Bucket(19, 1, internalAggregations, false, 0, DocValueFormat.RAW);
177+
buckets.add(bucket);
178+
179+
InvalidAggregationPathException e = expectThrows(InvalidAggregationPathException.class,
180+
() -> resolvePropertyFromPath(path.getPathElementsAsStringList(), buckets, "the_long_terms"));
181+
assertThat(e.getMessage(), equalTo("Cannot find an key ['bar'] in [string_terms]"));
182+
}
183+
}

server/src/test/java/org/elasticsearch/search/aggregations/metrics/ScriptedMetricAggregatorTests.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,17 @@ public static void initMockScripts() {
173173
});
174174
}
175175

176+
@Override
177+
protected ScriptService getMockScriptService() {
178+
MockScriptEngine scriptEngine = new MockScriptEngine(MockScriptEngine.NAME,
179+
SCRIPTS,
180+
Collections.emptyMap());
181+
Map<String, ScriptEngine> engines = Collections.singletonMap(scriptEngine.getType(), scriptEngine);
182+
183+
return new ScriptService(Settings.EMPTY, engines, ScriptModule.CORE_CONTEXTS);
184+
}
185+
186+
176187
@SuppressWarnings("unchecked")
177188
public void testNoDocs() throws IOException {
178189
try (Directory directory = newDirectory()) {
@@ -311,7 +322,7 @@ public void testAggParamsPassedToReduceScript() throws IOException {
311322
.initScript(INIT_SCRIPT_PARAMS).mapScript(MAP_SCRIPT_PARAMS)
312323
.combineScript(COMBINE_SCRIPT_PARAMS).reduceScript(REDUCE_SCRIPT_PARAMS);
313324
ScriptedMetric scriptedMetric = searchAndReduce(
314-
newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder, 0, scriptService);
325+
newSearcher(indexReader, true, true), new MatchAllDocsQuery(), aggregationBuilder, 0);
315326

316327
// The result value depends on the script params.
317328
assertEquals(4803, scriptedMetric.aggregation());

server/src/test/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketAggregatorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ public void testSameAggNames() throws IOException {
120120
valueFieldType.setName(VALUE_FIELD);
121121
valueFieldType.setHasDocValues(true);
122122

123-
avgResult = searchAndReduce(indexSearcher, query, avgBuilder, 10000, null,
123+
avgResult = searchAndReduce(indexSearcher, query, avgBuilder, 10000,
124124
new MappedFieldType[]{fieldType, valueFieldType});
125-
histogramResult = searchAndReduce(indexSearcher, query, histo, 10000, null,
125+
histogramResult = searchAndReduce(indexSearcher, query, histo, 10000,
126126
new MappedFieldType[]{fieldType, valueFieldType});
127127
}
128128

0 commit comments

Comments
 (0)