Skip to content

Commit e41a10e

Browse files
authored
Simplify ignore_malformed handling for synthetic souce in aggregate_metric_double (#109888)
1 parent 3faf4ce commit e41a10e

File tree

5 files changed

+98
-81
lines changed

5 files changed

+98
-81
lines changed

server/src/main/java/org/elasticsearch/common/xcontent/XContentHelper.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -748,24 +748,4 @@ public static XContentParser mapToXContentParser(XContentParserConfiguration con
748748
throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
749749
}
750750
}
751-
752-
/**
753-
* Drains all data available via this parser into a provided builder.
754-
* Provided parser is closed as a result.
755-
* @param parser
756-
* @param destination
757-
*/
758-
public static void drainAndClose(XContentParser parser, XContentBuilder destination) throws IOException {
759-
if (parser.isClosed()) {
760-
throw new IllegalStateException("Can't drain a parser that is closed");
761-
}
762-
763-
XContentParser.Token token;
764-
do {
765-
destination.copyCurrentStructure(parser);
766-
token = parser.nextToken();
767-
} while (token != null);
768-
769-
parser.close();
770-
}
771751
}

server/src/test/java/org/elasticsearch/common/xcontent/support/XContentHelperTests.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.elasticsearch.common.xcontent.support;
1010

11-
import org.elasticsearch.common.Strings;
1211
import org.elasticsearch.common.bytes.BytesArray;
1312
import org.elasticsearch.common.bytes.BytesReference;
1413
import org.elasticsearch.common.compress.CompressedXContent;
@@ -421,25 +420,4 @@ public void testParseToType() throws IOException {
421420

422421
assertThat(names, equalTo(Set.of("a", "c")));
423422
}
424-
425-
public void testDrainAndClose() throws IOException {
426-
String json = """
427-
{ "a": "b", "c": "d", "e": {"f": "g"}, "h": ["i", "j", {"k": "l"}]}""";
428-
var parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json);
429-
var content = XContentBuilder.builder(XContentType.JSON.xContent());
430-
XContentHelper.drainAndClose(parser, content);
431-
432-
assertEquals(json.replace(" ", ""), Strings.toString(content));
433-
assertTrue(parser.isClosed());
434-
}
435-
436-
public void testDrainAndCloseAlreadyClosed() throws IOException {
437-
var parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, "{}");
438-
parser.close();
439-
440-
assertThrows(
441-
IllegalStateException.class,
442-
() -> XContentHelper.drainAndClose(parser, XContentBuilder.builder(XContentType.JSON.xContent()))
443-
);
444-
}
445423
}

x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapper.java

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.common.logging.DeprecationLogger;
2020
import org.elasticsearch.common.time.DateMathParser;
2121
import org.elasticsearch.common.util.BigArrays;
22-
import org.elasticsearch.common.xcontent.XContentHelper;
2322
import org.elasticsearch.index.IndexMode;
2423
import org.elasticsearch.index.IndexVersion;
2524
import org.elasticsearch.index.fielddata.FieldDataContext;
@@ -28,9 +27,10 @@
2827
import org.elasticsearch.index.fielddata.ScriptDocValues.DoublesSupplier;
2928
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
3029
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
30+
import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader;
3131
import org.elasticsearch.index.mapper.DocumentParserContext;
3232
import org.elasticsearch.index.mapper.FieldMapper;
33-
import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper;
33+
import org.elasticsearch.index.mapper.IgnoreMalformedStoredValues;
3434
import org.elasticsearch.index.mapper.MappedFieldType;
3535
import org.elasticsearch.index.mapper.Mapper;
3636
import org.elasticsearch.index.mapper.MapperBuilderContext;
@@ -43,7 +43,6 @@
4343
import org.elasticsearch.index.mapper.TimeSeriesParams;
4444
import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType;
4545
import org.elasticsearch.index.mapper.ValueFetcher;
46-
import org.elasticsearch.index.mapper.XContentDataHelper;
4746
import org.elasticsearch.index.query.QueryRewriteContext;
4847
import org.elasticsearch.index.query.SearchExecutionContext;
4948
import org.elasticsearch.script.ScriptCompiler;
@@ -53,6 +52,7 @@
5352
import org.elasticsearch.search.MultiValueMode;
5453
import org.elasticsearch.search.sort.BucketedSort;
5554
import org.elasticsearch.search.sort.SortOrder;
55+
import org.elasticsearch.xcontent.CopyingXContentParser;
5656
import org.elasticsearch.xcontent.XContentBuilder;
5757
import org.elasticsearch.xcontent.XContentParser;
5858
import org.elasticsearch.xcontent.XContentSubParser;
@@ -592,9 +592,7 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
592592
EnumMap<Metric, Number> metricsParsed = new EnumMap<>(Metric.class);
593593
// Preserves the content of the field in order to be able to construct synthetic source
594594
// if field value is malformed.
595-
XContentBuilder malformedContentForSyntheticSource = context.mappingLookup().isSourceSynthetic() && ignoreMalformed
596-
? XContentBuilder.builder(context.parser().contentType().xContent())
597-
: null;
595+
XContentBuilder malformedDataForSyntheticSource = null;
598596

599597
try {
600598
token = context.parser().currentToken();
@@ -603,11 +601,14 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
603601
return;
604602
}
605603
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser());
606-
subParser = new XContentSubParser(context.parser());
607-
token = subParser.nextToken();
608-
if (malformedContentForSyntheticSource != null) {
609-
malformedContentForSyntheticSource.startObject();
604+
if (context.mappingLookup().isSourceSynthetic() && ignoreMalformed) {
605+
var copyingParser = new CopyingXContentParser(context.parser());
606+
malformedDataForSyntheticSource = copyingParser.getBuilder();
607+
subParser = new XContentSubParser(copyingParser);
608+
} else {
609+
subParser = new XContentSubParser(context.parser());
610610
}
611+
token = subParser.nextToken();
611612
while (token != XContentParser.Token.END_OBJECT) {
612613
// should be an object sub-field with name a metric name
613614
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser);
@@ -621,9 +622,6 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
621622
}
622623

623624
token = subParser.nextToken();
624-
if (malformedContentForSyntheticSource != null) {
625-
malformedContentForSyntheticSource.field(fieldName);
626-
}
627625
// Make sure that the value is a number. Probably this will change when
628626
// new aggregate metric types are added (histogram, cardinality etc)
629627
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser);
@@ -632,9 +630,6 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
632630
try {
633631
Number metricValue = delegateFieldMapper.value(context.parser());
634632
metricsParsed.put(metric, metricValue);
635-
if (malformedContentForSyntheticSource != null) {
636-
malformedContentForSyntheticSource.value(metricValue);
637-
}
638633
} catch (IllegalArgumentException e) {
639634
throw new IllegalArgumentException("failed to parse [" + metric.name() + "] sub field: " + e.getMessage(), e);
640635
}
@@ -677,24 +672,20 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
677672
}
678673
} catch (Exception e) {
679674
if (ignoreMalformed) {
680-
if (malformedContentForSyntheticSource != null) {
681-
if (subParser != null) {
682-
// Remaining data in parser needs to be stored as is in order to provide it in synthetic source.
683-
XContentHelper.drainAndClose(subParser, malformedContentForSyntheticSource);
684-
} else {
685-
// We don't use DrainingXContentParser since we don't want to go beyond current field
686-
malformedContentForSyntheticSource.copyCurrentStructure(context.parser());
687-
}
688-
;
689-
var nameValue = IgnoredSourceFieldMapper.NameValue.fromContext(
690-
context,
691-
name(),
692-
XContentDataHelper.encodeXContentBuilder(malformedContentForSyntheticSource)
693-
);
694-
context.addIgnoredField(nameValue);
695-
} else if (subParser != null) {
675+
if (subParser != null) {
696676
// close the subParser, so we advance to the end of the object
697677
subParser.close();
678+
} else {
679+
if (context.mappingLookup().isSourceSynthetic()) {
680+
// There is a malformed value, but it is not an object (since subParser is null).
681+
// So we just need to copy this single value.
682+
malformedDataForSyntheticSource = XContentBuilder.builder(context.parser().contentType().xContent())
683+
.copyCurrentStructure(context.parser());
684+
}
685+
}
686+
687+
if (malformedDataForSyntheticSource != null) {
688+
context.doc().add(IgnoreMalformedStoredValues.storedField(name(), malformedDataForSyntheticSource));
698689
}
699690

700691
context.addIgnoredField(name());
@@ -724,11 +715,15 @@ protected SyntheticSourceMode syntheticSourceMode() {
724715

725716
@Override
726717
public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
727-
// Note that malformed values are handled via `IgnoredSourceFieldMapper` infrastructure
728-
return new AggregateMetricSyntheticFieldLoader(name(), simpleName(), metrics);
718+
return new CompositeSyntheticFieldLoader(
719+
simpleName(),
720+
name(),
721+
new AggregateMetricSyntheticFieldLoader(name(), simpleName(), metrics),
722+
new CompositeSyntheticFieldLoader.MalformedValuesLayer(name())
723+
);
729724
}
730725

731-
public static class AggregateMetricSyntheticFieldLoader implements SourceLoader.SyntheticFieldLoader {
726+
public static class AggregateMetricSyntheticFieldLoader implements CompositeSyntheticFieldLoader.SyntheticFieldLoaderLayer {
732727
private final String name;
733728
private final String simpleName;
734729
private final EnumSet<Metric> metrics;
@@ -746,6 +741,11 @@ public String fieldName() {
746741
return name;
747742
}
748743

744+
@Override
745+
public long valueCount() {
746+
return hasValue() ? 1 : 0;
747+
}
748+
749749
@Override
750750
public Stream<Map.Entry<String, StoredFieldLoader>> storedFieldLoaders() {
751751
return Stream.of();
@@ -779,7 +779,7 @@ public void write(XContentBuilder b) throws IOException {
779779
if (metricHasValue.isEmpty()) {
780780
return;
781781
}
782-
b.startObject(simpleName);
782+
b.startObject();
783783
for (Map.Entry<Metric, SortedNumericDocValues> entry : metricDocValues.entrySet()) {
784784
if (metricHasValue.contains(entry.getKey())) {
785785
String metricName = entry.getKey().name();

x-pack/plugin/mapper-aggregate-metric/src/test/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateDoubleMetricFieldMapperTests.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
import org.apache.lucene.search.FieldExistsQuery;
1010
import org.apache.lucene.search.Query;
11+
import org.elasticsearch.common.Strings;
12+
import org.elasticsearch.core.CheckedConsumer;
1113
import org.elasticsearch.index.mapper.DocumentMapper;
1214
import org.elasticsearch.index.mapper.DocumentParsingException;
1315
import org.elasticsearch.index.mapper.LuceneDocument;
@@ -20,6 +22,7 @@
2022
import org.elasticsearch.plugins.Plugin;
2123
import org.elasticsearch.xcontent.XContentBuilder;
2224
import org.elasticsearch.xcontent.XContentFactory;
25+
import org.elasticsearch.xcontent.json.JsonXContent;
2326
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
2427
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric;
2528
import org.hamcrest.Matchers;
@@ -523,6 +526,43 @@ protected IngestScriptSupport ingestScriptSupport() {
523526
throw new AssumptionViolatedException("not supported");
524527
}
525528

529+
public void testArrayValueSyntheticSource() throws Exception {
530+
DocumentMapper mapper = createDocumentMapper(
531+
syntheticSourceFieldMapping(
532+
b -> b.field("type", CONTENT_TYPE)
533+
.array("metrics", "min", "max")
534+
.field("default_metric", "min")
535+
.field("ignore_malformed", "true")
536+
)
537+
);
538+
539+
var randomString = randomAlphaOfLength(10);
540+
CheckedConsumer<XContentBuilder, IOException> arrayValue = b -> {
541+
b.startArray("field");
542+
{
543+
b.startObject().field("max", 100).field("min", 10).endObject();
544+
b.startObject().field("max", 200).field("min", 20).endObject();
545+
b.value(randomString);
546+
}
547+
b.endArray();
548+
};
549+
550+
var expected = JsonXContent.contentBuilder().startObject();
551+
// First value comes from synthetic field loader and so is formatted in a specific format (e.g. min always come first).
552+
// Other values are stored as is as part of ignore_malformed logic for synthetic source.
553+
{
554+
expected.startArray("field");
555+
expected.startObject().field("min", 10.0).field("max", 100.0).endObject();
556+
expected.startObject().field("max", 200).field("min", 20).endObject();
557+
expected.value(randomString);
558+
expected.endArray();
559+
}
560+
expected.endObject();
561+
562+
var syntheticSource = syntheticSource(mapper, arrayValue);
563+
assertEquals(Strings.toString(expected), syntheticSource);
564+
}
565+
526566
protected final class AggregateDoubleMetricSyntheticSourceSupport implements SyntheticSourceSupport {
527567
private final boolean malformedExample;
528568
private final EnumSet<Metric> storedMetrics;

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/aggregate-metrics/100_synthetic_source.yml

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ aggregate_metric_double with ignore_malformed:
7676
index:
7777
index: test
7878
id: "1"
79-
refresh: true
8079
body:
8180
metric:
8281
min: 18.2
@@ -88,11 +87,22 @@ aggregate_metric_double with ignore_malformed:
8887
value_count: 50
8988

9089
- do:
91-
search:
90+
index:
91+
index: test
92+
id: "2"
93+
body:
94+
metric: ["hey", {"value_count": 1, "min": 18.2,"max": 100}, [123, 456]]
95+
96+
- do:
97+
indices.refresh: {}
98+
99+
- do:
100+
get:
92101
index: test
102+
id: "1"
93103

94104
- match:
95-
hits.hits.0._source:
105+
_source:
96106
metric:
97107
min: 18.2
98108
max: 100
@@ -102,3 +112,12 @@ aggregate_metric_double with ignore_malformed:
102112
field: "field"
103113
value_count: 50
104114

115+
- do:
116+
get:
117+
index: test
118+
id: "2"
119+
120+
- match:
121+
_source:
122+
metric: [{"min": 18.2,"max": 100.0, "value_count": 1}, "hey", 123, 456]
123+

0 commit comments

Comments
 (0)