From 5204c7706a2e9d32caf88a6bef73d7b62a3701e3 Mon Sep 17 00:00:00 2001 From: Oleksandr Kolomiiets Date: Thu, 13 Jun 2024 11:29:14 -0700 Subject: [PATCH] Simplify ignore_malformed handling for synthetic souce in aggregate_metric_double --- .../common/xcontent/XContentHelper.java | 20 -- .../mapper/CompositeSyntheticFieldLoader.java | 174 ++++++++++++++++++ .../xcontent/support/XContentHelperTests.java | 22 --- .../AggregateDoubleMetricFieldMapper.java | 72 ++++---- ...AggregateDoubleMetricFieldMapperTests.java | 40 ++++ .../100_synthetic_source.yml | 25 ++- 6 files changed, 272 insertions(+), 81 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/mapper/CompositeSyntheticFieldLoader.java diff --git a/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java b/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java index 28df2fad32cbb..9998cb55064e3 100644 --- a/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java +++ b/server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java @@ -748,24 +748,4 @@ public static XContentParser mapToXContentParser(XContentParserConfiguration con throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e); } } - - /** - * Drains all data available via this parser into a provided builder. - * Provided parser is closed as a result. - * @param parser - * @param destination - */ - public static void drainAndClose(XContentParser parser, XContentBuilder destination) throws IOException { - if (parser.isClosed()) { - throw new IllegalStateException("Can't drain a parser that is closed"); - } - - XContentParser.Token token; - do { - destination.copyCurrentStructure(parser); - token = parser.nextToken(); - } while (token != null); - - parser.close(); - } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/CompositeSyntheticFieldLoader.java b/server/src/main/java/org/elasticsearch/index/mapper/CompositeSyntheticFieldLoader.java new file mode 100644 index 0000000000000..e762f46f17e99 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/mapper/CompositeSyntheticFieldLoader.java @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.mapper; + +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static java.util.Collections.emptyList; + +/** + * A {@link SourceLoader.SyntheticFieldLoader} that uses a set of sub-loaders + * to produce synthetic source for the field. + * Typical use case is to gather field values from doc_values and append malformed values + * stored in a different field in case of ignore_malformed being enabled. + */ +public class CompositeSyntheticFieldLoader implements SourceLoader.SyntheticFieldLoader { + private final String fieldName; + private final String fullFieldName; + private final SyntheticFieldLoaderLayer[] parts; + private boolean hasValue; + + public CompositeSyntheticFieldLoader(String fieldName, String fullFieldName, SyntheticFieldLoaderLayer... parts) { + this.fieldName = fieldName; + this.fullFieldName = fullFieldName; + this.parts = parts; + this.hasValue = false; + } + + @Override + public Stream> storedFieldLoaders() { + return Arrays.stream(parts).flatMap(SyntheticFieldLoaderLayer::storedFieldLoaders).map(e -> Map.entry(e.getKey(), values -> { + hasValue = true; + e.getValue().load(values); + })); + } + + @Override + public DocValuesLoader docValuesLoader(LeafReader leafReader, int[] docIdsInLeaf) throws IOException { + var loaders = new ArrayList(parts.length); + for (var part : parts) { + var partLoader = part.docValuesLoader(leafReader, docIdsInLeaf); + if (partLoader != null) { + loaders.add(partLoader); + } + } + + if (loaders.isEmpty()) { + return null; + } + + return docId -> { + boolean hasDocs = false; + for (var loader : loaders) { + hasDocs |= loader.advanceToDoc(docId); + } + + this.hasValue |= hasDocs; + return hasDocs; + }; + } + + @Override + public boolean hasValue() { + return hasValue; + } + + @Override + public void write(XContentBuilder b) throws IOException { + var totalCount = Arrays.stream(parts).mapToLong(SyntheticFieldLoaderLayer::valueCount).sum(); + + if (totalCount == 0) { + return; + } + + if (totalCount == 1) { + b.field(fieldName); + for (var part : parts) { + part.write(b); + } + return; + } + + b.startArray(fieldName); + for (var part : parts) { + part.write(b); + } + b.endArray(); + } + + @Override + public String fieldName() { + return this.fullFieldName; + } + + /** + * Represents one layer of loading synthetic source values for a field + * as a part of {@link CompositeSyntheticFieldLoader}. + *
+ * Note that the contract of {@link SourceLoader.SyntheticFieldLoader#write(XContentBuilder)} + * is slightly different here since it only needs to write field values without encompassing object or array. + */ + public interface SyntheticFieldLoaderLayer extends SourceLoader.SyntheticFieldLoader { + /** + * Number of values that this loader will write. + * @return + */ + long valueCount(); + } + + /** + * Layer that loads malformed values stored in a dedicated field with a conventional name. + * @see IgnoreMalformedStoredValues + */ + public static class MalformedValuesLayer implements SyntheticFieldLoaderLayer { + private final String fieldName; + private List values; + + public MalformedValuesLayer(String fieldName) { + this.fieldName = fieldName + "._ignore_malformed"; + this.values = emptyList(); + } + + @Override + public long valueCount() { + return values.size(); + } + + @Override + public Stream> storedFieldLoaders() { + return Stream.of(Map.entry(fieldName, values -> this.values = values)); + } + + @Override + public DocValuesLoader docValuesLoader(LeafReader leafReader, int[] docIdsInLeaf) throws IOException { + return null; + } + + @Override + public boolean hasValue() { + return values.isEmpty() == false; + } + + @Override + public void write(XContentBuilder b) throws IOException { + for (Object v : values) { + if (v instanceof BytesRef r) { + XContentDataHelper.decodeAndWrite(b, r); + } else { + b.value(v); + } + } + values = emptyList(); + } + + @Override + public String fieldName() { + return fieldName; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/common/xcontent/support/XContentHelperTests.java b/server/src/test/java/org/elasticsearch/common/xcontent/support/XContentHelperTests.java index a3e11c0645e32..5b50eb63e1489 100644 --- a/server/src/test/java/org/elasticsearch/common/xcontent/support/XContentHelperTests.java +++ b/server/src/test/java/org/elasticsearch/common/xcontent/support/XContentHelperTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.common.xcontent.support; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressedXContent; @@ -421,25 +420,4 @@ public void testParseToType() throws IOException { assertThat(names, equalTo(Set.of("a", "c"))); } - - public void testDrainAndClose() throws IOException { - String json = """ - { "a": "b", "c": "d", "e": {"f": "g"}, "h": ["i", "j", {"k": "l"}]}"""; - var parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json); - var content = XContentBuilder.builder(XContentType.JSON.xContent()); - XContentHelper.drainAndClose(parser, content); - - assertEquals(json.replace(" ", ""), Strings.toString(content)); - assertTrue(parser.isClosed()); - } - - public void testDrainAndCloseAlreadyClosed() throws IOException { - var parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, "{}"); - parser.close(); - - assertThrows( - IllegalStateException.class, - () -> XContentHelper.drainAndClose(parser, XContentBuilder.builder(XContentType.JSON.xContent())) - ); - } } diff --git a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java index 81abe3dc5c088..33efabf101be7 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java @@ -19,7 +19,6 @@ import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.time.DateMathParser; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.fielddata.FieldDataContext; @@ -28,9 +27,10 @@ import org.elasticsearch.index.fielddata.ScriptDocValues.DoublesSupplier; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; +import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader; import org.elasticsearch.index.mapper.DocumentParserContext; import org.elasticsearch.index.mapper.FieldMapper; -import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper; +import org.elasticsearch.index.mapper.IgnoreMalformedStoredValues; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperBuilderContext; @@ -43,7 +43,6 @@ import org.elasticsearch.index.mapper.TimeSeriesParams; import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType; import org.elasticsearch.index.mapper.ValueFetcher; -import org.elasticsearch.index.mapper.XContentDataHelper; import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.script.ScriptCompiler; @@ -53,6 +52,7 @@ import org.elasticsearch.search.MultiValueMode; import org.elasticsearch.search.sort.BucketedSort; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.xcontent.CopyingXContentParser; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentSubParser; @@ -592,9 +592,7 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio EnumMap metricsParsed = new EnumMap<>(Metric.class); // Preserves the content of the field in order to be able to construct synthetic source // if field value is malformed. - XContentBuilder malformedContentForSyntheticSource = context.mappingLookup().isSourceSynthetic() && ignoreMalformed - ? XContentBuilder.builder(context.parser().contentType().xContent()) - : null; + XContentBuilder malformedDataForSyntheticSource = null; try { token = context.parser().currentToken(); @@ -603,11 +601,14 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio return; } ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser()); - subParser = new XContentSubParser(context.parser()); - token = subParser.nextToken(); - if (malformedContentForSyntheticSource != null) { - malformedContentForSyntheticSource.startObject(); + if (context.mappingLookup().isSourceSynthetic() && ignoreMalformed) { + var copyingParser = new CopyingXContentParser(context.parser()); + malformedDataForSyntheticSource = copyingParser.getBuilder(); + subParser = new XContentSubParser(copyingParser); + } else { + subParser = new XContentSubParser(context.parser()); } + token = subParser.nextToken(); while (token != XContentParser.Token.END_OBJECT) { // should be an object sub-field with name a metric name ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser); @@ -621,9 +622,6 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio } token = subParser.nextToken(); - if (malformedContentForSyntheticSource != null) { - malformedContentForSyntheticSource.field(fieldName); - } // Make sure that the value is a number. Probably this will change when // new aggregate metric types are added (histogram, cardinality etc) ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser); @@ -632,9 +630,6 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio try { Number metricValue = delegateFieldMapper.value(context.parser()); metricsParsed.put(metric, metricValue); - if (malformedContentForSyntheticSource != null) { - malformedContentForSyntheticSource.value(metricValue); - } } catch (IllegalArgumentException e) { throw new IllegalArgumentException("failed to parse [" + metric.name() + "] sub field: " + e.getMessage(), e); } @@ -677,24 +672,20 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio } } catch (Exception e) { if (ignoreMalformed) { - if (malformedContentForSyntheticSource != null) { - if (subParser != null) { - // Remaining data in parser needs to be stored as is in order to provide it in synthetic source. - XContentHelper.drainAndClose(subParser, malformedContentForSyntheticSource); - } else { - // We don't use DrainingXContentParser since we don't want to go beyond current field - malformedContentForSyntheticSource.copyCurrentStructure(context.parser()); - } - ; - var nameValue = IgnoredSourceFieldMapper.NameValue.fromContext( - context, - name(), - XContentDataHelper.encodeXContentBuilder(malformedContentForSyntheticSource) - ); - context.addIgnoredField(nameValue); - } else if (subParser != null) { + if (subParser != null) { // close the subParser, so we advance to the end of the object subParser.close(); + } else { + if (context.mappingLookup().isSourceSynthetic()) { + // There is a malformed value, but it is not an object (since subParser is null). + // So we just need to copy this single value. + malformedDataForSyntheticSource = XContentBuilder.builder(context.parser().contentType().xContent()) + .copyCurrentStructure(context.parser()); + } + } + + if (malformedDataForSyntheticSource != null) { + context.doc().add(IgnoreMalformedStoredValues.storedField(name(), malformedDataForSyntheticSource)); } context.addIgnoredField(name()); @@ -724,11 +715,15 @@ protected SyntheticSourceMode syntheticSourceMode() { @Override public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() { - // Note that malformed values are handled via `IgnoredSourceFieldMapper` infrastructure - return new AggregateMetricSyntheticFieldLoader(name(), simpleName(), metrics); + return new CompositeSyntheticFieldLoader( + simpleName(), + name(), + new AggregateMetricSyntheticFieldLoader(name(), simpleName(), metrics), + new CompositeSyntheticFieldLoader.MalformedValuesLayer(name()) + ); } - public static class AggregateMetricSyntheticFieldLoader implements SourceLoader.SyntheticFieldLoader { + public static class AggregateMetricSyntheticFieldLoader implements CompositeSyntheticFieldLoader.SyntheticFieldLoaderLayer { private final String name; private final String simpleName; private final EnumSet metrics; @@ -746,6 +741,11 @@ public String fieldName() { return name; } + @Override + public long valueCount() { + return hasValue() ? 1 : 0; + } + @Override public Stream> storedFieldLoaders() { return Stream.of(); @@ -779,7 +779,7 @@ public void write(XContentBuilder b) throws IOException { if (metricHasValue.isEmpty()) { return; } - b.startObject(simpleName); + b.startObject(); for (Map.Entry entry : metricDocValues.entrySet()) { if (metricHasValue.contains(entry.getKey())) { String metricName = entry.getKey().name(); diff --git a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java index f46508093c4ec..5fbc25eb037a7 100644 --- a/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java +++ b/x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java @@ -8,6 +8,8 @@ import org.apache.lucene.search.FieldExistsQuery; import org.apache.lucene.search.Query; +import org.elasticsearch.common.Strings; +import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentParsingException; import org.elasticsearch.index.mapper.LuceneDocument; @@ -20,6 +22,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric; import org.hamcrest.Matchers; @@ -523,6 +526,43 @@ protected IngestScriptSupport ingestScriptSupport() { throw new AssumptionViolatedException("not supported"); } + public void testArrayValueSyntheticSource() throws Exception { + DocumentMapper mapper = createDocumentMapper( + syntheticSourceFieldMapping( + b -> b.field("type", CONTENT_TYPE) + .array("metrics", "min", "max") + .field("default_metric", "min") + .field("ignore_malformed", "true") + ) + ); + + var randomString = randomAlphaOfLength(10); + CheckedConsumer arrayValue = b -> { + b.startArray("field"); + { + b.startObject().field("max", 100).field("min", 10).endObject(); + b.startObject().field("max", 200).field("min", 20).endObject(); + b.value(randomString); + } + b.endArray(); + }; + + var expected = JsonXContent.contentBuilder().startObject(); + // First value comes from synthetic field loader and so is formatted in a specific format (e.g. min always come first). + // Other values are stored as is as part of ignore_malformed logic for synthetic source. + { + expected.startArray("field"); + expected.startObject().field("min", 10.0).field("max", 100.0).endObject(); + expected.startObject().field("max", 200).field("min", 20).endObject(); + expected.value(randomString); + expected.endArray(); + } + expected.endObject(); + + var syntheticSource = syntheticSource(mapper, arrayValue); + assertEquals(Strings.toString(expected), syntheticSource); + } + protected final class AggregateDoubleMetricSyntheticSourceSupport implements SyntheticSourceSupport { private final boolean malformedExample; private final EnumSet storedMetrics; diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/aggregate-metrics/100_synthetic_source.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/aggregate-metrics/100_synthetic_source.yml index b846dbe858f61..cc0e8aff9b239 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/aggregate-metrics/100_synthetic_source.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/aggregate-metrics/100_synthetic_source.yml @@ -76,7 +76,6 @@ aggregate_metric_double with ignore_malformed: index: index: test id: "1" - refresh: true body: metric: min: 18.2 @@ -88,11 +87,22 @@ aggregate_metric_double with ignore_malformed: value_count: 50 - do: - search: + index: + index: test + id: "2" + body: + metric: ["hey", {"value_count": 1, "min": 18.2,"max": 100}, [123, 456]] + + - do: + indices.refresh: {} + + - do: + get: index: test + id: "1" - match: - hits.hits.0._source: + _source: metric: min: 18.2 max: 100 @@ -102,3 +112,12 @@ aggregate_metric_double with ignore_malformed: field: "field" value_count: 50 + - do: + get: + index: test + id: "2" + + - match: + _source: + metric: [{"min": 18.2,"max": 100.0, "value_count": 1}, "hey", 123, 456] +