Skip to content

Support synthetic source for geo_point when ignore_malformed is used #109651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/109651.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 109651
summary: Support synthetic source for `geo_point` when `ignore_malformed` is used
area: Mapping
type: enhancement
issues: []
3 changes: 1 addition & 2 deletions docs/reference/mapping/types/geo-point.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ any issues, but features in technical preview are not subject to the support SLA
of official GA features.

`geo_point` fields support <<synthetic-source,synthetic `_source`>> in their
default configuration. Synthetic `_source` cannot be used together with
<<ignore-malformed,`ignore_malformed`>>, <<copy-to,`copy_to`>>, or with
default configuration. Synthetic `_source` cannot be used together with <<copy-to,`copy_to`>> or with
<<doc-values,`doc_values`>> disabled.

Synthetic source always sorts `geo_point` fields (first by latitude and then
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.xcontent;

import java.io.IOException;

/**
* A parser that copies data that was parsed into a {@link XContentBuilder}.
* This parser naturally has some memory and runtime overhead to perform said copying.
* Use with {@link XContentSubParser} to preserve the entire object.
*/
public class CopyingXContentParser extends FilterXContentParserWrapper {
private final XContentBuilder builder;

public CopyingXContentParser(XContentParser delegate) throws IOException {
super(delegate);
this.builder = XContentBuilder.builder(delegate.contentType().xContent());
switch (delegate.currentToken()) {
case START_OBJECT -> builder.startObject();
case START_ARRAY -> builder.startArray();
default -> throw new IllegalArgumentException(
"can only copy parsers pointed to START_OBJECT or START_ARRAY but found: " + delegate.currentToken()
);
}
}

@Override
public Token nextToken() throws IOException {
XContentParser.Token next = delegate().nextToken();
builder.copyCurrentEvent(delegate());
return next;
}

public XContentBuilder getBuilder() {
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,18 @@ public XContentBuilder rawValue(String value) throws IOException {
return this;
}

/**
* Copies current event from parser into this builder.
* The difference with {@link XContentBuilder#copyCurrentStructure(XContentParser)}
* is that this method does not copy sub-objects as a single entity.
* @param parser
* @throws IOException
*/
public XContentBuilder copyCurrentEvent(XContentParser parser) throws IOException {
generator.copyCurrentEvent(parser);
return this;
}

public XContentBuilder copyCurrentStructure(XContentParser parser) throws IOException {
generator.copyCurrentStructure(parser);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -379,18 +378,18 @@ private LegacyGeoShapeParser() {}
public void parse(
XContentParser parser,
CheckedConsumer<ShapeBuilder<?, ?, ?>, IOException> consumer,
Consumer<Exception> onMalformed
MalformedValueHandler malformedHandler
) throws IOException {
try {
if (parser.currentToken() == XContentParser.Token.START_ARRAY) {
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
parse(parser, consumer, onMalformed);
parse(parser, consumer, malformedHandler);
}
} else {
consumer.accept(ShapeParser.parse(parser));
}
} catch (ElasticsearchParseException e) {
onMalformed.accept(e);
malformedHandler.notify(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
package org.elasticsearch.index.mapper;

import org.apache.lucene.search.Query;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Explicit;
import org.elasticsearch.common.geo.GeometryFormatterFactory;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.support.MapXContentParser;
Expand Down Expand Up @@ -52,12 +54,12 @@ public abstract static class Parser<T> {
* Parse the given xContent value to one or more objects of type {@link T}. The value can be
* in any supported format.
*/
public abstract void parse(XContentParser parser, CheckedConsumer<T, IOException> consumer, Consumer<Exception> onMalformed)
public abstract void parse(XContentParser parser, CheckedConsumer<T, IOException> consumer, MalformedValueHandler malformedHandler)
throws IOException;

private void fetchFromSource(Object sourceMap, Consumer<T> consumer) {
try (XContentParser parser = wrapObject(sourceMap)) {
parse(parser, v -> consumer.accept(normalizeFromSource(v)), e -> {}); /* ignore malformed */
parse(parser, v -> consumer.accept(normalizeFromSource(v)), NoopMalformedValueHandler.INSTANCE);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand All @@ -84,6 +86,36 @@ private static XContentParser wrapObject(Object sourceMap) throws IOException {
}
}

public interface MalformedValueHandler {
void notify(Exception parsingException) throws IOException;

void notify(Exception parsingException, XContentBuilder malformedDataForSyntheticSource) throws IOException;
}

public record NoopMalformedValueHandler() implements MalformedValueHandler {
public static final NoopMalformedValueHandler INSTANCE = new NoopMalformedValueHandler();

@Override
public void notify(Exception parsingException) {}

@Override
public void notify(Exception parsingException, XContentBuilder malformedDataForSyntheticSource) {}
}

public record DefaultMalformedValueHandler(CheckedBiConsumer<Exception, XContentBuilder, IOException> consumer)
implements
MalformedValueHandler {
@Override
public void notify(Exception parsingException) throws IOException {
consumer.accept(parsingException, null);
}

@Override
public void notify(Exception parsingException, XContentBuilder malformedDataForSyntheticSource) throws IOException {
consumer.accept(parsingException, malformedDataForSyntheticSource);
}
}

public abstract static class AbstractGeometryFieldType<T> extends MappedFieldType {

protected final Parser<T> geometryParser;
Expand Down Expand Up @@ -220,17 +252,20 @@ public final void parse(DocumentParserContext context) throws IOException {
new IllegalArgumentException("Cannot index data directly into a field with a [script] parameter")
);
}
parser.parse(context.parser(), v -> index(context, v), e -> {
if (ignoreMalformed()) {
context.addIgnoredField(fieldType().name());
} else {
throw new DocumentParsingException(
context.parser().getTokenLocation(),
"failed to parse field [" + fieldType().name() + "] of type [" + contentType() + "]",
e
);
}
});
parser.parse(context.parser(), v -> index(context, v), new DefaultMalformedValueHandler((e, b) -> onMalformedValue(context, b, e)));
}

protected void onMalformedValue(DocumentParserContext context, XContentBuilder malformedDataForSyntheticSource, Exception cause)
throws IOException {
if (ignoreMalformed()) {
context.addIgnoredField(fieldType().name());
} else {
throw new DocumentParsingException(
context.parser().getTokenLocation(),
"failed to parse field [" + fieldType().name() + "] of type [" + contentType() + "]",
cause
);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -71,7 +70,7 @@ public T getNullValue() {
/** A base parser implementation for point formats */
protected abstract static class PointParser<T> extends Parser<T> {
protected final String field;
private final CheckedFunction<XContentParser, T, IOException> objectParser;
protected final CheckedFunction<XContentParser, T, IOException> objectParser;
private final T nullValue;
private final boolean ignoreZValue;
protected final boolean ignoreMalformed;
Expand All @@ -98,7 +97,7 @@ protected PointParser(
protected abstract T createPoint(double x, double y);

@Override
public void parse(XContentParser parser, CheckedConsumer<T, IOException> consumer, Consumer<Exception> onMalformed)
public void parse(XContentParser parser, CheckedConsumer<T, IOException> consumer, MalformedValueHandler malformedHandler)
throws IOException {
if (parser.currentToken() == XContentParser.Token.START_ARRAY) {
XContentParser.Token token = parser.nextToken();
Expand Down Expand Up @@ -132,7 +131,7 @@ public void parse(XContentParser parser, CheckedConsumer<T, IOException> consume
consumer.accept(nullValue);
}
} else {
parseAndConsumeFromObject(parser, consumer, onMalformed);
parseAndConsumeFromObject(parser, consumer, malformedHandler);
}
token = parser.nextToken();
}
Expand All @@ -142,20 +141,20 @@ public void parse(XContentParser parser, CheckedConsumer<T, IOException> consume
consumer.accept(nullValue);
}
} else {
parseAndConsumeFromObject(parser, consumer, onMalformed);
parseAndConsumeFromObject(parser, consumer, malformedHandler);
}
}

private void parseAndConsumeFromObject(
protected void parseAndConsumeFromObject(
XContentParser parser,
CheckedConsumer<T, IOException> consumer,
Consumer<Exception> onMalformed
) {
MalformedValueHandler malformedHandler
) throws IOException {
try {
T point = objectParser.apply(parser);
consumer.accept(validate(point));
} catch (Exception e) {
onMalformed.accept(e);
malformedHandler.notify(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.common.geo.SimpleVectorTileFormatter;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.index.IndexMode;
Expand All @@ -45,6 +46,7 @@
import org.elasticsearch.search.lookup.FieldValues;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.search.runtime.GeoPointScriptFieldDistanceFeatureQuery;
import org.elasticsearch.xcontent.CopyingXContentParser;
import org.elasticsearch.xcontent.FilterXContentParserWrapper;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
Expand Down Expand Up @@ -193,13 +195,15 @@ private FieldValues<GeoPoint> scriptValues() {

@Override
public FieldMapper build(MapperBuilderContext context) {
boolean ignoreMalformedEnabled = ignoreMalformed.get().value();
Parser<GeoPoint> geoParser = new GeoPointParser(
name(),
(parser) -> GeoUtils.parseGeoPoint(parser, ignoreZValue.get().value()),
nullValue.get(),
ignoreZValue.get().value(),
ignoreMalformed.get().value(),
metric.get() != TimeSeriesParams.MetricType.POSITION
ignoreMalformedEnabled,
metric.get() != TimeSeriesParams.MetricType.POSITION,
context.isSourceSynthetic() && ignoreMalformedEnabled
);
GeoPointFieldType ft = new GeoPointFieldType(
context.buildFullName(name()),
Expand Down Expand Up @@ -524,16 +528,19 @@ public TimeSeriesParams.MetricType getMetricType() {

/** GeoPoint parser implementation */
private static class GeoPointParser extends PointParser<GeoPoint> {
private final boolean storeMalformedDataForSyntheticSource;

GeoPointParser(
String field,
CheckedFunction<XContentParser, GeoPoint, IOException> objectParser,
GeoPoint nullValue,
boolean ignoreZValue,
boolean ignoreMalformed,
boolean allowMultipleValues
boolean allowMultipleValues,
boolean storeMalformedDataForSyntheticSource
) {
super(field, objectParser, nullValue, ignoreZValue, ignoreMalformed, allowMultipleValues);
this.storeMalformedDataForSyntheticSource = storeMalformedDataForSyntheticSource;
}

protected GeoPoint validate(GeoPoint in) {
Expand Down Expand Up @@ -568,6 +575,45 @@ public GeoPoint normalizeFromSource(GeoPoint point) {
// normalize during parsing
return point;
}

@Override
protected void parseAndConsumeFromObject(
XContentParser parser,
CheckedConsumer<GeoPoint, IOException> consumer,
MalformedValueHandler malformedHandler
) throws IOException {
XContentParser parserWithCustomization = parser;
XContentBuilder malformedDataForSyntheticSource = null;

if (storeMalformedDataForSyntheticSource) {
if (parser.currentToken() == XContentParser.Token.START_OBJECT
|| parser.currentToken() == XContentParser.Token.START_ARRAY) {
// We have a complex structure so we'll memorize it while parsing.
var copyingParser = new CopyingXContentParser(parser);
malformedDataForSyntheticSource = copyingParser.getBuilder();
parserWithCustomization = copyingParser;
} else {
// We have a single value (e.g. a string) that is potentially malformed, let's simply remember it.
malformedDataForSyntheticSource = XContentBuilder.builder(parser.contentType().xContent()).copyCurrentStructure(parser);
}
}

try {
GeoPoint point = objectParser.apply(parserWithCustomization);
consumer.accept(validate(point));
} catch (Exception e) {
malformedHandler.notify(e, malformedDataForSyntheticSource);
}
}
}

@Override
protected void onMalformedValue(DocumentParserContext context, XContentBuilder malformedDataForSyntheticSource, Exception cause)
throws IOException {
super.onMalformedValue(context, malformedDataForSyntheticSource, cause);
if (malformedDataForSyntheticSource != null) {
context.doc().add(IgnoreMalformedStoredValues.storedField(name(), malformedDataForSyntheticSource));
}
}

@Override
Expand All @@ -585,11 +631,6 @@ public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
"field [" + name() + "] of type [" + typeName() + "] doesn't support synthetic source because it doesn't have doc values"
);
}
if (ignoreMalformed()) {
throw new IllegalArgumentException(
"field [" + name() + "] of type [" + typeName() + "] doesn't support synthetic source because it ignores malformed points"
);
}
if (copyTo.copyToFields().isEmpty() != true) {
throw new IllegalArgumentException(
"field [" + name() + "] of type [" + typeName() + "] doesn't support synthetic source because it declares copy_to"
Expand Down
Loading