diff --git a/x-pack/plugin/mapper-aggregate-metric/build.gradle b/x-pack/plugin/mapper-aggregate-metric/build.gradle index 6220ee78c62a9..35fc7d60b2afe 100644 --- a/x-pack/plugin/mapper-aggregate-metric/build.gradle +++ b/x-pack/plugin/mapper-aggregate-metric/build.gradle @@ -13,15 +13,13 @@ apply plugin: 'elasticsearch.internal-es-plugin' esplugin { name 'x-pack-aggregate-metric' - description 'Module for the aggregate_metric field type, which allows pre-aggregated fields to be stored a single field.' + description 'Module for the aggregate_metric_double field type, which allows pre-aggregated fields to be stored as a single field' classname 'org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin' extendedPlugins = ['x-pack-core'] } archivesBaseName = 'x-pack-aggregate-metric' dependencies { - compileOnly project(":server") - compileOnly project(path: xpackModule('core')) testImplementation(testArtifact(project(xpackModule('core')))) } diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/AggregateMetricMapperPlugin.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/AggregateMetricMapperPlugin.java index e44b65ee2212b..fea55e793d638 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/AggregateMetricMapperPlugin.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/AggregateMetricMapperPlugin.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; @@ -27,7 +28,7 @@ import static java.util.Collections.singletonMap; -public class AggregateMetricMapperPlugin extends Plugin implements MapperPlugin, ActionPlugin, SearchPlugin { +public class AggregateMetricMapperPlugin extends Plugin implements MapperPlugin, ActionPlugin, SearchPlugin, ExtensiblePlugin { @Override public Map getMappers() { diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java index 02c31e668e7ae..d80da1772197e 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java @@ -307,6 +307,10 @@ private void setMetricFields(EnumMap this.metricFields = metricFields; } + public Map getMetricFields() { + return Collections.unmodifiableMap(metricFields); + } + public void addMetricField(Metric m, NumberFieldMapper.NumberFieldType subfield) { if (metricFields == null) { metricFields = new EnumMap<>(AggregateDoubleMetricFieldMapper.Metric.class); diff --git a/x-pack/plugin/rollup/build.gradle b/x-pack/plugin/rollup/build.gradle index 401e178fafca7..e2da0ba0f866e 100644 --- a/x-pack/plugin/rollup/build.gradle +++ b/x-pack/plugin/rollup/build.gradle @@ -1,22 +1,18 @@ -import org.elasticsearch.gradle.internal.info.BuildParams - apply plugin: 'elasticsearch.internal-es-plugin' esplugin { name 'x-pack-rollup' description 'Elasticsearch Expanded Pack Plugin - Rollup' classname 'org.elasticsearch.xpack.rollup.Rollup' - extendedPlugins = ['x-pack-core'] + extendedPlugins = ['x-pack-aggregate-metric'] } archivesBaseName = 'x-pack-rollup' dependencies { - compileOnly project(":server") compileOnly project(path: xpackModule('core')) - compileOnly project(path: xpackModule('analytics')) - compileOnly project(path: xpackModule('mapper-aggregate-metric')) - compileOnly project(path: xpackModule('ilm')) compileOnly project(':modules:data-streams') + compileOnly project(path: xpackModule('ilm')) + compileOnly project(path: xpackModule('mapper-aggregate-metric')) testImplementation(testArtifact(project(xpackModule('core')))) } diff --git a/x-pack/plugin/rollup/qa/rest/build.gradle b/x-pack/plugin/rollup/qa/rest/build.gradle index 0c794c07b713b..6e69f1e929d8a 100644 --- a/x-pack/plugin/rollup/qa/rest/build.gradle +++ b/x-pack/plugin/rollup/qa/rest/build.gradle @@ -17,7 +17,7 @@ dependencies { restResources { restApi { - include '_common', 'bulk', 'cluster', 'indices', 'search', 'downsample' + include '_common', 'bulk', 'cluster', 'indices', 'search' } } diff --git a/x-pack/plugin/rollup/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/rollup/10_basic.yml b/x-pack/plugin/rollup/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/rollup/10_basic.yml index 75d80ec1b28da..8a17dccbf1441 100644 --- a/x-pack/plugin/rollup/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/rollup/10_basic.yml +++ b/x-pack/plugin/rollup/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/rollup/10_basic.yml @@ -87,7 +87,7 @@ setup: index.blocks.write: true --- -"Rollup index": +"Downsample index": - skip: version: " - 8.4.99" reason: "rollup renamed to downsample in 8.5.0" @@ -141,6 +141,24 @@ setup: - match: { rollup-test.settings.index.number_of_shards: "1" } - match: { rollup-test.settings.index.number_of_replicas: "0" } + # Assert rollup index mapping + - do: + indices.get_mapping: + index: rollup-test + + - match: { rollup-test.mappings.properties.@timestamp.type: date } + - match: { rollup-test.mappings.properties.@timestamp.meta.fixed_interval: 1h } + - match: { rollup-test.mappings.properties.@timestamp.meta.time_zone: UTC } + - match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.type: aggregate_metric_double } + - match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.metrics: [ "min", "max", "sum", "value_count" ] } + - match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.default_metric: max } + - match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-gauge.time_series_metric: gauge } + - match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-counter.type: long } + - match: { rollup-test.mappings.properties.k8s.properties.pod.properties.multi-counter.time_series_metric: counter } + - match: { rollup-test.mappings.properties.k8s.properties.pod.properties.uid.type: keyword } + - match: { rollup-test.mappings.properties.k8s.properties.pod.properties.uid.time_series_dimension: true } + + # Assert source index has not been deleted - do: indices.get: @@ -156,7 +174,7 @@ setup: - match: { indices.rollup-test.shards.0.0.num_search_segments: 1} --- -"Rollup non-existing index": +"Downsample non-existing index": - skip: version: " - 8.4.99" reason: "rollup renamed to downsample in 8.5.0" @@ -172,7 +190,7 @@ setup: } --- -"Rollup to existing rollup index": +"Downsample to existing index": - skip: version: " - 8.4.99" reason: "rollup renamed to downsample in 8.5.0" @@ -192,7 +210,7 @@ setup: } --- -"Rollup not time_series index": +"Downsample not time_series index": - skip: version: " - 8.4.99" reason: "rollup renamed to downsample in 8.5.0" @@ -213,7 +231,7 @@ setup: --- -"Rollup no metric index": +"Downsample no metric index": - skip: version: " - 8.4.99" reason: "rollup renamed to downsample in 8.5.0" @@ -254,3 +272,142 @@ setup: { "fixed_interval": "1h" } + +--- +"Downsample a downsampled index": + - skip: + version: " - 8.4.99" + reason: "Rollup of rollups introduced in 8.5.0" + + - do: + indices.downsample: + index: test + target_index: rollup-test + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + indices.downsample: + index: rollup-test + target_index: rollup-test-2 + body: > + { + "fixed_interval": "2h" + } + - is_true: acknowledged + + + # Assert rollup index mapping + - do: + indices.get_mapping: + index: rollup-test-2 + + - match: { rollup-test-2.mappings.properties.@timestamp.type: date } + - match: { rollup-test-2.mappings.properties.@timestamp.meta.fixed_interval: 2h } + - match: { rollup-test-2.mappings.properties.@timestamp.meta.time_zone: UTC } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.type: aggregate_metric_double } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.metrics: [ "min", "max", "sum", "value_count" ] } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.default_metric: max } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-gauge.time_series_metric: gauge } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-counter.type: long } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.multi-counter.time_series_metric: counter } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.uid.type: keyword } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.uid.time_series_dimension: true } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.network.properties.tx.type: aggregate_metric_double } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.network.properties.tx.metrics: [ "min", "max", "sum", "value_count" ] } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.network.properties.tx.default_metric: max } + - match: { rollup-test-2.mappings.properties.k8s.properties.pod.properties.network.properties.tx.time_series_metric: gauge } + + - do: + search: + index: rollup-test-2 + body: + sort: [ "_tsid", "@timestamp" ] + + - length: { hits.hits: 3 } + - match: { hits.hits.0._source._doc_count: 2 } + - match: { hits.hits.0._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 } + - match: { hits.hits.0._source.metricset: pod } + - match: { hits.hits.0._source.@timestamp: 2021-04-28T18:00:00.000Z } + - match: { hits.hits.0._source.k8s\.pod\.multi-counter: 21 } + - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.min: 90 } + - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.max: 200 } + - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.sum: 726 } + - match: { hits.hits.0._source.k8s\.pod\.multi-gauge.value_count: 6 } + - match: { hits.hits.0._source.k8s\.pod\.network\.tx.min: 2001818691 } + - match: { hits.hits.0._source.k8s\.pod\.network\.tx.max: 2005177954 } + - match: { hits.hits.0._source.k8s\.pod\.network\.tx.value_count: 2 } + - match: { hits.hits.0._source.k8s\.pod\.ip: "10.10.55.26" } + - match: { hits.hits.0._source.k8s\.pod\.created_at: "2021-04-28T19:35:00.000Z" } + - match: { hits.hits.0._source.k8s\.pod\.number_of_containers: 2 } + - match: { hits.hits.0._source.k8s\.pod\.tags: [ "backend", "prod", "us-west1" ] } + - match: { hits.hits.0._source.k8s\.pod\.values: [ 1, 1, 3 ] } + + - match: { hits.hits.1._source.k8s\.pod\.uid: 947e4ced-1786-4e53-9e0c-5c447e959507 } + - match: { hits.hits.1._source.metricset: pod } + - match: { hits.hits.1._source.@timestamp: 2021-04-28T20:00:00.000Z } + - match: { hits.hits.1._source._doc_count: 2 } + + - match: { hits.hits.2._source.k8s\.pod\.uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9 } + - match: { hits.hits.2._source.metricset: pod } + - match: { hits.hits.2._source.@timestamp: 2021-04-28T18:00:00.000Z } + - match: { hits.hits.2._source._doc_count: 4 } + + - do: + indices.downsample: + index: rollup-test + target_index: rollup-test-3 + body: > + { + "fixed_interval": "180m" + } + - is_true: acknowledged + +--- +"Downsample a downsampled index with wrong intervals": + - skip: + version: " - 8.4.99" + reason: "Rollup of rollups introduced in 8.5.0" + + - do: + indices.downsample: + index: test + target_index: rollup-test + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + catch: /Downsampling interval \[1h\] must be greater than the the source index interval \[1h\]/ + indices.downsample: + index: rollup-test + target_index: rollup-test-2 + body: > + { + "fixed_interval": "1h" + } + + - do: + catch: /Downsampling interval \[30m\] must be greater than the the source index interval \[1h\]/ + indices.downsample: + index: rollup-test + target_index: rollup-test-2 + body: > + { + "fixed_interval": "30m" + } + + - do: + catch: /Downsampling interval \[90m\] must be a multiple of the source index interval \[1h\]/ + indices.downsample: + index: rollup-test + target_index: rollup-test-2 + body: > + { + "fixed_interval": "90m" + } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java index 21afa1859a190..44bf433ada13c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java @@ -7,6 +7,10 @@ package org.elasticsearch.xpack.downsample; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; + /** * Base class for classes that read metric and label fields. */ @@ -22,9 +26,11 @@ abstract class AbstractRollupFieldProducer { /** * Collect a value for the field applying the specific subclass collection strategy. + * + * @param field the name of the field to collect * @param value the value to collect. */ - public abstract void collect(T value); + public abstract void collect(String field, T value); /** * @return the name of the field. @@ -34,14 +40,14 @@ public String name() { } /** - * @return the value of the field. + * Resets the producer to an empty value. */ - public abstract Object value(); + public abstract void reset(); /** - * Resets the collected value to the specific subclass reset value. + * Serialize the downsampled value of the field. */ - public abstract void reset(); + public abstract void write(XContentBuilder builder) throws IOException; /** * @return true if the field has not collected any value. diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java index 367c80691ab55..5c99d6135b3e0 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java @@ -8,16 +8,17 @@ package org.elasticsearch.xpack.downsample; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; +import java.util.LinkedHashMap; +import java.util.Map; /** * Utility class used for fetching field values by reading field data @@ -55,42 +56,39 @@ FormattedDocValues getLeaf(LeafReaderContext context) { return fieldData.load(context).getFormattedValues(format); } - Object format(Object value) { - if (value instanceof Long l) { - return format.format(l); - } else if (value instanceof Double d) { - return format.format(d); - } else if (value instanceof BytesRef b) { - return format.format(b); - } else if (value instanceof String s) { - return s; - } else { - throw new IllegalArgumentException("Invalid type: [" + value.getClass() + "]"); - } - } - /** - * Retrieve field fetchers for a list of fields. + * Retrieve field value fetchers for a list of fields. */ - private static List build(SearchExecutionContext context, String[] fields) { - List fetchers = new ArrayList<>(fields.length); + static Map create(SearchExecutionContext context, String[] fields) { + Map fetchers = new LinkedHashMap<>(); for (String field : fields) { MappedFieldType fieldType = context.getFieldType(field); - if (fieldType == null) { - throw new IllegalArgumentException("Unknown field: [" + field + "]"); + assert fieldType != null : "Unknown field type for field: [" + field + "]"; + + if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { + // If the field is an aggregate_metric_double field, we should load all its subfields + // This is a rollup-of-rollup case + for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields().values()) { + if (context.fieldExistsInIndex(metricSubField.name())) { + IndexFieldData fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH); + fetchers.put(metricSubField.name(), new FieldValueFetcher(metricSubField.name(), fieldType, fieldData)); + } + } + } else { + if (context.fieldExistsInIndex(field)) { + IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); + fetchers.put(field, new FieldValueFetcher(field, fieldType, fieldData)); + } } - IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); - fetchers.add(new FieldValueFetcher(field, fieldType, fieldData)); } - return Collections.unmodifiableList(fetchers); + return Collections.unmodifiableMap(fetchers); } - static List forMetrics(SearchExecutionContext context, String[] metricFields) { - return build(context, metricFields); - } - - static List forLabels(SearchExecutionContext context, String[] labelFields) { - return build(context, labelFields); + static Map docValuesFetchers(LeafReaderContext ctx, Map fieldValueFetchers) { + final Map docValuesFetchers = new LinkedHashMap<>(fieldValueFetchers.size()); + for (FieldValueFetcher fetcher : fieldValueFetchers.values()) { + docValuesFetchers.put(fetcher.name(), fetcher.getLeaf(ctx)); + } + return Collections.unmodifiableMap(docValuesFetchers); } - } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java index c10eeaec5dc91..cfaeeae7ca9af 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java @@ -7,8 +7,14 @@ package org.elasticsearch.xpack.downsample; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; +import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -31,7 +37,7 @@ public String name() { /** Collect the value of a raw field */ @Override - public void collect(Object value) { + public void collect(String field, Object value) { label.collect(value); isEmpty = false; } @@ -55,13 +61,17 @@ abstract static class Label { final String name; /** - * Abstract class that defines the how a label is computed. - * @param name + * Abstract class that defines how a label is downsampled. + * @param name the name of the field as it will be stored in the downsampled document */ protected Label(String name) { this.name = name; } + public String name() { + return name; + } + abstract void collect(Object value); abstract Object get(); @@ -79,6 +89,10 @@ protected Label(String name) { static class LastValueLabel extends Label { private Object lastValue; + LastValueLabel(String name) { + super(name); + } + LastValueLabel() { super("last_value"); } @@ -114,16 +128,85 @@ static class LabelLastValueFieldProducer extends LabelFieldProducer { public Object value() { return label().get(); } + + @Override + public void write(XContentBuilder builder) throws IOException { + if (isEmpty() == false) { + builder.field(name(), value()); + } + } + } + + static class AggregateMetricFieldProducer extends LabelFieldProducer { + + private Map labelsByField = new LinkedHashMap<>(); + + AggregateMetricFieldProducer(String name) { + super(name, null); + } + + public void addLabel(String field, Label label) { + labelsByField.put(field, label); + } + + @Override + public void collect(String field, Object value) { + labelsByField.get(field).collect(value); + isEmpty = false; + } + + @Override + public void write(XContentBuilder builder) throws IOException { + if (isEmpty() == false) { + builder.startObject(name()); + for (Label label : labels()) { + if (label.get() != null) { + builder.field(label.name(), label.get()); + } + } + builder.endObject(); + } + } + + public Collection