Skip to content

[TSDB] Add support for downsampling aggregate_metric_double fields #90029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3af2f77
Use XContentBuilder for index mapping
csoulios Sep 5, 2022
88ce0d2
Merge branch 'main' into tsdb-rollup-of-rollups
csoulios Sep 7, 2022
5bde2a6
Merge branch 'main' into tsdb-rollup-of-rollups
csoulios Sep 8, 2022
695da28
Added aggregate_metric_double field
csoulios Sep 8, 2022
977eeb2
initial support for rollup of rollups
csoulios Sep 8, 2022
d582542
Merge branch 'main' into tsdb-rollup-of-rollups
csoulios Sep 9, 2022
d2befc7
Merge branch 'main' into tsdb-rollup-of-rollups
csoulios Sep 12, 2022
146c924
Add fetcher support for aggregate_double_metric
csoulios Sep 12, 2022
c10fd78
Collect aggregate_metric_double fields
csoulios Sep 13, 2022
cebdc91
Cleanup
csoulios Sep 13, 2022
6c4e297
Fix gradle dependencies
csoulios Sep 13, 2022
07d02d2
Added testRollupOfRollups test
csoulios Sep 13, 2022
fde224f
Format test
csoulios Sep 13, 2022
32927a3
javadoc
csoulios Sep 13, 2022
548c96c
Merge branch 'main' into tsdb-rollup-of-rollups
csoulios Sep 14, 2022
fc27e34
Merge branch 'main' into tsdb-rollup-of-rollups
csoulios Sep 15, 2022
bf4ded9
Create fetcher only if mapped field
csoulios Sep 15, 2022
fe63511
Removed fuction that was not used
csoulios Sep 15, 2022
ee82647
Support aggregate_metric_double label fields
csoulios Sep 15, 2022
dd390af
cleanup
csoulios Sep 15, 2022
253c88a
Merge branch 'main' into tsdb-rollup-of-rollups
csoulios Sep 19, 2022
5bc0cb4
Validate downsampling interval
csoulios Sep 19, 2022
08c8fe5
Add tests for rollups of rollups
csoulios Sep 19, 2022
b39ee34
Use aggregate_metric field mapper as plugin
csoulios Sep 19, 2022
113bf91
Merge branch 'main' into tsdb-rollup-of-rollups
csoulios Sep 20, 2022
6c1597e
Removed unused dependency
csoulios Sep 20, 2022
69664b0
Move FieldValueFetcher into server module
csoulios Sep 20, 2022
f52f2d0
Revert "Move FieldValueFetcher into server module"
csoulios Sep 20, 2022
48e7bf9
Merge branch 'main' into tsdb-rollup-of-rollups
csoulios Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ private void setMetricFields(EnumMap<Metric, NumberFieldMapper.NumberFieldType>
this.metricFields = metricFields;
}

public Map<Metric, NumberFieldMapper.NumberFieldType> getMetricFields() {
return Collections.unmodifiableMap(metricFields);
}

public void addMetricField(Metric m, NumberFieldMapper.NumberFieldType subfield) {
if (metricFields == null) {
metricFields = new EnumMap<>(AggregateDoubleMetricFieldMapper.Metric.class);
Expand Down
5 changes: 2 additions & 3 deletions x-pack/plugin/rollup/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ archivesBaseName = 'x-pack-rollup'
dependencies {
compileOnly project(":server")
compileOnly project(path: xpackModule('core'))
compileOnly project(path: xpackModule('analytics'))
compileOnly project(path: xpackModule('mapper-aggregate-metric'))
compileOnly project(path: xpackModule('ilm'))
compileOnly project(':modules:data-streams')
compileOnly project(path: xpackModule('ilm'))
api project(path: xpackModule('mapper-aggregate-metric'))
testImplementation(testArtifact(project(xpackModule('core'))))
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugin/rollup/qa/rest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies {

restResources {
restApi {
include '_common', 'bulk', 'cluster', 'indices', 'search', 'downsample'
include '_common', 'bulk', 'cluster', 'indices', 'search'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ setup:
index.blocks.write: true

---
"Rollup index":
"Downsample index":
- skip:
version: " - 8.4.99"
reason: "rollup renamed to downsample in 8.5.0"
Expand Down Expand Up @@ -141,6 +141,24 @@ setup:
- match: { rollup-test.settings.index.number_of_shards: "1" }
- match: { rollup-test.settings.index.number_of_replicas: "0" }

# Assert rollup index mapping
- do:
indices.get_mapping:
index: rollup-test

- match: { [email protected]: date }
- match: { [email protected]_interval: 1h }
- match: { [email protected]_zone: UTC }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.type: aggregate_metric_double }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.metrics: [ "min", "max", "sum", "value_count" ] }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.default_metric: max }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.time_series_metric: gauge }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-counter.type: long }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-counter.time_series_metric: counter }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.uid.type: keyword }
- match: { rollup-test.mappings.properties.k8s.properties.pod.properties.uid.time_series_dimension: true }


# Assert source index has not been deleted
- do:
indices.get:
Expand All @@ -156,7 +174,7 @@ setup:
- match: { indices.rollup-test.shards.0.0.num_search_segments: 1}

---
"Rollup non-existing index":
"Downsample non-existing index":
- skip:
version: " - 8.4.99"
reason: "rollup renamed to downsample in 8.5.0"
Expand All @@ -172,7 +190,7 @@ setup:
}

---
"Rollup to existing rollup index":
"Downsample to existing index":
- skip:
version: " - 8.4.99"
reason: "rollup renamed to downsample in 8.5.0"
Expand All @@ -192,7 +210,7 @@ setup:
}

---
"Rollup not time_series index":
"Downsample not time_series index":
- skip:
version: " - 8.4.99"
reason: "rollup renamed to downsample in 8.5.0"
Expand All @@ -213,7 +231,7 @@ setup:


---
"Rollup no metric index":
"Downsample no metric index":
- skip:
version: " - 8.4.99"
reason: "rollup renamed to downsample in 8.5.0"
Expand Down Expand Up @@ -254,3 +272,138 @@ setup:
{
"fixed_interval": "1h"
}

---
"Downsample a downsampled index":
- skip:
version: " - 8.4.99"
reason: "Rollup of rollups introduced in 8.5.0"

- do:
indices.downsample:
index: test
target_index: rollup-test
body: >
{
"fixed_interval": "1h"
}
- is_true: acknowledged

- do:
indices.downsample:
index: rollup-test
target_index: rollup-test-2
body: >
{
"fixed_interval": "2h"
}
- is_true: acknowledged


# Assert rollup index mapping
- do:
indices.get_mapping:
index: rollup-test-2

- match: { [email protected]: date }
- match: { [email protected]_interval: 2h }
- match: { [email protected]_zone: UTC }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.type: aggregate_metric_double }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.metrics: [ "min", "max", "sum", "value_count" ] }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.default_metric: max }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.time_series_metric: gauge }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-counter.type: long }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-counter.time_series_metric: counter }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.uid.type: keyword }
- match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.uid.time_series_dimension: true }

- do:
search:
index: rollup-test-2
body:
sort: [ "_tsid", "@timestamp" ]

- length: { hits.hits: 3 }
- match: { hits.hits.0._source._doc_count: 2 }
- match: { hits.hits.0._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 }
- match: { hits.hits.0._source.metricset: pod }
- match: { hits.hits.0._source.@timestamp: 2021-04-28T18:00:00.000Z }
- match: { hits.hits.0._source.k8s\.pod\.multi-counter: 21 }
# - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.min: 90 }
# - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.max: 200 }
# - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.sum: 726 }
# - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.value_count: 6 }
# - match: { hits.hits.0._source.k8s\.pod\.network\.tx.min: 2001818691 }
# - match: { hits.hits.0._source.k8s\.pod\.network\.tx.max: 2005177954 }
# - match: { hits.hits.0._source.k8s\.pod\.network\.tx.value_count: 2 }
- match: { hits.hits.0._source.k8s\.pod\.ip: "10.10.55.26" }
- match: { hits.hits.0._source.k8s\.pod\.created_at: "2021-04-28T19:35:00.000Z" }
- match: { hits.hits.0._source.k8s\.pod\.number_of_containers: 2 }
- match: { hits.hits.0._source.k8s\.pod\.tags: [ "backend", "prod", "us-west1" ] }
- match: { hits.hits.0._source.k8s\.pod\.values: [ 1, 1, 3 ] }

- match: { hits.hits.1._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 }
- match: { hits.hits.1._source.metricset: pod }
- match: { hits.hits.1._source.@timestamp: 2021-04-28T20:00:00.000Z }
- match: { hits.hits.1._source._doc_count: 2 }

- match: { hits.hits.2._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 }
- match: { hits.hits.2._source.metricset: pod }
- match: { hits.hits.2._source.@timestamp: 2021-04-28T18:00:00.000Z }
- match: { hits.hits.2._source._doc_count: 4 }

- do:
indices.downsample:
index: rollup-test
target_index: rollup-test-3
body: >
{
"fixed_interval": "180m"
}
- is_true: acknowledged

---
"Downsample a downsampled index with wrong intervals":
- skip:
version: " - 8.4.99"
reason: "Rollup of rollups introduced in 8.5.0"

- do:
indices.downsample:
index: test
target_index: rollup-test
body: >
{
"fixed_interval": "1h"
}
- is_true: acknowledged

- do:
catch: /Downsampling interval \[1h\] must be greater than the the source index interval \[1h\]/
indices.downsample:
index: rollup-test
target_index: rollup-test-2
body: >
{
"fixed_interval": "1h"
}

- do:
catch: /Downsampling interval \[30m\] must be greater than the the source index interval \[1h\]/
indices.downsample:
index: rollup-test
target_index: rollup-test-2
body: >
{
"fixed_interval": "30m"
}

- do:
catch: /Downsampling interval \[90m\] must be a multiple of the source index interval \[1h\]/
indices.downsample:
index: rollup-test
target_index: rollup-test-2
body: >
{
"fixed_interval": "90m"
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Base class for classes that read metric and label fields.
*/
Expand All @@ -22,9 +26,11 @@ abstract class AbstractRollupFieldProducer<T> {

/**
* Collect a value for the field applying the specific subclass collection strategy.
*
* @param field the name of the field to collect
* @param value the value to collect.
*/
public abstract void collect(T value);
public abstract void collect(String field, T value);

/**
* @return the name of the field.
Expand All @@ -34,14 +40,14 @@ public String name() {
}

/**
* @return the value of the field.
* Resets the producer to an empty value.
*/
public abstract Object value();
public abstract void reset();

/**
* Resets the collected value to the specific subclass reset value.
* Serialize the downsampled value of the field.
*/
public abstract void reset();
public abstract void write(XContentBuilder builder) throws IOException;

/**
* @return true if the field has not collected any value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
package org.elasticsearch.xpack.downsample;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.Map;

/**
* Utility class used for fetching field values by reading field data
Expand Down Expand Up @@ -55,42 +56,39 @@ FormattedDocValues getLeaf(LeafReaderContext context) {
return fieldData.load(context).getFormattedValues(format);
}

Object format(Object value) {
if (value instanceof Long l) {
return format.format(l);
} else if (value instanceof Double d) {
return format.format(d);
} else if (value instanceof BytesRef b) {
return format.format(b);
} else if (value instanceof String s) {
return s;
} else {
throw new IllegalArgumentException("Invalid type: [" + value.getClass() + "]");
}
}

/**
* Retrieve field fetchers for a list of fields.
* Retrieve field value fetchers for a list of fields.
*/
private static List<FieldValueFetcher> build(SearchExecutionContext context, String[] fields) {
List<FieldValueFetcher> fetchers = new ArrayList<>(fields.length);
static Map<String, FieldValueFetcher> create(SearchExecutionContext context, String[] fields) {
Map<String, FieldValueFetcher> fetchers = new LinkedHashMap<>();
for (String field : fields) {
MappedFieldType fieldType = context.getFieldType(field);
if (fieldType == null) {
throw new IllegalArgumentException("Unknown field: [" + field + "]");
assert fieldType != null : "Unknown field type for field: [" + field + "]";

if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) {
// If the field is an aggregate_metric_double field, we should load all its subfields
// This is a rollup-of-rollup case
for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields().values()) {
if (context.fieldExistsInIndex(metricSubField.name())) {
IndexFieldData<?> fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH);
fetchers.put(metricSubField.name(), new FieldValueFetcher(metricSubField.name(), fieldType, fieldData));
}
}
} else {
if (context.fieldExistsInIndex(field)) {
IndexFieldData<?> fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH);
fetchers.put(field, new FieldValueFetcher(field, fieldType, fieldData));
}
}
IndexFieldData<?> fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH);
fetchers.add(new FieldValueFetcher(field, fieldType, fieldData));
}
return Collections.unmodifiableList(fetchers);
return Collections.unmodifiableMap(fetchers);
}

static List<FieldValueFetcher> forMetrics(SearchExecutionContext context, String[] metricFields) {
return build(context, metricFields);
}

static List<FieldValueFetcher> forLabels(SearchExecutionContext context, String[] labelFields) {
return build(context, labelFields);
static Map<String, FormattedDocValues> docValuesFetchers(LeafReaderContext ctx, Map<String, FieldValueFetcher> fieldValueFetchers) {
final Map<String, FormattedDocValues> docValuesFetchers = new LinkedHashMap<>(fieldValueFetchers.size());
for (FieldValueFetcher fetcher : fieldValueFetchers.values()) {
docValuesFetchers.put(fetcher.name(), fetcher.getLeaf(ctx));
}
return Collections.unmodifiableMap(docValuesFetchers);
}

}
Loading