Skip to content

Commit 569dffc

Browse files
authored
Move pipeline agg validation to coordinating node (#53669)
This moves the pipeline aggregation validation from the data node to the coordinating node so that we, eventually, can stop sending pipeline aggregations to the data nodes entirely. In fact, it moves it into the "request validation" stage so multiple errors can be accumulated and sent back to the requester for the entire request. We can't always take advantage of that, but it'll be nice for folks not to have to play whack-a-mole with validation. This is implemented by replacing `PipelineAggretionBuilder#validate` with: ``` protected abstract void validate(ValidationContext context); ``` The `ValidationContext` handles the accumulation of validation failures, provides access to the aggregation's siblings, and implements a few validation utility methods.
1 parent ffbb558 commit 569dffc

File tree

36 files changed

+546
-526
lines changed

36 files changed

+546
-526
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,11 @@ public ActionRequestValidationException validate() {
277277
addValidationError("[request_cache] cannot be used in a scroll context", validationException);
278278
}
279279
}
280+
if (source != null) {
281+
if (source.aggregations() != null) {
282+
validationException = source.aggregations().validate(validationException);
283+
}
284+
}
280285
return validationException;
281286
}
282287

server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.search.aggregations;
2020

21+
import org.elasticsearch.action.ActionRequestValidationException;
2122
import org.elasticsearch.common.ParsingException;
2223
import org.elasticsearch.common.Strings;
2324
import org.elasticsearch.common.io.stream.StreamInput;
@@ -283,8 +284,6 @@ public boolean mustVisitAllDocs() {
283284
return false;
284285
}
285286

286-
287-
288287
public Builder addAggregator(AggregationBuilder factory) {
289288
if (!names.add(factory.name)) {
290289
throw new IllegalArgumentException("Two sibling aggregations cannot have the same name: [" + factory.name + "]");
@@ -298,23 +297,59 @@ public Builder addPipelineAggregator(PipelineAggregationBuilder pipelineAggregat
298297
return this;
299298
}
300299

300+
/**
301+
* Validate the root of the aggregation tree.
302+
*/
303+
public ActionRequestValidationException validate(ActionRequestValidationException e) {
304+
PipelineAggregationBuilder.ValidationContext context =
305+
PipelineAggregationBuilder.ValidationContext.forTreeRoot(aggregationBuilders, pipelineAggregatorBuilders, e);
306+
validatePipelines(context);
307+
return validateChildren(context.getValidationException());
308+
}
309+
310+
/**
311+
* Validate a the pipeline aggregations in this factory.
312+
*/
313+
private void validatePipelines(PipelineAggregationBuilder.ValidationContext context) {
314+
List<PipelineAggregationBuilder> orderedPipelineAggregators;
315+
try {
316+
orderedPipelineAggregators = resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders);
317+
} catch (IllegalArgumentException iae) {
318+
context.addValidationError(iae.getMessage());
319+
return;
320+
}
321+
for (PipelineAggregationBuilder builder : orderedPipelineAggregators) {
322+
builder.validate(context);
323+
}
324+
}
325+
326+
/**
327+
* Validate a the children of this factory.
328+
*/
329+
private ActionRequestValidationException validateChildren(ActionRequestValidationException e) {
330+
for (AggregationBuilder agg : aggregationBuilders) {
331+
PipelineAggregationBuilder.ValidationContext context =
332+
PipelineAggregationBuilder.ValidationContext.forInsideTree(agg, e);
333+
agg.factoriesBuilder.validatePipelines(context);
334+
e = agg.factoriesBuilder.validateChildren(context.getValidationException());
335+
}
336+
return e;
337+
}
338+
301339
public AggregatorFactories build(QueryShardContext queryShardContext, AggregatorFactory parent) throws IOException {
302340
if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) {
303341
return EMPTY;
304342
}
305-
List<PipelineAggregationBuilder> orderedpipelineAggregators = null;
306-
orderedpipelineAggregators = resolvePipelineAggregatorOrder(this.pipelineAggregatorBuilders, this.aggregationBuilders);
307-
for (PipelineAggregationBuilder builder : orderedpipelineAggregators) {
308-
builder.validate(parent, aggregationBuilders, pipelineAggregatorBuilders);
309-
}
343+
List<PipelineAggregationBuilder> orderedPipelineAggregators =
344+
resolvePipelineAggregatorOrder(pipelineAggregatorBuilders, aggregationBuilders);
310345
AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()];
311346

312347
int i = 0;
313348
for (AggregationBuilder agg : aggregationBuilders) {
314349
aggFactories[i] = agg.build(queryShardContext, parent);
315350
++i;
316351
}
317-
return new AggregatorFactories(aggFactories, orderedpipelineAggregators);
352+
return new AggregatorFactories(aggFactories, orderedPipelineAggregators);
318353
}
319354

320355
private List<PipelineAggregationBuilder> resolvePipelineAggregatorOrder(

server/src/main/java/org/elasticsearch/search/aggregations/PipelineAggregationBuilder.java

Lines changed: 144 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@
1818
*/
1919
package org.elasticsearch.search.aggregations;
2020

21+
import org.elasticsearch.action.ActionRequestValidationException;
22+
import org.elasticsearch.action.ValidateActions;
2123
import org.elasticsearch.common.Strings;
2224
import org.elasticsearch.common.io.stream.NamedWriteable;
2325
import org.elasticsearch.common.xcontent.ToXContentFragment;
2426
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
27+
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
28+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
29+
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
2530
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2631

2732
import java.util.Collection;
2833
import java.util.Map;
34+
import java.util.Objects;
2935

3036
/**
3137
* A factory that knows how to create an {@link PipelineAggregator} of a
@@ -64,11 +70,145 @@ public final String[] getBucketsPaths() {
6470
}
6571

6672
/**
67-
* Internal: Validates the state of this factory (makes sure the factory is properly
68-
* configured)
73+
* Makes sure this builder is properly configured.
6974
*/
70-
protected abstract void validate(AggregatorFactory parent, Collection<AggregationBuilder> aggregationBuilders,
71-
Collection<PipelineAggregationBuilder> pipelineAggregatorBuilders);
75+
protected abstract void validate(ValidationContext context);
76+
public abstract static class ValidationContext {
77+
/**
78+
* Build the context for the root of the aggregation tree.
79+
*/
80+
public static ValidationContext forTreeRoot(Collection<AggregationBuilder> siblingAggregations,
81+
Collection<PipelineAggregationBuilder> siblingPipelineAggregations,
82+
ActionRequestValidationException validationFailuresSoFar) {
83+
return new ForTreeRoot(siblingAggregations, siblingPipelineAggregations, validationFailuresSoFar);
84+
}
85+
86+
/**
87+
* Build the context for a node inside the aggregation tree.
88+
*/
89+
public static ValidationContext forInsideTree(AggregationBuilder parent,
90+
ActionRequestValidationException validationFailuresSoFar) {
91+
return new ForInsideTree(parent, validationFailuresSoFar);
92+
}
93+
94+
95+
private ActionRequestValidationException e;
96+
97+
private ValidationContext(ActionRequestValidationException validationFailuresSoFar) {
98+
this.e = validationFailuresSoFar;
99+
}
100+
101+
private static class ForTreeRoot extends ValidationContext {
102+
private final Collection<AggregationBuilder> siblingAggregations;
103+
private final Collection<PipelineAggregationBuilder> siblingPipelineAggregations;
104+
105+
ForTreeRoot(Collection<AggregationBuilder> siblingAggregations,
106+
Collection<PipelineAggregationBuilder> siblingPipelineAggregations,
107+
ActionRequestValidationException validationFailuresSoFar) {
108+
super(validationFailuresSoFar);
109+
this.siblingAggregations = Objects.requireNonNull(siblingAggregations);
110+
this.siblingPipelineAggregations = Objects.requireNonNull(siblingPipelineAggregations);
111+
}
112+
113+
@Override
114+
public Collection<AggregationBuilder> getSiblingAggregations() {
115+
return siblingAggregations;
116+
}
117+
118+
@Override
119+
public Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations() {
120+
return siblingPipelineAggregations;
121+
}
122+
123+
@Override
124+
public void validateParentAggSequentiallyOrdered(String type, String name) {
125+
addValidationError(type + " aggregation [" + name
126+
+ "] must have a histogram, date_histogram or auto_date_histogram as parent but doesn't have a parent");
127+
}
128+
}
129+
130+
private static class ForInsideTree extends ValidationContext {
131+
private final AggregationBuilder parent;
132+
133+
ForInsideTree(AggregationBuilder parent, ActionRequestValidationException validationFailuresSoFar) {
134+
super(validationFailuresSoFar);
135+
this.parent = Objects.requireNonNull(parent);
136+
}
137+
138+
@Override
139+
public Collection<AggregationBuilder> getSiblingAggregations() {
140+
return parent.getSubAggregations();
141+
}
142+
143+
@Override
144+
public Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations() {
145+
return parent.getPipelineAggregations();
146+
}
147+
148+
@Override
149+
public void validateParentAggSequentiallyOrdered(String type, String name) {
150+
if (parent instanceof HistogramAggregationBuilder) {
151+
HistogramAggregationBuilder histoParent = (HistogramAggregationBuilder) parent;
152+
if (histoParent.minDocCount() != 0) {
153+
addValidationError(
154+
"parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
155+
}
156+
} else if (parent instanceof DateHistogramAggregationBuilder) {
157+
DateHistogramAggregationBuilder histoParent = (DateHistogramAggregationBuilder) parent;
158+
if (histoParent.minDocCount() != 0) {
159+
addValidationError(
160+
"parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
161+
}
162+
} else if (parent instanceof AutoDateHistogramAggregationBuilder) {
163+
// Nothing to check
164+
} else {
165+
addValidationError(
166+
type + " aggregation [" + name + "] must have a histogram, date_histogram or auto_date_histogram as parent");
167+
}
168+
}
169+
}
170+
171+
/**
172+
* Aggregations that are siblings to the aggregation being validated.
173+
*/
174+
public abstract Collection<AggregationBuilder> getSiblingAggregations();
175+
176+
/**
177+
* Pipeline aggregations that are siblings to the aggregation being validated.
178+
*/
179+
public abstract Collection<PipelineAggregationBuilder> getSiblingPipelineAggregations();
180+
181+
/**
182+
* Add a validation error to this context. All validation errors
183+
* are accumulated in a list and, if there are any, the request
184+
* is not executed and the entire list is returned as the error
185+
* response.
186+
*/
187+
public void addValidationError(String error) {
188+
e = ValidateActions.addValidationError(error, e);
189+
}
190+
191+
/**
192+
* Add a validation error about the {@code buckets_path}.
193+
*/
194+
public void addBucketPathValidationError(String error) {
195+
addValidationError(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() + ' ' + error);
196+
}
197+
198+
/**
199+
* Validates that the parent is sequentially ordered.
200+
*/
201+
public abstract void validateParentAggSequentiallyOrdered(String type, String name);
202+
203+
/**
204+
* The validation exception, if there is one. It'll be {@code null}
205+
* if the context wasn't provided with any exception on creation
206+
* and none were added.
207+
*/
208+
public ActionRequestValidationException getValidationException() {
209+
return e;
210+
}
211+
}
72212

73213
/**
74214
* Creates the pipeline aggregator

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/AbstractPipelineAggregationBuilder.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,10 @@
2222
import org.elasticsearch.common.io.stream.StreamInput;
2323
import org.elasticsearch.common.io.stream.StreamOutput;
2424
import org.elasticsearch.common.xcontent.XContentBuilder;
25-
import org.elasticsearch.search.aggregations.AggregationBuilder;
26-
import org.elasticsearch.search.aggregations.AggregatorFactory;
2725
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
28-
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregatorFactory;
29-
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
30-
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;
3126

3227
import java.io.IOException;
3328
import java.util.Arrays;
34-
import java.util.Collection;
3529
import java.util.Map;
3630
import java.util.Objects;
3731

@@ -79,16 +73,6 @@ public String type() {
7973
return type;
8074
}
8175

82-
/**
83-
* Validates the state of this factory (makes sure the factory is properly
84-
* configured)
85-
*/
86-
@Override
87-
public final void validate(AggregatorFactory parent, Collection<AggregationBuilder> factories,
88-
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
89-
doValidate(parent, factories, pipelineAggregatorFactories);
90-
}
91-
9276
protected abstract PipelineAggregator createInternal(Map<String, Object> metaData);
9377

9478
/**
@@ -102,32 +86,6 @@ public final PipelineAggregator create() {
10286
return aggregator;
10387
}
10488

105-
public void doValidate(AggregatorFactory parent, Collection<AggregationBuilder> factories,
106-
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
107-
}
108-
109-
/**
110-
* Validates pipeline aggregations that need sequentially ordered data.
111-
*/
112-
public static void validateSequentiallyOrderedParentAggs(AggregatorFactory parent, String type, String name) {
113-
if ((parent instanceof HistogramAggregatorFactory || parent instanceof DateHistogramAggregatorFactory
114-
|| parent instanceof AutoDateHistogramAggregatorFactory) == false) {
115-
throw new IllegalStateException(
116-
type + " aggregation [" + name + "] must have a histogram, date_histogram or auto_date_histogram as parent");
117-
}
118-
if (parent instanceof HistogramAggregatorFactory) {
119-
HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent;
120-
if (histoParent.minDocCount() != 0) {
121-
throw new IllegalStateException("parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
122-
}
123-
} else if (parent instanceof DateHistogramAggregatorFactory) {
124-
DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent;
125-
if (histoParent.minDocCount() != 0) {
126-
throw new IllegalStateException("parent histogram of " + type + " aggregation [" + name + "] must have min_doc_count of 0");
127-
}
128-
}
129-
}
130-
13189
@SuppressWarnings("unchecked")
13290
@Override
13391
public PAB setMetaData(Map<String, Object> metaData) {

server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipelineAggregationBuilder.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,10 @@
2424
import org.elasticsearch.common.xcontent.XContentBuilder;
2525
import org.elasticsearch.search.DocValueFormat;
2626
import org.elasticsearch.search.aggregations.AggregationBuilder;
27-
import org.elasticsearch.search.aggregations.AggregatorFactory;
28-
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
2927
import org.elasticsearch.search.aggregations.bucket.MultiBucketAggregationBuilder;
3028
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
3129

3230
import java.io.IOException;
33-
import java.util.Collection;
3431
import java.util.Map;
3532
import java.util.Objects;
3633
import java.util.Optional;
@@ -107,28 +104,27 @@ public GapPolicy gapPolicy() {
107104
protected abstract PipelineAggregator createInternal(Map<String, Object> metaData);
108105

109106
@Override
110-
public void doValidate(AggregatorFactory parent, Collection<AggregationBuilder> aggBuilders,
111-
Collection<PipelineAggregationBuilder> pipelineAggregatorFactories) {
107+
protected void validate(ValidationContext context) {
112108
if (bucketsPaths.length != 1) {
113-
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
114-
+ " must contain a single entry for aggregation [" + name + "]");
109+
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
110+
return;
115111
}
116112
// Need to find the first agg name in the buckets path to check its a
117113
// multi bucket agg: aggs are split with '>' and can optionally have a
118114
// metric name after them by using '.' so need to split on both to get
119115
// just the agg name
120116
final String firstAgg = bucketsPaths[0].split("[>\\.]")[0];
121-
Optional<AggregationBuilder> aggBuilder = aggBuilders.stream().filter((builder) -> builder.getName().equals(firstAgg))
117+
Optional<AggregationBuilder> aggBuilder = context.getSiblingAggregations().stream()
118+
.filter(builder -> builder.getName().equals(firstAgg))
122119
.findAny();
123-
if (aggBuilder.isPresent()) {
124-
if ((aggBuilder.get() instanceof MultiBucketAggregationBuilder) == false) {
125-
throw new IllegalArgumentException("The first aggregation in " + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
126-
+ " must be a multi-bucket aggregation for aggregation [" + name + "] found :"
127-
+ aggBuilder.get().getClass().getName() + " for buckets path: " + bucketsPaths[0]);
128-
}
129-
} else {
130-
throw new IllegalArgumentException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
131-
+ " aggregation does not exist for aggregation [" + name + "]: " + bucketsPaths[0]);
120+
if (aggBuilder.isEmpty()) {
121+
context.addBucketPathValidationError("aggregation does not exist for aggregation [" + name + "]: " + bucketsPaths[0]);
122+
return;
123+
}
124+
if ((aggBuilder.get() instanceof MultiBucketAggregationBuilder) == false) {
125+
context.addValidationError("The first aggregation in " + PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
126+
+ " must be a multi-bucket aggregation for aggregation [" + name + "] found :"
127+
+ aggBuilder.get().getClass().getName() + " for buckets path: " + bucketsPaths[0]);
132128
}
133129
}
134130

0 commit comments

Comments
 (0)