diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index bd1b0284a84fb..37c1c357007b0 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -72,6 +72,7 @@ POST /_search } -------------------------------------------------- // CONSOLE +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] <1> The metric is called `"the_sum"` <2> The `buckets_path` refers to the metric via a relative path `"the_sum"` @@ -136,6 +137,7 @@ POST /_search } -------------------------------------------------- // CONSOLE +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] <1> By using `_count` instead of a metric name, we can calculate the moving average of document counts in the histogram The `buckets_path` can also use `"_bucket_count"` and path to a multi-bucket aggregation to use the number of buckets @@ -231,6 +233,7 @@ include::pipeline/stats-bucket-aggregation.asciidoc[] include::pipeline/extended-stats-bucket-aggregation.asciidoc[] include::pipeline/percentiles-bucket-aggregation.asciidoc[] include::pipeline/movavg-aggregation.asciidoc[] +include::pipeline/movfn-aggregation.asciidoc[] include::pipeline/cumulative-sum-aggregation.asciidoc[] include::pipeline/bucket-script-aggregation.asciidoc[] include::pipeline/bucket-selector-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/pipeline/movavg-aggregation.asciidoc b/docs/reference/aggregations/pipeline/movavg-aggregation.asciidoc index db73510216be0..39a8255c90705 100644 --- a/docs/reference/aggregations/pipeline/movavg-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/movavg-aggregation.asciidoc @@ -1,6 +1,10 @@ [[search-aggregations-pipeline-movavg-aggregation]] === Moving Average Aggregation +deprecated[6.4.0, The Moving Average aggregation has been deprecated in favor of the more general +<>. The new Moving Function aggregation provides +all the same functionality as the Moving Average aggregation, but also provides more flexibility.] + Given an ordered series of data, the Moving Average aggregation will slide a window across the data and emit the average value of that window. For example, given the data `[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]`, we can calculate a simple moving average with windows size of `5` as follows: @@ -74,6 +78,7 @@ POST /_search -------------------------------------------------- // CONSOLE // TEST[setup:sales] +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] <1> A `date_histogram` named "my_date_histo" is constructed on the "timestamp" field, with one-day intervals <2> A `sum` metric is used to calculate the sum of a field. This could be any metric (sum, min, max, etc) @@ -180,6 +185,7 @@ POST /_search -------------------------------------------------- // CONSOLE // TEST[setup:sales] +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] A `simple` model has no special settings to configure @@ -233,6 +239,7 @@ POST /_search -------------------------------------------------- // CONSOLE // TEST[setup:sales] +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] A `linear` model has no special settings to configure @@ -295,7 +302,7 @@ POST /_search -------------------------------------------------- // CONSOLE // TEST[setup:sales] - +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] [[single_0.2alpha]] .EWMA with window of size 10, alpha = 0.2 @@ -355,6 +362,7 @@ POST /_search -------------------------------------------------- // CONSOLE // TEST[setup:sales] +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] In practice, the `alpha` value behaves very similarly in `holt` as `ewma`: small values produce more smoothing and more lag, while larger values produce closer tracking and less lag. The value of `beta` is often difficult @@ -446,7 +454,7 @@ POST /_search -------------------------------------------------- // CONSOLE // TEST[setup:sales] - +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] [[holt_winters_add]] .Holt-Winters moving average with window of size 120, alpha = 0.5, beta = 0.7, gamma = 0.3, period = 30 @@ -508,6 +516,7 @@ POST /_search -------------------------------------------------- // CONSOLE // TEST[setup:sales] +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] ==== Prediction @@ -550,6 +559,7 @@ POST /_search -------------------------------------------------- // CONSOLE // TEST[setup:sales] +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] The `simple`, `linear` and `ewma` models all produce "flat" predictions: they essentially converge on the mean of the last value in the series, producing a flat: @@ -631,6 +641,7 @@ POST /_search -------------------------------------------------- // CONSOLE // TEST[setup:sales] +// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.] <1> Minimization is enabled with the `minimize` parameter diff --git a/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc b/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc new file mode 100644 index 0000000000000..b05c56b880560 --- /dev/null +++ b/docs/reference/aggregations/pipeline/movfn-aggregation.asciidoc @@ -0,0 +1,633 @@ +[[search-aggregations-pipeline-movfn-aggregation]] +=== Moving Function Aggregation + +Given an ordered series of data, the Moving Function aggregation will slide a window across the data and allow the user to specify a custom +script that is executed on each window of data. For convenience, a number of common functions are predefined such as min/max, moving averages, +etc. + +This is conceptually very similar to the <> pipeline aggregation, except +it provides more functionality. +==== Syntax + +A `moving_fn` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "MovingFunctions.min(values)" + } +} +-------------------------------------------------- +// NOTCONSOLE + +.`moving_avg` Parameters +|=== +|Parameter Name |Description |Required |Default Value +|`buckets_path` |Path to the metric of interest (see <> for more details |Required | +|`window` |The size of window to "slide" across the histogram. |Required | +|`script` |The script that should be executed on each window of data |Required | +|=== + +`moving_fn` aggregations must be embedded inside of a `histogram` or `date_histogram` aggregation. They can be +embedded like any other metric aggregation: + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ <1> + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } <2> + }, + "the_movfn": { + "moving_fn": { + "buckets_path": "the_sum", <3> + "window": 10, + "script": "MovingFunctions.unweightedAvg(values)" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +<1> A `date_histogram` named "my_date_histo" is constructed on the "timestamp" field, with one-day intervals +<2> A `sum` metric is used to calculate the sum of a field. This could be any numeric metric (sum, min, max, etc) +<3> Finally, we specify a `moving_fn` aggregation which uses "the_sum" metric as its input. + +Moving averages are built by first specifying a `histogram` or `date_histogram` over a field. You can then optionally +add numeric metrics, such as a `sum`, inside of that histogram. Finally, the `moving_fn` is embedded inside the histogram. +The `buckets_path` parameter is then used to "point" at one of the sibling metrics inside of the histogram (see +<> for a description of the syntax for `buckets_path`. + +An example response from the above aggregation may look like: + +[source,js] +-------------------------------------------------- +{ + "took": 11, + "timed_out": false, + "_shards": ..., + "hits": ..., + "aggregations": { + "my_date_histo": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "the_sum": { + "value": 550.0 + }, + "the_movfn": { + "value": null + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2, + "the_sum": { + "value": 60.0 + }, + "the_movfn": { + "value": 550.0 + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "the_sum": { + "value": 375.0 + }, + "the_movfn": { + "value": 305.0 + } + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"took": 11/"took": $body.took/] +// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/] +// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/] + + +==== Custom user scripting + +The Moving Function aggregation allows the user to specify any arbitrary script to define custom logic. The script is invoked each time a +new window of data is collected. These values are provided to the script in the `values` variable. The script should then perform some +kind of calculation and emit a single `double` as the result. Emitting `null` is not permitted, although `NaN` and +/- `Inf` are allowed. + +For example, this script will simply return the first value from the window, or `NaN` if no values are available: + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movavg": { + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "return values.length > 0 ? values[0] : Double.NaN" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +==== Pre-built Functions + +For convenience, a number of functions have been prebuilt and are available inside the `moving_fn` script context: + +- `max()` +- `min()` +- `sum()` +- `stdDev()` +- `unweightedAvg()` +- `linearWeightedAvg()` +- `ewma()` +- `holt()` +- `holtWinters()` + +The functions are available from the `MovingFunctions` namespace. E.g. `MovingFunctions.max()` + +===== max Function + +This function accepts a collection of doubles and returns the maximum value in that window. `null` and `NaN` values are ignored; the maximum +is only calculated over the real values. If the window is empty, or all values are `null`/`NaN`, `NaN` is returned as the result. + +.`max(double[] values)` Parameters +|=== +|Parameter Name |Description +|`values` |The window of values to find the maximum +|=== + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_moving_max": { + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "MovingFunctions.max(values)" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +===== min Function + +This function accepts a collection of doubles and returns the minimum value in that window. `null` and `NaN` values are ignored; the minimum +is only calculated over the real values. If the window is empty, or all values are `null`/`NaN`, `NaN` is returned as the result. + +.`min(double[] values)` Parameters +|=== +|Parameter Name |Description +|`values` |The window of values to find the minimum +|=== + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_moving_min": { + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "MovingFunctions.min(values)" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +===== sum Function + +This function accepts a collection of doubles and returns the sum of the values in that window. `null` and `NaN` values are ignored; +the sum is only calculated over the real values. If the window is empty, or all values are `null`/`NaN`, `0.0` is returned as the result. + +.`sum(double[] values)` Parameters +|=== +|Parameter Name |Description +|`values` |The window of values to find the sum of +|=== + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_moving_sum": { + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "MovingFunctions.sum(values)" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +===== stdDev Function + +This function accepts a collection of doubles and and average, then returns the standard deviation of the values in that window. +`null` and `NaN` values are ignored; the sum is only calculated over the real values. If the window is empty, or all values are +`null`/`NaN`, `0.0` is returned as the result. + +.`stdDev(double[] values)` Parameters +|=== +|Parameter Name |Description +|`values` |The window of values to find the standard deviation of +|`avg` |The average of the window +|=== + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_moving_sum": { + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "MovingFunctions.stdDev(values, MovingFunctions.unweightedAvg(values))" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +The `avg` parameter must be provided to the standard deviation function because different styles of averages can be computed on the window +(simple, linearly weighted, etc). The various moving averages that are detailed below can be used to calculate the average for the +standard deviation function. + +===== unweightedAvg Function + +The `unweightedAvg` function calculates the sum of all values in the window, then divides by the size of the window. It is effectively +a simple arithmetic mean of the window. The simple moving average does not perform any time-dependent weighting, which means +the values from a `simple` moving average tend to "lag" behind the real data. + +`null` and `NaN` values are ignored; the average is only calculated over the real values. If the window is empty, or all values are +`null`/`NaN`, `NaN` is returned as the result. This means that the count used in the average calculation is count of non-`null`,non-`NaN` +values. + +.`unweightedAvg(double[] values)` Parameters +|=== +|Parameter Name |Description +|`values` |The window of values to find the sum of +|=== + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movavg": { + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "MovingFunctions.unweightedAvg(values)" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +==== linearWeightedAvg Function + +The `linearWeightedAvg` function assigns a linear weighting to points in the series, such that "older" datapoints (e.g. those at +the beginning of the window) contribute a linearly less amount to the total average. The linear weighting helps reduce +the "lag" behind the data's mean, since older points have less influence. + +If the window is empty, or all values are `null`/`NaN`, `NaN` is returned as the result. + +.`linearWeightedAvg(double[] values)` Parameters +|=== +|Parameter Name |Description +|`values` |The window of values to find the sum of +|=== + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movavg": { + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "MovingFunctions.linearWeightedAvg(values)" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +==== ewma Function + +The `ewma` function (aka "single-exponential") is similar to the `linearMovAvg` function, +except older data-points become exponentially less important, +rather than linearly less important. The speed at which the importance decays can be controlled with an `alpha` +setting. Small values make the weight decay slowly, which provides greater smoothing and takes into account a larger +portion of the window. Larger valuers make the weight decay quickly, which reduces the impact of older values on the +moving average. This tends to make the moving average track the data more closely but with less smoothing. + +`null` and `NaN` values are ignored; the average is only calculated over the real values. If the window is empty, or all values are +`null`/`NaN`, `NaN` is returned as the result. This means that the count used in the average calculation is count of non-`null`,non-`NaN` +values. + +.`ewma(double[] values, double alpha)` Parameters +|=== +|Parameter Name |Description +|`values` |The window of values to find the sum of +|`alpha` |Exponential decay +|=== + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movavg": { + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "MovingFunctions.ewma(values, 0.3)" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + + +==== holt Function + +The `holt` function (aka "double exponential") incorporates a second exponential term which +tracks the data's trend. Single exponential does not perform well when the data has an underlying linear trend. The +double exponential model calculates two values internally: a "level" and a "trend". + +The level calculation is similar to `ewma`, and is an exponentially weighted view of the data. The difference is +that the previously smoothed value is used instead of the raw value, which allows it to stay close to the original series. +The trend calculation looks at the difference between the current and last value (e.g. the slope, or trend, of the +smoothed data). The trend value is also exponentially weighted. + +Values are produced by multiplying the level and trend components. + +`null` and `NaN` values are ignored; the average is only calculated over the real values. If the window is empty, or all values are +`null`/`NaN`, `NaN` is returned as the result. This means that the count used in the average calculation is count of non-`null`,non-`NaN` +values. + +.`holt(double[] values, double alpha)` Parameters +|=== +|Parameter Name |Description +|`values` |The window of values to find the sum of +|`alpha` |Level decay value +|`beta` |Trend decay value +|=== + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movavg": { + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "MovingFunctions.holt(values, 0.3, 0.1)" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +In practice, the `alpha` value behaves very similarly in `holtMovAvg` as `ewmaMovAvg`: small values produce more smoothing +and more lag, while larger values produce closer tracking and less lag. The value of `beta` is often difficult +to see. Small values emphasize long-term trends (such as a constant linear trend in the whole series), while larger +values emphasize short-term trends. + +==== holtWinters Function + +The `holtWinters` function (aka "triple exponential") incorporates a third exponential term which +tracks the seasonal aspect of your data. This aggregation therefore smooths based on three components: "level", "trend" +and "seasonality". + +The level and trend calculation is identical to `holt` The seasonal calculation looks at the difference between +the current point, and the point one period earlier. + +Holt-Winters requires a little more handholding than the other moving averages. You need to specify the "periodicity" +of your data: e.g. if your data has cyclic trends every 7 days, you would set `period = 7`. Similarly if there was +a monthly trend, you would set it to `30`. There is currently no periodicity detection, although that is planned +for future enhancements. + +`null` and `NaN` values are ignored; the average is only calculated over the real values. If the window is empty, or all values are +`null`/`NaN`, `NaN` is returned as the result. This means that the count used in the average calculation is count of non-`null`,non-`NaN` +values. + +.`holtWinters(double[] values, double alpha)` Parameters +|=== +|Parameter Name |Description +|`values` |The window of values to find the sum of +|`alpha` |Level decay value +|`beta` |Trend decay value +|`gamma` |Seasonality decay value +|`period` |The periodicity of the data +|`multiplicative` |True if you wish to use multiplicative holt-winters, false to use additive +|=== + +[source,js] +-------------------------------------------------- +POST /_search +{ + "size": 0, + "aggs": { + "my_date_histo":{ + "date_histogram":{ + "field":"date", + "interval":"1M" + }, + "aggs":{ + "the_sum":{ + "sum":{ "field": "price" } + }, + "the_movavg": { + "moving_fn": { + "buckets_path": "the_sum", + "window": 10, + "script": "if (values.length > 5*2) {MovingFunctions.holtWinters(values, 0.3, 0.1, 0.1, 5, false)}" + } + } + } + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:sales] + +[WARNING] +====== +Multiplicative Holt-Winters works by dividing each data point by the seasonal value. This is problematic if any of +your data is zero, or if there are gaps in the data (since this results in a divid-by-zero). To combat this, the +`mult` Holt-Winters pads all values by a very small amount (1*10^-10^) so that all values are non-zero. This affects +the result, but only minimally. If your data is non-zero, or you prefer to see `NaN` when zero's are encountered, +you can disable this behavior with `pad: false` +====== + +===== "Cold Start" + +Unfortunately, due to the nature of Holt-Winters, it requires two periods of data to "bootstrap" the algorithm. This +means that your `window` must always be *at least* twice the size of your period. An exception will be thrown if it +isn't. It also means that Holt-Winters will not emit a value for the first `2 * period` buckets; the current algorithm +does not backcast. + +You'll notice in the above example we have an `if ()` statement checking the size of values. This is checking to make sure +we have two periods worth of data (`5 * 2`, where 5 is the period specified in the `holtWintersMovAvg` function) before calling +the holt-winters function. diff --git a/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessPlugin.java b/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessPlugin.java index 0364ad667efc7..4ebcf8bfb82d2 100644 --- a/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessPlugin.java +++ b/modules/lang-painless/src/main/java/org/elasticsearch/painless/PainlessPlugin.java @@ -32,6 +32,7 @@ import org.elasticsearch.painless.spi.PainlessExtension; import org.elasticsearch.painless.spi.Whitelist; import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.painless.spi.WhitelistLoader; import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ScriptPlugin; @@ -39,6 +40,7 @@ import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptEngine; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctionScript; import java.util.ArrayList; import java.util.Arrays; @@ -55,18 +57,34 @@ */ public final class PainlessPlugin extends Plugin implements ScriptPlugin, ExtensiblePlugin, ActionPlugin { - private final Map, List> extendedWhitelists = new HashMap<>(); + private static final Map, List> whitelists; + + /* + * Contexts from Core that need custom whitelists can add them to the map below. + * Whitelist resources should be added as appropriately named, separate files + * under Painless' resources + */ + static { + Map, List> map = new HashMap<>(); + + // Moving Function Pipeline Agg + List movFn = new ArrayList<>(Whitelist.BASE_WHITELISTS); + movFn.add(WhitelistLoader.loadFromResourceFiles(Whitelist.class, "org.elasticsearch.aggs.movfn.txt")); + map.put(MovingFunctionScript.CONTEXT, movFn); + + whitelists = map; + } @Override public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { Map, List> contextsWithWhitelists = new HashMap<>(); for (ScriptContext context : contexts) { // we might have a context that only uses the base whitelists, so would not have been filled in by reloadSPI - List whitelists = extendedWhitelists.get(context); - if (whitelists == null) { - whitelists = new ArrayList<>(Whitelist.BASE_WHITELISTS); + List contextWhitelists = whitelists.get(context); + if (contextWhitelists == null) { + contextWhitelists = new ArrayList<>(Whitelist.BASE_WHITELISTS); } - contextsWithWhitelists.put(context, whitelists); + contextsWithWhitelists.put(context, contextWhitelists); } return new PainlessScriptEngine(settings, contextsWithWhitelists); } @@ -80,7 +98,7 @@ public List> getSettings() { public void reloadSPI(ClassLoader loader) { for (PainlessExtension extension : ServiceLoader.load(PainlessExtension.class, loader)) { for (Map.Entry, List> entry : extension.getContextWhitelists().entrySet()) { - List existing = extendedWhitelists.computeIfAbsent(entry.getKey(), + List existing = whitelists.computeIfAbsent(entry.getKey(), c -> new ArrayList<>(Whitelist.BASE_WHITELISTS)); existing.addAll(entry.getValue()); } diff --git a/modules/lang-painless/src/main/resources/org/elasticsearch/painless/spi/org.elasticsearch.aggs.movfn.txt b/modules/lang-painless/src/main/resources/org/elasticsearch/painless/spi/org.elasticsearch.aggs.movfn.txt new file mode 100644 index 0000000000000..a120b73820ada --- /dev/null +++ b/modules/lang-painless/src/main/resources/org/elasticsearch/painless/spi/org.elasticsearch.aggs.movfn.txt @@ -0,0 +1,32 @@ +# +# Licensed to Elasticsearch under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This file contains a whitelist for the Moving Function pipeline aggregator in core + +class org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions { + double max(double[]) + double min(double[]) + double sum(double[]) + double stdDev(double[], double) + double unweightedAvg(double[]) + double linearWeightedAvg(double[]) + double ewma(double[], double) + double holt(double[], double, double) + double holtWinters(double[], double, double, double, int, boolean) +} \ No newline at end of file diff --git a/modules/lang-painless/src/test/resources/rest-api-spec/test/painless/70_mov_fn_agg.yml b/modules/lang-painless/src/test/resources/rest-api-spec/test/painless/70_mov_fn_agg.yml new file mode 100644 index 0000000000000..039b54aab01d1 --- /dev/null +++ b/modules/lang-painless/src/test/resources/rest-api-spec/test/painless/70_mov_fn_agg.yml @@ -0,0 +1,315 @@ +# Sanity integration test to make sure the custom context and whitelist work for moving_fn pipeline agg +# +setup: + - skip: + version: " - 6.4.0" + reason: "moving_fn added in 6.4.0" + - do: + indices.create: + index: test + body: + mappings: + _doc: + properties: + value_field: + type: integer + date: + type: date + + - do: + bulk: + refresh: true + body: + - index: + _index: test + _type: _doc + _id: 1 + - date: "2017-01-01T00:00:00" + value_field: 1 + - index: + _index: test + _type: _doc + _id: 2 + - date: "2017-01-02T00:00:00" + value_field: 2 + - index: + _index: test + _type: _doc + _id: 3 + - date: "2017-01-03T00:00:00" + value_field: 3 + - index: + _index: test + _type: _doc + _id: 4 + - date: "2017-01-04T00:00:00" + value_field: 4 + - index: + _index: test + _type: _doc + _id: 5 + - date: "2017-01-05T00:00:00" + value_field: 5 + - index: + _index: test + _type: _doc + _id: 6 + - date: "2017-01-06T00:00:00" + value_field: 6 + + - do: + indices.refresh: + index: [test] + +--- +"max": + + - do: + search: + body: + size: 0 + aggs: + the_histo: + date_histogram: + field: "date" + interval: "1d" + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: 3 + script: "MovingFunctions.max(values)" + + - match: { hits.total: 6 } + - length: { hits.hits: 0 } + - is_false: aggregations.the_histo.buckets.0.the_mov_fn.value + - match: { aggregations.the_histo.buckets.1.the_mov_fn.value: 1.0 } + - match: { aggregations.the_histo.buckets.2.the_mov_fn.value: 2.0 } + - match: { aggregations.the_histo.buckets.3.the_mov_fn.value: 3.0 } + - match: { aggregations.the_histo.buckets.4.the_mov_fn.value: 4.0 } + - match: { aggregations.the_histo.buckets.5.the_mov_fn.value: 5.0 } + +--- +"min": + + - do: + search: + body: + size: 0 + aggs: + the_histo: + date_histogram: + field: "date" + interval: "1d" + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: 3 + script: "MovingFunctions.min(values)" + + - match: { hits.total: 6 } + - length: { hits.hits: 0 } + - is_false: aggregations.the_histo.buckets.0.the_mov_fn.value + - match: { aggregations.the_histo.buckets.1.the_mov_fn.value: 1.0 } + - match: { aggregations.the_histo.buckets.2.the_mov_fn.value: 1.0 } + - match: { aggregations.the_histo.buckets.3.the_mov_fn.value: 1.0 } + - match: { aggregations.the_histo.buckets.4.the_mov_fn.value: 2.0 } + - match: { aggregations.the_histo.buckets.5.the_mov_fn.value: 3.0 } + +--- +"sum": + + - do: + search: + body: + size: 0 + aggs: + the_histo: + date_histogram: + field: "date" + interval: "1d" + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: 3 + script: "MovingFunctions.sum(values)" + + - match: { hits.total: 6 } + - length: { hits.hits: 0 } + - match: { aggregations.the_histo.buckets.0.the_mov_fn.value: 0.0 } + - match: { aggregations.the_histo.buckets.1.the_mov_fn.value: 1.0 } + - match: { aggregations.the_histo.buckets.2.the_mov_fn.value: 3.0 } + - match: { aggregations.the_histo.buckets.3.the_mov_fn.value: 6.0 } + - match: { aggregations.the_histo.buckets.4.the_mov_fn.value: 9.0 } + - match: { aggregations.the_histo.buckets.5.the_mov_fn.value: 12.0 } + +--- +"unweightedAvg": + + - do: + search: + body: + size: 0 + aggs: + the_histo: + date_histogram: + field: "date" + interval: "1d" + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: 3 + script: "MovingFunctions.unweightedAvg(values)" + + - match: { hits.total: 6 } + - length: { hits.hits: 0 } + + +--- +"linearWeightedAvg": + + - do: + search: + body: + size: 0 + aggs: + the_histo: + date_histogram: + field: "date" + interval: "1d" + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: 3 + script: "MovingFunctions.linearWeightedAvg(values)" + + - match: { hits.total: 6 } + - length: { hits.hits: 0 } + + +--- +"ewma": + + - do: + search: + body: + size: 0 + aggs: + the_histo: + date_histogram: + field: "date" + interval: "1d" + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: 3 + script: "MovingFunctions.ewma(values, 0.1)" + + - match: { hits.total: 6 } + - length: { hits.hits: 0 } + + +--- +"holt": + + - do: + search: + body: + size: 0 + aggs: + the_histo: + date_histogram: + field: "date" + interval: "1d" + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: 3 + script: "MovingFunctions.holt(values, 0.1, 0.1)" + + - match: { hits.total: 6 } + - length: { hits.hits: 0 } + + +--- +"holtWinters": + + - do: + search: + body: + size: 0 + aggs: + the_histo: + date_histogram: + field: "date" + interval: "1d" + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: 1 + script: "if (values.length > 1) { MovingFunctions.holtWinters(values, 0.1, 0.1, 0.1, 1, true)}" + + - match: { hits.total: 6 } + - length: { hits.hits: 0 } + +--- +"stdDev": + + - do: + search: + body: + size: 0 + aggs: + the_histo: + date_histogram: + field: "date" + interval: "1d" + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: 3 + script: "MovingFunctions.stdDev(values, MovingFunctions.unweightedAvg(values))" + + - match: { hits.total: 6 } + - length: { hits.hits: 0 } + + + + + + diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/250_moving_fn.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/250_moving_fn.yml new file mode 100644 index 0000000000000..9dd54811fabaa --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/250_moving_fn.yml @@ -0,0 +1,46 @@ +setup: + - skip: + version: " - 6.4.0" + reason: "moving_fn added in 6.4.0" + +--- +"Bad window": + + - do: + catch: /\[window\] must be a positive, non-zero integer\./ + search: + body: + size: 0 + aggs: + the_histo: + date_histogram: + field: "date" + interval: "1d" + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: -1 + script: "MovingFunctions.windowMax(values)" + +--- +"Not under date_histo": + + - do: + catch: /\[window\] must be a positive, non-zero integer\./ + search: + body: + size: 0 + aggs: + the_avg: + avg: + field: "value_field" + the_mov_fn: + moving_fn: + buckets_path: "the_avg" + window: -1 + script: "MovingFunctions.windowMax(values)" + diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/80_typed_keys.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/80_typed_keys.yml index 841d5cf611bab..19593decb6533 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/80_typed_keys.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/80_typed_keys.yml @@ -195,7 +195,13 @@ setup: --- "Test typed keys parameter for date_histogram aggregation and max_bucket pipeline aggregation": + - skip: + features: warnings + version: " - 6.4.0" + reason: "deprecation added in 6.4.0" - do: + warnings: + - 'The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.' search: typed_keys: true body: diff --git a/server/src/main/java/org/elasticsearch/script/ScriptModule.java b/server/src/main/java/org/elasticsearch/script/ScriptModule.java index 727651be6a565..5afb6ad28d7ab 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptModule.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptModule.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctionScript; /** * Manages building {@link ScriptService}. @@ -48,7 +49,8 @@ public class ScriptModule { FilterScript.CONTEXT, SimilarityScript.CONTEXT, SimilarityWeightScript.CONTEXT, - TemplateScript.CONTEXT + TemplateScript.CONTEXT, + MovingFunctionScript.CONTEXT ).collect(Collectors.toMap(c -> c.name, Function.identity())); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index b401ff5da1dba..66ea407f42afd 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -220,6 +220,8 @@ import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator; import org.elasticsearch.search.fetch.FetchPhase; @@ -514,6 +516,11 @@ private void registerPipelineAggregations(List plugins) { SerialDiffPipelineAggregationBuilder::new, SerialDiffPipelineAggregator::new, SerialDiffPipelineAggregationBuilder::parse)); + registerPipelineAggregation(new PipelineAggregationSpec( + MovFnPipelineAggregationBuilder.NAME, + MovFnPipelineAggregationBuilder::new, + MovFnPipelineAggregator::new, + MovFnPipelineAggregationBuilder::parse)); registerFromPlugin(plugins, SearchPlugin::getPipelineAggregations, this::registerPipelineAggregation); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java index e827275a2182d..ce87dd797d6e0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java @@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; @@ -78,6 +79,10 @@ public static PercentilesBucketPipelineAggregationBuilder percentilesBucket(Stri return new PercentilesBucketPipelineAggregationBuilder(name, bucketsPath); } + /** + * @deprecated use {@link #movingFunction(String, Script, String, int)} instead + */ + @Deprecated public static MovAvgPipelineAggregationBuilder movingAvg(String name, String bucketsPath) { return new MovAvgPipelineAggregationBuilder(name, bucketsPath); } @@ -114,4 +119,9 @@ public static CumulativeSumPipelineAggregationBuilder cumulativeSum(String name, public static SerialDiffPipelineAggregationBuilder diff(String name, String bucketsPath) { return new SerialDiffPipelineAggregationBuilder(name, bucketsPath); } + + public static MovFnPipelineAggregationBuilder movingFunction(String name, Script script, + String bucketsPaths, int window) { + return new MovFnPipelineAggregationBuilder(name, bucketsPaths, script, window); + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java index d2210e1da322c..8fdc6d3eb62b3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregationBuilder.java @@ -23,6 +23,8 @@ import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.ParseFieldRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -59,6 +61,8 @@ public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregatio public static final ParseField SETTINGS = new ParseField("settings"); private static final ParseField PREDICT = new ParseField("predict"); private static final ParseField MINIMIZE = new ParseField("minimize"); + private static final DeprecationLogger DEPRECATION_LOGGER + = new DeprecationLogger(Loggers.getLogger(MovAvgPipelineAggregationBuilder.class)); private String format; private GapPolicy gapPolicy = GapPolicy.SKIP; @@ -318,6 +322,8 @@ public static MovAvgPipelineAggregationBuilder parse( Integer predict = null; Boolean minimize = null; + DEPRECATION_LOGGER.deprecated("The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation."); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java index 26fb0333b188b..027536854ccfb 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions; import java.io.IOException; import java.text.ParseException; @@ -90,7 +91,7 @@ public MovAvgModel clone() { } @Override - protected double[] doPredict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { double[] predictions = new double[numPredictions]; // EWMA just emits the same final prediction repeatedly. @@ -100,19 +101,8 @@ protected double[] doPredict(Collection values, int numPre } @Override - public double next(Collection values) { - double avg = 0; - boolean first = true; - - for (T v : values) { - if (first) { - avg = v.doubleValue(); - first = false; - } else { - avg = (v.doubleValue() * alpha) + (avg * (1 - alpha)); - } - } - return avg; + public double next(Collection values) { + return MovingFunctions.ewma(values.stream().mapToDouble(Double::doubleValue).toArray(), alpha); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java index 1819333738502..d029bde29ad95 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions; import java.io.IOException; import java.text.ParseException; @@ -116,16 +117,15 @@ public MovAvgModel clone() { * * @param values Collection of numerics to movingAvg, usually windowed * @param numPredictions Number of newly generated predictions to return - * @param Type of numeric * @return Returns an array of doubles, since most smoothing methods operate on floating points */ @Override - protected double[] doPredict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { return next(values, numPredictions); } @Override - public double next(Collection values) { + public double next(Collection values) { return next(values, 1)[0]; } @@ -135,47 +135,13 @@ public double next(Collection values) { * @param values Collection of values to calculate avg for * @param numForecasts number of forecasts into the future to return * - * @param Type T extending Number * @return Returns a Double containing the moving avg for the window */ - public double[] next(Collection values, int numForecasts) { - + public double[] next(Collection values, int numForecasts) { if (values.size() == 0) { return emptyPredictions(numForecasts); } - - // Smoothed value - double s = 0; - double last_s = 0; - - // Trend value - double b = 0; - double last_b = 0; - - int counter = 0; - - T last; - for (T v : values) { - last = v; - if (counter == 1) { - s = v.doubleValue(); - b = v.doubleValue() - last.doubleValue(); - } else { - s = alpha * v.doubleValue() + (1.0d - alpha) * (last_s + last_b); - b = beta * (s - last_s) + (1 - beta) * last_b; - } - - counter += 1; - last_s = s; - last_b = b; - } - - double[] forecastValues = new double[numForecasts]; - for (int i = 0; i < numForecasts; i++) { - forecastValues[i] = s + (i * b); - } - - return forecastValues; + return MovingFunctions.holtForecast(values.stream().mapToDouble(Double::doubleValue).toArray(), alpha, beta, numForecasts); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java index a750145e5f9ab..e7c2007955fd2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.AggregationExecutionException; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions; import java.io.IOException; import java.text.ParseException; @@ -259,16 +260,15 @@ public boolean hasValue(int valuesAvailable) { * * @param values Collection of numerics to movingAvg, usually windowed * @param numPredictions Number of newly generated predictions to return - * @param Type of numeric * @return Returns an array of doubles, since most smoothing methods operate on floating points */ @Override - protected double[] doPredict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { return next(values, numPredictions); } @Override - public double next(Collection values) { + public double next(Collection values) { return next(values, 1)[0]; } @@ -278,88 +278,11 @@ public double next(Collection values) { * @param values Collection of values to calculate avg for * @param numForecasts number of forecasts into the future to return * - * @param Type T extending Number * @return Returns a Double containing the moving avg for the window */ - public double[] next(Collection values, int numForecasts) { - - if (values.size() < period * 2) { - // We need at least two full "seasons" to use HW - // This should have been caught earlier, we can't do anything now...bail - throw new AggregationExecutionException("Holt-Winters aggregation requires at least (2 * period == 2 * " - + period + " == "+(2 * period)+") data-points to function. Only [" + values.size() + "] were provided."); - } - - // Smoothed value - double s = 0; - double last_s; - - // Trend value - double b = 0; - double last_b = 0; - - // Seasonal value - double[] seasonal = new double[values.size()]; - - int counter = 0; - double[] vs = new double[values.size()]; - for (T v : values) { - vs[counter] = v.doubleValue() + padding; - counter += 1; - } - - // Initial level value is average of first season - // Calculate the slopes between first and second season for each period - for (int i = 0; i < period; i++) { - s += vs[i]; - b += (vs[i + period] - vs[i]) / period; - } - s /= period; - b /= period; - last_s = s; - - // Calculate first seasonal - if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) { - Arrays.fill(seasonal, 0.0); - } else { - for (int i = 0; i < period; i++) { - seasonal[i] = vs[i] / s; - } - } - - for (int i = period; i < vs.length; i++) { - // TODO if perf is a problem, we can specialize a subclass to avoid conditionals on each iteration - if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) { - s = alpha * (vs[i] / seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b); - } else { - s = alpha * (vs[i] - seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b); - } - - b = beta * (s - last_s) + (1 - beta) * last_b; - - if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) { - seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; - } else { - seasonal[i] = gamma * (vs[i] - (last_s - last_b )) + (1 - gamma) * seasonal[i - period]; - } - - last_s = s; - last_b = b; - } - - double[] forecastValues = new double[numForecasts]; - for (int i = 1; i <= numForecasts; i++) { - int idx = values.size() - period + ((i - 1) % period); - - // TODO perhaps pad out seasonal to a power of 2 and use a mask instead of modulo? - if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) { - forecastValues[i-1] = (s + (i * b)) * seasonal[idx]; - } else { - forecastValues[i-1] = s + (i * b) + seasonal[idx]; - } - } - - return forecastValues; + public double[] next(Collection values, int numForecasts) { + return MovingFunctions.holtWintersForecast(values.stream().mapToDouble(Double::doubleValue).toArray(), + alpha, beta, gamma, period, padding, seasonalityType.equals(SeasonalityType.MULTIPLICATIVE), numForecasts); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java index 3eed0bf603baa..3859405218286 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions; import java.io.IOException; import java.text.ParseException; @@ -74,7 +75,7 @@ public MovAvgModel clone() { } @Override - protected double[] doPredict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { double[] predictions = new double[numPredictions]; // EWMA just emits the same final prediction repeatedly. @@ -84,17 +85,8 @@ protected double[] doPredict(Collection values, int numPr } @Override - public double next(Collection values) { - double avg = 0; - long totalWeight = 1; - long current = 1; - - for (T v : values) { - avg += v.doubleValue() * current; - totalWeight += current; - current += 1; - } - return avg / totalWeight; + public double next(Collection values) { + return MovingFunctions.linearWeightedAvg(values.stream().mapToDouble(Double::doubleValue).toArray()); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java index 354434b65205f..f826c01adced1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java @@ -68,20 +68,18 @@ public boolean hasValue(int valuesAvailable) { * Returns the next value in the series, according to the underlying smoothing model * * @param values Collection of numerics to movingAvg, usually windowed - * @param Type of numeric * @return Returns a double, since most smoothing methods operate on floating points */ - public abstract double next(Collection values); + public abstract double next(Collection values); /** * Predicts the next `n` values in the series. * * @param values Collection of numerics to movingAvg, usually windowed * @param numPredictions Number of newly generated predictions to return - * @param Type of numeric * @return Returns an array of doubles, since most smoothing methods operate on floating points */ - public double[] predict(Collection values, int numPredictions) { + public double[] predict(Collection values, int numPredictions) { assert(numPredictions >= 1); // If there are no values, we can't do anything. Return an array of NaNs. @@ -97,10 +95,9 @@ public double[] predict(Collection values, int numPredicti * * @param values Collection of numerics to movingAvg, usually windowed * @param numPredictions Number of newly generated predictions to return - * @param Type of numeric * @return Returns an array of doubles, since most smoothing methods operate on floating points */ - protected abstract double[] doPredict(Collection values, int numPredictions); + protected abstract double[] doPredict(Collection values, int numPredictions); /** * Returns an empty set of predictions, filled with NaNs diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java index e30a59d288711..b54dba242f9f9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions; import java.io.IOException; import java.text.ParseException; @@ -72,7 +73,7 @@ public MovAvgModel clone() { } @Override - protected double[] doPredict(Collection values, int numPredictions) { + protected double[] doPredict(Collection values, int numPredictions) { double[] predictions = new double[numPredictions]; // Simple just emits the same final prediction repeatedly. @@ -82,12 +83,8 @@ protected double[] doPredict(Collection values, int numPre } @Override - public double next(Collection values) { - double avg = 0; - for (T v : values) { - avg += v.doubleValue(); - } - return avg / values.size(); + public double next(Collection values) { + return MovingFunctions.unweightedAvg(values.stream().mapToDouble(Double::doubleValue).toArray()); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnPipelineAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnPipelineAggregationBuilder.java new file mode 100644 index 0000000000000..d49da4658ae2d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnPipelineAggregationBuilder.java @@ -0,0 +1,264 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.movfn; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.BUCKETS_PATH; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY; + +public class MovFnPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { + public static final String NAME = "moving_fn"; + private static final ParseField WINDOW = new ParseField("window"); + + private final Script script; + private final String bucketsPathString; + private String format = null; + private GapPolicy gapPolicy = GapPolicy.SKIP; + private int window; + + private static final Function> PARSER + = name -> { + + @SuppressWarnings("unchecked") + ConstructingObjectParser parser = new ConstructingObjectParser<>( + MovFnPipelineAggregationBuilder.NAME, + false, + o -> new MovFnPipelineAggregationBuilder(name, (String) o[0], (Script) o[1], (int)o[2])); + + parser.declareString(ConstructingObjectParser.constructorArg(), BUCKETS_PATH_FIELD); + parser.declareField(ConstructingObjectParser.constructorArg(), + (p, c) -> Script.parse(p), Script.SCRIPT_PARSE_FIELD, ObjectParser.ValueType.OBJECT_OR_STRING); + parser.declareInt(ConstructingObjectParser.constructorArg(), WINDOW); + + parser.declareString(MovFnPipelineAggregationBuilder::format, FORMAT); + parser.declareField(MovFnPipelineAggregationBuilder::gapPolicy, p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return GapPolicy.parse(p.text().toLowerCase(Locale.ROOT), p.getTokenLocation()); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, GAP_POLICY, ObjectParser.ValueType.STRING); + + return parser; + }; + + + public MovFnPipelineAggregationBuilder(String name, String bucketsPath, Script script, int window) { + super(name, NAME, new String[]{bucketsPath}); + this.bucketsPathString = bucketsPath; + this.script = script; + if (window <= 0) { + throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer."); + } + this.window = window; + } + + public MovFnPipelineAggregationBuilder(StreamInput in) throws IOException { + super(in, NAME); + bucketsPathString = in.readString(); + script = new Script(in); + format = in.readOptionalString(); + gapPolicy = GapPolicy.readFrom(in); + window = in.readInt(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + out.writeString(bucketsPathString); + script.writeTo(out); + out.writeOptionalString(format); + gapPolicy.writeTo(out); + out.writeInt(window); + } + + /** + * Sets the format to use on the output of this aggregation. + */ + public MovFnPipelineAggregationBuilder format(String format) { + if (Strings.isNullOrEmpty(format)) { + throw new IllegalArgumentException("[" + FORMAT.getPreferredName() + "] must not be null or an empty string."); + } + this.format = format; + return this; + } + + /** + * Gets the format to use on the output of this aggregation. + */ + public String format() { + return format; + } + + protected DocValueFormat formatter() { + if (format != null) { + return new DocValueFormat.Decimal(format); + } + return DocValueFormat.RAW; + } + + /** + * Sets the gap policy to use for this aggregation. + */ + public MovFnPipelineAggregationBuilder gapPolicy(GapPolicy gapPolicy) { + if (gapPolicy == null) { + throw new IllegalArgumentException("[" + GAP_POLICY.getPreferredName() + "] must not be null."); + } + this.gapPolicy = gapPolicy; + return this; + } + + /** + * Gets the gap policy to use for this aggregation. + */ + public GapPolicy gapPolicy() { + return gapPolicy; + } + + /** + * Returns the window size for this aggregation + */ + public int getWindow() { + return window; + } + + /** + * Sets the window size for this aggregation + */ + public void setWindow(int window) { + if (window <= 0) { + throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer."); + } + this.window = window; + } + + @Override + public void doValidate(AggregatorFactory parent, List aggFactories, + List pipelineAggregatoractories) { + if (window <= 0) { + throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer."); + } + if (parent instanceof HistogramAggregatorFactory) { + HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent; + if (histoParent.minDocCount() != 0) { + throw new IllegalStateException("parent histogram of moving_function aggregation [" + name + + "] must have min_doc_count of 0"); + } + } else if (parent instanceof DateHistogramAggregatorFactory) { + DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent; + if (histoParent.minDocCount() != 0) { + throw new IllegalStateException("parent histogram of moving_function aggregation [" + name + + "] must have min_doc_count of 0"); + } + } else { + throw new IllegalStateException("moving_function aggregation [" + name + + "] must have a histogram or date_histogram as parent"); + } + } + + @Override + protected PipelineAggregator createInternal(Map metaData) throws IOException { + return new MovFnPipelineAggregator(name, bucketsPathString, script, window, formatter(), gapPolicy, metaData); + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(BUCKETS_PATH.getPreferredName(), bucketsPathString); + builder.field(Script.SCRIPT_PARSE_FIELD.getPreferredName(), script); + if (format != null) { + builder.field(FORMAT.getPreferredName(), format); + } + builder.field(GAP_POLICY.getPreferredName(), gapPolicy.getName()); + builder.field(WINDOW.getPreferredName(), window); + return builder; + } + + public static MovFnPipelineAggregationBuilder parse(String aggName, XContentParser parser) { + return PARSER.apply(aggName).apply(parser, null); + } + + /** + * Used for serialization testing, since pipeline aggs serialize themselves as a named object but are parsed + * as a regular object with the name passed in. + */ + static MovFnPipelineAggregationBuilder parse(XContentParser parser) throws IOException { + parser.nextToken(); + if (parser.currentToken().equals(XContentParser.Token.START_OBJECT)) { + parser.nextToken(); + if (parser.currentToken().equals(XContentParser.Token.FIELD_NAME)) { + String aggName = parser.currentName(); + parser.nextToken(); // "moving_fn" + parser.nextToken(); // start_object + return PARSER.apply(aggName).apply(parser, null); + } + } + + throw new IllegalStateException("Expected aggregation name but none found"); + } + + @Override + protected boolean overrideBucketsPath() { + return true; + } + + @Override + protected int doHashCode() { + return Objects.hash(bucketsPathString, script, format, gapPolicy, window); + } + + @Override + protected boolean doEquals(Object obj) { + MovFnPipelineAggregationBuilder other = (MovFnPipelineAggregationBuilder) obj; + return Objects.equals(bucketsPathString, other.bucketsPathString) + && Objects.equals(script, other.script) + && Objects.equals(format, other.format) + && Objects.equals(gapPolicy, other.gapPolicy) + && Objects.equals(window, other.window); + } + + @Override + public String getWriteableName() { + return NAME; + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnPipelineAggregator.java new file mode 100644 index 0000000000000..fc0ba7afac065 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnPipelineAggregator.java @@ -0,0 +1,149 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.movfn; + +import org.elasticsearch.common.collect.EvictingQueue; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.script.Script; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; + +/** + * This pipeline aggregation gives the user the ability to script functions that "move" across a window + * of data, instead of single data points. It is the scripted version of MovingAvg pipeline agg. + * + * Through custom script contexts, we expose a number of convenience methods: + * + * - max + * - min + * - sum + * - unweightedAvg + * - linearWeightedAvg + * - ewma + * - holt + * - holtWintersMovAvg + * + * The user can also define any arbitrary logic via their own scripting, or combine with the above methods. + */ +public class MovFnPipelineAggregator extends PipelineAggregator { + private final DocValueFormat formatter; + private final BucketHelpers.GapPolicy gapPolicy; + private final Script script; + private final String bucketsPath; + private final int window; + + MovFnPipelineAggregator(String name, String bucketsPath, Script script, int window, DocValueFormat formatter, + BucketHelpers.GapPolicy gapPolicy, Map metadata) { + super(name, new String[]{bucketsPath}, metadata); + this.bucketsPath = bucketsPath; + this.script = script; + this.formatter = formatter; + this.gapPolicy = gapPolicy; + this.window = window; + } + + public MovFnPipelineAggregator(StreamInput in) throws IOException { + super(in); + script = new Script(in); + formatter = in.readNamedWriteable(DocValueFormat.class); + gapPolicy = BucketHelpers.GapPolicy.readFrom(in); + bucketsPath = in.readString(); + window = in.readInt(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + script.writeTo(out); + out.writeNamedWriteable(formatter); + gapPolicy.writeTo(out); + out.writeString(bucketsPath); + out.writeInt(window); + } + + @Override + public String getWriteableName() { + return MovFnPipelineAggregationBuilder.NAME; + } + + @Override + public InternalAggregation reduce(InternalAggregation aggregation, InternalAggregation.ReduceContext reduceContext) { + InternalMultiBucketAggregation + histo = (InternalMultiBucketAggregation) aggregation; + List buckets = histo.getBuckets(); + HistogramFactory factory = (HistogramFactory) histo; + + List newBuckets = new ArrayList<>(); + EvictingQueue values = new EvictingQueue<>(this.window); + + // Initialize the script + MovingFunctionScript.Factory scriptFactory = reduceContext.scriptService().compile(script, MovingFunctionScript.CONTEXT); + Map vars = new HashMap<>(); + if (script.getParams() != null) { + vars.putAll(script.getParams()); + } + + MovingFunctionScript executableScript = scriptFactory.newInstance(); + + for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) { + Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy); + + // Default is to reuse existing bucket. Simplifies the rest of the logic, + // since we only change newBucket if we can add to it + MultiBucketsAggregation.Bucket newBucket = bucket; + + if (thisBucketValue != null && thisBucketValue.equals(Double.NaN) == false) { + + // The custom context mandates that the script returns a double (not Double) so we + // don't need null checks, etc. + double movavg = executableScript.execute(vars, values.stream().mapToDouble(Double::doubleValue).toArray()); + + List aggs = StreamSupport + .stream(bucket.getAggregations().spliterator(), false) + .map(InternalAggregation.class::cast) + .collect(Collectors.toList()); + aggs.add(new InternalSimpleValue(name(), movavg, formatter, new ArrayList<>(), metaData())); + newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs)); + values.offer(thisBucketValue); + } + newBuckets.add(newBucket); + } + + return factory.createAggregation(newBuckets); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovingFunctionScript.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovingFunctionScript.java new file mode 100644 index 0000000000000..131f6eb0fab58 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovingFunctionScript.java @@ -0,0 +1,45 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.movfn; + +import org.elasticsearch.script.ScriptContext; + +import java.util.Collection; +import java.util.Map; + +/** + * This class provides a custom script context for the Moving Function pipeline aggregation, + * so that we can expose a number of pre-baked moving functions like min, max, movavg, etc + */ +public abstract class MovingFunctionScript { + /** + * @param params The user-provided parameters + * @param values The values in the window that we are moving a function across + * @return A double representing the value from this particular window + */ + public abstract double execute(Map params, double[] values); + + public interface Factory { + MovingFunctionScript newInstance(); + } + + public static final String[] PARAMETERS = new String[] {"params", "values"}; + public static final ScriptContext CONTEXT = new ScriptContext<>("moving-function", Factory.class); +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovingFunctions.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovingFunctions.java new file mode 100644 index 0000000000000..4261271d185c3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovingFunctions.java @@ -0,0 +1,359 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.movfn; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Provides a collection of static utility methods that can be referenced from MovingFunction script contexts + */ +public class MovingFunctions { + + /** + * Find the maximum value in a window of values. + * If all values are missing/null/NaN, the return value will be NaN + */ + public static double max(double[] values) { + return Arrays.stream(values).max().orElse(Double.NaN); + } + + /** + * Find the minimum value in a window of values + * If all values are missing/null/NaN, the return value will be NaN + */ + public static double min(double[] values) { + return Arrays.stream(values).min().orElse(Double.NaN); + } + + /** + * Find the sum of a window of values + * If all values are missing/null/NaN, the return value will be 0.0 + */ + public static double sum(double[] values) { + if (values.length == 0) { + return 0.0; + } + return Arrays.stream(values).map(value -> { + if (Double.isNaN(value) == false) { + return value; + } + return 0.0; + }).sum(); + } + + /** + * Calculate a simple unweighted (arithmetic) moving average. + * + * Only finite values are averaged. NaN or null are ignored. + * If all values are missing/null/NaN, the return value will be NaN. + * The average is based on the count of non-null, non-NaN values. + */ + public static double unweightedAvg(double[] values) { + double avg = 0.0; + long count = 0; + for (double v : values) { + if (Double.isNaN(v) == false) { + avg += v; + count += 1; + } + } + return count == 0 ? Double.NaN : avg / count; + } + + /** + * Calculate a standard deviation over the values using the provided average. + * + * Only finite values are averaged. NaN or null are ignored. + * If all values are missing/null/NaN, the return value will be NaN. + * The average is based on the count of non-null, non-NaN values. + */ + public static double stdDev(double[] values, double avg) { + if (avg == Double.NaN) { + return Double.NaN; + } else { + long count = 0; + double squaredMean = 0; + for (double v : values) { + if (Double.isNaN(v) == false) { + squaredMean += Math.pow(v - avg, 2); + count += 1; + } + } + return Math.sqrt(squaredMean / count); + } + } + + /** + * Calculate a linearly weighted moving average, such that older values are + * linearly less important. "Time" is determined by position in collection + * + * Only finite values are averaged. NaN or null are ignored. + * If all values are missing/null/NaN, the return value will be NaN + * The average is based on the count of non-null, non-NaN values. + */ + public static double linearWeightedAvg(double[] values) { + double avg = 0; + long totalWeight = 1; + long current = 1; + + for (double v : values) { + if (Double.isNaN(v) == false) { + avg += v * current; + totalWeight += current; + current += 1; + } + } + return totalWeight == 1 ? Double.NaN : avg / totalWeight; + } + + /** + * + * Calculate a exponentially weighted moving average. + * + * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values + * (e.g. a random walk), while alpha = 0 retains infinite memory of past values (e.g. + * the series mean). Useful values are somewhere in between. Defaults to 0.5. + * + * Only finite values are averaged. NaN or null are ignored. + * If all values are missing/null/NaN, the return value will be NaN + * The average is based on the count of non-null, non-NaN values. + * + * @param alpha A double between 0-1 inclusive, controls data smoothing + */ + public static double ewma(double[] values, double alpha) { + double avg = Double.NaN; + boolean first = true; + + for (double v : values) { + if (Double.isNaN(v) == false) { + if (first) { + avg = v; + first = false; + } else { + avg = (v * alpha) + (avg * (1 - alpha)); + } + } + } + return avg; + } + + /** + * Calculate a doubly exponential weighted moving average + * + * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values + * (e.g. a random walk), while alpha = 0 retains infinite memory of past values (e.g. + * the series mean). Useful values are somewhere in between. Defaults to 0.5. + * + * Beta is equivalent to alpha, but controls the smoothing of the trend instead of the data + * + * Only finite values are averaged. NaN or null are ignored. + * If all values are missing/null/NaN, the return value will be NaN + * The average is based on the count of non-null, non-NaN values. + * + * @param alpha A double between 0-1 inclusive, controls data smoothing + * @param beta a double between 0-1 inclusive, controls trend smoothing + */ + public static double holt(double[] values, double alpha, double beta) { + if (values.length == 0) { + return Double.NaN; + } + + return holtForecast(values, alpha, beta, 1)[0]; + } + + /** + * Version of holt that can "forecast", not exposed as a whitelisted function for moving_fn scripts, but + * here as compatibility/code sharing for existing moving_avg agg. Can be removed when moving_avg is gone. + */ + public static double[] holtForecast(double[] values, double alpha, double beta, int numForecasts) { + + // Smoothed value + double s = 0; + double last_s = 0; + + // Trend value + double b = 0; + double last_b = 0; + + int counter = 0; + + Double last; + for (double v : values) { + if (Double.isNaN(v) == false) { + last = v; + if (counter == 0) { + s = v; + b = v - last; + } else { + s = alpha * v + (1.0d - alpha) * (last_s + last_b); + b = beta * (s - last_s) + (1 - beta) * last_b; + } + + counter += 1; + last_s = s; + last_b = b; + } + } + + if (counter == 0) { + return emptyPredictions(numForecasts); + } + + double[] forecastValues = new double[numForecasts]; + for (int i = 0; i < numForecasts; i++) { + forecastValues[i] = s + (i * b); + } + + return forecastValues; + } + + /** + * Calculate a triple exponential weighted moving average + * + * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values + * (e.g. a random walk), while alpha = 0 retains infinite memory of past values (e.g. + * the series mean). Useful values are somewhere in between. Defaults to 0.5. + * + * Beta is equivalent to alpha, but controls the smoothing of the trend instead of the data. + * Gamma is equivalent to alpha, but controls the smoothing of the seasonality instead of the data + * + * Only finite values are averaged. NaN or null are ignored. + * If all values are missing/null/NaN, the return value will be NaN + * The average is based on the count of non-null, non-NaN values. + * + * @param alpha A double between 0-1 inclusive, controls data smoothing + * @param beta a double between 0-1 inclusive, controls trend smoothing + * @param gamma a double between 0-1 inclusive, controls seasonality smoothing + * @param period the expected periodicity of the data + * @param multiplicative true if multiplicative HW should be used. False for additive + */ + public static double holtWinters(double[] values, double alpha, double beta, double gamma, + int period, boolean multiplicative) { + + if (values.length == 0) { + return Double.NaN; + } + + double padding = multiplicative ? 0.0000000001 : 0.0; + return holtWintersForecast(values, alpha, beta, gamma, period, padding, multiplicative, 1)[0]; + } + + /** + * Version of holt-winters that can "forecast", not exposed as a whitelisted function for moving_fn scripts, but + * here as compatibility/code sharing for existing moving_avg agg. Can be removed when moving_avg is gone. + */ + public static double[] holtWintersForecast(double[] values, double alpha, double beta, double gamma, + int period, double padding, boolean multiplicative, int numForecasts) { + if (values.length < period * 2) { + // We need at least two full "seasons" to use HW + // This should have been caught earlier, we can't do anything now...bail + throw new IllegalArgumentException("Holt-Winters aggregation requires at least (2 * period == 2 * " + + period + " == "+(2 * period)+") data-points to function. Only [" + values.length + "] were provided."); + } + + // Smoothed value + double s = 0; + double last_s; + + // Trend value + double b = 0; + double last_b = 0; + + // Seasonal value + double[] seasonal = new double[values.length]; + + int counter = 0; + double[] vs = new double[values.length]; + for (double v : values) { + if (Double.isNaN(v) == false) { + vs[counter] = v + padding; + counter += 1; + } + } + + if (counter == 0) { + return emptyPredictions(numForecasts); + } + + // Initial level value is average of first season + // Calculate the slopes between first and second season for each period + for (int i = 0; i < period; i++) { + s += vs[i]; + b += (vs[i + period] - vs[i]) / period; + } + s /= period; + b /= period; + last_s = s; + + // Calculate first seasonal + if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) { + Arrays.fill(seasonal, 0.0); + } else { + for (int i = 0; i < period; i++) { + seasonal[i] = vs[i] / s; + } + } + + for (int i = period; i < vs.length; i++) { + // TODO if perf is a problem, we can specialize a subclass to avoid conditionals on each iteration + if (multiplicative) { + s = alpha * (vs[i] / seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b); + } else { + s = alpha * (vs[i] - seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b); + } + + b = beta * (s - last_s) + (1 - beta) * last_b; + + if (multiplicative) { + seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; + } else { + seasonal[i] = gamma * (vs[i] - (last_s - last_b )) + (1 - gamma) * seasonal[i - period]; + } + + last_s = s; + last_b = b; + } + + double[] forecastValues = new double[numForecasts]; + for (int i = 1; i <= numForecasts; i++) { + int idx = values.length - period + ((i - 1) % period); + + // TODO perhaps pad out seasonal to a power of 2 and use a mask instead of modulo? + if (multiplicative) { + forecastValues[i-1] = (s + (i * b)) * seasonal[idx]; + } else { + forecastValues[i-1] = s + (i * b) + seasonal[idx]; + } + } + + return forecastValues; + } + + /** + * Returns an empty set of predictions, filled with NaNs + * @param numPredictions Number of empty predictions to generate + */ + private static double[] emptyPredictions(int numPredictions) { + double[] predictions = new double[numPredictions]; + Arrays.fill(predictions, Double.NaN); + return predictions; + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 9b5bc7541f2c2..e89e15c631082 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -446,7 +446,7 @@ private void executeTestCase(boolean reduced, Query query, List dataset, InternalDateHistogram histogram; if (reduced) { - histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, maxBucket, fieldType); + histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, maxBucket, null, fieldType); } else { histogram = search(indexSearcher, query, aggregationBuilder, maxBucket, fieldType); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnPipelineAggregationBuilderSerializationTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnPipelineAggregationBuilderSerializationTests.java new file mode 100644 index 0000000000000..218cbdf62ca05 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnPipelineAggregationBuilderSerializationTests.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.movfn; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.script.Script; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class MovFnPipelineAggregationBuilderSerializationTests extends AbstractSerializingTestCase { + + @Override + protected MovFnPipelineAggregationBuilder createTestInstance() { + return new MovFnPipelineAggregationBuilder(randomAlphaOfLength(10), "foo", new Script("foo"), randomIntBetween(1, 10)); + } + + @Override + protected Writeable.Reader instanceReader() { + return MovFnPipelineAggregationBuilder::new; + } + + @Override + protected MovFnPipelineAggregationBuilder doParseInstance(XContentParser parser) throws IOException { + return MovFnPipelineAggregationBuilder.parse(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} + diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnUnitTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnUnitTests.java new file mode 100644 index 0000000000000..4f9e653a20df6 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnUnitTests.java @@ -0,0 +1,164 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.movfn; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; +import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class MovFnUnitTests extends AggregatorTestCase { + + private static final String DATE_FIELD = "date"; + private static final String INSTANT_FIELD = "instant"; + private static final String VALUE_FIELD = "value_field"; + + private static final List datasetTimes = Arrays.asList( + "2017-01-01T01:07:45", + "2017-01-02T03:43:34", + "2017-01-03T04:11:00", + "2017-01-04T05:11:31", + "2017-01-05T08:24:05", + "2017-01-06T13:09:32", + "2017-01-07T13:47:43", + "2017-01-08T16:14:34", + "2017-01-09T17:09:50", + "2017-01-10T22:55:46"); + + private static final List datasetValues = Arrays.asList(1,2,3,4,5,6,7,8,9,10); + + public void testMatchAllDocs() throws IOException { + Query query = new MatchAllDocsQuery(); + Script script = new Script(Script.DEFAULT_SCRIPT_TYPE, "painless", "test", Collections.emptyMap()); + + DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo"); + aggBuilder.dateHistogramInterval(DateHistogramInterval.DAY).field(DATE_FIELD); + aggBuilder.subAggregation(new AvgAggregationBuilder("avg").field(VALUE_FIELD)); + aggBuilder.subAggregation(new MovFnPipelineAggregationBuilder("mov_fn", "avg", script, 3)); + + executeTestCase(query, aggBuilder, histogram -> { + assertEquals(10, histogram.getBuckets().size()); + List buckets = histogram.getBuckets(); + for (int i = 0; i < buckets.size(); i++) { + if (i == 0) { + assertThat(((InternalSimpleValue)(buckets.get(i).getAggregations().get("mov_fn"))).value(), equalTo(Double.NaN)); + } else { + assertThat(((InternalSimpleValue)(buckets.get(i).getAggregations().get("mov_fn"))).value(), equalTo(((double) i))); + } + + } + }, 1000, script); + } + + + @SuppressWarnings("unchecked") + private void executeTestCase(Query query, + DateHistogramAggregationBuilder aggBuilder, + Consumer verify, + int maxBucket, Script script) throws IOException { + + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + Document document = new Document(); + int counter = 0; + for (String date : datasetTimes) { + if (frequently()) { + indexWriter.commit(); + } + + long instant = asLong(date); + document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); + document.add(new LongPoint(INSTANT_FIELD, instant)); + document.add(new NumericDocValuesField(VALUE_FIELD, datasetValues.get(counter))); + indexWriter.addDocument(document); + document.clear(); + counter += 1; + } + } + + ScriptService scriptService = mock(ScriptService.class); + MovingFunctionScript.Factory factory = mock(MovingFunctionScript.Factory.class); + when(scriptService.compile(script, MovingFunctionScript.CONTEXT)).thenReturn(factory); + + MovingFunctionScript scriptInstance = new MovingFunctionScript() { + @Override + public double execute(Map params, double[] values) { + assertNotNull(values); + return MovingFunctions.max(values); + } + }; + + when(factory.newInstance()).thenReturn(scriptInstance); + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + DateFieldMapper.Builder builder = new DateFieldMapper.Builder("_name"); + DateFieldMapper.DateFieldType fieldType = builder.fieldType(); + fieldType.setHasDocValues(true); + fieldType.setName(aggBuilder.field()); + + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + valueFieldType.setHasDocValues(true); + valueFieldType.setName("value_field"); + + InternalDateHistogram histogram; + histogram = searchAndReduce(indexSearcher, query, aggBuilder, maxBucket, scriptService, + new MappedFieldType[]{fieldType, valueFieldType}); + verify.accept(histogram); + } + } + } + + private static long asLong(String dateTime) { + return DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parser().parseDateTime(dateTime).getMillis(); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnWhitelistedFunctionTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnWhitelistedFunctionTests.java new file mode 100644 index 0000000000000..0a0f9d6ae3759 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnWhitelistedFunctionTests.java @@ -0,0 +1,684 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.movfn; + +import org.elasticsearch.common.collect.EvictingQueue; +import org.elasticsearch.test.ESTestCase; + +import java.util.Arrays; + +import static org.hamcrest.Matchers.equalTo; + +public class MovFnWhitelistedFunctionTests extends ESTestCase { + + public void testWindowMax() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + double randValue = randomDouble(); + double expected = -Double.MAX_VALUE; + + if (i == 0) { + window.offer(randValue); + continue; + } + + for (double value : window) { + expected = Math.max(expected, value); + } + + double actual = MovingFunctions.max(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertEquals(expected, actual, 0.01 * Math.abs(expected)); + window.offer(randValue); + } + } + + public void testNullWindowMax() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + Double randValue = randomBoolean() ? Double.NaN : null; + + if (i == 0) { + if (randValue != null) { + window.offer(randValue); + } + continue; + } + + double actual = MovingFunctions.max(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertThat(actual, equalTo(Double.NaN)); + if (randValue != null) { + window.offer(randValue); + } + } + } + + public void testEmptyWindowMax() { + EvictingQueue window = new EvictingQueue<>(0); + double actual = MovingFunctions.max(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertThat(actual, equalTo(Double.NaN)); + } + + public void testWindowMin() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + double randValue = randomDouble(); + double expected = Double.MAX_VALUE; + + if (i == 0) { + window.offer(randValue); + continue; + } + + for (double value : window) { + expected = Math.min(expected, value); + } + + double actual = MovingFunctions.min(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertEquals(expected, actual, 0.01 * Math.abs(expected)); + window.offer(randValue); + } + } + + public void testNullWindowMin() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + Double randValue = randomBoolean() ? Double.NaN : null; + + if (i == 0) { + if (randValue != null) { + window.offer(randValue); + } + continue; + } + + double actual = MovingFunctions.min(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertThat(actual, equalTo(Double.NaN)); + if (randValue != null) { + window.offer(randValue); + } + } + } + + public void testEmptyWindowMin() { + EvictingQueue window = new EvictingQueue<>(0); + double actual = MovingFunctions.min(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertThat(actual, equalTo(Double.NaN)); + } + + public void testWindowSum() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + double randValue = randomDouble(); + double expected = 0; + + if (i == 0) { + window.offer(randValue); + continue; + } + + for (double value : window) { + expected += value; + } + + double actual = MovingFunctions.sum(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertEquals(expected, actual, 0.01 * Math.abs(expected)); + window.offer(randValue); + } + } + + public void testNullWindowSum() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + Double randValue = randomBoolean() ? Double.NaN : null; + + if (i == 0) { + if (randValue != null) { + window.offer(randValue); + } + continue; + } + + double actual = MovingFunctions.sum(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertThat(actual, equalTo(0.0)); + if (randValue != null) { + window.offer(randValue); + } + } + } + + public void testEmptyWindowSum() { + EvictingQueue window = new EvictingQueue<>(0); + double actual = MovingFunctions.sum(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertThat(actual, equalTo(0.0)); + } + + public void testSimpleMovAvg() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + double randValue = randomDouble(); + double expected = 0; + + if (i == 0) { + window.offer(randValue); + continue; + } + + for (double value : window) { + expected += value; + } + expected /= window.size(); + + double actual = MovingFunctions.unweightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertEquals(expected, actual, 0.01 * Math.abs(expected)); + window.offer(randValue); + } + } + + public void testNullSimpleMovAvg() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + Double randValue = randomBoolean() ? Double.NaN : null; + + if (i == 0) { + if (randValue != null) { + window.offer(randValue); + } + continue; + } + + double actual = MovingFunctions.unweightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertThat(actual, equalTo(Double.NaN)); + if (randValue != null) { + window.offer(randValue); + } + } + } + + public void testEmptySimpleMovAvg() { + EvictingQueue window = new EvictingQueue<>(0); + double actual = MovingFunctions.unweightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertThat(actual, equalTo(Double.NaN)); + } + + public void testSimpleMovStdDev() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + double randValue = randomDouble(); + double mean = 0; + + if (i == 0) { + window.offer(randValue); + continue; + } + + for (double value : window) { + mean += value; + } + mean /= window.size(); + + double expected = 0.0; + for (double value : window) { + expected += Math.pow(value - mean, 2); + } + expected = Math.sqrt(expected / window.size()); + + double actual = MovingFunctions.stdDev(window.stream().mapToDouble(Double::doubleValue).toArray(), mean); + assertEquals(expected, actual, 0.01 * Math.abs(expected)); + window.offer(randValue); + } + } + + public void testNullSimpleStdDev() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + Double randValue = randomBoolean() ? Double.NaN : null; + + if (i == 0) { + if (randValue != null) { + window.offer(randValue); + } + continue; + } + + double actual = MovingFunctions.stdDev(window.stream().mapToDouble(Double::doubleValue).toArray(), + MovingFunctions.unweightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray())); + assertThat(actual, equalTo(Double.NaN)); + if (randValue != null) { + window.offer(randValue); + } + } + } + + public void testEmptySimpleStdDev() { + EvictingQueue window = new EvictingQueue<>(0); + double actual = MovingFunctions.stdDev(window.stream().mapToDouble(Double::doubleValue).toArray(), + MovingFunctions.unweightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray())); + assertThat(actual, equalTo(Double.NaN)); + } + + public void testLinearMovAvg() { + + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + double randValue = randomDouble(); + + if (i == 0) { + window.offer(randValue); + continue; + } + + double avg = 0; + long totalWeight = 1; + long current = 1; + + for (double value : window) { + avg += value * current; + totalWeight += current; + current += 1; + } + double expected = avg / totalWeight; + double actual = MovingFunctions.linearWeightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertEquals(expected, actual, 0.01 * Math.abs(expected)); + window.offer(randValue); + } + } + + public void testNullLinearMovAvg() { + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + Double randValue = randomBoolean() ? Double.NaN : null; + + if (i == 0) { + if (randValue != null) { + window.offer(randValue); + } + continue; + } + + double actual = MovingFunctions.linearWeightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertThat(actual, equalTo(Double.NaN)); + if (randValue != null) { + window.offer(randValue); + } + } + } + + public void testEmptyLinearMovAvg() { + EvictingQueue window = new EvictingQueue<>(0); + double actual = MovingFunctions.linearWeightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray()); + assertThat(actual, equalTo(Double.NaN)); + } + + public void testEWMAMovAvg() { + double alpha = randomDouble(); + + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + double randValue = randomDouble(); + + if (i == 0) { + window.offer(randValue); + continue; + } + + double avg = 0; + boolean first = true; + + for (double value : window) { + if (first) { + avg = value; + first = false; + } else { + avg = (value * alpha) + (avg * (1 - alpha)); + } + } + double expected = avg; + double actual = MovingFunctions.ewma(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha); + assertEquals(expected, actual, 0.01 * Math.abs(expected)); + window.offer(randValue); + } + } + + public void testNullEwmaMovAvg() { + double alpha = randomDouble(); + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + Double randValue = randomBoolean() ? Double.NaN : null; + + if (i == 0) { + if (randValue != null) { + window.offer(randValue); + } + continue; + } + + double actual = MovingFunctions.ewma(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha); + assertThat(actual, equalTo(Double.NaN)); + if (randValue != null) { + window.offer(randValue); + } + } + } + + public void testEmptyEwmaMovAvg() { + double alpha = randomDouble(); + EvictingQueue window = new EvictingQueue<>(0); + double actual = MovingFunctions.ewma(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha); + assertThat(actual, equalTo(Double.NaN)); + } + + public void testHoltLinearMovAvg() { + double alpha = randomDouble(); + double beta = randomDouble(); + + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + double randValue = randomDouble(); + + if (i == 0) { + window.offer(randValue); + continue; + } + + double s = 0; + double last_s = 0; + + // Trend value + double b = 0; + double last_b = 0; + int counter = 0; + + double last; + for (double value : window) { + last = value; + if (counter == 0) { + s = value; + b = value - last; + } else { + s = alpha * value + (1.0d - alpha) * (last_s + last_b); + b = beta * (s - last_s) + (1 - beta) * last_b; + } + + counter += 1; + last_s = s; + last_b = b; + } + + double expected = s + (0 * b) ; + double actual = MovingFunctions.holt(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha, beta); + assertEquals(expected, actual, 0.01 * Math.abs(expected)); + window.offer(randValue); + } + } + + public void testNullHoltMovAvg() { + double alpha = randomDouble(); + double beta = randomDouble(); + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(1, 50); + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < numValues; i++) { + + Double randValue = randomBoolean() ? Double.NaN : null; + + if (i == 0) { + if (randValue != null) { + window.offer(randValue); + } + continue; + } + + double actual = MovingFunctions.holt(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha, beta); + assertThat(actual, equalTo(Double.NaN)); + if (randValue != null) { + window.offer(randValue); + } + } + } + + public void testEmptyHoltMovAvg() { + double alpha = randomDouble(); + double beta = randomDouble(); + EvictingQueue window = new EvictingQueue<>(0); + double actual = MovingFunctions.holt(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha, beta); + assertThat(actual, equalTo(Double.NaN)); + } + + public void testHoltWintersMultiplicative() { + double alpha = randomDouble(); + double beta = randomDouble(); + double gamma = randomDouble(); + int period = randomIntBetween(1,10); + int windowSize = randomIntBetween(period * 2, 50); // HW requires at least two periods of data + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < windowSize; i++) { + window.offer(randomDouble()); + } + + // Smoothed value + double s = 0; + double last_s = 0; + + // Trend value + double b = 0; + double last_b = 0; + + // Seasonal value + double[] seasonal = new double[windowSize]; + + int counter = 0; + double[] vs = new double[windowSize]; + for (double v : window) { + vs[counter] = v + 0.0000000001; + counter += 1; + } + + // Initial level value is average of first season + // Calculate the slopes between first and second season for each period + for (int i = 0; i < period; i++) { + s += vs[i]; + b += (vs[i + period] - vs[i]) / period; + } + s /= period; + b /= period; + last_s = s; + + // Calculate first seasonal + if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) { + Arrays.fill(seasonal, 0.0); + } else { + for (int i = 0; i < period; i++) { + seasonal[i] = vs[i] / s; + } + } + + for (int i = period; i < vs.length; i++) { + s = alpha * (vs[i] / seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b); + b = beta * (s - last_s) + (1 - beta) * last_b; + + seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; + last_s = s; + last_b = b; + } + + int idx = window.size() - period + (0 % period); + double expected = (s + (1 * b)) * seasonal[idx]; + double actual = MovingFunctions.holtWinters(window.stream().mapToDouble(Double::doubleValue).toArray(), + alpha, beta, gamma, period, true); + assertEquals(expected, actual, 0.01 * Math.abs(expected)); + } + + public void testNullHoltWintersMovAvg() { + double alpha = randomDouble(); + double beta = randomDouble(); + double gamma = randomDouble(); + int period = randomIntBetween(1,10); + int numValues = randomIntBetween(1, 100); + int windowSize = randomIntBetween(period * 2, 50); // HW requires at least two periods of data + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < windowSize; i++) { + window.offer(Double.NaN); + } + + for (int i = 0; i < numValues; i++) { + double actual = MovingFunctions.holtWinters(window.stream().mapToDouble(Double::doubleValue).toArray(), + alpha, beta, gamma, period, false); + assertThat(actual, equalTo(Double.NaN)); + } + } + + public void testEmptyHoltWintersMovAvg() { + double alpha = randomDouble(); + double beta = randomDouble(); + double gamma = randomDouble(); + int period = randomIntBetween(1,10); + EvictingQueue window = new EvictingQueue<>(0); + double actual = MovingFunctions.holtWinters(window.stream().mapToDouble(Double::doubleValue).toArray(), + alpha, beta, gamma, period, false); + assertThat(actual, equalTo(Double.NaN)); + } + + public void testHoltWintersAdditive() { + double alpha = randomDouble(); + double beta = randomDouble(); + double gamma = randomDouble(); + int period = randomIntBetween(1,10); + + int windowSize = randomIntBetween(period * 2, 50); // HW requires at least two periods of data + + EvictingQueue window = new EvictingQueue<>(windowSize); + for (int i = 0; i < windowSize; i++) { + window.offer(randomDouble()); + } + + // Smoothed value + double s = 0; + double last_s = 0; + + // Trend value + double b = 0; + double last_b = 0; + + // Seasonal value + double[] seasonal = new double[windowSize]; + + int counter = 0; + double[] vs = new double[windowSize]; + for (double v : window) { + vs[counter] = v; + counter += 1; + } + + // Initial level value is average of first season + // Calculate the slopes between first and second season for each period + for (int i = 0; i < period; i++) { + s += vs[i]; + b += (vs[i + period] - vs[i]) / period; + } + s /= period; + b /= period; + last_s = s; + + // Calculate first seasonal + if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) { + Arrays.fill(seasonal, 0.0); + } else { + for (int i = 0; i < period; i++) { + seasonal[i] = vs[i] / s; + } + } + + for (int i = period; i < vs.length; i++) { + s = alpha * (vs[i] - seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b); + b = beta * (s - last_s) + (1 - beta) * last_b; + + seasonal[i] = gamma * (vs[i] - (last_s - last_b )) + (1 - gamma) * seasonal[i - period]; + last_s = s; + last_b = b; + } + + int idx = window.size() - period + (0 % period); + double expected = s + (1 * b) + seasonal[idx]; + double actual = MovingFunctions.holtWinters(window.stream().mapToDouble(Double::doubleValue).toArray(), + alpha, beta, gamma, period, false); + assertEquals(expected, actual, 0.01 * Math.abs(expected)); + } + +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java index 73a3c553b4d1a..01af64d26deca 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java @@ -312,7 +312,7 @@ private double holt(Collection window) { double last; for (double value : window) { last = value; - if (counter == 1) { + if (counter == 0) { s = value; b = value - last; } else { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java index 869a7cd58ed8e..659fad3f45ce6 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java @@ -31,6 +31,8 @@ import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel; import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel; +import java.io.IOException; + public class MovAvgTests extends BasePipelineAggregationTestCase { @Override @@ -94,6 +96,12 @@ protected MovAvgPipelineAggregationBuilder createTestAggregatorFactory() { return factory; } + @Override + public void testFromXContent() throws IOException { + super.testFromXContent(); + assertWarnings("The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation."); + } + public void testDefaultParsing() throws Exception { MovAvgPipelineAggregationBuilder expected = new MovAvgPipelineAggregationBuilder("commits_moving_avg", "commits"); String json = "{" + @@ -104,6 +112,7 @@ public void testDefaultParsing() throws Exception { " }" + "}"; PipelineAggregationBuilder newAgg = parse(createParser(JsonXContent.jsonXContent, json)); + assertWarnings("The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation."); assertNotSame(newAgg, expected); assertEquals(expected, newAgg); assertEquals(expected.hashCode(), newAgg.hashCode()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java index 34d203443604a..55c31013fd9de 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgUnitTests.java @@ -246,7 +246,7 @@ public void testHoltLinearMovAvgModel() { double last; for (double value : window) { last = value; - if (counter == 1) { + if (counter == 0) { s = value; b = value - last; } else { @@ -292,7 +292,7 @@ public void testHoltLinearPredictionModel() { double last; for (double value : window) { last = value; - if (counter == 1) { + if (counter == 0) { s = value; b = value - last; } else { diff --git a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java index da3757d77b46e..00303b344b92a 100644 --- a/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/script/MockScriptEngine.java @@ -26,6 +26,8 @@ import org.elasticsearch.index.similarity.ScriptedSimilarity.Query; import org.elasticsearch.index.similarity.ScriptedSimilarity.Term; import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctionScript; +import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions; import org.elasticsearch.search.lookup.LeafSearchLookup; import org.elasticsearch.search.lookup.SearchLookup; @@ -109,6 +111,9 @@ public String execute() { } else if (context.instanceClazz.equals(SimilarityWeightScript.class)) { SimilarityWeightScript.Factory factory = mockCompiled::createSimilarityWeightScript; return context.factoryClazz.cast(factory); + } else if (context.instanceClazz.equals(MovingFunctionScript.class)) { + MovingFunctionScript.Factory factory = mockCompiled::createMovingFunctionScript; + return context.factoryClazz.cast(factory); } throw new IllegalArgumentException("mock script engine does not know how to handle context [" + context.name + "]"); } @@ -169,6 +174,10 @@ public SimilarityScript createSimilarityScript() { public SimilarityWeightScript createSimilarityWeightScript() { return new MockSimilarityWeightScript(script != null ? script : ctx -> 42d); } + + public MovingFunctionScript createMovingFunctionScript() { + return new MockMovingFunctionScript(); + } } public class MockExecutableScript implements ExecutableScript { @@ -327,4 +336,11 @@ public static Script mockInlineScript(final String script) { return new Script(ScriptType.INLINE, "mock", script, emptyMap()); } + public class MockMovingFunctionScript extends MovingFunctionScript { + @Override + public double execute(Map params, double[] values) { + return MovingFunctions.unweightedAvg(values); + } + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 73ac501ec1dfc..ea2b5d2ad5343 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -61,8 +61,11 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.mock.orig.Mockito; -import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders; +import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase; import org.elasticsearch.search.fetch.subphase.FetchSourceSubPhase; @@ -302,7 +305,7 @@ protected A searchAndReduc Query query, AggregationBuilder builder, MappedFieldType... fieldTypes) throws IOException { - return searchAndReduce(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + return searchAndReduce(searcher, query, builder, DEFAULT_MAX_BUCKETS, null, fieldTypes); } /** @@ -314,6 +317,7 @@ protected A searchAndReduc Query query, AggregationBuilder builder, int maxBucket, + ScriptService scriptService, MappedFieldType... fieldTypes) throws IOException { final IndexReaderContext ctx = searcher.getTopReaderContext(); @@ -368,7 +372,7 @@ protected A searchAndReduc // now do the final reduce MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket); InternalAggregation.ReduceContext context = - new InternalAggregation.ReduceContext(root.context().bigArrays(), null, reduceBucketConsumer, true); + new InternalAggregation.ReduceContext(root.context().bigArrays(), scriptService, reduceBucketConsumer, true); @SuppressWarnings("unchecked") A internalAgg = (A) aggs.get(0).doReduce(aggs, context);