From 3af2f779bacb234448b9c7aeae180521458ae86d Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Mon, 5 Sep 2022 14:59:16 +0300 Subject: [PATCH 01/20] Use XContentBuilder for index mapping --- .../v2/RollupActionSingleNodeTests.java | 114 +++++++++++------- 1 file changed, 71 insertions(+), 43 deletions(-) diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java index 55e21d1c4c282..20aad0fcd2591 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/v2/RollupActionSingleNodeTests.java @@ -100,6 +100,7 @@ import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; public class RollupActionSingleNodeTests extends ESSingleNodeTestCase { @@ -110,6 +111,7 @@ public class RollupActionSingleNodeTests extends ESSingleNodeTestCase { public static final String FIELD_DIMENSION_2 = "dimension_long"; public static final String FIELD_NUMERIC_1 = "numeric_1"; public static final String FIELD_NUMERIC_2 = "numeric_2"; + public static final String FIELD_AGG_METRIC = "agg_metric_1"; public static final String FIELD_METRIC_LABEL_DOUBLE = "metric_label_double"; public static final String FIELD_LABEL_DOUBLE = "label_double"; public static final String FIELD_LABEL_INTEGER = "label_integer"; @@ -144,7 +146,7 @@ protected Collection> getPlugins() { } @Before - public void setup() { + public void setup() throws IOException { sourceIndex = randomAlphaOfLength(14).toLowerCase(Locale.ROOT); rollupIndex = "rollup-" + sourceIndex; startTime = randomLongBetween(946769284000L, 1607470084000L); // random date between 2000-2020 @@ -179,47 +181,74 @@ public void setup() { if (randomBoolean()) { settings.put(IndexMetadata.SETTING_INDEX_HIDDEN, randomBoolean()); } - assertAcked( - client().admin() - .indices() - .prepareCreate(sourceIndex) - .setSettings(settings.build()) - .setMapping( - FIELD_TIMESTAMP, - "type=date", - FIELD_DIMENSION_1, - "type=keyword,time_series_dimension=true", - FIELD_DIMENSION_2, - "type=long,time_series_dimension=true", - FIELD_NUMERIC_1, - "type=long,time_series_metric=gauge", - FIELD_NUMERIC_2, - "type=double,time_series_metric=counter", - FIELD_LABEL_DOUBLE, - "type=double", - FIELD_LABEL_INTEGER, - "type=integer", - FIELD_LABEL_KEYWORD, - "type=keyword", - FIELD_LABEL_TEXT, - "type=text", - FIELD_LABEL_BOOLEAN, - "type=boolean", - FIELD_METRIC_LABEL_DOUBLE, /* numeric label indexed as a metric */ - "type=double,time_series_metric=counter", - FIELD_LABEL_IPv4_ADDRESS, - "type=ip", - FIELD_LABEL_IPv6_ADDRESS, - "type=ip", - FIELD_LABEL_DATE, - "type=date,format=date_optional_time", - FIELD_LABEL_KEYWORD_ARRAY, - "type=keyword", - FIELD_LABEL_DOUBLE_ARRAY, - "type=double" - ) - .get() - ); + + XContentBuilder mapping = jsonBuilder().startObject() + .startObject("_doc") + .startObject("properties") + .startObject(FIELD_TIMESTAMP) + .field("type", "date") + .endObject() + .startObject(FIELD_DIMENSION_1) + .field("type", "keyword") + .field("time_series_dimension", true) + .endObject() + .startObject(FIELD_DIMENSION_2) + .field("type", "long") + .field("time_series_dimension", true) + .endObject() + .startObject(FIELD_NUMERIC_1) + .field("type", "long") + .field("time_series_metric", "gauge") + .endObject() + .startObject(FIELD_NUMERIC_2) + .field("type", "double") + .field("time_series_metric", "counter") + .endObject() + // .startObject(FIELD_AGG_METRIC) + // .field("type", "aggregate_metric_double") + // .field("time_series_metric", "counter") + // .array("metrics", new String[] { "min", "max", "sum", "value_count" }) + // .endObject() + .startObject(FIELD_LABEL_DOUBLE) + .field("type", "double") + .endObject() + .startObject(FIELD_LABEL_INTEGER) + .field("type", "integer") + .endObject() + .startObject(FIELD_LABEL_KEYWORD) + .field("type", "keyword") + .endObject() + .startObject(FIELD_LABEL_TEXT) + .field("type", "text") + .endObject() + .startObject(FIELD_LABEL_BOOLEAN) + .field("type", "boolean") + .endObject() + .startObject(FIELD_METRIC_LABEL_DOUBLE) + .field("type", "double") /* numeric label indexed as a metric */ + .field("time_series_metric", "counter") + .endObject() + .startObject(FIELD_LABEL_IPv4_ADDRESS) + .field("type", "ip") + .endObject() + .startObject(FIELD_LABEL_IPv6_ADDRESS) + .field("type", "ip") + .endObject() + .startObject(FIELD_LABEL_DATE) + .field("type", "date") + .field("format", "date_optional_time") + .endObject() + .startObject(FIELD_LABEL_KEYWORD_ARRAY) + .field("type", "keyword") + .endObject() + .startObject(FIELD_LABEL_DOUBLE_ARRAY) + .field("type", "double") + .endObject() + .endObject() + .endObject() + .endObject(); + + assertAcked(client().admin().indices().prepareCreate(sourceIndex).setSettings(settings.build()).setMapping(mapping).get()); } public void testRollupIndex() throws IOException { @@ -471,7 +500,6 @@ public void onFailure(Exception e) { assertBusy(() -> assertTrue("In progress rollup did not complete", rollupListener.success), 60, TimeUnit.SECONDS); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/88800") public void testRollupDatastream() throws Exception { RollupActionConfig config = new RollupActionConfig(randomInterval()); String dataStreamName = createDataStream(); From 695da28372105a513bcfa6314c3921ffadb140cd Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Thu, 8 Sep 2022 16:01:28 +0300 Subject: [PATCH 02/20] Added aggregate_metric_double field to the test index mapping --- .../DownsampleActionSingleNodeTests.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index d4b0de218e61b..48ba56242a6a1 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -124,6 +124,7 @@ public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase { public static final String FIELD_LABEL_UNMAPPED = "label_unmapped"; public static final String FIELD_LABEL_KEYWORD_ARRAY = "label_keyword_array"; public static final String FIELD_LABEL_DOUBLE_ARRAY = "label_double_array"; + public static final String FIELD_LABEL_AGG_METRIC = "label_agg_metric"; private static final int MAX_DIM_VALUES = 5; private static final long MAX_NUM_BUCKETS = 10; @@ -204,11 +205,12 @@ public void setup() throws IOException { .field("type", "double") .field("time_series_metric", "counter") .endObject() - // .startObject(FIELD_AGG_METRIC) - // .field("type", "aggregate_metric_double") - // .field("time_series_metric", "counter") - // .array("metrics", new String[] { "min", "max", "sum", "value_count" }) - // .endObject() + .startObject(FIELD_AGG_METRIC) + .field("type", "aggregate_metric_double") + .field("time_series_metric", "gauge") + .array("metrics", new String[] { "min", "max", "sum", "value_count" }) + .field("default_metric", "value_count") + .endObject() .startObject(FIELD_LABEL_DOUBLE) .field("type", "double") .endObject() @@ -244,6 +246,11 @@ public void setup() throws IOException { .startObject(FIELD_LABEL_DOUBLE_ARRAY) .field("type", "double") .endObject() + .startObject(FIELD_LABEL_AGG_METRIC) + .field("type", "aggregate_metric_double") + .array("metrics", new String[] { "min", "max", "sum", "value_count" }) + .field("default_metric", "value_count") + .endObject() .endObject() .endObject() .endObject(); From 977eeb264d0a07079cd7892c77ba03f3fa4c58c7 Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Thu, 8 Sep 2022 16:01:54 +0300 Subject: [PATCH 03/20] initial support for rollup of rollups --- .../mapper/AggregateDoubleMetricFieldMapper.java | 4 ++++ .../xpack/downsample/FieldValueFetcher.java | 12 ++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java index 46a22ab4ca850..158050bdb67f3 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java @@ -317,6 +317,10 @@ private void setMetricFields(EnumMap this.metricFields = metricFields; } + public Map getMetricFields() { + return Collections.unmodifiableMap(metricFields); + } + public void addMetricField(Metric m, NumberFieldMapper.NumberFieldType subfield) { if (metricFields == null) { metricFields = new EnumMap<>(AggregateDoubleMetricFieldMapper.Metric.class); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java index 8bf75cdf873ae..627c7a5cb22c0 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java @@ -12,8 +12,10 @@ import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; import java.io.IOException; import java.math.BigInteger; @@ -118,9 +120,15 @@ private static List build(SearchExecutionContext context, Str MappedFieldType fieldType = context.getFieldType(field); if (fieldType == null) { throw new IllegalArgumentException("Unknown field: [" + field + "]"); + } else if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { + for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields().values()) { + IndexFieldData fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH); + fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field, validTypes))); + } + } else { + IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); + fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field, validTypes))); } - IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); - fetchers.add(new FieldValueFetcher(field, fieldType, fieldData, getValidator(field, validTypes))); } return Collections.unmodifiableList(fetchers); } From 146c924c0ff32aa18ba81f4ac99b19380dbbfc99 Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Tue, 13 Sep 2022 00:38:21 +0300 Subject: [PATCH 04/20] Add fetcher support for aggregate_double_metric Change how we serialize metrics to doc --- .../AbstractRollupFieldProducer.java | 12 +- .../xpack/downsample/FieldValueFetcher.java | 44 ++++---- .../xpack/downsample/LabelFieldProducer.java | 15 ++- .../xpack/downsample/MetricFieldProducer.java | 104 +++++++++++++----- .../xpack/downsample/RollupShardIndexer.java | 61 +++++----- .../DownsampleActionSingleNodeTests.java | 6 + .../downsample/MetricFieldProducerTests.java | 23 ++-- 7 files changed, 174 insertions(+), 91 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java index 21afa1859a190..f2db8526a402c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java @@ -7,6 +7,10 @@ package org.elasticsearch.xpack.downsample; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; + /** * Base class for classes that read metric and label fields. */ @@ -34,14 +38,14 @@ public String name() { } /** - * @return the value of the field. + * Resets the collected value to the specific subclass reset value. */ - public abstract Object value(); + public abstract void reset(); /** - * Resets the collected value to the specific subclass reset value. + * Serialize the downsampled value. */ - public abstract void reset(); + public abstract void writeTo(XContentBuilder builder) throws IOException; /** * @return true if the field has not collected any value. diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java index 802923789c728..a67456aac63d6 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java @@ -17,9 +17,9 @@ import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; +import java.util.HashMap; +import java.util.Map; /** * Utility class used for fetching field values by reading field data @@ -71,33 +71,39 @@ Object format(Object value) { } } + private static Map build(SearchExecutionContext context, String field) { + MappedFieldType fieldType = context.getFieldType(field); + assert fieldType != null : "Unknown field type for field: [" + field + "]"; + Map fetchers = new HashMap<>(); + + if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { + for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields().values()) { + IndexFieldData fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH); + fetchers.put(metricSubField.name(), new FieldValueFetcher(metricSubField.name(), fieldType, fieldData)); + } + } else { + IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); + fetchers.put(field, new FieldValueFetcher(field, fieldType, fieldData)); + } + return Collections.unmodifiableMap(fetchers); + } + /** * Retrieve field fetchers for a list of fields. */ - private static List build(SearchExecutionContext context, String[] fields) { - List fetchers = new ArrayList<>(fields.length); + private static Map build(SearchExecutionContext context, String[] fields) { + Map fetchers = new HashMap<>(); for (String field : fields) { - MappedFieldType fieldType = context.getFieldType(field); - if (fieldType == null) { - throw new IllegalArgumentException("Unknown field: [" + field + "]"); - } else if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { - for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields().values()) { - IndexFieldData fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH); - fetchers.add(new FieldValueFetcher(field, fieldType, fieldData)); - } - } else { - IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); - fetchers.add(new FieldValueFetcher(field, fieldType, fieldData)); - } + fetchers.putAll(build(context, field)); } - return Collections.unmodifiableList(fetchers); + return Collections.unmodifiableMap(fetchers); } - static List forMetrics(SearchExecutionContext context, String[] metricFields) { + static Map forMetrics(SearchExecutionContext context, String[] metricFields) { return build(context, metricFields); } - static List forLabels(SearchExecutionContext context, String[] labelFields) { + static Map forLabels(SearchExecutionContext context, String[] labelFields) { return build(context, labelFields); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java index c10eeaec5dc91..292fd82640161 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java @@ -8,7 +8,9 @@ package org.elasticsearch.xpack.downsample; import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.xcontent.XContentBuilder; +import java.io.IOException; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -114,12 +116,23 @@ static class LabelLastValueFieldProducer extends LabelFieldProducer { public Object value() { return label().get(); } + + @Override + public void writeTo(XContentBuilder builder) throws IOException { + if (isEmpty() == false) { + builder.field(name(), value()); + } + } } /** * Produce a collection of label field producers. */ - static Map buildLabelFieldProducers(SearchExecutionContext context, String[] labelFields) { + static Map buildLabelFieldProducers( + SearchExecutionContext context, + String[] labelFields, + Map fieldFetchers + ) { final Map fields = new LinkedHashMap<>(); for (String field : labelFields) { LabelFieldProducer producer = new LabelLastValueFieldProducer(field); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java index 5164e6d8518df..d88a792629d65 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java @@ -8,11 +8,15 @@ package org.elasticsearch.xpack.downsample; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.search.aggregations.metrics.CompensatedSum; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -61,17 +65,11 @@ public void collect(Number value) { isEmpty = false; } - /** - * Return the downsampled value as computed after collecting all raw values. - * @return - */ - public abstract Object value(); - abstract static class Metric { final String name; /** - * Abstract class that defines the how a metric is computed. + * Abstract class that defines how a metric is computed. * @param name */ protected Metric(String name) { @@ -227,14 +225,20 @@ void reset() { static class CounterMetricFieldProducer extends MetricFieldProducer { CounterMetricFieldProducer(String name) { - super(name, List.of(new LastValue())); + super(name, Collections.singletonList(new LastValue())); } - @Override public Object value() { - assert metrics().size() == 1 : "Counters have only one metric"; + assert metrics().size() == 1 : "Single value producers must have only one metric"; return metrics().get(0).get(); } + + @Override + public void writeTo(XContentBuilder builder) throws IOException { + if (isEmpty() == false) { + builder.field(name(), value()); + } + } } /** @@ -243,18 +247,41 @@ public Object value() { static class GaugeMetricFieldProducer extends MetricFieldProducer { GaugeMetricFieldProducer(String name) { - super(name, List.of(new Min(), new Max(), new Sum(), new ValueCount())); + this(name, List.of(new Min(), new Max(), new Sum(), new ValueCount())); + } + + GaugeMetricFieldProducer(String name, List metrics) { + super(name, metrics); } @Override - public Object value() { - Map metricValues = new HashMap<>(); - for (MetricFieldProducer.Metric metric : metrics()) { - if (metric.get() != null) { - metricValues.put(metric.name, metric.get()); + public void writeTo(XContentBuilder builder) throws IOException { + if (isEmpty() == false) { + builder.startObject(name()); + for (MetricFieldProducer.Metric metric : metrics()) { + if (metric.get() != null) { + builder.field(metric.name, metric.get()); + } } + builder.endObject(); } - return Collections.unmodifiableMap(metricValues); + } + } + + static class SilentMetricFieldProducer extends MetricFieldProducer { + + SilentMetricFieldProducer(String name, Metric metric) { + super(name, Collections.singletonList(metric)); + } + + public Object value() { + assert metrics().size() == 1 : "Single value producers must have only one metric"; + return metrics().get(0).get(); + } + + @Override + public void writeTo(XContentBuilder builder) throws IOException { + // No output. It's silent after all } } @@ -262,19 +289,40 @@ public Object value() { * Produce a collection of metric field producers based on the metric_type mapping parameter in the field * mapping. */ - static Map buildMetricFieldProducers(SearchExecutionContext context, String[] metricFields) { + static Map buildMetricFieldProducers( + SearchExecutionContext context, + String[] metricFields, + Map fieldFetchers + ) { final Map fields = new LinkedHashMap<>(); for (String field : metricFields) { MappedFieldType fieldType = context.getFieldType(field); - assert fieldType.getMetricType() != null; - - MetricFieldProducer producer = switch (fieldType.getMetricType()) { - case gauge -> new GaugeMetricFieldProducer(field); - case counter -> new CounterMetricFieldProducer(field); - default -> throw new IllegalArgumentException("Unsupported metric type [" + fieldType.getMetricType() + "]"); - }; - - fields.put(field, producer); + assert fieldType != null : "Unknown field type for field: [" + field + "]"; + assert fieldType.getMetricType() != null : "Unknown metric type for metric field: [" + field + "]"; + + if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { + List metricOperations = new ArrayList<>(aggMetricFieldType.getMetricFields().size()); + for (var e : aggMetricFieldType.getMetricFields().entrySet()) { + AggregateDoubleMetricFieldMapper.Metric metric = e.getKey(); + NumberFieldMapper.NumberFieldType metricSubField = e.getValue(); + Metric metricOperation = switch (metric) { + case max -> new Max(); + case min -> new Min(); + case sum, value_count -> new Sum(); // To aggregate value_count summary, we must sum all field values + }; + metricOperations.add(metricOperation); + MetricFieldProducer producer = new SilentMetricFieldProducer(metricSubField.name(), metricOperation); + fields.put(metricSubField.name(), producer); + } + fields.put(field, new GaugeMetricFieldProducer(field, metricOperations)); + } else { + MetricFieldProducer producer = switch (fieldType.getMetricType()) { + case gauge -> new GaugeMetricFieldProducer(field); + case counter -> new CounterMetricFieldProducer(field); + default -> throw new IllegalArgumentException("Unsupported metric type [" + fieldType.getMetricType() + "]"); + }; + fields.put(field, producer); + } } return Collections.unmodifiableMap(fields); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index 639525d51e8c3..e567dc83d6256 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -61,13 +61,12 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.elasticsearch.core.Strings.format; /** - * An indexer for rollups that iterates documents collected by {@link TimeSeriesIndexSearcher}, - * computes the rollup buckets and stores the buckets in the rollup index. + * An indexer for downsampling that iterates documents collected by {@link TimeSeriesIndexSearcher}, + * computes the rollup buckets and stores the buckets in the downsampled index. * * The documents collected by the {@link TimeSeriesIndexSearcher} are expected to be sorted * by _tsid in ascending order and @timestamp in descending order. @@ -91,8 +90,11 @@ class RollupShardIndexer { private final String[] dimensionFields; private final String[] metricFields; private final String[] labelFields; - private final List metricFieldFetchers; - private final List labelFieldFetchers; + private final Map metricFieldFetchers; + private final Map labelFieldFetchers; + + private final Map metricFieldProducers; + private final Map labelFieldProducers; private final AtomicLong numSent = new AtomicLong(); private final AtomicLong numIndexed = new AtomicLong(); @@ -116,7 +118,7 @@ class RollupShardIndexer { this.metricFields = metricFields; this.labelFields = labelFields; - this.searcher = indexShard.acquireSearcher("rollup"); + this.searcher = indexShard.acquireSearcher("downsampling"); Closeable toClose = searcher; try { this.searchExecutionContext = indexService.newSearchExecutionContext( @@ -130,8 +132,16 @@ class RollupShardIndexer { this.timestampField = searchExecutionContext.getFieldType(DataStreamTimestampFieldMapper.DEFAULT_PATH); this.timestampFormat = timestampField.docValueFormat(null, null); this.rounding = config.createRounding(); + this.metricFieldFetchers = FieldValueFetcher.forMetrics(searchExecutionContext, metricFields); this.labelFieldFetchers = FieldValueFetcher.forLabels(searchExecutionContext, labelFields); + this.metricFieldProducers = MetricFieldProducer.buildMetricFieldProducers( + searchExecutionContext, + metricFields, + metricFieldFetchers + ); + this.labelFieldProducers = LabelFieldProducer.buildLabelFieldProducers(searchExecutionContext, labelFields, labelFieldFetchers); + toClose = null; } finally { IOUtils.closeWhileHandlingException(toClose); @@ -234,13 +244,13 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag final LeafReaderContext ctx = aggCtx.getLeafReaderContext(); final DocCountProvider docCountProvider = new DocCountProvider(); docCountProvider.setLeafReaderContext(ctx); - final Map metricsFieldLeaves = new HashMap<>(); - for (FieldValueFetcher fetcher : metricFieldFetchers) { + final Map metricsFieldLeaves = new HashMap<>(metricFieldFetchers.size()); + for (FieldValueFetcher fetcher : metricFieldFetchers.values()) { metricsFieldLeaves.put(fetcher.name(), fetcher.getLeaf(ctx)); } - final Map labelFieldLeaves = new HashMap<>(); - for (FieldValueFetcher fetcher : labelFieldFetchers) { + final Map labelFieldLeaves = new HashMap<>(labelFieldFetchers.size()); + for (FieldValueFetcher fetcher : labelFieldFetchers.values()) { labelFieldLeaves.put(fetcher.name(), fetcher.getLeaf(ctx)); } @@ -313,6 +323,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException { final int docCount = docCountProvider.getDocCount(docId); rollupBucketBuilder.collectDocCount(docCount); + for (Map.Entry e : fieldFetchers) { final String fieldName = e.getKey(); final FormattedDocValues leafField = e.getValue(); @@ -372,12 +383,9 @@ private class RollupBucketBuilder { private BytesRef tsid; private long timestamp; private int docCount; - private final Map metricFieldProducers; - private final Map labelFieldProducers; RollupBucketBuilder() { - this.metricFieldProducers = MetricFieldProducer.buildMetricFieldProducers(searchExecutionContext, metricFields); - this.labelFieldProducers = LabelFieldProducer.buildLabelFieldProducers(searchExecutionContext, labelFields); + } /** @@ -394,8 +402,8 @@ public RollupBucketBuilder resetTsid(BytesRef tsid, long timestamp) { public RollupBucketBuilder resetTimestamp(long timestamp) { this.timestamp = timestamp; this.docCount = 0; - this.metricFieldProducers.values().forEach(MetricFieldProducer::reset); - this.labelFieldProducers.values().forEach(LabelFieldProducer::reset); + metricFieldProducers.values().forEach(MetricFieldProducer::reset); + labelFieldProducers.values().forEach(LabelFieldProducer::reset); if (logger.isTraceEnabled()) { logger.trace( "New bucket for _tsid: [{}], @timestamp: [{}]", @@ -451,7 +459,7 @@ public void collectDocCount(int docCount) { } public XContentBuilder buildRollupDocument() throws IOException { - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE); + final XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE); builder.startObject(); if (isEmpty()) { builder.endObject(); @@ -468,17 +476,14 @@ public XContentBuilder buildRollupDocument() throws IOException { builder.field(e.getKey(), e.getValue()); } - for (AbstractRollupFieldProducer fieldProducer : Stream.concat( - metricFieldProducers.values().stream(), - labelFieldProducers.values().stream() - ).toList()) { - if (fieldProducer.isEmpty() == false) { - String field = fieldProducer.name(); - Object value = fieldProducer.value(); - if (value != null) { - builder.field(field, value); - } - } + // Serialize all metric fields + for (var producer : metricFieldProducers.values()) { + producer.writeTo(builder); + } + + // Serialize all label fields + for (var producer : labelFieldProducers.values()) { + producer.writeTo(builder); } builder.endObject(); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index c270cc7ffa743..8541f3e756371 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -286,6 +286,12 @@ public void testRollupIndex() throws IOException { // .field(FIELD_DIMENSION_2, randomIntBetween(1, 10)) //TODO: Fix _tsid format issue and then enable this .field(FIELD_NUMERIC_1, randomInt()) .field(FIELD_NUMERIC_2, DATE_FORMATTER.parseMillis(ts)) + .startObject(FIELD_AGG_METRIC) + .field("min", 1) + .field("max", 8) + .field("sum", 120) + .field("value_count", 40) + .endObject() .field(FIELD_LABEL_DOUBLE, labelDoubleValue) .field(FIELD_METRIC_LABEL_DOUBLE, labelDoubleValue) .field(FIELD_LABEL_INTEGER, labelIntegerValue) diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java index d27008729ae2c..cf4404b99cf1a 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java @@ -120,7 +120,7 @@ public void testLastValueMetric() { } public void testCounterMetricFieldProducer() { - MetricFieldProducer producer = new MetricFieldProducer.CounterMetricFieldProducer("field"); + var producer = new MetricFieldProducer.CounterMetricFieldProducer("field"); assertTrue(producer.isEmpty()); producer.collect(55.0); producer.collect(12.2); @@ -139,15 +139,16 @@ public void testGaugeMetricFieldProducer() { producer.collect(12.2); producer.collect(5.5); - assertFalse(producer.isEmpty()); - Object o = producer.value(); - if (o instanceof Map) { - Map m = (Map) o; - assertMap(m, matchesMap().entry("min", 5.5).entry("max", 55.0).entry("value_count", 3L).entry("sum", 72.7)); - assertEquals(4, m.size()); - } else { - fail("Value is not a Map"); - } + //TODO: Fix this +// assertFalse(producer.isEmpty()); +// Object o = producer.value(); +// if (o instanceof Map) { +// Map m = (Map) o; +// assertMap(m, matchesMap().entry("min", 5.5).entry("max", 55.0).entry("value_count", 3L).entry("sum", 72.7)); +// assertEquals(4, m.size()); +// } else { +// fail("Value is not a Map"); +// } assertEquals("field", producer.name()); } @@ -213,7 +214,7 @@ public MappedFieldType getFieldType(String name) { Map producers = MetricFieldProducer.buildMetricFieldProducers( searchExecutionContext, - new String[] { "gauge_field", "counter_field" } + new String[] { "gauge_field", "counter_field" }, null ); assertTrue(producers.get("gauge_field") instanceof MetricFieldProducer.GaugeMetricFieldProducer); assertTrue(producers.get("counter_field") instanceof MetricFieldProducer.CounterMetricFieldProducer); From c10fd78d34629922874ad6ed4567289afb29a754 Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Tue, 13 Sep 2022 13:24:44 +0300 Subject: [PATCH 05/20] Collect aggregate_metric_double fields for computing metrics --- .../AbstractRollupFieldProducer.java | 6 +- .../xpack/downsample/LabelFieldProducer.java | 10 +-- .../xpack/downsample/MetricFieldProducer.java | 69 ++++++++++++------- .../xpack/downsample/RollupShardIndexer.java | 30 +++----- .../downsample/LabelFieldProducerTests.java | 2 +- .../downsample/MetricFieldProducerTests.java | 22 +++--- 6 files changed, 76 insertions(+), 63 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java index f2db8526a402c..988a58ed2b7db 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java @@ -26,9 +26,11 @@ abstract class AbstractRollupFieldProducer { /** * Collect a value for the field applying the specific subclass collection strategy. + * + * @param field the name of the field to collect * @param value the value to collect. */ - public abstract void collect(T value); + public abstract void collect(String field, T value); /** * @return the name of the field. @@ -43,7 +45,7 @@ public String name() { public abstract void reset(); /** - * Serialize the downsampled value. + * Serialize the downsampled value of the field. */ public abstract void writeTo(XContentBuilder builder) throws IOException; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java index 292fd82640161..90434c42246ff 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java @@ -33,7 +33,7 @@ public String name() { /** Collect the value of a raw field */ @Override - public void collect(Object value) { + public void collect(String field, Object value) { label.collect(value); isEmpty = false; } @@ -57,7 +57,7 @@ abstract static class Label { final String name; /** - * Abstract class that defines the how a label is computed. + * Abstract class that defines how a label is computed. * @param name */ protected Label(String name) { @@ -128,11 +128,7 @@ public void writeTo(XContentBuilder builder) throws IOException { /** * Produce a collection of label field producers. */ - static Map buildLabelFieldProducers( - SearchExecutionContext context, - String[] labelFields, - Map fieldFetchers - ) { + static Map buildLabelFieldProducers(SearchExecutionContext context, String[] labelFields) { final Map fields = new LinkedHashMap<>(); for (String field : labelFields) { LabelFieldProducer producer = new LabelLastValueFieldProducer(field); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java index d88a792629d65..23b8b7ffb6c95 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java @@ -15,7 +15,7 @@ import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -41,7 +41,7 @@ abstract class MetricFieldProducer extends AbstractRollupFieldProducer { * Reset all values collected for the field */ public void reset() { - for (Metric metric : metrics) { + for (Metric metric : metrics()) { metric.reset(); } isEmpty = true; @@ -52,14 +52,14 @@ public String name() { } /** return the list of metrics that are computed for the field */ - public List metrics() { + public Collection metrics() { return metrics; } /** Collect the value of a raw field and compute all downsampled metrics */ @Override - public void collect(Number value) { - for (MetricFieldProducer.Metric metric : metrics) { + public void collect(String field, Number value) { + for (MetricFieldProducer.Metric metric : metrics()) { metric.collect(value); } isEmpty = false; @@ -70,7 +70,7 @@ abstract static class Metric { /** * Abstract class that defines how a metric is computed. - * @param name + * @param name the name of the metric as it will be output in the downsampled document */ protected Metric(String name) { this.name = name; @@ -145,6 +145,10 @@ static class Sum extends Metric { super("sum"); } + Sum(String name) { + super(name); + } + @Override void collect(Number value) { kahanSummation.add(value.doubleValue()); @@ -230,7 +234,7 @@ static class CounterMetricFieldProducer extends MetricFieldProducer { public Object value() { assert metrics().size() == 1 : "Single value producers must have only one metric"; - return metrics().get(0).get(); + return metrics().iterator().next().get(); } @Override @@ -268,20 +272,40 @@ public void writeTo(XContentBuilder builder) throws IOException { } } - static class SilentMetricFieldProducer extends MetricFieldProducer { + static class AggregateMetricFieldProducer extends MetricFieldProducer { + + private Map metricsByField = new LinkedHashMap<>(); - SilentMetricFieldProducer(String name, Metric metric) { - super(name, Collections.singletonList(metric)); + AggregateMetricFieldProducer(String name) { + super(name, Collections.emptyList()); } - public Object value() { - assert metrics().size() == 1 : "Single value producers must have only one metric"; - return metrics().get(0).get(); + public void addMetric(String field, Metric metric) { + metricsByField.put(field, metric); + } + + @Override + public void collect(String field, Number value) { + metricsByField.get(field).collect(value); + isEmpty = false; } @Override public void writeTo(XContentBuilder builder) throws IOException { - // No output. It's silent after all + if (isEmpty() == false) { + builder.startObject(name()); + for (MetricFieldProducer.Metric metric : metrics()) { + if (metric.get() != null) { + builder.field(metric.name, metric.get()); + } + } + builder.endObject(); + } + } + + @Override + public Collection metrics() { + return metricsByField.values(); } } @@ -289,11 +313,7 @@ public void writeTo(XContentBuilder builder) throws IOException { * Produce a collection of metric field producers based on the metric_type mapping parameter in the field * mapping. */ - static Map buildMetricFieldProducers( - SearchExecutionContext context, - String[] metricFields, - Map fieldFetchers - ) { + static Map buildMetricFieldProducers(SearchExecutionContext context, String[] metricFields) { final Map fields = new LinkedHashMap<>(); for (String field : metricFields) { MappedFieldType fieldType = context.getFieldType(field); @@ -301,20 +321,21 @@ static Map buildMetricFieldProducers( assert fieldType.getMetricType() != null : "Unknown metric type for metric field: [" + field + "]"; if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { - List metricOperations = new ArrayList<>(aggMetricFieldType.getMetricFields().size()); + AggregateMetricFieldProducer producer = new AggregateMetricFieldProducer(field); + for (var e : aggMetricFieldType.getMetricFields().entrySet()) { AggregateDoubleMetricFieldMapper.Metric metric = e.getKey(); NumberFieldMapper.NumberFieldType metricSubField = e.getValue(); Metric metricOperation = switch (metric) { case max -> new Max(); case min -> new Min(); - case sum, value_count -> new Sum(); // To aggregate value_count summary, we must sum all field values + case sum -> new Sum(); + case value_count -> new Sum("value_count"); // To aggregate value_count summary, we must sum all field values }; - metricOperations.add(metricOperation); - MetricFieldProducer producer = new SilentMetricFieldProducer(metricSubField.name(), metricOperation); + producer.addMetric(metricSubField.name(), metricOperation); fields.put(metricSubField.name(), producer); } - fields.put(field, new GaugeMetricFieldProducer(field, metricOperations)); + fields.put(field, producer); } else { MetricFieldProducer producer = switch (fieldType.getMetricType()) { case gauge -> new GaugeMetricFieldProducer(field); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index e567dc83d6256..b472c3398e20e 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -54,6 +54,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -93,9 +94,6 @@ class RollupShardIndexer { private final Map metricFieldFetchers; private final Map labelFieldFetchers; - private final Map metricFieldProducers; - private final Map labelFieldProducers; - private final AtomicLong numSent = new AtomicLong(); private final AtomicLong numIndexed = new AtomicLong(); private final AtomicLong numFailed = new AtomicLong(); @@ -132,16 +130,8 @@ class RollupShardIndexer { this.timestampField = searchExecutionContext.getFieldType(DataStreamTimestampFieldMapper.DEFAULT_PATH); this.timestampFormat = timestampField.docValueFormat(null, null); this.rounding = config.createRounding(); - this.metricFieldFetchers = FieldValueFetcher.forMetrics(searchExecutionContext, metricFields); this.labelFieldFetchers = FieldValueFetcher.forLabels(searchExecutionContext, labelFields); - this.metricFieldProducers = MetricFieldProducer.buildMetricFieldProducers( - searchExecutionContext, - metricFields, - metricFieldFetchers - ); - this.labelFieldProducers = LabelFieldProducer.buildLabelFieldProducers(searchExecutionContext, labelFields, labelFieldFetchers); - toClose = null; } finally { IOUtils.closeWhileHandlingException(toClose); @@ -323,7 +313,6 @@ public void collect(int docId, long owningBucketOrd) throws IOException { final int docCount = docCountProvider.getDocCount(docId); rollupBucketBuilder.collectDocCount(docCount); - for (Map.Entry e : fieldFetchers) { final String fieldName = e.getKey(); final FormattedDocValues leafField = e.getValue(); @@ -383,9 +372,12 @@ private class RollupBucketBuilder { private BytesRef tsid; private long timestamp; private int docCount; + private final Map metricFieldProducers; + private final Map labelFieldProducers; RollupBucketBuilder() { - + this.metricFieldProducers = MetricFieldProducer.buildMetricFieldProducers(searchExecutionContext, metricFields); + this.labelFieldProducers = LabelFieldProducer.buildLabelFieldProducers(searchExecutionContext, labelFields); } /** @@ -402,8 +394,8 @@ public RollupBucketBuilder resetTsid(BytesRef tsid, long timestamp) { public RollupBucketBuilder resetTimestamp(long timestamp) { this.timestamp = timestamp; this.docCount = 0; - metricFieldProducers.values().forEach(MetricFieldProducer::reset); - labelFieldProducers.values().forEach(LabelFieldProducer::reset); + this.metricFieldProducers.values().forEach(MetricFieldProducer::reset); + this.labelFieldProducers.values().forEach(LabelFieldProducer::reset); if (logger.isTraceEnabled()) { logger.trace( "New bucket for _tsid: [{}], @timestamp: [{}]", @@ -439,13 +431,13 @@ public void collect(final String field, int docValueCount, final Function(metricFieldProducers.values())) { producer.writeTo(builder); } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/LabelFieldProducerTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/LabelFieldProducerTests.java index ee88dc83bbb18..641cbe228e02a 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/LabelFieldProducerTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/LabelFieldProducerTests.java @@ -66,7 +66,7 @@ public void testLabelFieldProducer() { assertTrue(producer.isEmpty()); assertEquals("dummy", producer.name()); assertEquals("last_value", producer.label().name); - producer.collect("aaaa"); + producer.collect("dummy", "aaaa"); assertFalse(producer.isEmpty()); assertEquals("aaaa", producer.value()); producer.reset(); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java index cf4404b99cf1a..eb647dd0e4855 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java @@ -120,11 +120,12 @@ public void testLastValueMetric() { } public void testCounterMetricFieldProducer() { - var producer = new MetricFieldProducer.CounterMetricFieldProducer("field"); + final String field = "field"; + var producer = new MetricFieldProducer.CounterMetricFieldProducer(field); assertTrue(producer.isEmpty()); - producer.collect(55.0); - producer.collect(12.2); - producer.collect(5.5); + producer.collect(field, 55.0); + producer.collect(field, 12.2); + producer.collect(field, 5.5); assertFalse(producer.isEmpty()); Object o = producer.value(); @@ -133,11 +134,12 @@ public void testCounterMetricFieldProducer() { } public void testGaugeMetricFieldProducer() { - MetricFieldProducer producer = new MetricFieldProducer.GaugeMetricFieldProducer("field"); + final String field = "field"; + MetricFieldProducer producer = new MetricFieldProducer.GaugeMetricFieldProducer(field); assertTrue(producer.isEmpty()); - producer.collect(55.0); - producer.collect(12.2); - producer.collect(5.5); + producer.collect(field, 55.0); + producer.collect(field, 12.2); + producer.collect(field, 5.5); //TODO: Fix this // assertFalse(producer.isEmpty()); @@ -149,7 +151,7 @@ public void testGaugeMetricFieldProducer() { // } else { // fail("Value is not a Map"); // } - assertEquals("field", producer.name()); + assertEquals(field, producer.name()); } public void testBuildMetricProducers() { @@ -214,7 +216,7 @@ public MappedFieldType getFieldType(String name) { Map producers = MetricFieldProducer.buildMetricFieldProducers( searchExecutionContext, - new String[] { "gauge_field", "counter_field" }, null + new String[] { "gauge_field", "counter_field" } ); assertTrue(producers.get("gauge_field") instanceof MetricFieldProducer.GaugeMetricFieldProducer); assertTrue(producers.get("counter_field") instanceof MetricFieldProducer.CounterMetricFieldProducer); From cebdc917bc3ff2fc083b36b0627899fe72a589ce Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Tue, 13 Sep 2022 15:33:37 +0300 Subject: [PATCH 06/20] Cleanup --- .../AbstractRollupFieldProducer.java | 2 +- .../xpack/downsample/LabelFieldProducer.java | 2 +- .../xpack/downsample/MetricFieldProducer.java | 6 ++-- .../xpack/downsample/RollupShardIndexer.java | 4 +-- .../downsample/MetricFieldProducerTests.java | 32 +++++++++++-------- 5 files changed, 25 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java index 988a58ed2b7db..08af6d89ba947 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java @@ -47,7 +47,7 @@ public String name() { /** * Serialize the downsampled value of the field. */ - public abstract void writeTo(XContentBuilder builder) throws IOException; + public abstract void write(XContentBuilder builder) throws IOException; /** * @return true if the field has not collected any value. diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java index 90434c42246ff..cdf3cfae59975 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java @@ -118,7 +118,7 @@ public Object value() { } @Override - public void writeTo(XContentBuilder builder) throws IOException { + public void write(XContentBuilder builder) throws IOException { if (isEmpty() == false) { builder.field(name(), value()); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java index 23b8b7ffb6c95..a37d8cf0072ab 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java @@ -238,7 +238,7 @@ public Object value() { } @Override - public void writeTo(XContentBuilder builder) throws IOException { + public void write(XContentBuilder builder) throws IOException { if (isEmpty() == false) { builder.field(name(), value()); } @@ -259,7 +259,7 @@ static class GaugeMetricFieldProducer extends MetricFieldProducer { } @Override - public void writeTo(XContentBuilder builder) throws IOException { + public void write(XContentBuilder builder) throws IOException { if (isEmpty() == false) { builder.startObject(name()); for (MetricFieldProducer.Metric metric : metrics()) { @@ -291,7 +291,7 @@ public void collect(String field, Number value) { } @Override - public void writeTo(XContentBuilder builder) throws IOException { + public void write(XContentBuilder builder) throws IOException { if (isEmpty() == false) { builder.startObject(name()); for (MetricFieldProducer.Metric metric : metrics()) { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index b472c3398e20e..511361489689f 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -470,12 +470,12 @@ public XContentBuilder buildRollupDocument() throws IOException { // Serialize all metric fields for (MetricFieldProducer producer : new HashSet<>(metricFieldProducers.values())) { - producer.writeTo(builder); + producer.write(builder); } // Serialize all label fields for (var producer : labelFieldProducers.values()) { - producer.writeTo(builder); + producer.write(builder); } builder.endObject(); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java index eb647dd0e4855..ff01b619b9820 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java @@ -7,18 +7,20 @@ package org.elasticsearch.xpack.downsample; +import org.elasticsearch.common.Strings; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesParams; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import java.io.IOException; import java.util.Map; import static java.util.Collections.emptyMap; -import static org.elasticsearch.test.MapMatcher.assertMap; -import static org.elasticsearch.test.MapMatcher.matchesMap; public class MetricFieldProducerTests extends AggregatorTestCase { @@ -119,7 +121,7 @@ public void testLastValueMetric() { assertNull(metric.get()); } - public void testCounterMetricFieldProducer() { + public void testCounterMetricFieldProducer() throws IOException { final String field = "field"; var producer = new MetricFieldProducer.CounterMetricFieldProducer(field); assertTrue(producer.isEmpty()); @@ -131,9 +133,14 @@ public void testCounterMetricFieldProducer() { Object o = producer.value(); assertEquals(55.0, o); assertEquals("field", producer.name()); + + XContentBuilder builder = JsonXContent.contentBuilder().startObject(); + producer.write(builder); + builder.endObject(); + assertEquals("{\"field\":55.0}", Strings.toString(builder)); } - public void testGaugeMetricFieldProducer() { + public void testGaugeMetricFieldProducer() throws IOException { final String field = "field"; MetricFieldProducer producer = new MetricFieldProducer.GaugeMetricFieldProducer(field); assertTrue(producer.isEmpty()); @@ -141,16 +148,13 @@ public void testGaugeMetricFieldProducer() { producer.collect(field, 12.2); producer.collect(field, 5.5); - //TODO: Fix this -// assertFalse(producer.isEmpty()); -// Object o = producer.value(); -// if (o instanceof Map) { -// Map m = (Map) o; -// assertMap(m, matchesMap().entry("min", 5.5).entry("max", 55.0).entry("value_count", 3L).entry("sum", 72.7)); -// assertEquals(4, m.size()); -// } else { -// fail("Value is not a Map"); -// } + assertFalse(producer.isEmpty()); + + XContentBuilder builder = JsonXContent.contentBuilder().startObject(); + producer.write(builder); + builder.endObject(); + assertEquals("{\"field\":{\"min\":5.5,\"max\":55.0,\"sum\":72.7,\"value_count\":3}}", Strings.toString(builder)); + assertEquals(field, producer.name()); } From 6c4e297ab51b625dfbee03df081085386f24bcea Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Tue, 13 Sep 2022 19:29:43 +0300 Subject: [PATCH 07/20] Fix gradle dependencies --- x-pack/plugin/rollup/build.gradle | 5 ++--- x-pack/plugin/rollup/qa/rest/build.gradle | 2 +- .../xpack/downsample/DownsampleActionSingleNodeTests.java | 2 -- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/rollup/build.gradle b/x-pack/plugin/rollup/build.gradle index c258602938b1d..7aa7d41be6944 100644 --- a/x-pack/plugin/rollup/build.gradle +++ b/x-pack/plugin/rollup/build.gradle @@ -13,10 +13,9 @@ archivesBaseName = 'x-pack-rollup' dependencies { compileOnly project(":server") compileOnly project(path: xpackModule('core')) - compileOnly project(path: xpackModule('analytics')) - compileOnly project(path: xpackModule('mapper-aggregate-metric')) - compileOnly project(path: xpackModule('ilm')) compileOnly project(':modules:data-streams') + compileOnly project(path: xpackModule('ilm')) + api project(path: xpackModule('mapper-aggregate-metric')) testImplementation(testArtifact(project(xpackModule('core')))) } diff --git a/x-pack/plugin/rollup/qa/rest/build.gradle b/x-pack/plugin/rollup/qa/rest/build.gradle index 95438bc08f1eb..ee634f386f09d 100644 --- a/x-pack/plugin/rollup/qa/rest/build.gradle +++ b/x-pack/plugin/rollup/qa/rest/build.gradle @@ -17,7 +17,7 @@ dependencies { restResources { restApi { - include '_common', 'bulk', 'cluster', 'indices', 'search', 'downsample' + include '_common', 'bulk', 'cluster', 'indices', 'search' } } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index 8541f3e756371..cb08d71800bc9 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -69,7 +69,6 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; -import org.elasticsearch.xpack.analytics.AnalyticsPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.downsample.DownsampleAction; import org.elasticsearch.xpack.core.downsample.DownsampleConfig; @@ -140,7 +139,6 @@ protected Collection> getPlugins() { return List.of( LocalStateCompositeXPackPlugin.class, Rollup.class, - AnalyticsPlugin.class, AggregateMetricMapperPlugin.class, DataStreamsPlugin.class, IndexLifecycle.class From 07d02d257a7958d342ea7b0955fc2f59d8bdc55c Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Tue, 13 Sep 2022 22:04:11 +0300 Subject: [PATCH 08/20] Added testRollupOfRollups test --- .../xpack/downsample/MetricFieldProducer.java | 1 - .../DownsampleActionSingleNodeTests.java | 43 +++++++++++++++++-- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java index a37d8cf0072ab..03e02cfbaec63 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java @@ -335,7 +335,6 @@ static Map buildMetricFieldProducers(SearchExecutio producer.addMetric(metricSubField.name(), metricOperation); fields.put(metricSubField.name(), producer); } - fields.put(field, producer); } else { MetricFieldProducer producer = switch (fieldType.getMetricType()) { case gauge -> new GaugeMetricFieldProducer(field); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index cb08d71800bc9..a437364df0713 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -285,10 +285,10 @@ public void testRollupIndex() throws IOException { .field(FIELD_NUMERIC_1, randomInt()) .field(FIELD_NUMERIC_2, DATE_FORMATTER.parseMillis(ts)) .startObject(FIELD_AGG_METRIC) - .field("min", 1) - .field("max", 8) - .field("sum", 120) - .field("value_count", 40) + .field("min", randomDoubleBetween(-1000, 1000, true)) + .field("max", randomDoubleBetween(-1000, 1000, true)) + .field("sum", randomIntBetween(100, 10000)) + .field("value_count", randomIntBetween(100, 1000)) .endObject() .field(FIELD_LABEL_DOUBLE, labelDoubleValue) .field(FIELD_METRIC_LABEL_DOUBLE, labelDoubleValue) @@ -310,6 +310,41 @@ public void testRollupIndex() throws IOException { assertRollupIndex(sourceIndex, rollupIndex, config); } + public void testRollupOfRollups() throws IOException { + int intervalMinutes = randomIntBetween(10, 120); + DownsampleConfig config = new DownsampleConfig(DateHistogramInterval.minutes(intervalMinutes)); + SourceSupplier sourceSupplier = () -> { + String ts = randomDateForInterval(config.getInterval()); + double labelDoubleValue = DATE_FORMATTER.parseMillis(ts); + + return XContentFactory.jsonBuilder() + .startObject() + .field(FIELD_TIMESTAMP, ts) + .field(FIELD_DIMENSION_1, randomFrom(dimensionValues)) + .field(FIELD_NUMERIC_1, randomInt()) + .field(FIELD_NUMERIC_2, DATE_FORMATTER.parseMillis(ts)) + .startObject(FIELD_AGG_METRIC) + .field("min", randomDoubleBetween(-1000, 1000, true)) + .field("max", randomDoubleBetween(-1000, 1000, true)) + .field("sum", randomIntBetween(100, 10000)) + .field("value_count", randomIntBetween(100, 1000)) + .endObject() + .field(FIELD_LABEL_DOUBLE, labelDoubleValue) + .field(FIELD_METRIC_LABEL_DOUBLE, labelDoubleValue) + .endObject(); + }; + bulkIndex(sourceSupplier); + prepareSourceIndex(sourceIndex); + rollup(sourceIndex, rollupIndex, config); + assertRollupIndex(sourceIndex, rollupIndex, config); + + // Downsample the rollupIndex + String rollupIndex2 = rollupIndex + "-2"; + DownsampleConfig config2 = new DownsampleConfig(DateHistogramInterval.minutes(intervalMinutes * randomIntBetween(2, 50))); + rollup(rollupIndex, rollupIndex2, config2); + assertRollupIndex(sourceIndex, rollupIndex2, config2); + } + private Date randomDate() { int randomYear = randomIntBetween(1970, 2020); int randomMonth = randomIntBetween(1, 12); From fde224f518053a1f1bb8b1183283f9f17b5468b4 Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Tue, 13 Sep 2022 22:54:55 +0300 Subject: [PATCH 09/20] Format test --- .../DownsampleActionSingleNodeTests.java | 93 +++++++------------ 1 file changed, 31 insertions(+), 62 deletions(-) diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index a437364df0713..7dd26f9f6ba9f 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -182,78 +182,45 @@ public void setup() throws IOException { settings.put(IndexMetadata.SETTING_INDEX_HIDDEN, randomBoolean()); } - XContentBuilder mapping = jsonBuilder().startObject() - .startObject("_doc") - .startObject("properties") - .startObject(FIELD_TIMESTAMP) - .field("type", "date") - .endObject() - .startObject(FIELD_DIMENSION_1) - .field("type", "keyword") - .field("time_series_dimension", true) - .endObject() - .startObject(FIELD_DIMENSION_2) - .field("type", "long") - .field("time_series_dimension", true) - .endObject() - .startObject(FIELD_NUMERIC_1) - .field("type", "long") - .field("time_series_metric", "gauge") - .endObject() - .startObject(FIELD_NUMERIC_2) - .field("type", "double") - .field("time_series_metric", "counter") - .endObject() - .startObject(FIELD_AGG_METRIC) + XContentBuilder mapping = jsonBuilder().startObject().startObject("_doc").startObject("properties"); + mapping.startObject(FIELD_TIMESTAMP).field("type", "date").endObject(); + + // Dimensions + mapping.startObject(FIELD_DIMENSION_1).field("type", "keyword").field("time_series_dimension", true).endObject(); + mapping.startObject(FIELD_DIMENSION_2).field("type", "long").field("time_series_dimension", true).endObject(); + + // Metrics + mapping.startObject(FIELD_NUMERIC_1).field("type", "long").field("time_series_metric", "gauge").endObject(); + mapping.startObject(FIELD_NUMERIC_2).field("type", "double").field("time_series_metric", "counter").endObject(); + mapping.startObject(FIELD_AGG_METRIC) .field("type", "aggregate_metric_double") .field("time_series_metric", "gauge") .array("metrics", new String[] { "min", "max", "sum", "value_count" }) .field("default_metric", "value_count") - .endObject() - .startObject(FIELD_LABEL_DOUBLE) - .field("type", "double") - .endObject() - .startObject(FIELD_LABEL_INTEGER) - .field("type", "integer") - .endObject() - .startObject(FIELD_LABEL_KEYWORD) - .field("type", "keyword") - .endObject() - .startObject(FIELD_LABEL_TEXT) - .field("type", "text") - .endObject() - .startObject(FIELD_LABEL_BOOLEAN) - .field("type", "boolean") - .endObject() - .startObject(FIELD_METRIC_LABEL_DOUBLE) + .endObject(); + mapping.startObject(FIELD_METRIC_LABEL_DOUBLE) .field("type", "double") /* numeric label indexed as a metric */ .field("time_series_metric", "counter") - .endObject() - .startObject(FIELD_LABEL_IPv4_ADDRESS) - .field("type", "ip") - .endObject() - .startObject(FIELD_LABEL_IPv6_ADDRESS) - .field("type", "ip") - .endObject() - .startObject(FIELD_LABEL_DATE) - .field("type", "date") - .field("format", "date_optional_time") - .endObject() - .startObject(FIELD_LABEL_KEYWORD_ARRAY) - .field("type", "keyword") - .endObject() - .startObject(FIELD_LABEL_DOUBLE_ARRAY) - .field("type", "double") - .endObject() - .startObject(FIELD_LABEL_AGG_METRIC) + .endObject(); + + // Labels + mapping.startObject(FIELD_LABEL_DOUBLE).field("type", "double").endObject(); + mapping.startObject(FIELD_LABEL_INTEGER).field("type", "integer").endObject(); + mapping.startObject(FIELD_LABEL_KEYWORD).field("type", "keyword").endObject(); + mapping.startObject(FIELD_LABEL_TEXT).field("type", "text").endObject(); + mapping.startObject(FIELD_LABEL_BOOLEAN).field("type", "boolean").endObject(); + mapping.startObject(FIELD_LABEL_IPv4_ADDRESS).field("type", "ip").endObject(); + mapping.startObject(FIELD_LABEL_IPv6_ADDRESS).field("type", "ip").endObject(); + mapping.startObject(FIELD_LABEL_DATE).field("type", "date").field("format", "date_optional_time").endObject(); + mapping.startObject(FIELD_LABEL_KEYWORD_ARRAY).field("type", "keyword").endObject(); + mapping.startObject(FIELD_LABEL_DOUBLE_ARRAY).field("type", "double").endObject(); + mapping.startObject(FIELD_LABEL_AGG_METRIC) .field("type", "aggregate_metric_double") .array("metrics", new String[] { "min", "max", "sum", "value_count" }) .field("default_metric", "value_count") - .endObject() - .endObject() - .endObject() .endObject(); + mapping.endObject().endObject().endObject(); assertAcked(client().admin().indices().prepareCreate(sourceIndex).setSettings(settings.build()).setMapping(mapping).get()); } @@ -334,11 +301,13 @@ public void testRollupOfRollups() throws IOException { .endObject(); }; bulkIndex(sourceSupplier); + + // Downsample the source indexe prepareSourceIndex(sourceIndex); rollup(sourceIndex, rollupIndex, config); assertRollupIndex(sourceIndex, rollupIndex, config); - // Downsample the rollupIndex + // Downsample the rollupIndex. The downsampling interval is a multiple of the previous downsampling interval. String rollupIndex2 = rollupIndex + "-2"; DownsampleConfig config2 = new DownsampleConfig(DateHistogramInterval.minutes(intervalMinutes * randomIntBetween(2, 50))); rollup(rollupIndex, rollupIndex2, config2); From 32927a319f1e1bb494921812069d5ed93d538fca Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Tue, 13 Sep 2022 23:02:11 +0300 Subject: [PATCH 10/20] javadoc --- .../xpack/downsample/AbstractRollupFieldProducer.java | 2 +- .../elasticsearch/xpack/downsample/LabelFieldProducer.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java index 08af6d89ba947..44bf433ada13c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/AbstractRollupFieldProducer.java @@ -40,7 +40,7 @@ public String name() { } /** - * Resets the collected value to the specific subclass reset value. + * Resets the producer to an empty value. */ public abstract void reset(); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java index cdf3cfae59975..d29d85c9cb78a 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java @@ -57,8 +57,8 @@ abstract static class Label { final String name; /** - * Abstract class that defines how a label is computed. - * @param name + * Abstract class that defines how a label is downsampled. + * @param name the name of the field as it will be stored in the downsampled document */ protected Label(String name) { this.name = name; From bf4ded9390645648ac64fac0dd211daea370a574 Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Thu, 15 Sep 2022 15:26:12 +0300 Subject: [PATCH 11/20] Create fetcher only if mapped field exists in the index. --- .../xpack/downsample/FieldValueFetcher.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java index a67456aac63d6..0c1651bc2c7ae 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java @@ -78,12 +78,16 @@ private static Map build(SearchExecutionContext conte if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields().values()) { - IndexFieldData fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH); - fetchers.put(metricSubField.name(), new FieldValueFetcher(metricSubField.name(), fieldType, fieldData)); + if (context.fieldExistsInIndex(metricSubField.name())) { + IndexFieldData fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH); + fetchers.put(metricSubField.name(), new FieldValueFetcher(metricSubField.name(), fieldType, fieldData)); + } } } else { - IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); - fetchers.put(field, new FieldValueFetcher(field, fieldType, fieldData)); + if (context.fieldExistsInIndex(field)) { + IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); + fetchers.put(field, new FieldValueFetcher(field, fieldType, fieldData)); + } } return Collections.unmodifiableMap(fetchers); } From fe63511e66697a2d8e6d28ee72d44c35796bc435 Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Thu, 15 Sep 2022 15:47:27 +0300 Subject: [PATCH 12/20] Removed fuction that was not used --- .../xpack/downsample/FieldValueFetcher.java | 55 ++++++------------- .../xpack/downsample/LabelFieldProducer.java | 4 +- .../xpack/downsample/MetricFieldProducer.java | 8 +-- .../xpack/downsample/RollupShardIndexer.java | 9 ++- .../downsample/MetricFieldProducerTests.java | 2 +- 5 files changed, 29 insertions(+), 49 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java index 0c1651bc2c7ae..d56ad4c14ea04 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.downsample; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.mapper.MappedFieldType; @@ -57,48 +56,30 @@ FormattedDocValues getLeaf(LeafReaderContext context) { return fieldData.load(context).getFormattedValues(format); } - Object format(Object value) { - if (value instanceof Long l) { - return format.format(l); - } else if (value instanceof Double d) { - return format.format(d); - } else if (value instanceof BytesRef b) { - return format.format(b); - } else if (value instanceof String s) { - return s; - } else { - throw new IllegalArgumentException("Invalid type: [" + value.getClass() + "]"); - } - } - - private static Map build(SearchExecutionContext context, String field) { - MappedFieldType fieldType = context.getFieldType(field); - assert fieldType != null : "Unknown field type for field: [" + field + "]"; - Map fetchers = new HashMap<>(); - - if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { - for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields().values()) { - if (context.fieldExistsInIndex(metricSubField.name())) { - IndexFieldData fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH); - fetchers.put(metricSubField.name(), new FieldValueFetcher(metricSubField.name(), fieldType, fieldData)); - } - } - } else { - if (context.fieldExistsInIndex(field)) { - IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); - fetchers.put(field, new FieldValueFetcher(field, fieldType, fieldData)); - } - } - return Collections.unmodifiableMap(fetchers); - } - /** * Retrieve field fetchers for a list of fields. */ private static Map build(SearchExecutionContext context, String[] fields) { Map fetchers = new HashMap<>(); for (String field : fields) { - fetchers.putAll(build(context, field)); + MappedFieldType fieldType = context.getFieldType(field); + assert fieldType != null : "Unknown field type for field: [" + field + "]"; + + if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { + // If the field is an aggregate_metric_double field, we should load all its subfields + // This is a rollup-of-rollup case + for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields().values()) { + if (context.fieldExistsInIndex(metricSubField.name())) { + IndexFieldData fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH); + fetchers.put(metricSubField.name(), new FieldValueFetcher(metricSubField.name(), fieldType, fieldData)); + } + } + } else { + if (context.fieldExistsInIndex(field)) { + IndexFieldData fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH); + fetchers.put(field, new FieldValueFetcher(field, fieldType, fieldData)); + } + } } return Collections.unmodifiableMap(fetchers); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java index d29d85c9cb78a..20d24b21e7fb9 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java @@ -126,9 +126,9 @@ public void write(XContentBuilder builder) throws IOException { } /** - * Produce a collection of label field producers. + * Create a collection of label field producers. */ - static Map buildLabelFieldProducers(SearchExecutionContext context, String[] labelFields) { + static Map createLabelFieldProducers(SearchExecutionContext context, String[] labelFields) { final Map fields = new LinkedHashMap<>(); for (String field : labelFields) { LabelFieldProducer producer = new LabelLastValueFieldProducer(field); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java index 03e02cfbaec63..1c903bed6e504 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/MetricFieldProducer.java @@ -310,10 +310,10 @@ public Collection metrics() { } /** - * Produce a collection of metric field producers based on the metric_type mapping parameter in the field + * Create a collection of metric field producers based on the metric_type mapping parameter in the field * mapping. */ - static Map buildMetricFieldProducers(SearchExecutionContext context, String[] metricFields) { + static Map createMetricFieldProducers(SearchExecutionContext context, String[] metricFields) { final Map fields = new LinkedHashMap<>(); for (String field : metricFields) { MappedFieldType fieldType = context.getFieldType(field); @@ -321,8 +321,9 @@ static Map buildMetricFieldProducers(SearchExecutio assert fieldType.getMetricType() != null : "Unknown metric type for metric field: [" + field + "]"; if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { + // If the field is an aggregate_metric_double field, we should use the correct subfields + // for each aggregation. This is a rollup-of-rollup case AggregateMetricFieldProducer producer = new AggregateMetricFieldProducer(field); - for (var e : aggMetricFieldType.getMetricFields().entrySet()) { AggregateDoubleMetricFieldMapper.Metric metric = e.getKey(); NumberFieldMapper.NumberFieldType metricSubField = e.getValue(); @@ -339,7 +340,6 @@ static Map buildMetricFieldProducers(SearchExecutio MetricFieldProducer producer = switch (fieldType.getMetricType()) { case gauge -> new GaugeMetricFieldProducer(field); case counter -> new CounterMetricFieldProducer(field); - default -> throw new IllegalArgumentException("Unsupported metric type [" + fieldType.getMetricType() + "]"); }; fields.put(field, producer); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index 5856e148fa2b3..ff4b5a6dc7824 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -327,7 +327,6 @@ public void collect(int docId, long owningBucketOrd) throws IOException { } catch (IOException ex) { throw new ElasticsearchException("Failed to read values for field [" + fieldName + "]"); } - } return values; }); @@ -378,8 +377,8 @@ private class RollupBucketBuilder { private final Map labelFieldProducers; RollupBucketBuilder() { - this.metricFieldProducers = MetricFieldProducer.buildMetricFieldProducers(searchExecutionContext, metricFields); - this.labelFieldProducers = LabelFieldProducer.buildLabelFieldProducers(searchExecutionContext, labelFields); + this.metricFieldProducers = MetricFieldProducer.createMetricFieldProducers(searchExecutionContext, metricFields); + this.labelFieldProducers = LabelFieldProducer.createLabelFieldProducers(searchExecutionContext, labelFields); } /** @@ -472,12 +471,12 @@ public XContentBuilder buildRollupDocument() throws IOException { } // Serialize all metric fields - for (MetricFieldProducer producer : new HashSet<>(metricFieldProducers.values())) { + for (var producer : new HashSet<>(metricFieldProducers.values())) { producer.write(builder); } // Serialize all label fields - for (var producer : labelFieldProducers.values()) { + for (var producer : new HashSet<>(labelFieldProducers.values())) { producer.write(builder); } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java index ff01b619b9820..0c93f010ec99e 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/MetricFieldProducerTests.java @@ -218,7 +218,7 @@ public MappedFieldType getFieldType(String name) { } }; - Map producers = MetricFieldProducer.buildMetricFieldProducers( + Map producers = MetricFieldProducer.createMetricFieldProducers( searchExecutionContext, new String[] { "gauge_field", "counter_field" } ); From ee82647fd645adf3d6419973883b8d3eb8253b78 Mon Sep 17 00:00:00 2001 From: Christos Soulios Date: Thu, 15 Sep 2022 21:41:58 +0300 Subject: [PATCH 13/20] Support aggregate_metric_double label fields --- .../xpack/downsample/LabelFieldProducer.java | 78 ++++++++++++++++++- .../xpack/downsample/MetricFieldProducer.java | 11 ++- .../DownsampleActionSingleNodeTests.java | 26 ++++++- 3 files changed, 108 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java index 20d24b21e7fb9..cfaeeae7ca9af 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/LabelFieldProducer.java @@ -7,10 +7,14 @@ package org.elasticsearch.xpack.downsample; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -64,6 +68,10 @@ protected Label(String name) { this.name = name; } + public String name() { + return name; + } + abstract void collect(Object value); abstract Object get(); @@ -81,6 +89,10 @@ protected Label(String name) { static class LastValueLabel extends Label { private Object lastValue; + LastValueLabel(String name) { + super(name); + } + LastValueLabel() { super("last_value"); } @@ -125,14 +137,76 @@ public void write(XContentBuilder builder) throws IOException { } } + static class AggregateMetricFieldProducer extends LabelFieldProducer { + + private Map labelsByField = new LinkedHashMap<>(); + + AggregateMetricFieldProducer(String name) { + super(name, null); + } + + public void addLabel(String field, Label label) { + labelsByField.put(field, label); + } + + @Override + public void collect(String field, Object value) { + labelsByField.get(field).collect(value); + isEmpty = false; + } + + @Override + public void write(XContentBuilder builder) throws IOException { + if (isEmpty() == false) { + builder.startObject(name()); + for (Label label : labels()) { + if (label.get() != null) { + builder.field(label.name(), label.get()); + } + } + builder.endObject(); + } + } + + public Collection