Skip to content

Commit 46bf8ba

Browse files
committed
Switch aggregation registration for push to pull
Adds `getAggregations` to `SearchPlugin` which can be used to register aggregations. Fixup MockNode which wasn't createing MockBigArrays.
1 parent 8d2770c commit 46bf8ba

File tree

55 files changed

+645
-373
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+645
-373
lines changed

core/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,12 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
364364
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
365365
b.bind(IngestService.class).toInstance(ingestService);
366366
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
367+
Class<? extends SearchService> searchServiceImpl = pickSearchServiceImplementation();
368+
if (searchServiceImpl == SearchService.class) {
369+
b.bind(SearchService.class).asEagerSingleton();
370+
} else {
371+
b.bind(SearchService.class).to(searchServiceImpl).asEagerSingleton();
372+
}
367373
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
368374
}
369375
);
@@ -716,6 +722,13 @@ private void writePortsFile(String type, BoundTransportAddress boundAddress) {
716722
}
717723
}
718724

725+
/**
726+
* The {@link PluginsService} used to build this node's components.
727+
*/
728+
protected PluginsService getPluginsService() {
729+
return pluginsService;
730+
}
731+
719732
/**
720733
* Creates a new {@link CircuitBreakerService} based on the settings provided.
721734
* @see #BREAKER_TYPE_KEY
@@ -739,6 +752,13 @@ BigArrays createBigArrays(Settings settings, CircuitBreakerService circuitBreake
739752
return new BigArrays(settings, circuitBreakerService);
740753
}
741754

755+
/**
756+
* Select the search service implementation. Overrided by tests.
757+
*/
758+
protected Class<? extends SearchService> pickSearchServiceImplementation() {
759+
return SearchService.class;
760+
}
761+
742762
/**
743763
* Get Custom Name Resolvers list based on a Discovery Plugins list
744764
* @param discoveryPlugins Discovery plugins list

core/src/main/java/org/elasticsearch/plugins/Plugin.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,29 @@
1919

2020
package org.elasticsearch.plugins;
2121

22-
import java.util.Collection;
23-
import java.util.Collections;
24-
import java.util.List;
25-
import java.util.Map;
26-
2722
import org.elasticsearch.action.ActionModule;
2823
import org.elasticsearch.client.Client;
2924
import org.elasticsearch.cluster.service.ClusterService;
3025
import org.elasticsearch.common.component.LifecycleComponent;
3126
import org.elasticsearch.common.inject.Module;
3227
import org.elasticsearch.common.io.stream.NamedWriteable;
3328
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
34-
import org.elasticsearch.common.io.stream.Writeable;
3529
import org.elasticsearch.common.settings.Setting;
3630
import org.elasticsearch.common.settings.Settings;
3731
import org.elasticsearch.common.settings.SettingsModule;
3832
import org.elasticsearch.index.IndexModule;
3933
import org.elasticsearch.indices.analysis.AnalysisModule;
4034
import org.elasticsearch.script.ScriptModule;
4135
import org.elasticsearch.script.ScriptService;
36+
import org.elasticsearch.search.SearchModule;
4237
import org.elasticsearch.threadpool.ExecutorBuilder;
4338
import org.elasticsearch.threadpool.ThreadPool;
4439
import org.elasticsearch.watcher.ResourceWatcherService;
4540

41+
import java.util.Collection;
42+
import java.util.Collections;
43+
import java.util.List;
44+
4645
/**
4746
* An extension point allowing to plug in custom functionality.
4847
* <p>
@@ -163,6 +162,14 @@ public final void onModule(AnalysisModule module) {}
163162
@Deprecated
164163
public final void onModule(ActionModule module) {}
165164

165+
/**
166+
* Old-style action extension point.
167+
*
168+
* @deprecated implement {@link SearchPlugin} instead
169+
*/
170+
@Deprecated
171+
public final void onModule(SearchModule module) {}
172+
166173
/**
167174
* Provides the list of this plugin's custom thread pools, empty if
168175
* none.

core/src/main/java/org/elasticsearch/plugins/SearchPlugin.java

Lines changed: 160 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,21 @@
2424
import org.elasticsearch.common.io.stream.NamedWriteable;
2525
import org.elasticsearch.common.io.stream.StreamInput;
2626
import org.elasticsearch.common.io.stream.Writeable;
27-
import org.elasticsearch.common.io.stream.Writeable.Reader;
2827
import org.elasticsearch.common.lucene.search.function.ScoreFunction;
2928
import org.elasticsearch.common.xcontent.XContent;
3029
import org.elasticsearch.index.query.QueryBuilder;
3130
import org.elasticsearch.index.query.QueryParser;
3231
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
3332
import org.elasticsearch.index.query.functionscore.ScoreFunctionParser;
33+
import org.elasticsearch.search.aggregations.Aggregation;
34+
import org.elasticsearch.search.aggregations.AggregationBuilder;
35+
import org.elasticsearch.search.aggregations.Aggregator;
36+
import org.elasticsearch.search.aggregations.InternalAggregation;
37+
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
3438
import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms;
3539
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
3640
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicParser;
41+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
3742
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
3843
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
3944
import org.elasticsearch.search.fetch.FetchSubPhase;
@@ -42,6 +47,7 @@
4247

4348
import java.util.List;
4449
import java.util.Map;
50+
import java.util.TreeMap;
4551

4652
import static java.util.Collections.emptyList;
4753
import static java.util.Collections.emptyMap;
@@ -94,16 +100,28 @@ default Map<String, Suggester<?>> getSuggesters() {
94100
default List<QuerySpec<?>> getQueries() {
95101
return emptyList();
96102
}
103+
/**
104+
* The new {@link Aggregation}s added by this plugin.
105+
*/
106+
default List<AggregationSpec> getAggregations() {
107+
return emptyList();
108+
}
109+
/**
110+
* The new {@link PipelineAggregator}s added by this plugin.
111+
*/
112+
default List<PipelineAggregationSpec> getPipelineAggregations() {
113+
return emptyList();
114+
}
97115

98116
/**
99117
* Specification of custom {@link ScoreFunction}.
100118
*/
101119
class ScoreFunctionSpec<T extends ScoreFunctionBuilder<T>> extends SearchExtensionSpec<T, ScoreFunctionParser<T>> {
102-
public ScoreFunctionSpec(ParseField name, Reader<T> reader, ScoreFunctionParser<T> parser) {
120+
public ScoreFunctionSpec(ParseField name, Writeable.Reader<T> reader, ScoreFunctionParser<T> parser) {
103121
super(name, reader, parser);
104122
}
105123

106-
public ScoreFunctionSpec(String name, Reader<T> reader, ScoreFunctionParser<T> parser) {
124+
public ScoreFunctionSpec(String name, Writeable.Reader<T> reader, ScoreFunctionParser<T> parser) {
107125
super(name, reader, parser);
108126
}
109127
}
@@ -122,7 +140,7 @@ class QuerySpec<T extends QueryBuilder> extends SearchExtensionSpec<T, QueryPars
122140
* {@link StreamInput}
123141
* @param parser the parser the reads the query builder from xcontent
124142
*/
125-
public QuerySpec(ParseField name, Reader<T> reader, QueryParser<T> parser) {
143+
public QuerySpec(ParseField name, Writeable.Reader<T> reader, QueryParser<T> parser) {
126144
super(name, reader, parser);
127145
}
128146

@@ -135,10 +153,143 @@ public QuerySpec(ParseField name, Reader<T> reader, QueryParser<T> parser) {
135153
* {@link StreamInput}
136154
* @param parser the parser the reads the query builder from xcontent
137155
*/
138-
public QuerySpec(String name, Reader<T> reader, QueryParser<T> parser) {
156+
public QuerySpec(String name, Writeable.Reader<T> reader, QueryParser<T> parser) {
139157
super(name, reader, parser);
140158
}
141159
}
160+
/**
161+
* Specification for an {@link Aggregation}.
162+
*/
163+
public static class AggregationSpec extends SearchExtensionSpec<AggregationBuilder, Aggregator.Parser> {
164+
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
165+
166+
/**
167+
* Specification for an {@link Aggregation}.
168+
*
169+
* @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it
170+
* is the name by under which the reader is registered. So it is the name that the {@link AggregationBuilder} should return
171+
* from {@link NamedWriteable#getWriteableName()}.
172+
* @param reader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
173+
* {@link StreamInput}
174+
* @param parser the parser the reads the aggregation builder from xcontent
175+
*/
176+
public AggregationSpec(ParseField name, Writeable.Reader<? extends AggregationBuilder> reader, Aggregator.Parser parser) {
177+
super(name, reader, parser);
178+
}
179+
180+
/**
181+
* Specification for an {@link Aggregation}.
182+
*
183+
* @param name the name by which this aggregation might be parsed or deserialized. Make sure that the {@link AggregationBuilder}
184+
* returns this from {@link NamedWriteable#getWriteableName()}.
185+
* @param reader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
186+
* {@link StreamInput}
187+
* @param parser the parser the reads the aggregation builder from xcontent
188+
*/
189+
public AggregationSpec(String name, Writeable.Reader<? extends AggregationBuilder> reader, Aggregator.Parser parser) {
190+
super(name, reader, parser);
191+
}
192+
193+
/**
194+
* Add a reader for the shard level results of the aggregation with {@linkplain #getName}'s {@link ParseField#getPreferredName()} as
195+
* the {@link NamedWriteable#getWriteableName()}.
196+
*/
197+
public AggregationSpec addResultReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
198+
return addResultReader(getName().getPreferredName(), resultReader);
199+
}
200+
201+
/**
202+
* Add a reader for the shard level results of the aggregation.
203+
*/
204+
public AggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) {
205+
resultReaders.put(writeableName, resultReader);
206+
return this;
207+
}
208+
209+
/**
210+
* Get the readers that must be registered for this aggregation's results.
211+
*/
212+
public Map<String, Writeable.Reader<? extends InternalAggregation>> getResultReaders() {
213+
return resultReaders;
214+
}
215+
}
216+
217+
/**
218+
* Specification for a {@link PipelineAggregator}.
219+
*/
220+
public static class PipelineAggregationSpec extends SearchExtensionSpec<PipelineAggregationBuilder, PipelineAggregator.Parser> {
221+
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
222+
private final Writeable.Reader<? extends PipelineAggregator> aggregatorReader;
223+
224+
/**
225+
* Specification of a {@link PipelineAggregator}.
226+
*
227+
* @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it
228+
* is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and
229+
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}.
230+
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
231+
* {@link StreamInput}
232+
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
233+
* @param parser reads the aggregation builder from XContent
234+
*/
235+
public PipelineAggregationSpec(ParseField name,
236+
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
237+
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
238+
PipelineAggregator.Parser parser) {
239+
super(name, builderReader, parser);
240+
this.aggregatorReader = aggregatorReader;
241+
}
242+
243+
/**
244+
* Specification of a {@link PipelineAggregator}.
245+
*
246+
* @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the
247+
* {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from
248+
* {@link NamedWriteable#getWriteableName()}.
249+
* @param builderReader the reader registered for this aggregation's builder. Typically a reference to a constructor that takes a
250+
* {@link StreamInput}
251+
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
252+
* @param parser reads the aggregation builder from XContent
253+
*/
254+
public PipelineAggregationSpec(String name,
255+
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
256+
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
257+
PipelineAggregator.Parser parser) {
258+
super(name, builderReader, parser);
259+
this.aggregatorReader = aggregatorReader;
260+
}
261+
262+
/**
263+
* Add a reader for the shard level results of the aggregation with {@linkplain #getName()}'s {@link ParseField#getPreferredName()}
264+
* as the {@link NamedWriteable#getWriteableName()}.
265+
*/
266+
public PipelineAggregationSpec addResultReader(Writeable.Reader<? extends InternalAggregation> resultReader) {
267+
return addResultReader(getName().getPreferredName(), resultReader);
268+
}
269+
270+
/**
271+
* Add a reader for the shard level results of the aggregation.
272+
*/
273+
public PipelineAggregationSpec addResultReader(String writeableName, Writeable.Reader<? extends InternalAggregation> resultReader) {
274+
resultReaders.put(writeableName, resultReader);
275+
return this;
276+
}
277+
278+
/**
279+
* The reader for the {@link PipelineAggregator}.
280+
*/
281+
public Writeable.Reader<? extends PipelineAggregator> getAggregatorReader() {
282+
return aggregatorReader;
283+
}
284+
285+
/**
286+
* Get the readers that must be registered for this aggregation's results.
287+
*/
288+
public Map<String, Writeable.Reader<? extends InternalAggregation>> getResultReaders() {
289+
return resultReaders;
290+
}
291+
}
292+
142293

143294
/**
144295
* Specification of search time behavior extension like a custom {@link MovAvgModel} or {@link ScoreFunction}.
@@ -150,7 +301,7 @@ public QuerySpec(String name, Reader<T> reader, QueryParser<T> parser) {
150301
*/
151302
class SearchExtensionSpec<W extends NamedWriteable, P> {
152303
private final ParseField name;
153-
private final Writeable.Reader<W> reader;
304+
private final Writeable.Reader<? extends W> reader;
154305
private final P parser;
155306

156307
/**
@@ -162,7 +313,7 @@ class SearchExtensionSpec<W extends NamedWriteable, P> {
162313
* @param reader reader that reads the behavior from the internode protocol
163314
* @param parser parser that read the behavior from a REST request
164315
*/
165-
public SearchExtensionSpec(ParseField name, Writeable.Reader<W> reader, P parser) {
316+
public SearchExtensionSpec(ParseField name, Writeable.Reader<? extends W> reader, P parser) {
166317
this.name = name;
167318
this.reader = reader;
168319
this.parser = parser;
@@ -176,7 +327,7 @@ public SearchExtensionSpec(ParseField name, Writeable.Reader<W> reader, P parser
176327
* @param reader reader that reads the behavior from the internode protocol
177328
* @param parser parser that read the behavior from a REST request
178329
*/
179-
public SearchExtensionSpec(String name, Writeable.Reader<W> reader, P parser) {
330+
public SearchExtensionSpec(String name, Writeable.Reader<? extends W> reader, P parser) {
180331
this(new ParseField(name), reader, parser);
181332
}
182333

@@ -190,7 +341,7 @@ public ParseField getName() {
190341
/**
191342
* The reader responsible for reading the behavior from the internode protocol.
192343
*/
193-
public Writeable.Reader<W> getReader() {
344+
public Writeable.Reader<? extends W> getReader() {
194345
return reader;
195346
}
196347

0 commit comments

Comments
 (0)