Skip to content

Commit af11ec5

Browse files
authored
Remove serialization from pipeline aggregator (#55026)
`master` doesn't need to talk to Elasticsearch versions before 7.8.0 so `PipelineAggregator` doesn't need to be writable *at all* in master. New pipeline aggregations don't need to worry about serializing `PipelineAggregator` at all so this drops all of it. For the most part we don't need to worry about serialization of `PipelineAggregator` at all any more. When backporting a change to an aggregator that is serialized to previous versions of Elasticsearch it *should* be fairly simple to pick a value to serialize. And the compiler *should* tell you that you need to do it. In many cases you this'll be a noop. *Hopefully* all cases. Closes #53742
1 parent 0e55fde commit af11ec5

30 files changed

+23
-811
lines changed

server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -368,46 +368,6 @@ public AggregationSpec setAggregatorRegistrar(Consumer<ValuesSourceRegistry> agg
368368
class PipelineAggregationSpec extends SearchExtensionSpec<PipelineAggregationBuilder,
369369
ContextParser<String, ? extends PipelineAggregationBuilder>> {
370370
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
371-
/**
372-
* Read the aggregator from a stream.
373-
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
374-
*/
375-
@Deprecated
376-
private final Writeable.Reader<? extends PipelineAggregator> aggregatorReader;
377-
378-
/**
379-
* Specification of a {@link PipelineAggregator}.
380-
*
381-
* @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it
382-
* is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and
383-
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}.
384-
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
385-
* {@link StreamInput}
386-
* @param parser reads the aggregation builder from XContent
387-
*/
388-
public PipelineAggregationSpec(ParseField name,
389-
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
390-
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
391-
super(name, builderReader, parser);
392-
this.aggregatorReader = null;
393-
}
394-
395-
/**
396-
* Specification of a {@link PipelineAggregator}.
397-
*
398-
* @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the
399-
* {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from
400-
* {@link NamedWriteable#getWriteableName()}.
401-
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
402-
* {@link StreamInput}
403-
* @param parser reads the aggregation builder from XContent
404-
*/
405-
public PipelineAggregationSpec(String name,
406-
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
407-
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
408-
super(name, builderReader, parser);
409-
this.aggregatorReader = null;
410-
}
411371

412372
/**
413373
* Specification of a {@link PipelineAggregator}.
@@ -417,18 +377,12 @@ public PipelineAggregationSpec(String name,
417377
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}.
418378
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
419379
* {@link StreamInput}
420-
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
421380
* @param parser reads the aggregation builder from XContent
422-
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(ParseField, Writeable.Reader, ContextParser)} for
423-
* pipelines implemented after 7.8.0
424381
*/
425-
@Deprecated
426382
public PipelineAggregationSpec(ParseField name,
427383
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
428-
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
429384
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
430385
super(name, builderReader, parser);
431-
this.aggregatorReader = aggregatorReader;
432386
}
433387

434388
/**
@@ -439,18 +393,12 @@ public PipelineAggregationSpec(ParseField name,
439393
* {@link NamedWriteable#getWriteableName()}.
440394
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
441395
* {@link StreamInput}
442-
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
443396
* @param parser reads the aggregation builder from XContent
444-
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines
445-
* implemented after 7.8.0
446397
*/
447-
@Deprecated
448398
public PipelineAggregationSpec(String name,
449399
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
450-
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
451400
ContextParser<String, ? extends PipelineAggregationBuilder> parser) {
452401
super(name, builderReader, parser);
453-
this.aggregatorReader = aggregatorReader;
454402
}
455403

456404
/**
@@ -461,17 +409,14 @@ public PipelineAggregationSpec(String name,
461409
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}.
462410
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
463411
* {@link StreamInput}
464-
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
465412
* @param parser reads the aggregation builder from XContent
466413
* @deprecated prefer the ctor that takes a {@link ContextParser}
467414
*/
468415
@Deprecated
469416
public PipelineAggregationSpec(ParseField name,
470417
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
471-
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
472418
PipelineAggregator.Parser parser) {
473419
super(name, builderReader, (p, n) -> parser.parse(n, p));
474-
this.aggregatorReader = aggregatorReader;
475420
}
476421

477422
/**
@@ -482,16 +427,13 @@ public PipelineAggregationSpec(ParseField name,
482427
* {@link NamedWriteable#getWriteableName()}.
483428
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
484429
* {@link StreamInput}
485-
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
486430
* @deprecated prefer the ctor that takes a {@link ContextParser}
487431
*/
488432
@Deprecated
489433
public PipelineAggregationSpec(String name,
490434
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
491-
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
492435
PipelineAggregator.Parser parser) {
493436
super(name, builderReader, (p, n) -> parser.parse(n, p));
494-
this.aggregatorReader = aggregatorReader;
495437
}
496438

497439
/**
@@ -510,15 +452,6 @@ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.R
510452
return this;
511453
}
512454

513-
/**
514-
* Read the aggregator from a stream.
515-
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
516-
*/
517-
@Deprecated
518-
public Writeable.Reader<? extends PipelineAggregator> getAggregatorReader() {
519-
return aggregatorReader;
520-
}
521-
522455
/**
523456
* Get the readers that must be registered for this aggregation's results.
524457
*/

server/src/main/java/org/elasticsearch/search/SearchModule.java

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -193,52 +193,37 @@
193193
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
194194
import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
195195
import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder;
196-
import org.elasticsearch.search.aggregations.pipeline.AvgBucketPipelineAggregator;
197196
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
198-
import org.elasticsearch.search.aggregations.pipeline.BucketScriptPipelineAggregator;
199197
import org.elasticsearch.search.aggregations.pipeline.BucketSelectorPipelineAggregationBuilder;
200-
import org.elasticsearch.search.aggregations.pipeline.BucketSelectorPipelineAggregator;
201198
import org.elasticsearch.search.aggregations.pipeline.BucketSortPipelineAggregationBuilder;
202-
import org.elasticsearch.search.aggregations.pipeline.BucketSortPipelineAggregator;
203199
import org.elasticsearch.search.aggregations.pipeline.CumulativeSumPipelineAggregationBuilder;
204-
import org.elasticsearch.search.aggregations.pipeline.CumulativeSumPipelineAggregator;
205200
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
206-
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregator;
207201
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketParser;
208202
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
209-
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregator;
210203
import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue;
211204
import org.elasticsearch.search.aggregations.pipeline.InternalDerivative;
212205
import org.elasticsearch.search.aggregations.pipeline.InternalExtendedStatsBucket;
213206
import org.elasticsearch.search.aggregations.pipeline.InternalPercentilesBucket;
214207
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
215208
import org.elasticsearch.search.aggregations.pipeline.InternalStatsBucket;
216209
import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder;
217-
import org.elasticsearch.search.aggregations.pipeline.MaxBucketPipelineAggregator;
218210
import org.elasticsearch.search.aggregations.pipeline.MinBucketPipelineAggregationBuilder;
219-
import org.elasticsearch.search.aggregations.pipeline.MinBucketPipelineAggregator;
220211
import org.elasticsearch.search.aggregations.pipeline.MovFnPipelineAggregationBuilder;
221-
import org.elasticsearch.search.aggregations.pipeline.MovFnPipelineAggregator;
222212
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
223-
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregator;
224-
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
225213
import org.elasticsearch.search.aggregations.pipeline.SerialDiffPipelineAggregationBuilder;
226-
import org.elasticsearch.search.aggregations.pipeline.SerialDiffPipelineAggregator;
227214
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
228-
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregator;
229215
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
230-
import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregator;
231216
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
232217
import org.elasticsearch.search.fetch.FetchPhase;
233218
import org.elasticsearch.search.fetch.FetchSubPhase;
234-
import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase;
235219
import org.elasticsearch.search.fetch.subphase.ExplainPhase;
220+
import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase;
221+
import org.elasticsearch.search.fetch.subphase.FetchScorePhase;
236222
import org.elasticsearch.search.fetch.subphase.FetchSourcePhase;
223+
import org.elasticsearch.search.fetch.subphase.FetchVersionPhase;
237224
import org.elasticsearch.search.fetch.subphase.MatchedQueriesPhase;
238-
import org.elasticsearch.search.fetch.subphase.FetchScorePhase;
239225
import org.elasticsearch.search.fetch.subphase.ScriptFieldsPhase;
240226
import org.elasticsearch.search.fetch.subphase.SeqNoPrimaryTermPhase;
241-
import org.elasticsearch.search.fetch.subphase.FetchVersionPhase;
242227
import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
243228
import org.elasticsearch.search.fetch.subphase.highlight.HighlightPhase;
244229
import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
@@ -502,82 +487,68 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
502487
registerPipelineAggregation(new PipelineAggregationSpec(
503488
DerivativePipelineAggregationBuilder.NAME,
504489
DerivativePipelineAggregationBuilder::new,
505-
DerivativePipelineAggregator::new,
506490
DerivativePipelineAggregationBuilder::parse)
507491
.addResultReader(InternalDerivative::new));
508492
registerPipelineAggregation(new PipelineAggregationSpec(
509493
MaxBucketPipelineAggregationBuilder.NAME,
510494
MaxBucketPipelineAggregationBuilder::new,
511-
MaxBucketPipelineAggregator::new,
512495
MaxBucketPipelineAggregationBuilder.PARSER)
513496
// This bucket is used by many pipeline aggreations.
514497
.addResultReader(InternalBucketMetricValue.NAME, InternalBucketMetricValue::new));
515498
registerPipelineAggregation(new PipelineAggregationSpec(
516499
MinBucketPipelineAggregationBuilder.NAME,
517500
MinBucketPipelineAggregationBuilder::new,
518-
MinBucketPipelineAggregator::new,
519501
MinBucketPipelineAggregationBuilder.PARSER)
520502
/* Uses InternalBucketMetricValue */);
521503
registerPipelineAggregation(new PipelineAggregationSpec(
522504
AvgBucketPipelineAggregationBuilder.NAME,
523505
AvgBucketPipelineAggregationBuilder::new,
524-
AvgBucketPipelineAggregator::new,
525506
AvgBucketPipelineAggregationBuilder.PARSER)
526507
// This bucket is used by many pipeline aggreations.
527508
.addResultReader(InternalSimpleValue.NAME, InternalSimpleValue::new));
528509
registerPipelineAggregation(new PipelineAggregationSpec(
529510
SumBucketPipelineAggregationBuilder.NAME,
530511
SumBucketPipelineAggregationBuilder::new,
531-
SumBucketPipelineAggregator::new,
532512
SumBucketPipelineAggregationBuilder.PARSER)
533513
/* Uses InternalSimpleValue */);
534514
registerPipelineAggregation(new PipelineAggregationSpec(
535515
StatsBucketPipelineAggregationBuilder.NAME,
536516
StatsBucketPipelineAggregationBuilder::new,
537-
StatsBucketPipelineAggregator::new,
538517
StatsBucketPipelineAggregationBuilder.PARSER)
539518
.addResultReader(InternalStatsBucket::new));
540519
registerPipelineAggregation(new PipelineAggregationSpec(
541520
ExtendedStatsBucketPipelineAggregationBuilder.NAME,
542521
ExtendedStatsBucketPipelineAggregationBuilder::new,
543-
ExtendedStatsBucketPipelineAggregator::new,
544522
new ExtendedStatsBucketParser())
545523
.addResultReader(InternalExtendedStatsBucket::new));
546524
registerPipelineAggregation(new PipelineAggregationSpec(
547525
PercentilesBucketPipelineAggregationBuilder.NAME,
548526
PercentilesBucketPipelineAggregationBuilder::new,
549-
PercentilesBucketPipelineAggregator::new,
550527
PercentilesBucketPipelineAggregationBuilder.PARSER)
551528
.addResultReader(InternalPercentilesBucket::new));
552529
registerPipelineAggregation(new PipelineAggregationSpec(
553530
CumulativeSumPipelineAggregationBuilder.NAME,
554531
CumulativeSumPipelineAggregationBuilder::new,
555-
CumulativeSumPipelineAggregator::new,
556532
CumulativeSumPipelineAggregationBuilder::parse));
557533
registerPipelineAggregation(new PipelineAggregationSpec(
558534
BucketScriptPipelineAggregationBuilder.NAME,
559535
BucketScriptPipelineAggregationBuilder::new,
560-
BucketScriptPipelineAggregator::new,
561536
BucketScriptPipelineAggregationBuilder.PARSER));
562537
registerPipelineAggregation(new PipelineAggregationSpec(
563538
BucketSelectorPipelineAggregationBuilder.NAME,
564539
BucketSelectorPipelineAggregationBuilder::new,
565-
BucketSelectorPipelineAggregator::new,
566540
BucketSelectorPipelineAggregationBuilder::parse));
567541
registerPipelineAggregation(new PipelineAggregationSpec(
568542
BucketSortPipelineAggregationBuilder.NAME,
569543
BucketSortPipelineAggregationBuilder::new,
570-
BucketSortPipelineAggregator::new,
571544
BucketSortPipelineAggregationBuilder::parse));
572545
registerPipelineAggregation(new PipelineAggregationSpec(
573546
SerialDiffPipelineAggregationBuilder.NAME,
574547
SerialDiffPipelineAggregationBuilder::new,
575-
SerialDiffPipelineAggregator::new,
576548
SerialDiffPipelineAggregationBuilder::parse));
577549
registerPipelineAggregation(new PipelineAggregationSpec(
578550
MovFnPipelineAggregationBuilder.NAME,
579551
MovFnPipelineAggregationBuilder::new,
580-
MovFnPipelineAggregator::new,
581552
MovFnPipelineAggregationBuilder.PARSER));
582553

583554
registerFromPlugin(plugins, SearchPlugin::getPipelineAggregations, this::registerPipelineAggregation);
@@ -588,10 +559,6 @@ private void registerPipelineAggregation(PipelineAggregationSpec spec) {
588559
(p, c) -> spec.getParser().parse(p, (String) c)));
589560
namedWriteables.add(
590561
new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader()));
591-
if (spec.getAggregatorReader() != null) {
592-
namedWriteables.add(new NamedWriteableRegistry.Entry(
593-
PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader()));
594-
}
595562
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> resultReader : spec.getResultReaders().entrySet()) {
596563
namedWriteables
597564
.add(new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue()));

server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,7 @@ public void execute(SearchContext context) {
130130
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
131131
}
132132
}
133-
context.queryResult().aggregations(new InternalAggregations(aggregations,
134-
context.request().source().aggregations()::buildPipelineTree));
133+
context.queryResult().aggregations(new InternalAggregations(aggregations));
135134

136135
// disable aggregations so that they don't run on next pages in case of scrolling
137136
context.aggregations(null);

0 commit comments

Comments
 (0)