Skip to content

Extrapolate rate aggregation #126331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.aggregation;

import org.elasticsearch.compute.operator.DriverContext;

public abstract class TimeSeriesGroupingAggregatorEvaluationContext extends GroupingAggregatorEvaluationContext {
public TimeSeriesGroupingAggregatorEvaluationContext(DriverContext driverContext) {
super(driverContext);
}

public abstract long rangeStartInMillis(int groupId);

public abstract long rangeEndInMillis(int groupId);
}
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ import java.util.Arrays;
value = {
@IntermediateState(name = "timestamps", type = "LONG_BLOCK"),
@IntermediateState(name = "values", type = "$TYPE$_BLOCK"),
@IntermediateState(name = "sampleCounts", type = "INT"),
@IntermediateState(name = "resets", type = "DOUBLE") }
)
public class Rate$Type$Aggregator {
@@ -57,10 +58,11 @@ public class Rate$Type$Aggregator {
int groupId,
LongBlock timestamps,
$Type$Block values,
int sampleCount,
double reset,
int otherPosition
) {
current.combine(groupId, timestamps, values, reset, otherPosition);
current.combine(groupId, timestamps, values, sampleCount, reset, otherPosition);
}

public static void combineStates(
@@ -72,18 +74,16 @@ public class Rate$Type$Aggregator {
current.combineState(currentGroupId, otherState, otherGroupId);
}

public static Block evaluateFinal(
$Type$RateGroupingState state,
IntVector selected,
GroupingAggregatorEvaluationContext evaluatorContext
) {
return state.evaluateFinal(selected, evaluatorContext.blockFactory());
public static Block evaluateFinal($Type$RateGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
return state.evaluateFinal(selected, evalContext);
}

private static class $Type$RateState {
static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject($Type$RateState.class);
final long[] timestamps; // descending order
final $type$[] values;
// the timestamps and values arrays might have collapsed to fewer values than the actual sample count
int sampleCount = 0;
double reset = 0;

$Type$RateState(int initialSize) {
@@ -94,6 +94,7 @@ public class Rate$Type$Aggregator {
$Type$RateState(long[] ts, $type$[] vs) {
this.timestamps = ts;
this.values = vs;
this.sampleCount = values.length;
}

private $type$ dv($type$ v0, $type$ v1) {
@@ -107,6 +108,7 @@ public class Rate$Type$Aggregator {
reset += dv(v, values[1]) + dv(values[1], values[0]) - dv(v, values[0]);
timestamps[1] = t;
values[1] = v;
sampleCount++;
}

int entries() {
@@ -163,7 +165,7 @@ public class Rate$Type$Aggregator {
}
}

void combine(int groupId, LongBlock timestamps, $Type$Block values, double reset, int otherPosition) {
void combine(int groupId, LongBlock timestamps, $Type$Block values, int sampleCount, double reset, int otherPosition) {
final int valueCount = timestamps.getValueCount(otherPosition);
if (valueCount == 0) {
return;
@@ -175,6 +177,7 @@ public class Rate$Type$Aggregator {
adjustBreaker($Type$RateState.bytesUsed(valueCount));
state = new $Type$RateState(valueCount);
state.reset = reset;
state.sampleCount = sampleCount;
states.set(groupId, state);
// TODO: add bulk_copy to Block
for (int i = 0; i < valueCount; i++) {
@@ -185,6 +188,7 @@ public class Rate$Type$Aggregator {
adjustBreaker($Type$RateState.bytesUsed(state.entries() + valueCount));
var newState = new $Type$RateState(state.entries() + valueCount);
newState.reset = state.reset + reset;
newState.sampleCount = state.sampleCount + sampleCount;
states.set(groupId, newState);
merge(state, newState, firstIndex, valueCount, timestamps, values);
adjustBreaker(-$Type$RateState.bytesUsed(state.entries())); // old state
@@ -232,6 +236,7 @@ public class Rate$Type$Aggregator {
adjustBreaker($Type$RateState.bytesUsed(len));
curr = new $Type$RateState(Arrays.copyOf(other.timestamps, len), Arrays.copyOf(other.values, len));
curr.reset = other.reset;
curr.sampleCount = other.sampleCount;
states.set(groupId, curr);
} else {
states.set(groupId, mergeState(curr, other));
@@ -243,6 +248,7 @@ public class Rate$Type$Aggregator {
adjustBreaker($Type$RateState.bytesUsed(newLen));
var dst = new $Type$RateState(newLen);
dst.reset = s1.reset + s2.reset;
dst.sampleCount = s1.sampleCount + s2.sampleCount;
int i = 0, j = 0, k = 0;
while (i < s1.entries() && j < s2.entries()) {
if (s1.timestamps[i] > s2.timestamps[j]) {
@@ -281,6 +287,7 @@ public class Rate$Type$Aggregator {
try (
LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2);
$Type$Block.Builder values = blockFactory.new$Type$BlockBuilder(positionCount * 2);
IntVector.FixedBuilder sampleCounts = blockFactory.newIntVectorFixedBuilder(positionCount);
DoubleVector.FixedBuilder resets = blockFactory.newDoubleVectorFixedBuilder(positionCount)
) {
for (int i = 0; i < positionCount; i++) {
@@ -298,45 +305,103 @@ public class Rate$Type$Aggregator {
values.append$Type$(v);
}
values.endPositionEntry();

sampleCounts.appendInt(i, state.sampleCount);
resets.appendDouble(i, state.reset);
} else {
timestamps.appendNull();
values.appendNull();
sampleCounts.appendInt(i, 0);
resets.appendDouble(i, 0);
}
}
blocks[offset] = timestamps.build();
blocks[offset + 1] = values.build();
blocks[offset + 2] = resets.build().asBlock();
blocks[offset + 2] = sampleCounts.build().asBlock();
blocks[offset + 3] = resets.build().asBlock();
}
}

private static double computeRateWithoutExtrapolate($Type$RateState state, long unitInMillis) {
final int len = state.entries();
assert len >= 2 : "rate requires at least two samples; got " + len;
final long firstTS = state.timestamps[state.timestamps.length - 1];
final long lastTS = state.timestamps[0];
double reset = state.reset;
for (int i = 1; i < len; i++) {
if (state.values[i - 1] < state.values[i]) {
reset += state.values[i];
}
}
final double firstValue = state.values[len - 1];
final double lastValue = state.values[0] + reset;
return (lastValue - firstValue) * unitInMillis / (lastTS - firstTS);
}

/**
* Credit to PromQL for this extrapolation algorithm:
* If samples are close enough to the rangeStart and rangeEnd, we extrapolate the rate all the way to the boundary in question.
* "Close enough" is defined as "up to 10% more than the average duration between samples within the range".
* Essentially, we assume a more or less regular spacing between samples. If we don't see a sample where we would expect one,
* we assume the series does not cover the whole range but starts and/or ends within the range.
* We still extrapolate the rate in this case, but not all the way to the boundary, only by half of the average duration between
* samples (which is our guess for where the series actually starts or ends).
*/
private static double extrapolateRate($Type$RateState state, long rangeStart, long rangeEnd, long unitInMillis) {
final int len = state.entries();
assert len >= 2 : "rate requires at least two samples; got " + len;
final long firstTS = state.timestamps[state.timestamps.length - 1];
final long lastTS = state.timestamps[0];
double reset = state.reset;
for (int i = 1; i < len; i++) {
if (state.values[i - 1] < state.values[i]) {
reset += state.values[i];
}
}
double firstValue = state.values[len - 1];
double lastValue = state.values[0] + reset;
final double sampleTS = lastTS - firstTS;
final double averageSampleInterval = sampleTS / state.sampleCount;
final double slope = (lastValue - firstValue) / sampleTS;
double startGap = firstTS - rangeStart;
if (startGap > 0) {
if (startGap > averageSampleInterval * 1.1) {
startGap = averageSampleInterval / 2.0;
}
firstValue = Math.max(0.0, firstValue - startGap * slope);
}
double endGap = rangeEnd - lastTS;
if (endGap > 0) {
if (endGap > averageSampleInterval * 1.1) {
endGap = averageSampleInterval / 2.0;
}
lastValue = lastValue + endGap * slope;
}
return (lastValue - firstValue) * unitInMillis / (rangeEnd - rangeStart);
}

Block evaluateFinal(IntVector selected, BlockFactory blockFactory) {
Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
int positionCount = selected.getPositionCount();
try (DoubleBlock.Builder rates = blockFactory.newDoubleBlockBuilder(positionCount)) {
try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) {
for (int p = 0; p < positionCount; p++) {
final var groupId = selected.getInt(p);
final var state = groupId < states.size() ? states.get(groupId) : null;
if (state == null) {
if (state == null || state.sampleCount < 2) {
rates.appendNull();
continue;
}
int len = state.entries();
long dt = state.timestamps[0] - state.timestamps[len - 1];
if (dt == 0) {
// TODO: maybe issue warning when we don't have enough sample?
rates.appendNull();
final double rate;
if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) {
rate = extrapolateRate(
state,
tsContext.rangeStartInMillis(groupId),
tsContext.rangeEndInMillis(groupId),
unitInMillis
);
} else {
double reset = state.reset;
for (int i = 1; i < len; i++) {
if (state.values[i - 1] < state.values[i]) {
reset += state.values[i];
}
}
double dv = state.values[0] - state.values[len - 1] + reset;
rates.appendDouble(dv * unitInMillis / dt);
rate = computeRateWithoutExtrapolate(state, unitInMillis);
}
rates.appendDouble(rate);
}
return rates.build();
}
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ public String describe() {

private final List<GroupingAggregator> aggregators;

private final DriverContext driverContext;
protected final DriverContext driverContext;

/**
* Nanoseconds this operator has spent hashing grouping keys.
@@ -226,7 +226,7 @@ public void finish() {
blocks = new Block[keys.length + Arrays.stream(aggBlockCounts).sum()];
System.arraycopy(keys, 0, blocks, 0, keys.length);
int offset = keys.length;
var evaluationContext = new GroupingAggregatorEvaluationContext(driverContext);
var evaluationContext = evaluationContext(keys);
for (int i = 0; i < aggregators.size(); i++) {
var aggregator = aggregators.get(i);
aggregator.evaluate(blocks, offset, selected, evaluationContext);
@@ -245,6 +245,10 @@ public void finish() {
}
}

protected GroupingAggregatorEvaluationContext evaluationContext(Block[] keys) {
return new GroupingAggregatorEvaluationContext(driverContext);
}

@Override
public boolean isFinished() {
return finished && output == null;
Original file line number Diff line number Diff line change
@@ -11,7 +11,12 @@
import org.elasticsearch.compute.Describable;
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.aggregation.GroupingAggregator;
import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext;
import org.elasticsearch.compute.aggregation.TimeSeriesGroupingAggregatorEvaluationContext;
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.LongBlock;

import java.util.List;
import java.util.function.Supplier;
@@ -67,4 +72,23 @@ public TimeSeriesAggregationOperator(
super(aggregators, blockHash, driverContext);
this.timeBucket = timeBucket;
}

@Override
protected GroupingAggregatorEvaluationContext evaluationContext(Block[] keys) {
if (keys.length < 2) {
return super.evaluationContext(keys);
}
final LongBlock timestamps = keys[0].elementType() == ElementType.LONG ? (LongBlock) keys[0] : (LongBlock) keys[1];
return new TimeSeriesGroupingAggregatorEvaluationContext(driverContext) {
@Override
public long rangeStartInMillis(int groupId) {
return timestamps.getLong(groupId);
}

@Override
public long rangeEndInMillis(int groupId) {
return timeBucket.nextRoundingValue(timestamps.getLong(groupId));
}
};
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -77,53 +77,81 @@ required_capability: metrics_command
TS k8s | STATS max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute) | SORT time_bucket DESC | LIMIT 2;

max(rate(network.total_bytes_in)): double | time_bucket:date
10.594594594594595 | 2024-05-10T00:20:00.000Z
6.980660660660663 | 2024-05-10T00:20:00.000Z
23.702205882352942 | 2024-05-10T00:15:00.000Z
;

twoRatesWithBucket
required_capability: metrics_command
TS k8s | STATS max(rate(network.total_bytes_in)), sum(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute) | SORT time_bucket DESC | LIMIT 3;

max(rate(network.total_bytes_in)): double | sum(rate(network.total_bytes_in)): double | time_bucket:date
10.594594594594595 | 42.70864495221802 | 2024-05-10T00:20:00.000Z
23.702205882352942 | 112.36715680313907 | 2024-05-10T00:15:00.000Z
17.90625 | 85.18387414067914 | 2024-05-10T00:10:00.000Z
max(rate(network.total_bytes_in)):double | sum(rate(network.total_bytes_in)):double | time_bucket:datetime
6.980660660660663 | 23.959973363184154 | 2024-05-10T00:20:00.000Z
23.702205882352942 | 94.9517511187984 | 2024-05-10T00:15:00.000Z
14.97039381153305 | 63.00652190262822 | 2024-05-10T00:10:00.000Z
;


oneRateWithBucketAndCluster
required_capability: metrics_command
TS k8s | STATS max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC, cluster | LIMIT 6;

max(rate(network.total_bytes_in)): double | time_bucket:date | cluster: keyword
10.594594594594595 | 2024-05-10T00:20:00.000Z | prod
5.586206896551724 | 2024-05-10T00:20:00.000Z | qa
5.37037037037037 | 2024-05-10T00:20:00.000Z | staging
15.913978494623656 | 2024-05-10T00:15:00.000Z | prod
23.702205882352942 | 2024-05-10T00:15:00.000Z | qa
9.823232323232324 | 2024-05-10T00:15:00.000Z | staging
max(rate(network.total_bytes_in)):double | time_bucket:datetime | cluster:keyword
6.980660660660663 | 2024-05-10T00:20:00.000Z | prod
4.05 | 2024-05-10T00:20:00.000Z | qa
3.19 | 2024-05-10T00:20:00.000Z | staging
11.860805860805861 | 2024-05-10T00:15:00.000Z | prod
23.702205882352942 | 2024-05-10T00:15:00.000Z | qa
7.784911616161616 | 2024-05-10T00:15:00.000Z | staging
;

BytesAndCostByBucketAndCluster
required_capability: metrics_command
TS k8s | STATS max(rate(network.total_bytes_in)), max(network.cost) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC, cluster | LIMIT 6;

max(rate(network.total_bytes_in)): double | max(network.cost): double | time_bucket:date | cluster: keyword
10.594594594594595 | 10.75 | 2024-05-10T00:20:00.000Z | prod
5.586206896551724 | 11.875 | 2024-05-10T00:20:00.000Z | qa
5.37037037037037 | 9.5 | 2024-05-10T00:20:00.000Z | staging
15.913978494623656 | 12.375 | 2024-05-10T00:15:00.000Z | prod
6.980660660660663 | 10.75 | 2024-05-10T00:20:00.000Z | prod
4.05 | 11.875 | 2024-05-10T00:20:00.000Z | qa
3.19 | 9.5 | 2024-05-10T00:20:00.000Z | staging
11.860805860805861 | 12.375 | 2024-05-10T00:15:00.000Z | prod
23.702205882352942 | 12.125 | 2024-05-10T00:15:00.000Z | qa
9.823232323232324 | 11.5 | 2024-05-10T00:15:00.000Z | staging
7.784911616161616 | 11.5 | 2024-05-10T00:15:00.000Z | staging
;

oneRateWithBucketAndClusterThenFilter
required_capability: metrics_command
TS k8s | WHERE cluster=="prod" | STATS max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC | LIMIT 3;

max(rate(network.total_bytes_in)): double | time_bucket:date | cluster: keyword
10.594594594594595 | 2024-05-10T00:20:00.000Z | prod
15.913978494623656 | 2024-05-10T00:15:00.000Z | prod
11.562737642585551 | 2024-05-10T00:10:00.000Z | prod
max(rate(network.total_bytes_in)):double | time_bucket:datetime | cluster:keyword
6.980660660660663 | 2024-05-10T00:20:00.000Z | prod
11.860805860805861 | 2024-05-10T00:15:00.000Z | prod
11.562737642585551 | 2024-05-10T00:10:00.000Z | prod
;


oneRateWithBucketAndClusterThenFilter
required_capability: metrics_command
TS k8s | WHERE cluster=="prod" | STATS max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC | LIMIT 3;

max(rate(network.total_bytes_in)):double | time_bucket:datetime | cluster:keyword
6.980660660660663 | 2024-05-10T00:20:00.000Z | prod
11.860805860805861 | 2024-05-10T00:15:00.000Z | prod
11.562737642585551 | 2024-05-10T00:10:00.000Z | prod
;

notEnoughSamples
required_capability: metrics_command
TS k8s | WHERE @timestamp <= "2024-05-10T00:06:14.000Z" | STATS max(rate(network.total_bytes_in)) BY pod, time_bucket = bucket(@timestamp,1minute) | SORT pod, time_bucket DESC | LIMIT 10;

max(rate(network.total_bytes_in)):double | pod:keyword | time_bucket:datetime
null | one | 2024-05-10T00:06:00.000Z
0.075 | one | 2024-05-10T00:05:00.000Z
null | one | 2024-05-10T00:04:00.000Z
16.45 | one | 2024-05-10T00:03:00.000Z
null | one | 2024-05-10T00:01:00.000Z
null | three | 2024-05-10T00:06:00.000Z
null | three | 2024-05-10T00:05:00.000Z
1.534413580246913 | three | 2024-05-10T00:03:00.000Z
null | three | 2024-05-10T00:02:00.000Z
null | three | 2024-05-10T00:01:00.000Z
;
Original file line number Diff line number Diff line change
@@ -365,6 +365,7 @@ record RateKey(String cluster, String host) {
}
}

@AwaitsFix(bugUrl = "removed?")
public void testRateWithTimeBucket() {
var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown();
record RateKey(String host, String cluster, long interval) {}
@@ -459,6 +460,7 @@ record RateKey(String host, String cluster, long interval) {}
}
}

@AwaitsFix(bugUrl = "removed?")
public void testRateWithTimeBucketAndCluster() {
var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown();
record RateKey(String host, String cluster, long interval) {}