Skip to content

Geo-Match Enrich Processor #47243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void testDefaultConfiguration() throws IOException {
GeoShapeFieldMapper geoShapeFieldMapper = (GeoShapeFieldMapper) fieldMapper;
assertThat(geoShapeFieldMapper.fieldType().orientation(),
equalTo(GeoShapeFieldMapper.Defaults.ORIENTATION.value()));
assertThat(geoShapeFieldMapper.fieldType.hasDocValues(), equalTo(false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
public static final String ENRICH_INDEX_NAME_BASE = ".enrich-";

public static final String MATCH_TYPE = "match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{MATCH_TYPE};
public static final String GEO_MATCH_TYPE = "geo_match";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{
MATCH_TYPE,
GEO_MATCH_TYPE
};

private static final ParseField QUERY = new ParseField("query");
private static final ParseField INDICES = new ParseField("indices");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,158 @@
*/
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;

public abstract class AbstractEnrichProcessor extends AbstractProcessor {

private final String policyName;
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final String field;
private final String targetField;
private final boolean ignoreMissing;
private final boolean overrideEnabled;
protected final String matchField;
protected final int maxMatches;

protected AbstractEnrichProcessor(String tag, Client client, String policyName, String field, String targetField,
boolean ignoreMissing, boolean overrideEnabled, String matchField, int maxMatches) {
this(tag, createSearchRunner(client), policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
}

protected AbstractEnrichProcessor(String tag, String policyName) {
protected AbstractEnrichProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName, String field, String targetField, boolean ignoreMissing, boolean overrideEnabled,
String matchField, int maxMatches) {
super(tag);
this.policyName = policyName;
this.searchRunner = searchRunner;
this.field = field;
this.targetField = targetField;
this.ignoreMissing = ignoreMissing;
this.overrideEnabled = overrideEnabled;
this.matchField = matchField;
this.maxMatches = maxMatches;
}

public abstract QueryBuilder getQueryBuilder(Object fieldValue);

@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
try {
// If a document does not have the enrich key, return the unchanged document
final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
if (value == null) {
handler.accept(ingestDocument, null);
return;
}

QueryBuilder queryBuilder = getQueryBuilder(value);
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
searchBuilder.from(0);
searchBuilder.size(maxMatches);
searchBuilder.trackScores(false);
searchBuilder.fetchSource(true);
searchBuilder.query(constantScore);
SearchRequest req = new SearchRequest();
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
req.preference(Preference.LOCAL.type());
req.source(searchBuilder);

searchRunner.accept(req, (searchResponse, e) -> {
if (e != null) {
handler.accept(null, e);
return;
}

// If the index is empty, return the unchanged document
// If the enrich key does not exist in the index, throw an error
// If no documents match the key, return the unchanged document
SearchHit[] searchHits = searchResponse.getHits().getHits();
if (searchHits.length < 1) {
handler.accept(ingestDocument, null);
return;
}

if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
for (SearchHit searchHit : searchHits) {
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
enrichDocuments.add(enrichDocument);
}
ingestDocument.setFieldValue(targetField, enrichDocuments);
}
handler.accept(ingestDocument, null);
});
} catch (Exception e) {
handler.accept(null, e);
}
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException("this method should not get executed");
}

public String getPolicyName() {
return policyName;
}

@Override
public String getType() {
return EnrichProcessorFactory.TYPE;
}

String getField() {
return field;
}

public String getTargetField() {
return targetField;
}

boolean isIgnoreMissing() {
return ignoreMissing;
}

boolean isOverrideEnabled() {
return overrideEnabled;
}

public String getMatchField() {
return matchField;
}

int getMaxMatches() {
return maxMatches;
}

private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> {
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
resp -> {
handler.accept(resp, null);
},
e -> {
handler.accept(null, e);
}));
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -203,29 +204,29 @@ private void validateField(Map<?, ?> properties, String fieldName, boolean field

private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
// Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type
String keyType;
final String keyType;
final CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping;
if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
keyType = "keyword";
matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false);
// No need to also configure index_options, because keyword type defaults to 'docs'.
} else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) {
matchFieldMapping = (builder) -> builder.field("type", "geo_shape");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe that geo_shape field type uses doc values by default, right?
Do you know whether the default is going to change in the future?
I just want to make sure we never store more than what is needed for enrich. (since doc values are not being used for querying)

Copy link
Contributor Author

@talevy talevy Sep 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is that since doc-values aren't supported, neither is doc-values: false. I was debating whether this should be fixed upstream so that one is allowed to set that. Currently, it errors out if you attempt to set it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will follow-up in another PR to expose doc-values parsing to geo_shape. I think it should understand that parameter, even though it only allows it to be false. For now, I think this is the best that can be done. Thankfully, I know when doc-values on shapes will be supported so I will be sure to update this!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 That sounds good to me.

} else {
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
}

// Enable _source on enrich index. Explicitly mark key mapping type.
try {
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject()
builder = builder.startObject()
.startObject(MapperService.SINGLE_MAPPING_NAME)
.field("dynamic", false)
.startObject("_source")
.field("enabled", true)
.endObject()
.startObject("properties")
.startObject(policy.getMatchField())
.field("type", keyType)
.field("doc_values", false)
.endObject()
.endObject()
.startObject(policy.getMatchField());
builder = matchFieldMapping.apply(builder).endObject().endObject()
.startObject("_meta")
.field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT)
.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
Expand Down Expand Up @@ -57,8 +58,13 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin

switch (policyType) {
case EnrichPolicy.MATCH_TYPE:
return new MatchProcessor(tag, client, policyName, field, targetField, matchField,
ignoreMissing, overrideEnabled, maxMatches);
return new MatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
maxMatches);
case EnrichPolicy.GEO_MATCH_TYPE:
String relationStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "shape_relation", "intersects");
ShapeRelation shapeRelation = ShapeRelation.getRelationByName(relationStr);
return new GeoMatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
maxMatches, shapeRelation);
default:
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.GeoUtils;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.MultiPoint;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.index.query.GeoShapeQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;

public final class GeoMatchProcessor extends AbstractEnrichProcessor {

private ShapeRelation shapeRelation;

GeoMatchProcessor(String tag,
Client client,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches,
ShapeRelation shapeRelation) {
super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
this.shapeRelation = shapeRelation;
}

/** used in tests **/
GeoMatchProcessor(String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
int maxMatches, ShapeRelation shapeRelation) {
super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
this.shapeRelation = shapeRelation;
}

@SuppressWarnings("unchecked")
@Override
public QueryBuilder getQueryBuilder(Object fieldValue) {
List<Point> points = new ArrayList<>();
if (fieldValue instanceof List) {
List<Object> values = (List<Object>) fieldValue;
if (values.size() == 2 && values.get(0) instanceof Number) {
GeoPoint geoPoint = GeoUtils.parseGeoPoint(values, true);
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
} else {
for (Object value : values) {
GeoPoint geoPoint = GeoUtils.parseGeoPoint(value, true);
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
}
}
} else {
GeoPoint geoPoint = GeoUtils.parseGeoPoint(fieldValue, true);
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
}
final Geometry queryGeometry;
if (points.isEmpty()) {
throw new IllegalArgumentException("no geopoints found");
} else if (points.size() == 1) {
queryGeometry = points.get(0);
} else {
queryGeometry = new MultiPoint(points);
}
GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, queryGeometry);
shapeQuery.relation(shapeRelation);
return shapeQuery;
}

public ShapeRelation getShapeRelation() {
return shapeRelation;
}
}
Loading