Skip to content

Commit e6c70f6

Browse files
authored
Add value_count mode to rate agg (#63687)
Adds a new value count mode to the rate aggregation. Closes #63575
1 parent d3bbd7d commit e6c70f6

File tree

10 files changed

+293
-29
lines changed

10 files changed

+293
-29
lines changed

docs/reference/aggregations/metrics/rate-aggregation.asciidoc

+80-2
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ be automatically calculated by multiplying monthly rate by 12.
9393
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
9494

9595
Instead of counting the number of documents, it is also possible to calculate a sum of all values of the fields in the documents in each
96-
bucket. The following request will group all sales records into monthly bucket and than calculate the total monthly sales and convert them
97-
into average daily sales.
96+
bucket or the number of values in each bucket. The following request will group all sales records into monthly bucket and than calculate
97+
the total monthly sales and convert them into average daily sales.
9898

9999
[source,console]
100100
--------------------------------------------------
@@ -164,6 +164,84 @@ The response will contain the average daily sale prices for each month.
164164
--------------------------------------------------
165165
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
166166

167+
By adding the `mode` parameter with the value `value_count`, we can change the calculation from `sum` to the number of values of the field:
168+
169+
[source,console]
170+
--------------------------------------------------
171+
GET sales/_search
172+
{
173+
"size": 0,
174+
"aggs": {
175+
"by_date": {
176+
"date_histogram": {
177+
"field": "date",
178+
"calendar_interval": "month" <1>
179+
},
180+
"aggs": {
181+
"avg_number_of_sales_per_year": {
182+
"rate": {
183+
"field": "price", <2>
184+
"unit": "year", <3>
185+
"mode": "value_count" <4>
186+
}
187+
}
188+
}
189+
}
190+
}
191+
}
192+
--------------------------------------------------
193+
// TEST[setup:sales]
194+
<1> Histogram is grouped by month.
195+
<2> Calculate number of of all sale prices
196+
<3> Convert to annual counts
197+
<4> Changing the mode to value count
198+
199+
The response will contain the average daily sale prices for each month.
200+
201+
[source,console-result]
202+
--------------------------------------------------
203+
{
204+
...
205+
"aggregations" : {
206+
"by_date" : {
207+
"buckets" : [
208+
{
209+
"key_as_string" : "2015/01/01 00:00:00",
210+
"key" : 1420070400000,
211+
"doc_count" : 3,
212+
"avg_number_of_sales_per_year" : {
213+
"value" : 36.0
214+
}
215+
},
216+
{
217+
"key_as_string" : "2015/02/01 00:00:00",
218+
"key" : 1422748800000,
219+
"doc_count" : 2,
220+
"avg_number_of_sales_per_year" : {
221+
"value" : 24.0
222+
}
223+
},
224+
{
225+
"key_as_string" : "2015/03/01 00:00:00",
226+
"key" : 1425168000000,
227+
"doc_count" : 2,
228+
"avg_number_of_sales_per_year" : {
229+
"value" : 24.0
230+
}
231+
}
232+
]
233+
}
234+
}
235+
}
236+
--------------------------------------------------
237+
// TESTRESPONSE[s/\.\.\./"took": $body.took,"timed_out": false,"_shards": $body._shards,"hits": $body.hits,/]
238+
239+
By default `sum` mode is used.
240+
241+
`"mode": "sum"`:: calculate the sum of all values field
242+
`"mode": "value_count"`:: use the number of values in the field
243+
244+
The `mode` parameter can only be used with fields and scripts.
167245

168246
==== Relationship between bucket sizes and rate
169247

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/AbstractRateAggregator.java

+6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public abstract class AbstractRateAggregator extends NumericMetricsAggregator.Si
2626
protected final ValuesSource valuesSource;
2727
private final DocValueFormat format;
2828
private final Rounding.DateTimeUnit rateUnit;
29+
protected final RateMode rateMode;
2930
private final SizedBucketAggregator sizedBucketAggregator;
3031

3132
protected DoubleArray sums;
@@ -35,6 +36,7 @@ public AbstractRateAggregator(
3536
String name,
3637
ValuesSourceConfig valuesSourceConfig,
3738
Rounding.DateTimeUnit rateUnit,
39+
RateMode rateMode,
3840
SearchContext context,
3941
Aggregator parent,
4042
Map<String, Object> metadata
@@ -45,8 +47,12 @@ public AbstractRateAggregator(
4547
if (valuesSource != null) {
4648
sums = context.bigArrays().newDoubleArray(1, true);
4749
compensations = context.bigArrays().newDoubleArray(1, true);
50+
if (rateMode == null) {
51+
rateMode = RateMode.SUM;
52+
}
4853
}
4954
this.rateUnit = rateUnit;
55+
this.rateMode = rateMode;
5056
this.sizedBucketAggregator = findSizedBucketAncestor();
5157
}
5258

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/HistogramRateAggregator.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ public HistogramRateAggregator(
2727
String name,
2828
ValuesSourceConfig valuesSourceConfig,
2929
Rounding.DateTimeUnit rateUnit,
30+
RateMode rateMode,
3031
SearchContext context,
3132
Aggregator parent,
3233
Map<String, Object> metadata
3334
) throws IOException {
34-
super(name, valuesSourceConfig, rateUnit, context, parent, metadata);
35+
super(name, valuesSourceConfig, rateUnit, rateMode, context, parent, metadata);
3536
}
3637

3738
@Override
@@ -51,7 +52,18 @@ public void collect(int doc, long bucket) throws IOException {
5152
double sum = sums.get(bucket);
5253
double compensation = compensations.get(bucket);
5354
kahanSummation.reset(sum, compensation);
54-
kahanSummation.add(sketch.value());
55+
final double value;
56+
switch (rateMode) {
57+
case SUM:
58+
value = sketch.value();
59+
break;
60+
case VALUE_COUNT:
61+
value = sketch.count();
62+
break;
63+
default:
64+
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
65+
}
66+
kahanSummation.add(value);
5567
compensations.set(bucket, kahanSummation.delta());
5668
sums.set(bucket, kahanSummation.value());
5769
}

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/NumericRateAggregator.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ public NumericRateAggregator(
2626
String name,
2727
ValuesSourceConfig valuesSourceConfig,
2828
Rounding.DateTimeUnit rateUnit,
29+
RateMode rateMode,
2930
SearchContext context,
3031
Aggregator parent,
3132
Map<String, Object> metadata
3233
) throws IOException {
33-
super(name, valuesSourceConfig, rateUnit, context, parent, metadata);
34+
super(name, valuesSourceConfig, rateUnit, rateMode, context, parent, metadata);
3435
}
3536

3637
@Override
@@ -51,10 +52,17 @@ public void collect(int doc, long bucket) throws IOException {
5152
double sum = sums.get(bucket);
5253
double compensation = compensations.get(bucket);
5354
kahanSummation.reset(sum, compensation);
54-
55-
for (int i = 0; i < valuesCount; i++) {
56-
double value = values.nextValue();
57-
kahanSummation.add(value);
55+
switch (rateMode) {
56+
case SUM:
57+
for (int i = 0; i < valuesCount; i++) {
58+
kahanSummation.add(values.nextValue());
59+
}
60+
break;
61+
case VALUE_COUNT:
62+
kahanSummation.add(valuesCount);
63+
break;
64+
default:
65+
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
5866
}
5967

6068
compensations.set(bucket, kahanSummation.delta());

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregationBuilder.java

+44-18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
*/
66
package org.elasticsearch.xpack.analytics.rate;
77

8+
import java.io.IOException;
9+
import java.util.Map;
10+
import java.util.Objects;
11+
12+
import org.elasticsearch.Version;
813
import org.elasticsearch.common.ParseField;
914
import org.elasticsearch.common.Rounding;
1015
import org.elasticsearch.common.io.stream.StreamInput;
@@ -24,13 +29,10 @@
2429
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
2530
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
2631

27-
import java.io.IOException;
28-
import java.util.Map;
29-
import java.util.Objects;
30-
3132
public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource, RateAggregationBuilder> {
3233
public static final String NAME = "rate";
3334
public static final ParseField UNIT_FIELD = new ParseField("unit");
35+
public static final ParseField MODE_FIELD = new ParseField("mode");
3436
public static final ValuesSourceRegistry.RegistryKey<RateAggregatorSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
3537
NAME,
3638
RateAggregatorSupplier.class
@@ -40,9 +42,11 @@ public class RateAggregationBuilder extends ValuesSourceAggregationBuilder.LeafO
4042
static {
4143
ValuesSourceAggregationBuilder.declareFields(PARSER, true, true, false, false);
4244
PARSER.declareString(RateAggregationBuilder::rateUnit, UNIT_FIELD);
45+
PARSER.declareString(RateAggregationBuilder::rateMode, MODE_FIELD);
4346
}
4447

4548
Rounding.DateTimeUnit rateUnit;
49+
RateMode rateMode;
4650

4751
public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
4852
RateAggregatorFactory.registerAggregators(builder);
@@ -58,6 +62,8 @@ protected RateAggregationBuilder(
5862
Map<String, Object> metadata
5963
) {
6064
super(clone, factoriesBuilder, metadata);
65+
this.rateUnit = clone.rateUnit;
66+
this.rateMode = clone.rateMode;
6167
}
6268

6369
@Override
@@ -76,6 +82,11 @@ public RateAggregationBuilder(StreamInput in) throws IOException {
7682
} else {
7783
rateUnit = null;
7884
}
85+
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
86+
if (in.readBoolean()) {
87+
rateMode = in.readEnum(RateMode.class);
88+
}
89+
}
7990
}
8091

8192
@Override
@@ -90,6 +101,14 @@ protected void innerWriteTo(StreamOutput out) throws IOException {
90101
} else {
91102
out.writeByte((byte) 0);
92103
}
104+
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
105+
if (rateMode != null) {
106+
out.writeBoolean(true);
107+
out.writeEnum(rateMode);
108+
} else {
109+
out.writeBoolean(false);
110+
}
111+
}
93112
}
94113

95114
@Override
@@ -104,14 +123,22 @@ protected RateAggregatorFactory innerBuild(
104123
AggregatorFactory parent,
105124
AggregatorFactories.Builder subFactoriesBuilder
106125
) throws IOException {
107-
return new RateAggregatorFactory(name, config, rateUnit, context, parent, subFactoriesBuilder, metadata);
126+
if (field() == null && script() == null) {
127+
if (rateMode != null) {
128+
throw new IllegalArgumentException("The mode parameter is only supported with field or script");
129+
}
130+
}
131+
return new RateAggregatorFactory(name, config, rateUnit, rateMode, context, parent, subFactoriesBuilder, metadata);
108132
}
109133

110134
@Override
111135
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
112136
if (rateUnit != null) {
113137
builder.field(UNIT_FIELD.getPreferredName(), rateUnit.shortName());
114138
}
139+
if (rateMode != null) {
140+
builder.field(MODE_FIELD.getPreferredName(), rateMode.value());
141+
}
115142
return builder;
116143
}
117144

@@ -129,6 +156,15 @@ public RateAggregationBuilder rateUnit(Rounding.DateTimeUnit rateUnit) {
129156
return this;
130157
}
131158

159+
public RateAggregationBuilder rateMode(String rateMode) {
160+
return rateMode(RateMode.resolve(rateMode));
161+
}
162+
163+
public RateAggregationBuilder rateMode(RateMode rateMode) {
164+
this.rateMode = rateMode;
165+
return this;
166+
}
167+
132168
static Rounding.DateTimeUnit parse(String rateUnit) {
133169
Rounding.DateTimeUnit parsedRate = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(rateUnit);
134170
if (parsedRate == null) {
@@ -140,17 +176,7 @@ static Rounding.DateTimeUnit parse(String rateUnit) {
140176
@Override
141177
protected ValuesSourceConfig resolveConfig(AggregationContext context) {
142178
if (field() == null && script() == null) {
143-
return new ValuesSourceConfig(
144-
CoreValuesSourceType.NUMERIC,
145-
null,
146-
true,
147-
null,
148-
null,
149-
1.0,
150-
null,
151-
DocValueFormat.RAW,
152-
context
153-
);
179+
return new ValuesSourceConfig(CoreValuesSourceType.NUMERIC, null, true, null, null, 1.0, null, DocValueFormat.RAW, context);
154180
} else {
155181
return super.resolveConfig(context);
156182
}
@@ -162,11 +188,11 @@ public boolean equals(Object o) {
162188
if (o == null || getClass() != o.getClass()) return false;
163189
if (!super.equals(o)) return false;
164190
RateAggregationBuilder that = (RateAggregationBuilder) o;
165-
return rateUnit == that.rateUnit;
191+
return rateUnit == that.rateUnit && rateMode == that.rateMode;
166192
}
167193

168194
@Override
169195
public int hashCode() {
170-
return Objects.hash(super.hashCode(), rateUnit);
196+
return Objects.hash(super.hashCode(), rateUnit, rateMode);
171197
}
172198
}

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorFactory.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,21 @@ class RateAggregatorFactory extends ValuesSourceAggregatorFactory {
2929

3030
private final Rounding.DateTimeUnit rateUnit;
3131

32+
private final RateMode rateMode;
33+
3234
RateAggregatorFactory(
3335
String name,
3436
ValuesSourceConfig config,
3537
Rounding.DateTimeUnit rateUnit,
38+
RateMode rateMode,
3639
AggregationContext context,
3740
AggregatorFactory parent,
3841
AggregatorFactories.Builder subFactoriesBuilder,
3942
Map<String, Object> metadata
4043
) throws IOException {
4144
super(name, config, context, parent, subFactoriesBuilder, metadata);
4245
this.rateUnit = rateUnit;
46+
this.rateMode = rateMode;
4347
}
4448

4549
static void registerAggregators(ValuesSourceRegistry.Builder builder) {
@@ -59,7 +63,7 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) {
5963

6064
@Override
6165
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
62-
return new AbstractRateAggregator(name, config, rateUnit, searchContext, parent, metadata) {
66+
return new AbstractRateAggregator(name, config, rateUnit, rateMode, searchContext, parent, metadata) {
6367
@Override
6468
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) {
6569
return LeafBucketCollector.NO_OP_COLLECTOR;
@@ -76,6 +80,6 @@ protected Aggregator doCreateInternal(
7680
) throws IOException {
7781
return context.getValuesSourceRegistry()
7882
.getAggregator(RateAggregationBuilder.REGISTRY_KEY, config)
79-
.build(name, config, rateUnit, searchContext, parent, metadata);
83+
.build(name, config, rateUnit, rateMode, searchContext, parent, metadata);
8084
}
8185
}

x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/rate/RateAggregatorSupplier.java

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Aggregator build(
1919
String name,
2020
ValuesSourceConfig valuesSourceConfig,
2121
Rounding.DateTimeUnit rateUnit,
22+
RateMode rateMode,
2223
SearchContext context,
2324
Aggregator parent,
2425
Map<String, Object> metadata

0 commit comments

Comments
 (0)