Skip to content

Commit e8598a0

Browse files
committed
Support for ScriptedMetric to be used in the order clause of an aggregation.
1 parent 09053f5 commit e8598a0

File tree

3 files changed

+163
-8
lines changed

3 files changed

+163
-8
lines changed

core/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323
import org.elasticsearch.common.lease.Releasables;
2424
import org.elasticsearch.common.util.BigArrays;
2525
import org.elasticsearch.common.util.ObjectArray;
26+
import org.elasticsearch.script.*;
27+
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator;
2628
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
2729
import org.elasticsearch.search.aggregations.support.AggregationContext;
2830
import org.elasticsearch.search.internal.SearchContext.Lifetime;
31+
import org.elasticsearch.transport.TransportRequest;
2932

3033
import java.io.IOException;
31-
import java.util.List;
32-
import java.util.Map;
34+
import java.util.*;
3335

3436
/**
3537
* A factory that knows how to create an {@link Aggregator} of a specific type.
@@ -234,4 +236,143 @@ public void close() {
234236
};
235237
}
236238

239+
// Specific implementation used by ScriptedMetricAggregator
240+
// In this context we need to provide the reduce function to implement the metric method inherited
241+
// from the NumericMetricsAggregator class.
242+
protected static Aggregator asMultiBucketAggregator(final AggregatorFactory factory,
243+
final AggregationContext context,
244+
final Aggregator parent,
245+
Script reduceScript) throws IOException {
246+
final Aggregator first = factory.create(context, parent, true);
247+
final BigArrays bigArrays = context.bigArrays();
248+
249+
first.preCollection();
250+
251+
// Returns a NumericMetricsAggregator instead of a simple Aggregator. So the result of the aggregation
252+
// can be used to order the result.
253+
// It's NumericMetricsAggregator.SingleValue because the getProperty method in InternalScriptedMetric
254+
// only supports single value (path: value).
255+
return new NumericMetricsAggregator.SingleValue("",context, parent, new ArrayList<>(), null) {
256+
@Override
257+
public double metric(final long owningBucketOrd) {
258+
try {
259+
Object aggregationObject = buildAggregation(owningBucketOrd).getProperty("value");
260+
List<Object> aggregationObjects = Arrays.asList(aggregationObject);
261+
Map<String, Object> vars = new HashMap<>();
262+
vars.put("_aggs", aggregationObjects);
263+
264+
if (reduceScript.getParams() != null) {
265+
vars.putAll(reduceScript.getParams());
266+
}
267+
ScriptService scriptService = context().searchContext().scriptService();
268+
CompiledScript compiledScript = scriptService.compile(reduceScript,
269+
ScriptContext.Standard.AGGS, new InternalAggregation.ReduceContext(bigArrays, scriptService, new TransportRequest.Empty()));
270+
ExecutableScript script = scriptService.executable(compiledScript, vars);
271+
Object value = script.run();
272+
273+
if(value instanceof Number) {
274+
return ((Number) value).doubleValue();
275+
} else {
276+
throw new AggregationExecutionException("Invalid order path ["+this+
277+
"]. Only numeric result are supported.");
278+
}
279+
} catch (IOException e) {
280+
throw new AggregationExecutionException("Failed to build aggregation [" + name() + "]", e);
281+
}
282+
}
283+
284+
@Override
285+
protected LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
286+
for (long i = 0; i < collectors.size(); ++i) {
287+
collectors.set(i, null);
288+
}
289+
return new LeafBucketCollector() {
290+
Scorer scorer;
291+
292+
@Override
293+
public void setScorer(Scorer scorer) throws IOException {
294+
this.scorer = scorer;
295+
}
296+
297+
@Override
298+
public void collect(int doc, long bucket) throws IOException {
299+
aggregators = bigArrays.grow(aggregators, bucket + 1);
300+
collectors = bigArrays.grow(collectors, bucket + 1);
301+
302+
LeafBucketCollector collector = collectors.get(bucket);
303+
if (collector == null) {
304+
Aggregator aggregator = aggregators.get(bucket);
305+
if (aggregator == null) {
306+
aggregator = factory.create(context, parent, true);
307+
aggregator.preCollection();
308+
aggregators.set(bucket, aggregator);
309+
}
310+
collector = aggregator.getLeafCollector(ctx);
311+
collector.setScorer(scorer);
312+
collectors.set(bucket, collector);
313+
}
314+
collector.collect(doc, 0);
315+
}
316+
317+
};
318+
}
319+
320+
ObjectArray<Aggregator> aggregators;
321+
ObjectArray<LeafBucketCollector> collectors;
322+
323+
{
324+
context.searchContext().addReleasable(this, Lifetime.PHASE);
325+
aggregators = bigArrays.newObjectArray(1);
326+
aggregators.set(0, first);
327+
collectors = bigArrays.newObjectArray(1);
328+
}
329+
330+
@Override
331+
public String name() {
332+
return first.name();
333+
}
334+
335+
@Override
336+
public AggregationContext context() {
337+
return first.context();
338+
}
339+
340+
@Override
341+
public Aggregator parent() {
342+
return first.parent();
343+
}
344+
345+
@Override
346+
public boolean needsScores() {
347+
return first.needsScores();
348+
}
349+
350+
@Override
351+
public Aggregator subAggregator(String name) {
352+
throw new UnsupportedOperationException();
353+
}
354+
355+
@Override
356+
public InternalAggregation buildAggregation(long bucket) throws IOException {
357+
if (bucket < aggregators.size()) {
358+
Aggregator aggregator = aggregators.get(bucket);
359+
if (aggregator != null) {
360+
return aggregator.buildAggregation(0);
361+
}
362+
}
363+
return buildEmptyAggregation();
364+
}
365+
366+
@Override
367+
public InternalAggregation buildEmptyAggregation() {
368+
return first.buildEmptyAggregation();
369+
}
370+
371+
@Override
372+
public void close() {
373+
Releasables.close(aggregators, collectors);
374+
}
375+
};
376+
}
377+
237378
}

core/src/main/java/org/elasticsearch/search/aggregations/metrics/scripted/ScriptedMetricAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public Factory(String name, Script initScript, Script mapScript, Script combineS
127127
public Aggregator createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket,
128128
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
129129
if (collectsFromSingleBucket == false) {
130-
return asMultiBucketAggregator(this, context, parent);
130+
return asMultiBucketAggregator(this, context, parent, reduceScript);
131131
}
132132
Map<String, Object> params = this.params;
133133
if (params != null) {

core/src/main/java/org/elasticsearch/search/aggregations/support/AggregationPath.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
package org.elasticsearch.search.aggregations.support;
2121

2222
import org.elasticsearch.common.Strings;
23-
import org.elasticsearch.search.aggregations.Aggregation;
24-
import org.elasticsearch.search.aggregations.AggregationExecutionException;
25-
import org.elasticsearch.search.aggregations.Aggregator;
26-
import org.elasticsearch.search.aggregations.HasAggregations;
23+
import org.elasticsearch.search.aggregations.*;
2724
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
2825
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
2926
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
@@ -240,7 +237,24 @@ public double resolveValue(HasAggregations root) {
240237
"]. Missing value key in [" + token + "] which refers to a multi-value metric aggregation");
241238
}
242239
parent = null;
243-
value = ((InternalNumericMetricsAggregation.MultiValue) agg).value(token.key);
240+
if(agg instanceof InternalNumericMetricsAggregation.MultiValue) {
241+
// For NumericMetricsAggregation we use the method value returning a native double
242+
// Optimization to avoid object creation and multiple casts.
243+
value = ((InternalNumericMetricsAggregation.MultiValue) agg).value(token.key);
244+
} else if(agg instanceof InternalAggregation) {
245+
// For a general use case, we use the method getProperty returning an Object
246+
Object propertyValue = agg.getProperty(token.key);
247+
// Only aggregation returning a numeric value are supported.
248+
if(propertyValue instanceof Number) {
249+
value = ((Number) propertyValue).doubleValue();
250+
} else {
251+
throw new AggregationExecutionException("Invalid order path ["+this+
252+
"]. Only numeric result are supported.");
253+
}
254+
} else {
255+
throw new AggregationExecutionException("Invalid aggregation type for order path ["+this+
256+
"]. Only numeric & scripted metric aggregation are supported.");
257+
}
244258
}
245259

246260
return value;

0 commit comments

Comments
 (0)