Skip to content

Commit d615add

Browse files
jklancicpolyfractal
authored andcommitted
Add pipeline parent validation for auto date histogram (#35670)
Allow `auto_date_histogram` as a valid parent agg for derivative, cumulative sum, moving average, moving function and serial differencing pipeline aggregations. Since all these aggs share the same requirement (sequentially ordered parent aggs), this commit also refactors to share the same validation code so that any newly added aggs won't be forgotten. Closes #35578
1 parent 01afeff commit d615add

16 files changed

+317
-107
lines changed

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregatorFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public final class HistogramAggregatorFactory extends ValuesSourceAggregatorFact
4242
private final long minDocCount;
4343
private final double minBound, maxBound;
4444

45-
HistogramAggregatorFactory(String name, ValuesSourceConfig<Numeric> config, double interval, double offset,
45+
public HistogramAggregatorFactory(String name, ValuesSourceConfig<Numeric> config, double interval, double offset,
4646
BucketOrder order, boolean keyed, long minDocCount, double minBound, double maxBound,
4747
SearchContext context, AggregatorFactory<?> parent,
4848
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/AbstractPipelineAggregationBuilder.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.elasticsearch.search.aggregations.AggregationBuilder;
2626
import org.elasticsearch.search.aggregations.AggregatorFactory;
2727
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
28+
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregatorFactory;
29+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
30+
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;
2831

2932
import java.io.IOException;
3033
import java.util.Arrays;
@@ -102,6 +105,28 @@ public final PipelineAggregator create() throws IOException {
102105
public void doValidate(AggregatorFactory<?> parent, Collection<AggregationBuilder> factories,
103106
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
104107
}
108+
109+
/**
110+
* Validates pipeline aggregations that need sequentially ordered data.
111+
*/
112+
public static void validateSequentiallyOrderedParentAggs(AggregatorFactory<?> parent, String type, String name) {
113+
if ((parent instanceof HistogramAggregatorFactory || parent instanceof DateHistogramAggregatorFactory
114+
|| parent instanceof AutoDateHistogramAggregatorFactory) == false) {
115+
throw new IllegalStateException(
116+
type + " aggregation [" + name + "] must have a histogram, date_histogram or auto_date_histogram as parent");
117+
}
118+
if (parent instanceof HistogramAggregatorFactory) {
119+
HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent;
120+
if (histoParent.minDocCount() != 0) {
121+
throw new IllegalStateException("parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
122+
}
123+
} else if (parent instanceof DateHistogramAggregatorFactory) {
124+
DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent;
125+
if (histoParent.minDocCount() != 0) {
126+
throw new IllegalStateException("parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
127+
}
128+
}
129+
}
105130

106131
@SuppressWarnings("unchecked")
107132
@Override

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumPipelineAggregationBuilder.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import org.elasticsearch.search.aggregations.AggregationBuilder;
2929
import org.elasticsearch.search.aggregations.AggregatorFactory;
3030
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
31-
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
32-
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;
3331

3432
import java.io.IOException;
3533
import java.util.ArrayList;
@@ -101,22 +99,8 @@ public void doValidate(AggregatorFactory<?> parent, Collection<AggregationBuilde
10199
throw new IllegalStateException(BUCKETS_PATH.getPreferredName()
102100
+ " must contain a single entry for aggregation [" + name + "]");
103101
}
104-
if (parent instanceof HistogramAggregatorFactory) {
105-
HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent;
106-
if (histoParent.minDocCount() != 0) {
107-
throw new IllegalStateException("parent histogram of cumulative sum aggregation [" + name
108-
+ "] must have min_doc_count of 0");
109-
}
110-
} else if (parent instanceof DateHistogramAggregatorFactory) {
111-
DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent;
112-
if (histoParent.minDocCount() != 0) {
113-
throw new IllegalStateException("parent histogram of cumulative sum aggregation [" + name
114-
+ "] must have min_doc_count of 0");
115-
}
116-
} else {
117-
throw new IllegalStateException("cumulative sum aggregation [" + name
118-
+ "] must have a histogram or date_histogram as parent");
119-
}
102+
103+
validateSequentiallyOrderedParentAggs(parent, NAME, name);
120104
}
121105

122106
@Override

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/DerivativePipelineAggregationBuilder.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@
3232
import org.elasticsearch.search.aggregations.AggregatorFactory;
3333
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
3434
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
35-
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
3635
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
37-
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;
3836
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
3937
import org.joda.time.DateTimeZone;
4038

@@ -161,22 +159,8 @@ public void doValidate(AggregatorFactory<?> parent, Collection<AggregationBuilde
161159
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
162160
+ " must contain a single entry for aggregation [" + name + "]");
163161
}
164-
if (parent instanceof HistogramAggregatorFactory) {
165-
HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent;
166-
if (histoParent.minDocCount() != 0) {
167-
throw new IllegalStateException("parent histogram of derivative aggregation [" + name
168-
+ "] must have min_doc_count of 0");
169-
}
170-
} else if (parent instanceof DateHistogramAggregatorFactory) {
171-
DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent;
172-
if (histoParent.minDocCount() != 0) {
173-
throw new IllegalStateException("parent histogram of derivative aggregation [" + name
174-
+ "] must have min_doc_count of 0");
175-
}
176-
} else {
177-
throw new IllegalStateException("derivative aggregation [" + name
178-
+ "] must have a histogram or date_histogram as parent");
179-
}
162+
163+
validateSequentiallyOrderedParentAggs(parent, NAME, name);
180164
}
181165

182166
@Override

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovAvgPipelineAggregationBuilder.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import org.elasticsearch.search.aggregations.AggregationBuilder;
3333
import org.elasticsearch.search.aggregations.AggregatorFactory;
3434
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
35-
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
36-
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;
3735
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
3836

3937
import java.io.IOException;
@@ -267,22 +265,8 @@ public void doValidate(AggregatorFactory<?> parent, Collection<AggregationBuilde
267265
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
268266
+ " must contain a single entry for aggregation [" + name + "]");
269267
}
270-
if (parent instanceof HistogramAggregatorFactory) {
271-
HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent;
272-
if (histoParent.minDocCount() != 0) {
273-
throw new IllegalStateException("parent histogram of moving average aggregation [" + name
274-
+ "] must have min_doc_count of 0");
275-
}
276-
} else if (parent instanceof DateHistogramAggregatorFactory) {
277-
DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent;
278-
if (histoParent.minDocCount() != 0) {
279-
throw new IllegalStateException("parent histogram of moving average aggregation [" + name
280-
+ "] must have min_doc_count of 0");
281-
}
282-
} else {
283-
throw new IllegalStateException("moving average aggregation [" + name
284-
+ "] must have a histogram or date_histogram as parent");
285-
}
268+
269+
validateSequentiallyOrderedParentAggs(parent, NAME, name);
286270
}
287271

288272
@Override

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/MovFnPipelineAggregationBuilder.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import org.elasticsearch.search.aggregations.AggregationBuilder;
3333
import org.elasticsearch.search.aggregations.AggregatorFactory;
3434
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
35-
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
36-
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;
3735
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
3836

3937
import java.io.IOException;
@@ -176,22 +174,8 @@ public void doValidate(AggregatorFactory<?> parent, Collection<AggregationBuilde
176174
if (window <= 0) {
177175
throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer.");
178176
}
179-
if (parent instanceof HistogramAggregatorFactory) {
180-
HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent;
181-
if (histoParent.minDocCount() != 0) {
182-
throw new IllegalStateException("parent histogram of moving_function aggregation [" + name
183-
+ "] must have min_doc_count of 0");
184-
}
185-
} else if (parent instanceof DateHistogramAggregatorFactory) {
186-
DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent;
187-
if (histoParent.minDocCount() != 0) {
188-
throw new IllegalStateException("parent histogram of moving_function aggregation [" + name
189-
+ "] must have min_doc_count of 0");
190-
}
191-
} else {
192-
throw new IllegalStateException("moving_function aggregation [" + name
193-
+ "] must have a histogram or date_histogram as parent");
194-
}
177+
178+
validateSequentiallyOrderedParentAggs(parent, NAME, name);
195179
}
196180

197181
@Override

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffPipelineAggregationBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,14 @@
2626
import org.elasticsearch.common.xcontent.XContentBuilder;
2727
import org.elasticsearch.common.xcontent.XContentParser;
2828
import org.elasticsearch.search.DocValueFormat;
29+
import org.elasticsearch.search.aggregations.AggregationBuilder;
30+
import org.elasticsearch.search.aggregations.AggregatorFactory;
31+
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
2932
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
3033

3134
import java.io.IOException;
3235
import java.util.ArrayList;
36+
import java.util.Collection;
3337
import java.util.List;
3438
import java.util.Map;
3539
import java.util.Objects;
@@ -134,6 +138,12 @@ protected DocValueFormat formatter() {
134138
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
135139
return new SerialDiffPipelineAggregator(name, bucketsPaths, formatter(), gapPolicy, lag, metaData);
136140
}
141+
142+
@Override
143+
public void doValidate(AggregatorFactory<?> parent, Collection<AggregationBuilder> aggFactories,
144+
Collection<PipelineAggregationBuilder> pipelineAggregatoractories) {
145+
validateSequentiallyOrderedParentAggs(parent, NAME, name);
146+
}
137147

138148
@Override
139149
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {

server/src/test/java/org/elasticsearch/search/aggregations/MultiBucketAggregatorWrapperTests.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,14 @@
3030
import org.elasticsearch.search.internal.SearchContext;
3131
import org.elasticsearch.test.ESTestCase;
3232

33-
import java.io.IOException;
3433
import java.util.Collections;
35-
import java.util.List;
36-
import java.util.Map;
3734

35+
import static org.mockito.Matchers.same;
3836
import static org.mockito.Mockito.mock;
39-
import static org.mockito.Mockito.when;
40-
import static org.mockito.Mockito.verify;
4137
import static org.mockito.Mockito.reset;
42-
import static org.mockito.Mockito.same;
38+
import static org.mockito.Mockito.verify;
4339
import static org.mockito.Mockito.verifyNoMoreInteractions;
40+
import static org.mockito.Mockito.when;
4441

4542
public class MultiBucketAggregatorWrapperTests extends ESTestCase {
4643

@@ -74,21 +71,4 @@ public void testNoNullScorerIsDelegated() throws Exception {
7471
verifyNoMoreInteractions(wrappedCollector);
7572
wrapper.close();
7673
}
77-
78-
static class TestAggregatorFactory extends AggregatorFactory {
79-
80-
private final Aggregator aggregator;
81-
82-
TestAggregatorFactory(SearchContext context, Aggregator aggregator) throws IOException {
83-
super("_name", context, null, new AggregatorFactories.Builder(), Collections.emptyMap());
84-
this.aggregator = aggregator;
85-
}
86-
87-
@Override
88-
protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List list,
89-
Map metaData) throws IOException {
90-
return aggregator;
91-
}
92-
}
93-
9474
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.search.aggregations;
21+
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.util.BigArrays;
24+
import org.elasticsearch.common.util.MockBigArrays;
25+
import org.elasticsearch.common.util.MockPageCacheRecycler;
26+
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
27+
import org.elasticsearch.search.internal.SearchContext;
28+
29+
import java.io.IOException;
30+
import java.util.Collections;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
import static org.mockito.Mockito.mock;
35+
import static org.mockito.Mockito.when;
36+
37+
/**
38+
* Test implementation for AggregatorFactory.
39+
*/
40+
public class TestAggregatorFactory extends AggregatorFactory {
41+
42+
private final Aggregator aggregator;
43+
44+
TestAggregatorFactory(SearchContext context, Aggregator aggregator) throws IOException {
45+
super("_name", context, null, new AggregatorFactories.Builder(), Collections.emptyMap());
46+
this.aggregator = aggregator;
47+
}
48+
49+
@Override
50+
protected Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List list,
51+
Map metaData) throws IOException {
52+
return aggregator;
53+
}
54+
55+
public static TestAggregatorFactory createInstance() throws IOException {
56+
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
57+
SearchContext searchContext = mock(SearchContext.class);
58+
when(searchContext.bigArrays()).thenReturn(bigArrays);
59+
60+
Aggregator aggregator = mock(Aggregator.class);
61+
62+
return new TestAggregatorFactory(searchContext, aggregator);
63+
}
64+
}

server/src/test/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumAggregatorTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.elasticsearch.search.aggregations.AggregationBuilder;
3838
import org.elasticsearch.search.aggregations.AggregatorTestCase;
3939
import org.elasticsearch.search.aggregations.InternalAggregation;
40+
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
41+
import org.elasticsearch.search.aggregations.TestAggregatorFactory;
4042
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
4143
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
4244
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
@@ -48,7 +50,10 @@
4850

4951
import java.io.IOException;
5052
import java.util.Arrays;
53+
import java.util.Collections;
54+
import java.util.HashSet;
5155
import java.util.List;
56+
import java.util.Set;
5257
import java.util.function.Consumer;
5358

5459
import static org.hamcrest.Matchers.equalTo;
@@ -259,6 +264,34 @@ public void testNoBuckets() throws IOException {
259264
}
260265
});
261266
}
267+
268+
/**
269+
* The validation should verify the parent aggregation is allowed.
270+
*/
271+
public void testValidate() throws IOException {
272+
final Set<PipelineAggregationBuilder> aggBuilders = new HashSet<>();
273+
aggBuilders.add(new CumulativeSumPipelineAggregationBuilder("cusum", "sum"));
274+
275+
final CumulativeSumPipelineAggregationBuilder builder = new CumulativeSumPipelineAggregationBuilder("name", "valid");
276+
builder.validate(PipelineAggregationHelperTests.getRandomSequentiallyOrderedParentAgg(), Collections.emptySet(), aggBuilders);
277+
}
278+
279+
/**
280+
* The validation should throw an IllegalArgumentException, since parent
281+
* aggregation is not a type of HistogramAggregatorFactory,
282+
* DateHistogramAggregatorFactory or AutoDateHistogramAggregatorFactory.
283+
*/
284+
public void testValidateException() throws IOException {
285+
final Set<PipelineAggregationBuilder> aggBuilders = new HashSet<>();
286+
aggBuilders.add(new CumulativeSumPipelineAggregationBuilder("cusum", "sum"));
287+
TestAggregatorFactory parentFactory = TestAggregatorFactory.createInstance();
288+
289+
final CumulativeSumPipelineAggregationBuilder builder = new CumulativeSumPipelineAggregationBuilder("name", "invalid_agg>metric");
290+
IllegalStateException ex = expectThrows(IllegalStateException.class,
291+
() -> builder.validate(parentFactory, Collections.emptySet(), aggBuilders));
292+
assertEquals("cumulative_sum aggregation [name] must have a histogram, date_histogram or auto_date_histogram as parent",
293+
ex.getMessage());
294+
}
262295

263296
private void executeTestCase(Query query, AggregationBuilder aggBuilder, Consumer<InternalAggregation> verify) throws IOException {
264297
executeTestCase(query, aggBuilder, verify, indexWriter -> {

0 commit comments

Comments
 (0)