Skip to content

[WIP] TSDB: add a low-level time series aggregation to support promQL #86097

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 58 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
14cd560
add time_series_aggregation
weizijun Apr 7, 2022
54abfa0
Merge branch 'master' into add-time_series-aggregation
weizijun Apr 21, 2022
c0e19d5
add tests and fixed error code
weizijun Apr 21, 2022
8503e18
add comments
weizijun Apr 22, 2022
6133e42
spotless
weizijun Apr 22, 2022
7255f4a
optimize code
weizijun Apr 22, 2022
9568953
support down sample range
weizijun Apr 25, 2022
7577368
support offset
weizijun Apr 26, 2022
984fc2d
fixed last exception and support without group
weizijun Apr 27, 2022
837b32f
fixed empty bucket
weizijun May 10, 2022
23f3a8c
[WIP] add rate support
weizijun May 12, 2022
9caaad6
add rate support
weizijun May 12, 2022
9ad347b
add rate support
weizijun May 12, 2022
354cdb8
add rate support
weizijun May 12, 2022
993f6f7
add irate support
weizijun May 12, 2022
eab24bf
revert test
weizijun May 12, 2022
10ceecd
add rate support
weizijun May 12, 2022
dbfe860
fixed no_op error
weizijun May 13, 2022
96f453d
code improve
weizijun May 16, 2022
4d1285a
fix range boundary
weizijun May 19, 2022
c2737e3
aggregator\function support params
weizijun May 20, 2022
cf82e65
Merge branch 'master' into add-time_series-aggregation
elasticmachine May 20, 2022
7c73c9a
support topk
weizijun May 23, 2022
c1b97b5
support topk
weizijun May 23, 2022
c8ec769
support count_values
weizijun May 23, 2022
d7449ba
support quantile
weizijun May 23, 2022
ca23fa4
support count_values
weizijun May 23, 2022
6374917
support more function: abs,ceil,floor,exp,sqrt,ln,log10,sin,cos,tan,…
weizijun May 24, 2022
7891163
fix failed check
weizijun May 24, 2022
159f32a
fixup
weizijun May 24, 2022
d29c0d5
fixup
weizijun May 25, 2022
0a8113f
function param improve
weizijun May 25, 2022
9eaa330
function param improve
weizijun May 25, 2022
a479a50
return NaN when value is invaild
weizijun May 26, 2022
868098e
Merge branch 'master' into add-time_series-aggregation
elasticmachine May 27, 2022
112c7ab
improve aggregate_metric_double
weizijun May 27, 2022
309ea72
support get origin value
weizijun May 27, 2022
15fc22e
revert
weizijun May 27, 2022
4cd2be4
function add value type
weizijun May 30, 2022
aad754d
improve rounding
weizijun May 31, 2022
6431934
improve rounding
weizijun May 31, 2022
7a26c13
improve rounding
weizijun May 31, 2022
15da69f
improve rounding
weizijun May 31, 2022
6b106db
fix TimeSeriesLineAggreagation NPE
weizijun May 31, 2022
5d0fbd1
fix rate and count
weizijun Jun 1, 2022
8e3956a
fix start_time and end_time
weizijun Jun 1, 2022
0f79448
fix memory leak
weizijun Jun 2, 2022
fdebd07
fixup
weizijun Jun 9, 2022
0113910
memory improve
weizijun Jun 13, 2022
703546d
Merge branch 'master' into add-time_series-aggregation
elasticmachine Jun 15, 2022
04f9186
Merge branch 'main' into add-time_series-aggregation
elasticmachine Aug 12, 2022
dae6fc8
support deferring
weizijun Aug 16, 2022
6e7c4d2
improve fetch tsid
weizijun Aug 18, 2022
4a75da2
fix aggregate_metric_double agg
weizijun Aug 18, 2022
a0f29f5
fixup
weizijun Aug 18, 2022
d506616
rm old aggregator
weizijun Aug 18, 2022
9bf11ac
remove deferring parameter
weizijun Aug 19, 2022
6898e84
fix rounding
weizijun Aug 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -25,10 +28,14 @@
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.Stats;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.pipeline.SimpleValue;
import org.elasticsearch.search.aggregations.timeseries.TimeSeries;
import org.elasticsearch.search.aggregations.timeseries.aggregation.Function;
import org.elasticsearch.search.aggregations.timeseries.aggregation.InternalTimeSeriesAggregation;
import org.elasticsearch.search.aggregations.timeseries.aggregation.internal.TimeSeriesLineAggreagation;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -40,6 +47,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -50,6 +58,7 @@
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.aggregations.AggregationBuilders.timeSeries;
import static org.elasticsearch.search.aggregations.AggregationBuilders.timeSeriesAggregation;
import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
Expand Down Expand Up @@ -521,4 +530,212 @@ public void testGetHitsFailure() throws Exception {

}

public void testBasicTimeSeriesAggregations() {
DateHistogramInterval fixedInterval = DateHistogramInterval.days(randomIntBetween(10, 100));
Rounding.Prepared rounding = Rounding.builder(new TimeValue(fixedInterval.estimateMillis())).build().prepareForUnknown();
SearchResponse response = client().prepareSearch("index")
.setSize(0)
.addAggregation(timeSeriesAggregation("by_ts").field("metric_0").interval(fixedInterval).size(data.size()))
.get();
Aggregations aggregations = response.getAggregations();
assertNotNull(aggregations);
InternalTimeSeriesAggregation timeSeries = aggregations.get("by_ts");
assertThat(
timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()),
equalTo(data.keySet())
);
for (InternalTimeSeriesAggregation.InternalBucket bucket : timeSeries.getBuckets()) {
Map<String, Object> key = bucket.getKey();
Map<Long, Map<String, Double>> dataValues = data.get(key);
assertThat((long) dataValues.size(), equalTo(bucket.getDocCount()));
Map<Long, Tuple<Double, Long>> dataBucketValues = new HashMap<>();
dataValues.forEach((timestamp, metrics) -> {
long roundValue = rounding.nextRoundingValue(timestamp);
if (dataBucketValues.containsKey(roundValue)) {
if (timestamp > dataBucketValues.get(roundValue).v2()) {
dataBucketValues.put(roundValue, new Tuple<>(metrics.get("metric_0"), timestamp));
}
} else {
dataBucketValues.put(roundValue, new Tuple<>(metrics.get("metric_0"), timestamp));
}
});

dataBucketValues.forEach((timestamp, value) -> {
assertTrue(((TimeSeriesLineAggreagation) bucket.getMetricAggregation()).getTimeBucketValues().containsKey(timestamp));
InternalAggregation aggregation = ((TimeSeriesLineAggreagation) bucket.getMetricAggregation()).getTimeBucketValues()
.get(timestamp);
assertTrue(aggregation instanceof InternalNumericMetricsAggregation.SingleValue);
assertThat(((InternalNumericMetricsAggregation.SingleValue) aggregation).value(), closeTo(value.v1(), 0.0001d));
});
}
}

public void testTimeSeriesAggregationsDownsample() {
DateHistogramInterval fixedInterval = DateHistogramInterval.days(randomIntBetween(10, 100));
Rounding.Prepared rounding = Rounding.builder(new TimeValue(fixedInterval.estimateMillis())).build().prepareForUnknown();
SearchResponse response = client().prepareSearch("index")
.setSize(0)
.addAggregation(
timeSeriesAggregation("by_ts").field("metric_0")
.interval(fixedInterval)
.downsample(fixedInterval, Function.sum_over_time, null)
.size(data.size())
)
.get();
Aggregations aggregations = response.getAggregations();
assertNotNull(aggregations);
InternalTimeSeriesAggregation timeSeries = aggregations.get("by_ts");
assertThat(
timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()),
equalTo(data.keySet())
);
for (InternalTimeSeriesAggregation.InternalBucket bucket : timeSeries.getBuckets()) {
Map<String, Object> key = bucket.getKey();
Map<Long, Map<String, Double>> dataValues = data.get(key);
assertThat((long) dataValues.size(), equalTo(bucket.getDocCount()));
Map<Long, Double> dataBucketValues = new HashMap<>();
dataValues.forEach((timestamp, metrics) -> {
long roundValue = rounding.nextRoundingValue(timestamp);
if (dataBucketValues.containsKey(roundValue)) {
dataBucketValues.put(roundValue, dataBucketValues.get(roundValue) + metrics.get("metric_0"));
} else {
dataBucketValues.put(roundValue, metrics.get("metric_0"));
}
});

dataBucketValues.forEach((timestamp, value) -> {
assertTrue(((TimeSeriesLineAggreagation) bucket.getMetricAggregation()).getTimeBucketValues().containsKey(timestamp));
InternalAggregation aggregation = ((TimeSeriesLineAggreagation) bucket.getMetricAggregation()).getTimeBucketValues()
.get(timestamp);
assertTrue(aggregation instanceof InternalNumericMetricsAggregation.SingleValue);
assertThat(((InternalNumericMetricsAggregation.SingleValue) aggregation).value(), closeTo(value, 0.0001d));
});
}
}

public void testTimeSeriesAggregationsAggregator() {
DateHistogramInterval fixedInterval = DateHistogramInterval.days(randomIntBetween(10, 100));
Rounding.Prepared rounding = Rounding.builder(new TimeValue(fixedInterval.estimateMillis())).build().prepareForUnknown();
SearchResponse response = client().prepareSearch("index")
.setSize(0)
.addAggregation(timeSeriesAggregation("by_ts").field("metric_0").interval(fixedInterval).aggregator("sum").size(data.size()))
.get();
Aggregations aggregations = response.getAggregations();
assertNotNull(aggregations);
InternalTimeSeriesAggregation timeSeries = aggregations.get("by_ts");
assertThat(timeSeries.getBuckets().size(), equalTo(1));
assertThat(
timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()),
equalTo(Set.of(Map.of()))
);

Map<Long, Double> aggResults = new HashMap<>();
data.forEach((key, value) -> {
Map<Long, Tuple<Double, Long>> downsampleResult = new HashMap<>();
value.forEach((timestamp, metrics) -> {
long roundValue = rounding.nextRoundingValue(timestamp);
if (downsampleResult.containsKey(roundValue)) {
if (timestamp > downsampleResult.get(roundValue).v2()) {
downsampleResult.put(roundValue, new Tuple<>(metrics.get("metric_0"), timestamp));
}
} else {
downsampleResult.put(roundValue, new Tuple<>(metrics.get("metric_0"), timestamp));
}
});

for (Entry<Long, Tuple<Double, Long>> entry : downsampleResult.entrySet()) {
Long timestamp = entry.getKey();
Double downsampleValue = entry.getValue().v1();
if (aggResults.containsKey(timestamp)) {
aggResults.put(timestamp, aggResults.get(timestamp) + downsampleValue);
} else {
aggResults.put(timestamp, downsampleValue);
}
}
});

for (InternalTimeSeriesAggregation.InternalBucket bucket : timeSeries.getBuckets()) {
aggResults.forEach((timestamp, metric) -> {
assertTrue(((TimeSeriesLineAggreagation) bucket.getMetricAggregation()).getTimeBucketValues().containsKey(timestamp));
InternalAggregation aggregation = ((TimeSeriesLineAggreagation) bucket.getMetricAggregation()).getTimeBucketValues()
.get(timestamp);
assertTrue(aggregation instanceof InternalNumericMetricsAggregation.SingleValue);
assertThat(((InternalNumericMetricsAggregation.SingleValue) aggregation).value(), closeTo(metric, 0.0001d));
});
}
}

public void testTimeSeriesAggregationsGroupBy() {
DateHistogramInterval fixedInterval = DateHistogramInterval.days(randomIntBetween(10, 100));
Rounding.Prepared rounding = Rounding.builder(new TimeValue(fixedInterval.estimateMillis())).build().prepareForUnknown();
SearchResponse response = client().prepareSearch("index")
.setSize(0)
.addAggregation(
timeSeriesAggregation("by_ts").field("metric_0")
.group(List.of("dim_0"))
.interval(fixedInterval)
.downsample(fixedInterval, Function.max_over_time, null)
.aggregator("sum")
.size(data.size())
)
.get();
Aggregations aggregations = response.getAggregations();
assertNotNull(aggregations);
InternalTimeSeriesAggregation timeSeries = aggregations.get("by_ts");
assertThat(
timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()),
equalTo(
data.keySet()
.stream()
.map(key -> key.get("dim_0"))
.collect(Collectors.toSet())
.stream()
.map(key -> Map.of("dim_0", key))
.collect(Collectors.toSet())
)
);

Map<Map<String, String>, Map<Long, Double>> aggResults = new HashMap<>();
data.forEach((key, value) -> {
String dim = key.get("dim_0");
Map<String, String> bucketKey = Map.of("dim_0", dim);
Map<Long, Double> bucketResult = aggResults.get(bucketKey);
if (bucketResult == null) {
bucketResult = new HashMap<>();
aggResults.put(bucketKey, bucketResult);
}

Map<Long, Double> downsampleResult = new HashMap<>();
value.forEach((timestamp, metrics) -> {
long roundValue = rounding.nextRoundingValue(timestamp);
if (downsampleResult.containsKey(roundValue)) {
downsampleResult.put(roundValue, Math.max(downsampleResult.get(roundValue), metrics.get("metric_0")));
} else {
downsampleResult.put(roundValue, metrics.get("metric_0"));
}
});

for (Entry<Long, Double> entry : downsampleResult.entrySet()) {
Long timestamp = entry.getKey();
Double downsampleValue = entry.getValue();
if (bucketResult.containsKey(timestamp)) {
bucketResult.put(timestamp, bucketResult.get(timestamp) + downsampleValue);
} else {
bucketResult.put(timestamp, downsampleValue);
}
}
});

for (InternalTimeSeriesAggregation.InternalBucket bucket : timeSeries.getBuckets()) {
Map<String, Object> key = bucket.getKey();
Map<Long, Double> dataValues = aggResults.get(key);
dataValues.forEach((timestamp, metric) -> {
assertTrue(((TimeSeriesLineAggreagation) bucket.getMetricAggregation()).getTimeBucketValues().containsKey(timestamp));
InternalAggregation aggregation = ((TimeSeriesLineAggreagation) bucket.getMetricAggregation()).getTimeBucketValues()
.get(timestamp);
assertTrue(aggregation instanceof InternalNumericMetricsAggregation.SingleValue);
assertThat(((InternalNumericMetricsAggregation.SingleValue) aggregation).value(), closeTo(metric, 0.0001d));
});
}
}
}
28 changes: 28 additions & 0 deletions server/src/main/java/org/elasticsearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.timeseries.InternalTimeSeries;
import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder;
import org.elasticsearch.search.aggregations.timeseries.aggregation.InternalTimeSeriesAggregation;
import org.elasticsearch.search.aggregations.timeseries.aggregation.TimeSeriesAggregationAggregationBuilder;
import org.elasticsearch.search.aggregations.timeseries.aggregation.internal.TSIDInternalAggregation;
import org.elasticsearch.search.aggregations.timeseries.aggregation.internal.TimeSeriesCountValues;
import org.elasticsearch.search.aggregations.timeseries.aggregation.internal.TimeSeriesIRate;
import org.elasticsearch.search.aggregations.timeseries.aggregation.internal.TimeSeriesLast;
import org.elasticsearch.search.aggregations.timeseries.aggregation.internal.TimeSeriesLineAggreagation;
import org.elasticsearch.search.aggregations.timeseries.aggregation.internal.TimeSeriesOriginValues;
import org.elasticsearch.search.aggregations.timeseries.aggregation.internal.TimeSeriesRate;
import org.elasticsearch.search.aggregations.timeseries.aggregation.internal.TimeSeriesTopk;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.subphase.ExplainPhase;
Expand Down Expand Up @@ -666,6 +676,24 @@ private ValuesSourceRegistry registerAggregations(List<SearchPlugin> plugins) {
).addResultReader(InternalTimeSeries::new),
builder
);

registerAggregation(
new AggregationSpec(
TimeSeriesAggregationAggregationBuilder.NAME,
TimeSeriesAggregationAggregationBuilder::new,
TimeSeriesAggregationAggregationBuilder.PARSER
).addResultReader(InternalTimeSeriesAggregation::new)
.addResultReader(TimeSeriesLast.NAME, TimeSeriesLast::new)
.addResultReader(TSIDInternalAggregation.NAME, TSIDInternalAggregation::new)
.addResultReader(TimeSeriesRate.NAME, TimeSeriesRate::new)
.addResultReader(TimeSeriesIRate.NAME, TimeSeriesIRate::new)
.addResultReader(TimeSeriesLineAggreagation.NAME, TimeSeriesLineAggreagation::new)
.addResultReader(TimeSeriesTopk.NAME, TimeSeriesTopk::new)
.addResultReader(TimeSeriesCountValues.NAME, TimeSeriesCountValues::new)
.addResultReader(TimeSeriesOriginValues.NAME, TimeSeriesOriginValues::new)
.setAggregatorRegistrar(TimeSeriesAggregationAggregationBuilder::registerAggregators),
builder
);
}

if (RestApiVersion.minimumSupported() == RestApiVersion.V_7) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder;
import org.elasticsearch.search.aggregations.timeseries.aggregation.TimeSeriesAggregationAggregationBuilder;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -373,4 +374,7 @@ public static TimeSeriesAggregationBuilder timeSeries(String name) {
return new TimeSeriesAggregationBuilder(name);
}

public static TimeSeriesAggregationAggregationBuilder timeSeriesAggregation(String name) {
return new TimeSeriesAggregationAggregationBuilder(name);
}
}
Loading