From e67b11cfdb26462c8ad8938dc59ba9bd8809d3a1 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 23 Sep 2019 14:00:59 -0700 Subject: [PATCH 1/6] init --- .../xpack/core/enrich/EnrichPolicy.java | 6 +- .../xpack/enrich/EnrichPolicyRunner.java | 2 + .../xpack/enrich/EnrichProcessorFactory.java | 3 + .../xpack/enrich/GeoMatchProcessor.java | 183 ++++++++++++++++++ .../xpack/enrich/GeoMatchProcessorTests.java | 153 +++++++++++++++ 5 files changed, 346 insertions(+), 1 deletion(-) create mode 100644 x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java create mode 100644 x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java index aec1f0e7b803a..bd0dc7b574308 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java @@ -36,7 +36,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment { public static final String ENRICH_INDEX_NAME_BASE = ".enrich-"; public static final String MATCH_TYPE = "match"; - public static final String[] SUPPORTED_POLICY_TYPES = new String[]{MATCH_TYPE}; + public static final String GEO_MATCH_TYPE = "geo_match"; + public static final String[] SUPPORTED_POLICY_TYPES = new String[]{ + MATCH_TYPE, + GEO_MATCH_TYPE + }; private static final ParseField QUERY = new ParseField("query"); private static final ParseField INDICES = new ParseField("indices"); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 3eac4b3f10d46..29bab261ec612 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -200,6 +200,8 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) { keyType = "keyword"; // No need to also configure index_options, because keyword type defaults to 'docs'. + } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) { + keyType = "geo_shape"; } else { throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index 03c5f3dd58c6f..1f636b14c3b14 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -59,6 +59,9 @@ public Processor create(Map processorFactories, Strin case EnrichPolicy.MATCH_TYPE: return new MatchProcessor(tag, client, policyName, field, targetField, matchField, ignoreMissing, overrideEnabled, maxMatches); + case EnrichPolicy.GEO_MATCH_TYPE: + return new GeoMatchProcessor(tag, client, policyName, field, targetField, matchField, + ignoreMissing, overrideEnabled, maxMatches); default: throw new IllegalArgumentException("unsupported policy type [" + policyType + "]"); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java new file mode 100644 index 0000000000000..6ed83cb7b02a8 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java @@ -0,0 +1,183 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.routing.Preference; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.geo.GeoUtils; +import org.elasticsearch.common.geo.ShapeRelation; +import org.elasticsearch.geometry.Point; +import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.GeoShapeQueryBuilder; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +public final class GeoMatchProcessor extends AbstractEnrichProcessor { + + private final BiConsumer> searchRunner; + private final String field; + private final String targetField; + private final String matchField; + private final boolean ignoreMissing; + private final boolean overrideEnabled; + private final int maxMatches; + + GeoMatchProcessor(String tag, + Client client, + String policyName, + String field, + String targetField, + String matchField, + boolean ignoreMissing, + boolean overrideEnabled, + int maxMatches) { + this( + tag, + createSearchRunner(client), + policyName, + field, + targetField, + matchField, + ignoreMissing, + overrideEnabled, + maxMatches + ); + } + + GeoMatchProcessor(String tag, + BiConsumer> searchRunner, + String policyName, + String field, + String targetField, + String matchField, + boolean ignoreMissing, + boolean overrideEnabled, + int maxMatches) { + super(tag, policyName); + this.searchRunner = searchRunner; + this.field = field; + this.targetField = targetField; + this.matchField = matchField; + this.ignoreMissing = ignoreMissing; + this.overrideEnabled = overrideEnabled; + this.maxMatches = maxMatches; + } + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + try { + // If a document does not have the enrich key, return the unchanged document + final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); + if (value == null) { + handler.accept(ingestDocument, null); + return; + } + + GeoPoint point = GeoUtils.parseGeoPoint(value, true); + + GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, new Point(point.lon(), point.lat())); + shapeQuery.relation(ShapeRelation.INTERSECTS); + ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(shapeQuery); + SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.from(0); + searchBuilder.size(maxMatches); + searchBuilder.trackScores(false); + searchBuilder.fetchSource(true); + searchBuilder.query(constantScore); + + SearchRequest req = new SearchRequest(); + req.indices(EnrichPolicy.getBaseName(getPolicyName())); + req.preference(Preference.LOCAL.type()); + req.source(searchBuilder); + + searchRunner.accept(req, (searchResponse, e) -> { + if (e != null) { + handler.accept(null, e); + return; + } + + // If the index is empty, return the unchanged document + // If the enrich key does not exist in the index, throw an error + // If no documents match the key, return the unchanged document + SearchHit[] searchHits = searchResponse.getHits().getHits(); + if (searchHits.length < 1) { + handler.accept(ingestDocument, null); + return; + } + + if (overrideEnabled || ingestDocument.hasField(targetField) == false) { + List> enrichDocuments = new ArrayList<>(searchHits.length); + for (SearchHit searchHit : searchHits) { + Map enrichDocument = searchHit.getSourceAsMap(); + enrichDocuments.add(enrichDocument); + } + ingestDocument.setFieldValue(targetField, enrichDocuments); + } + handler.accept(ingestDocument, null); + }); + } catch (Exception e) { + handler.accept(null, e); + } + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); + } + + @Override + public String getType() { + return EnrichProcessorFactory.TYPE; + } + + String getField() { + return field; + } + + public String getTargetField() { + return targetField; + } + + public String getMatchField() { + return matchField; + } + + boolean isIgnoreMissing() { + return ignoreMissing; + } + + boolean isOverrideEnabled() { + return overrideEnabled; + } + + int getMaxMatches() { + return maxMatches; + } + + private static BiConsumer> createSearchRunner(Client client) { + return (req, handler) -> { + client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap( + resp -> { + handler.accept(resp, null); + }, + e -> { + handler.accept(null, e); + })); + }; + } +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java new file mode 100644 index 0000000000000..97d5c836ac9b8 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java @@ -0,0 +1,153 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.Preference; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.geometry.Point; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.GeoShapeQueryBuilder; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.test.ESTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.hamcrest.Matchers.emptyArray; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; + +public class GeoMatchProcessorTests extends ESTestCase { + + public void testBasics() throws Exception { + int maxMatches = randomIntBetween(1, 8); + MockSearchFunction mockSearch = mockedSearchFunction(Map.of("key", Map.of("shape", "object", "zipcode",94040))); + GeoMatchProcessor processor = new GeoMatchProcessor("_tag", mockSearch, "_name", "location", "entry", + "shape", false, true, maxMatches); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Map.of("location", "37.386637, -122.084110")); + // Run + IngestDocument[] holder = new IngestDocument[1]; + processor.execute(ingestDocument, (result, e) -> holder[0] = result); + assertThat(holder[0], notNullValue()); + // Check request + SearchRequest request = mockSearch.getCapturedRequest(); + assertThat(request.indices().length, equalTo(1)); + assertThat(request.indices()[0], equalTo(".enrich-_name")); + assertThat(request.preference(), equalTo(Preference.LOCAL.type())); + assertThat(request.source().size(), equalTo(maxMatches)); + assertThat(request.source().trackScores(), equalTo(false)); + assertThat(request.source().fetchSource().fetchSource(), equalTo(true)); + assertThat(request.source().fetchSource().excludes(), emptyArray()); + assertThat(request.source().fetchSource().includes(), emptyArray()); + assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class)); + assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(GeoShapeQueryBuilder.class)); + GeoShapeQueryBuilder shapeQueryBuilder = (GeoShapeQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery(); + assertThat(shapeQueryBuilder.fieldName(), equalTo("shape")); + assertThat(shapeQueryBuilder.shape(), equalTo(new Point(-122.084110, 37.386637))); + o + + shapeQueryBuilder.toQuery() + // Check result + List entries = ingestDocument.getFieldValue("entry", List.class); + Map entry = (Map) entries.get(0); + assertThat(entry.size(), equalTo(2)); + assertThat(entry.get("zipcode"), equalTo(94040)); + } + + private static final class MockSearchFunction implements BiConsumer> { + private final SearchResponse mockResponse; + private final SetOnce capturedRequest; + private final Exception exception; + + MockSearchFunction(SearchResponse mockResponse) { + this.mockResponse = mockResponse; + this.exception = null; + this.capturedRequest = new SetOnce<>(); + } + + MockSearchFunction(Exception exception) { + this.mockResponse = null; + this.exception = exception; + this.capturedRequest = new SetOnce<>(); + } + + @Override + public void accept(SearchRequest request, BiConsumer handler) { + capturedRequest.set(request); + if (exception != null) { + handler.accept(null, exception); + } else { + handler.accept(mockResponse, null); + } + } + + SearchRequest getCapturedRequest() { + return capturedRequest.get(); + } + } + + public MockSearchFunction mockedSearchFunction() { + return new MockSearchFunction(mockResponse(Collections.emptyMap())); + } + + public MockSearchFunction mockedSearchFunction(Exception exception) { + return new MockSearchFunction(exception); + } + + public MockSearchFunction mockedSearchFunction(Map> documents) { + return new MockSearchFunction(mockResponse(documents)); + } + + public SearchResponse mockResponse(Map> documents) { + + final QueryShardContext context = new QueryShardContext(0, new IndexSettings(new IndexMetaData(), Settings.EMPTY), null, null, null, mapperService, + null, null, xContentRegistry(), writableRegistry(), client, leaf.reader(), () -> nowInMillis, null); + + SearchHit[] searchHits = documents.entrySet().stream().map(e -> { + SearchHit searchHit = new SearchHit(randomInt(100), e.getKey(), new Text(MapperService.SINGLE_MAPPING_NAME), + Collections.emptyMap()); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) { + builder.map(e.getValue()); + builder.flush(); + ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream(); + searchHit.sourceRef(new BytesArray(outputStream.toByteArray())); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + return searchHit; + }).toArray(SearchHit[]::new); + return new SearchResponse(new SearchResponseSections( + new SearchHits(searchHits, new TotalHits(documents.size(), TotalHits.Relation.EQUAL_TO), 1.0f), + new Aggregations(Collections.emptyList()), new Suggest(Collections.emptyList()), + false, false, null, 1), null, 1, 1, 0, 1, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(1, 1, 0)); + } +} From 840273e3ae461e4c9926c2f5b227aacfd6594dfc Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Fri, 27 Sep 2019 11:08:06 -0700 Subject: [PATCH 2/6] Geo-Match Enrich Processor this commit introduces a geo-match enrich processor that looks up a specific `geo_point` field in the enrich-index for all entries that have a geo_shape match field that meets some specific relation criteria with the input field. For example, the enrich index may contain documents with zipcodes and their respective geo_shape. Ingesting documents with a geo_point field can be enriched with which zipcode they associate according to which shape they are contained within. this commit also refactors some of the MatchProcessor by moving a lot of the shared code to AbstractEnrichProcessor. Closes #42639. --- .../xpack/enrich/AbstractEnrichProcessor.java | 135 +++++++++++++- .../xpack/enrich/EnrichPolicyRunner.java | 21 ++- .../xpack/enrich/EnrichProcessorFactory.java | 9 +- .../xpack/enrich/GeoMatchProcessor.java | 165 +++--------------- .../xpack/enrich/MatchProcessor.java | 156 ++--------------- .../xpack/enrich/BasicEnrichTests.java | 63 ++++++- .../xpack/enrich/GeoMatchProcessorTests.java | 20 +-- .../xpack/enrich/MatchProcessorTests.java | 14 +- 8 files changed, 265 insertions(+), 318 deletions(-) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java index a2afd0ce17f66..f219cfc63e172 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java @@ -5,18 +5,151 @@ */ package org.elasticsearch.xpack.enrich; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; public abstract class AbstractEnrichProcessor extends AbstractProcessor { private final String policyName; + private final BiConsumer> searchRunner; + private final String field; + private final String targetField; + private final boolean ignoreMissing; + private final boolean overrideEnabled; + protected final String matchField; + protected final int maxMatches; + + + protected AbstractEnrichProcessor(String tag, Client client, String policyName, String field, String targetField, + boolean ignoreMissing, boolean overrideEnabled, String matchField, int maxMatches) { + this(tag, createSearchRunner(client), policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); + } - protected AbstractEnrichProcessor(String tag, String policyName) { + protected AbstractEnrichProcessor(String tag, + BiConsumer> searchRunner, + String policyName, String field, String targetField, boolean ignoreMissing, boolean overrideEnabled, + String matchField, int maxMatches) { super(tag); this.policyName = policyName; + this.searchRunner = searchRunner; + this.field = field; + this.targetField = targetField; + this.ignoreMissing = ignoreMissing; + this.overrideEnabled = overrideEnabled; + this.matchField = matchField; + this.maxMatches = maxMatches; + } + + public abstract SearchSourceBuilder getSearchSourceBuilder(Object fieldValue); + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + try { + // If a document does not have the enrich key, return the unchanged document + final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); + if (value == null) { + handler.accept(ingestDocument, null); + return; + } + + SearchSourceBuilder searchSourceBuilder = getSearchSourceBuilder(value); + + SearchRequest req = new SearchRequest(); + req.indices(EnrichPolicy.getBaseName(getPolicyName())); + req.preference(Preference.LOCAL.type()); + req.source(searchSourceBuilder); + + searchRunner.accept(req, (searchResponse, e) -> { + if (e != null) { + handler.accept(null, e); + return; + } + + // If the index is empty, return the unchanged document + // If the enrich key does not exist in the index, throw an error + // If no documents match the key, return the unchanged document + SearchHit[] searchHits = searchResponse.getHits().getHits(); + if (searchHits.length < 1) { + handler.accept(ingestDocument, null); + return; + } + + if (overrideEnabled || ingestDocument.hasField(targetField) == false) { + List> enrichDocuments = new ArrayList<>(searchHits.length); + for (SearchHit searchHit : searchHits) { + Map enrichDocument = searchHit.getSourceAsMap(); + enrichDocuments.add(enrichDocument); + } + ingestDocument.setFieldValue(targetField, enrichDocuments); + } + handler.accept(ingestDocument, null); + }); + } catch (Exception e) { + handler.accept(null, e); + } + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } public String getPolicyName() { return policyName; } + + @Override + public String getType() { + return EnrichProcessorFactory.TYPE; + } + + String getField() { + return field; + } + + public String getTargetField() { + return targetField; + } + + boolean isIgnoreMissing() { + return ignoreMissing; + } + + boolean isOverrideEnabled() { + return overrideEnabled; + } + + public String getMatchField() { + return matchField; + } + + int getMaxMatches() { + return maxMatches; + } + + private static BiConsumer> createSearchRunner(Client client) { + return (req, handler) -> { + client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap( + resp -> { + handler.accept(resp, null); + }, + e -> { + handler.accept(null, e); + })); + }; + } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 29bab261ec612..74a5e60380c63 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -196,12 +196,13 @@ private void validateField(Map properties, String fieldName, boolean field private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { // Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type - String keyType; + final String keyType; + final MatchFieldMapping matchFieldMapping; if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) { - keyType = "keyword"; + matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false); // No need to also configure index_options, because keyword type defaults to 'docs'. } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) { - keyType = "geo_shape"; + matchFieldMapping = (builder) -> builder.field("type", "geo_shape"); } else { throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); } @@ -209,18 +210,15 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { // Enable _source on enrich index. Explicitly mark key mapping type. try { XContentBuilder builder = JsonXContent.contentBuilder(); - builder.startObject() + builder = builder.startObject() .startObject(MapperService.SINGLE_MAPPING_NAME) .field("dynamic", false) .startObject("_source") .field("enabled", true) .endObject() .startObject("properties") - .startObject(policy.getMatchField()) - .field("type", keyType) - .field("doc_values", false) - .endObject() - .endObject() + .startObject(policy.getMatchField()); + builder = matchFieldMapping.build(builder).endObject().endObject() .startObject("_meta") .field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT) .field(ENRICH_POLICY_NAME_FIELD_NAME, policyName) @@ -413,4 +411,9 @@ public void onFailure(Exception e) { } }); } + + @FunctionalInterface + private interface MatchFieldMapping { + XContentBuilder build(XContentBuilder builder) throws IOException; + } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index 1f636b14c3b14..d58c559dfe0fa 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -10,6 +10,7 @@ import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.geo.ShapeRelation; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.Processor; @@ -57,11 +58,11 @@ public Processor create(Map processorFactories, Strin switch (policyType) { case EnrichPolicy.MATCH_TYPE: - return new MatchProcessor(tag, client, policyName, field, targetField, matchField, - ignoreMissing, overrideEnabled, maxMatches); + return new MatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField, + maxMatches); case EnrichPolicy.GEO_MATCH_TYPE: - return new GeoMatchProcessor(tag, client, policyName, field, targetField, matchField, - ignoreMissing, overrideEnabled, maxMatches); + return new GeoMatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField, + maxMatches, ShapeRelation.INTERSECTS); default: throw new IllegalArgumentException("unsupported policy type [" + policyType + "]"); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java index 6ed83cb7b02a8..ae24cc7ce9443 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java @@ -5,179 +5,66 @@ */ package org.elasticsearch.xpack.enrich; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.geo.GeoUtils; import org.elasticsearch.common.geo.ShapeRelation; import org.elasticsearch.geometry.Point; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.GeoShapeQueryBuilder; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.function.BiConsumer; public final class GeoMatchProcessor extends AbstractEnrichProcessor { - private final BiConsumer> searchRunner; - private final String field; - private final String targetField; - private final String matchField; - private final boolean ignoreMissing; - private final boolean overrideEnabled; - private final int maxMatches; + private ShapeRelation shapeRelation; GeoMatchProcessor(String tag, Client client, String policyName, String field, String targetField, - String matchField, - boolean ignoreMissing, boolean overrideEnabled, - int maxMatches) { - this( - tag, - createSearchRunner(client), - policyName, - field, - targetField, - matchField, - ignoreMissing, - overrideEnabled, - maxMatches - ); + boolean ignoreMissing, + String matchField, + int maxMatches, ShapeRelation shapeRelation) { + super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); + this.shapeRelation = shapeRelation; } + /** used in tests **/ GeoMatchProcessor(String tag, BiConsumer> searchRunner, String policyName, String field, String targetField, - String matchField, - boolean ignoreMissing, boolean overrideEnabled, - int maxMatches) { - super(tag, policyName); - this.searchRunner = searchRunner; - this.field = field; - this.targetField = targetField; - this.matchField = matchField; - this.ignoreMissing = ignoreMissing; - this.overrideEnabled = overrideEnabled; - this.maxMatches = maxMatches; - } - - @Override - public void execute(IngestDocument ingestDocument, BiConsumer handler) { - try { - // If a document does not have the enrich key, return the unchanged document - final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); - if (value == null) { - handler.accept(ingestDocument, null); - return; - } - - GeoPoint point = GeoUtils.parseGeoPoint(value, true); - - GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, new Point(point.lon(), point.lat())); - shapeQuery.relation(ShapeRelation.INTERSECTS); - ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(shapeQuery); - SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); - searchBuilder.from(0); - searchBuilder.size(maxMatches); - searchBuilder.trackScores(false); - searchBuilder.fetchSource(true); - searchBuilder.query(constantScore); - - SearchRequest req = new SearchRequest(); - req.indices(EnrichPolicy.getBaseName(getPolicyName())); - req.preference(Preference.LOCAL.type()); - req.source(searchBuilder); - - searchRunner.accept(req, (searchResponse, e) -> { - if (e != null) { - handler.accept(null, e); - return; - } - - // If the index is empty, return the unchanged document - // If the enrich key does not exist in the index, throw an error - // If no documents match the key, return the unchanged document - SearchHit[] searchHits = searchResponse.getHits().getHits(); - if (searchHits.length < 1) { - handler.accept(ingestDocument, null); - return; - } - - if (overrideEnabled || ingestDocument.hasField(targetField) == false) { - List> enrichDocuments = new ArrayList<>(searchHits.length); - for (SearchHit searchHit : searchHits) { - Map enrichDocument = searchHit.getSourceAsMap(); - enrichDocuments.add(enrichDocument); - } - ingestDocument.setFieldValue(targetField, enrichDocuments); - } - handler.accept(ingestDocument, null); - }); - } catch (Exception e) { - handler.accept(null, e); - } - } - - @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - throw new UnsupportedOperationException("this method should not get executed"); + boolean ignoreMissing, + String matchField, + int maxMatches, ShapeRelation shapeRelation) { + super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); + this.shapeRelation = shapeRelation; } @Override - public String getType() { - return EnrichProcessorFactory.TYPE; - } - - String getField() { - return field; - } - - public String getTargetField() { - return targetField; - } - - public String getMatchField() { - return matchField; - } - - boolean isIgnoreMissing() { - return ignoreMissing; - } - - boolean isOverrideEnabled() { - return overrideEnabled; - } - - int getMaxMatches() { - return maxMatches; + public SearchSourceBuilder getSearchSourceBuilder(Object fieldValue) { + GeoPoint point = GeoUtils.parseGeoPoint(fieldValue, true); + GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, new Point(point.lon(), point.lat())); + shapeQuery.relation(shapeRelation); + ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(shapeQuery); + SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.from(0); + searchBuilder.size(maxMatches); + searchBuilder.trackScores(false); + searchBuilder.fetchSource(true); + searchBuilder.query(constantScore); + return searchBuilder; } - private static BiConsumer> createSearchRunner(Client client) { - return (req, handler) -> { - client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap( - resp -> { - handler.accept(resp, null); - }, - e -> { - handler.accept(null, e); - })); - }; + public ShapeRelation getShapeRelation() { + return shapeRelation; } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java index 3e69690626760..c320fd3ae52fd 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java @@ -5,172 +5,52 @@ */ package org.elasticsearch.xpack.enrich; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.function.BiConsumer; -public final class MatchProcessor extends AbstractEnrichProcessor { - - private final BiConsumer> searchRunner; - private final String field; - private final String targetField; - private final String matchField; - private final boolean ignoreMissing; - private final boolean overrideEnabled; - private final int maxMatches; +public class MatchProcessor extends AbstractEnrichProcessor { MatchProcessor(String tag, Client client, String policyName, String field, String targetField, - String matchField, - boolean ignoreMissing, boolean overrideEnabled, + boolean ignoreMissing, + String matchField, int maxMatches) { - this( - tag, - createSearchRunner(client), - policyName, - field, - targetField, - matchField, - ignoreMissing, - overrideEnabled, - maxMatches - ); + super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); } + /** used in tests **/ MatchProcessor(String tag, BiConsumer> searchRunner, String policyName, String field, String targetField, - String matchField, - boolean ignoreMissing, boolean overrideEnabled, + boolean ignoreMissing, + String matchField, int maxMatches) { - super(tag, policyName); - this.searchRunner = searchRunner; - this.field = field; - this.targetField = targetField; - this.matchField = matchField; - this.ignoreMissing = ignoreMissing; - this.overrideEnabled = overrideEnabled; - this.maxMatches = maxMatches; - } - - @Override - public void execute(IngestDocument ingestDocument, BiConsumer handler) { - try { - // If a document does not have the enrich key, return the unchanged document - final String value = ingestDocument.getFieldValue(field, String.class, ignoreMissing); - if (value == null) { - handler.accept(ingestDocument, null); - return; - } - - TermQueryBuilder termQuery = new TermQueryBuilder(matchField, value); - ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery); - SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); - searchBuilder.from(0); - searchBuilder.size(maxMatches); - searchBuilder.trackScores(false); - searchBuilder.fetchSource(true); - searchBuilder.query(constantScore); - - SearchRequest req = new SearchRequest(); - req.indices(EnrichPolicy.getBaseName(getPolicyName())); - req.preference(Preference.LOCAL.type()); - req.source(searchBuilder); - - searchRunner.accept(req, (searchResponse, e) -> { - if (e != null) { - handler.accept(null, e); - return; - } - - // If the index is empty, return the unchanged document - // If the enrich key does not exist in the index, throw an error - // If no documents match the key, return the unchanged document - SearchHit[] searchHits = searchResponse.getHits().getHits(); - if (searchHits.length < 1) { - handler.accept(ingestDocument, null); - return; - } - - if (overrideEnabled || ingestDocument.hasField(targetField) == false) { - List> enrichDocuments = new ArrayList<>(searchHits.length); - for (SearchHit searchHit : searchHits) { - Map enrichDocument = searchHit.getSourceAsMap(); - enrichDocuments.add(enrichDocument); - } - ingestDocument.setFieldValue(targetField, enrichDocuments); - } - handler.accept(ingestDocument, null); - }); - } catch (Exception e) { - handler.accept(null, e); - } + super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - throw new UnsupportedOperationException("this method should not get executed"); - } - - @Override - public String getType() { - return EnrichProcessorFactory.TYPE; - } - - String getField() { - return field; - } - - public String getTargetField() { - return targetField; - } - - public String getMatchField() { - return matchField; - } - - boolean isIgnoreMissing() { - return ignoreMissing; - } - - boolean isOverrideEnabled() { - return overrideEnabled; - } - - int getMaxMatches() { - return maxMatches; - } - - private static BiConsumer> createSearchRunner(Client client) { - return (req, handler) -> { - client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap( - resp -> { - handler.accept(resp, null); - }, - e -> { - handler.accept(null, e); - })); - }; + public SearchSourceBuilder getSearchSourceBuilder(Object fieldValue) { + TermQueryBuilder termQuery = new TermQueryBuilder(matchField, fieldValue); + ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery); + SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.from(0); + searchBuilder.size(maxMatches); + searchBuilder.trackScores(false); + searchBuilder.fetchSource(true); + searchBuilder.query(constantScore); + return searchBuilder; } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index 5e6d686292c86..cba5bceb23e8b 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -51,10 +51,10 @@ protected boolean resetNodeAfterTest() { return true; } - public void testIngestDataWithEnrichProcessor() { + public void testIngestDataWithMatchProcessor() { int numDocs = 32; int maxMatches = randomIntBetween(2, 8); - List keys = createSourceIndex(numDocs, maxMatches); + List keys = createSourceMatchIndex(numDocs, maxMatches); String policyName = "my-policy"; EnrichPolicy enrichPolicy = @@ -110,6 +110,62 @@ public void testIngestDataWithEnrichProcessor() { assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo((long) numDocs)); } + public void testIngestDataWithGeoMatchProcessor() { + String matchField = "location"; + String enrichField = "zipcode"; + // create enrich index + { + IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME); + indexRequest.source(Map.of(matchField, "POLYGON((" + + "-122.08592534065245 37.38501746624134," + + "-122.08193421363829 37.38501746624134," + + "-122.08193421363829 37.3879329075567," + + "-122.08592534065245 37.3879329075567," + + "-122.08592534065245 37.38501746624134))", + "zipcode", "94040")); + client().index(indexRequest).actionGet(); + client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet(); + } + + String policyName = "my-policy"; + EnrichPolicy enrichPolicy = + new EnrichPolicy(EnrichPolicy.GEO_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), matchField, List.of(enrichField)); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + + String pipelineName = "my-pipeline"; + String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName + + "\", \"field\": \"" + matchField + "\", \"target_field\": \"enriched\", \"max_matches\": 1 }}]}"; + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).actionGet(); + + BulkRequest bulkRequest = new BulkRequest("my-index"); + IndexRequest indexRequest = new IndexRequest(); + indexRequest.id("_id"); + indexRequest.setPipeline(pipelineName); + indexRequest.source(Map.of(matchField, "37.386444, -122.083863")); // point within match boundary + bulkRequest.add(indexRequest); + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false)); + assertThat(bulkResponse.getItems().length, equalTo(1)); + assertThat(bulkResponse.getItems()[0].getId(), equalTo("_id")); + + GetResponse getResponse = client().get(new GetRequest("my-index", "_id")).actionGet(); + Map source = getResponse.getSourceAsMap(); + List entries = (List) source.get("enriched"); + assertThat(entries, notNullValue()); + assertThat(entries.size(), equalTo(1)); + + EnrichStatsAction.Response statsResponse = + client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet(); + assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1)); + String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId(); + assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), equalTo(localNodeId)); + assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L)); + assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo(1L)); + } + public void testMultiplePolicies() { int numPolicies = 8; for (int i = 0; i < numPolicies; i++) { @@ -152,7 +208,7 @@ public void testMultiplePolicies() { } } - private List createSourceIndex(int numKeys, int numDocsPerKey) { + private List createSourceMatchIndex(int numKeys, int numDocsPerKey) { Set keys = new HashSet<>(); for (int id = 0; id < numKeys; id++) { String key; @@ -170,5 +226,4 @@ private List createSourceIndex(int numKeys, int numDocsPerKey) { client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet(); return List.copyOf(keys); } - } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java index 97d5c836ac9b8..c7e631809c110 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java @@ -11,20 +11,15 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponseSections; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.Preference; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.text.Text; +import org.elasticsearch.common.geo.ShapeRelation; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.geometry.Point; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.GeoShapeQueryBuilder; -import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; @@ -47,11 +42,11 @@ public class GeoMatchProcessorTests extends ESTestCase { - public void testBasics() throws Exception { + public void testBasics() { int maxMatches = randomIntBetween(1, 8); MockSearchFunction mockSearch = mockedSearchFunction(Map.of("key", Map.of("shape", "object", "zipcode",94040))); GeoMatchProcessor processor = new GeoMatchProcessor("_tag", mockSearch, "_name", "location", "entry", - "shape", false, true, maxMatches); + false, true, "shape", maxMatches, ShapeRelation.INTERSECTS); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("location", "37.386637, -122.084110")); // Run @@ -73,9 +68,7 @@ public void testBasics() throws Exception { GeoShapeQueryBuilder shapeQueryBuilder = (GeoShapeQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery(); assertThat(shapeQueryBuilder.fieldName(), equalTo("shape")); assertThat(shapeQueryBuilder.shape(), equalTo(new Point(-122.084110, 37.386637))); - o - shapeQueryBuilder.toQuery() // Check result List entries = ingestDocument.getFieldValue("entry", List.class); Map entry = (Map) entries.get(0); @@ -128,13 +121,8 @@ public MockSearchFunction mockedSearchFunction(Map> docum } public SearchResponse mockResponse(Map> documents) { - - final QueryShardContext context = new QueryShardContext(0, new IndexSettings(new IndexMetaData(), Settings.EMPTY), null, null, null, mapperService, - null, null, xContentRegistry(), writableRegistry(), client, leaf.reader(), () -> nowInMillis, null); - SearchHit[] searchHits = documents.entrySet().stream().map(e -> { - SearchHit searchHit = new SearchHit(randomInt(100), e.getKey(), new Text(MapperService.SINGLE_MAPPING_NAME), - Collections.emptyMap()); + SearchHit searchHit = new SearchHit(randomInt(100), e.getKey(), Collections.emptyMap()); try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) { builder.map(e.getValue()); builder.flush(); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java index c7e1010a046b4..e7a7a2a99a2ed 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java @@ -46,7 +46,7 @@ public class MatchProcessorTests extends ESTestCase { public void testBasics() throws Exception { int maxMatches = randomIntBetween(1, 8); MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); - MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, maxMatches); + MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", maxMatches); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.co")); // Run @@ -79,7 +79,7 @@ public void testBasics() throws Exception { public void testNoMatch() throws Exception { MockSearchFunction mockSearch = mockedSearchFunction(); - MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, 1); + MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", 1); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.com")); int numProperties = ingestDocument.getSourceAndMetadata().size(); @@ -109,7 +109,7 @@ public void testNoMatch() throws Exception { public void testSearchFailure() throws Exception { String indexName = ".enrich-_name"; MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName)); - MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, 1); + MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", 1); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.com")); // Run @@ -144,7 +144,7 @@ public void testSearchFailure() throws Exception { public void testIgnoreKeyMissing() throws Exception { { MatchProcessor processor = - new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", true, true, 1); + new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", true, true, "domain", 1); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); @@ -155,7 +155,7 @@ public void testIgnoreKeyMissing() throws Exception { } { MatchProcessor processor = - new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", false, true, 1); + new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", true, false, "domain", 1); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); IngestDocument[] resultHolder = new IngestDocument[1]; Exception[] exceptionHolder = new Exception[1]; @@ -171,7 +171,7 @@ public void testIgnoreKeyMissing() throws Exception { public void testExistingFieldWithOverrideDisabled() throws Exception { MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); - MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false, 1); + MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, false, "domain", 1); IngestDocument ingestDocument = new IngestDocument(new HashMap<>(Map.of("domain", "elastic.co", "tld", "tld")), Map.of()); IngestDocument[] resultHolder = new IngestDocument[1]; @@ -187,7 +187,7 @@ public void testExistingFieldWithOverrideDisabled() throws Exception { public void testExistingNullFieldWithOverrideDisabled() throws Exception { MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); - MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false, 1); + MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, false, "domain", 1); Map source = new HashMap<>(); source.put("domain", "elastic.co"); From cdce1786458fce72de53a922be00bff1643fcd7e Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 30 Sep 2019 13:50:44 -0700 Subject: [PATCH 3/6] respond to review --- .../mapper/AbstractGeometryFieldMapper.java | 2 + .../mapper/GeoShapeFieldMapperTests.java | 2 + .../xpack/enrich/AbstractEnrichProcessor.java | 17 ++-- .../xpack/enrich/EnrichPolicyRunner.java | 12 +-- .../xpack/enrich/EnrichProcessorFactory.java | 4 +- .../xpack/enrich/GeoMatchProcessor.java | 17 ++-- .../xpack/enrich/MatchProcessor.java | 15 +--- .../xpack/enrich/EnrichPolicyRunnerTests.java | 84 +++++++++++++++++++ 8 files changed, 115 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/AbstractGeometryFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/AbstractGeometryFieldMapper.java index 006af39b8f59b..6c3a8068d771f 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/AbstractGeometryFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/AbstractGeometryFieldMapper.java @@ -252,6 +252,8 @@ public Mapper.Builder parse(String name, Map node, ParserContext } Builder builder = newBuilder(name, params); + TypeParsers.parseField(builder, name, node, parserContext); + if (params.containsKey(Names.COERCE.getPreferredName())) { builder.coerce((Boolean)params.get(Names.COERCE.getPreferredName())); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java index 65485bd5f9782..3c645ee6912bb 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java @@ -49,6 +49,7 @@ public void testDefaultConfiguration() throws IOException { String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type1") .startObject("properties").startObject("location") .field("type", "geo_shape") + .field("doc_values", true) .endObject().endObject() .endObject().endObject()); @@ -60,6 +61,7 @@ public void testDefaultConfiguration() throws IOException { GeoShapeFieldMapper geoShapeFieldMapper = (GeoShapeFieldMapper) fieldMapper; assertThat(geoShapeFieldMapper.fieldType().orientation(), equalTo(GeoShapeFieldMapper.Defaults.ORIENTATION.value())); + assertThat(geoShapeFieldMapper.fieldType.hasDocValues(), equalTo(false)); } /** diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java index f219cfc63e172..5cedec045919f 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java @@ -10,6 +10,8 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.routing.Preference; +import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.search.SearchHit; @@ -33,7 +35,6 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor { protected final String matchField; protected final int maxMatches; - protected AbstractEnrichProcessor(String tag, Client client, String policyName, String field, String targetField, boolean ignoreMissing, boolean overrideEnabled, String matchField, int maxMatches) { this(tag, createSearchRunner(client), policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); @@ -54,7 +55,7 @@ protected AbstractEnrichProcessor(String tag, this.maxMatches = maxMatches; } - public abstract SearchSourceBuilder getSearchSourceBuilder(Object fieldValue); + public abstract QueryBuilder getQueryBuilder(Object fieldValue); @Override public void execute(IngestDocument ingestDocument, BiConsumer handler) { @@ -66,12 +67,18 @@ public void execute(IngestDocument ingestDocument, BiConsumer { if (e != null) { diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 74a5e60380c63..7a4d2dd81fd82 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -197,12 +198,12 @@ private void validateField(Map properties, String fieldName, boolean field private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { // Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type final String keyType; - final MatchFieldMapping matchFieldMapping; + final CheckedFunction matchFieldMapping; if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) { matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false); // No need to also configure index_options, because keyword type defaults to 'docs'. } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) { - matchFieldMapping = (builder) -> builder.field("type", "geo_shape"); + matchFieldMapping = (builder) -> builder.field("type", "geo_shape").field("doc_values", false); } else { throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); } @@ -218,7 +219,7 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { .endObject() .startObject("properties") .startObject(policy.getMatchField()); - builder = matchFieldMapping.build(builder).endObject().endObject() + builder = matchFieldMapping.apply(builder).endObject().endObject() .startObject("_meta") .field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT) .field(ENRICH_POLICY_NAME_FIELD_NAME, policyName) @@ -411,9 +412,4 @@ public void onFailure(Exception e) { } }); } - - @FunctionalInterface - private interface MatchFieldMapping { - XContentBuilder build(XContentBuilder builder) throws IOException; - } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index d58c559dfe0fa..a4251d9f17083 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -61,8 +61,10 @@ public Processor create(Map processorFactories, Strin return new MatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField, maxMatches); case EnrichPolicy.GEO_MATCH_TYPE: + String relationStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "shape_relation", "intersects"); + ShapeRelation shapeRelation = ShapeRelation.getRelationByName(relationStr); return new GeoMatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField, - maxMatches, ShapeRelation.INTERSECTS); + maxMatches, shapeRelation); default: throw new IllegalArgumentException("unsupported policy type [" + policyType + "]"); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java index ae24cc7ce9443..ad4b7796eb1d5 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java @@ -12,9 +12,8 @@ import org.elasticsearch.common.geo.GeoUtils; import org.elasticsearch.common.geo.ShapeRelation; import org.elasticsearch.geometry.Point; -import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.GeoShapeQueryBuilder; -import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.index.query.QueryBuilder; import java.util.function.BiConsumer; @@ -30,7 +29,8 @@ public final class GeoMatchProcessor extends AbstractEnrichProcessor { boolean overrideEnabled, boolean ignoreMissing, String matchField, - int maxMatches, ShapeRelation shapeRelation) { + int maxMatches, + ShapeRelation shapeRelation) { super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); this.shapeRelation = shapeRelation; } @@ -50,18 +50,11 @@ public final class GeoMatchProcessor extends AbstractEnrichProcessor { } @Override - public SearchSourceBuilder getSearchSourceBuilder(Object fieldValue) { + public QueryBuilder getQueryBuilder(Object fieldValue) { GeoPoint point = GeoUtils.parseGeoPoint(fieldValue, true); GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, new Point(point.lon(), point.lat())); shapeQuery.relation(shapeRelation); - ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(shapeQuery); - SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); - searchBuilder.from(0); - searchBuilder.size(maxMatches); - searchBuilder.trackScores(false); - searchBuilder.fetchSource(true); - searchBuilder.query(constantScore); - return searchBuilder; + return shapeQuery; } public ShapeRelation getShapeRelation() { diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java index c320fd3ae52fd..55a3b2275e9c8 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java @@ -8,9 +8,8 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; -import org.elasticsearch.search.builder.SearchSourceBuilder; import java.util.function.BiConsumer; @@ -42,15 +41,7 @@ public class MatchProcessor extends AbstractEnrichProcessor { } @Override - public SearchSourceBuilder getSearchSourceBuilder(Object fieldValue) { - TermQueryBuilder termQuery = new TermQueryBuilder(matchField, fieldValue); - ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery); - SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); - searchBuilder.from(0); - searchBuilder.size(maxMatches); - searchBuilder.trackScores(false); - searchBuilder.fetchSource(true); - searchBuilder.query(constantScore); - return searchBuilder; + public QueryBuilder getQueryBuilder(Object fieldValue) { + return new TermQueryBuilder(matchField, fieldValue); } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index 853386c65c6d5..49085f8bfb89f 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -150,6 +150,90 @@ public void testRunner() throws Exception { ensureEnrichIndexIsReadOnly(createdEnrichIndex); } + public void testRunnerGeoMatchType() throws Exception { + final String sourceIndex = "source-index"; + IndexResponse indexRequest = client().index(new IndexRequest() + .index(sourceIndex) + .id("id") + .source( + "{" + + "\"location\":" + + "\"POINT(10.0 10.0)\"," + + "\"zipcode\":90210" + + "}", + XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex) + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("location"), is(equalTo("POINT(10.0 10.0)"))); + assertThat(sourceDocMap.get("zipcode"), is(equalTo(90210))); + + List enrichFields = List.of("zipcode"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.GEO_MATCH_TYPE, null, List.of(sourceIndex), "location", enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap(); + validateMappingMetadata(mapping, policyName, policy); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map field1 = (Map) properties.get("location"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo("geo_shape"))); + assertNull(field1.get("doc_values")); + + // Validate document structure + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1") + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(2))); + assertThat(enrichDocument.get("location"), is(equalTo("POINT(10.0 10.0)"))); + assertThat(enrichDocument.get("zipcode"), is(equalTo(90210))); + + // Validate segments + validateSegments(createdEnrichIndex, 1); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + public void testRunnerMultiSource() throws Exception { String baseSourceName = "source-index-"; int numberOfSourceIndices = 3; From 3cc42e29288ef84cc3d1f96a947869cf3843b448 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 30 Sep 2019 14:39:11 -0700 Subject: [PATCH 4/6] undo field mapper changes to geo_shape. unintentional --- .../elasticsearch/index/mapper/AbstractGeometryFieldMapper.java | 2 -- .../elasticsearch/index/mapper/GeoShapeFieldMapperTests.java | 1 - 2 files changed, 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/AbstractGeometryFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/AbstractGeometryFieldMapper.java index 6c3a8068d771f..006af39b8f59b 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/AbstractGeometryFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/AbstractGeometryFieldMapper.java @@ -252,8 +252,6 @@ public Mapper.Builder parse(String name, Map node, ParserContext } Builder builder = newBuilder(name, params); - TypeParsers.parseField(builder, name, node, parserContext); - if (params.containsKey(Names.COERCE.getPreferredName())) { builder.coerce((Boolean)params.get(Names.COERCE.getPreferredName())); } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java index 3c645ee6912bb..e14d04eb7d664 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java @@ -49,7 +49,6 @@ public void testDefaultConfiguration() throws IOException { String mapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type1") .startObject("properties").startObject("location") .field("type", "geo_shape") - .field("doc_values", true) .endObject().endObject() .endObject().endObject()); From 0947a8acb80771ee067c299276c88caf8898d3b5 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 30 Sep 2019 15:07:18 -0700 Subject: [PATCH 5/6] oh my, forgot another miscode --- .../java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 7a4d2dd81fd82..c454e653093dd 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -203,7 +203,7 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false); // No need to also configure index_options, because keyword type defaults to 'docs'. } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) { - matchFieldMapping = (builder) -> builder.field("type", "geo_shape").field("doc_values", false); + matchFieldMapping = (builder) -> builder.field("type", "geo_shape"); } else { throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); } From 1c473cc6b0f9c569899fdd1ae9a8eb1df766f996 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 2 Oct 2019 13:28:06 -0700 Subject: [PATCH 6/6] support multiple geo-points as input --- .../xpack/enrich/GeoMatchProcessor.java | 32 +++++++++++++++++-- .../xpack/enrich/GeoMatchProcessorTests.java | 29 ++++++++++++++--- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java index ad4b7796eb1d5..ebe05772bf073 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java @@ -11,10 +11,14 @@ import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.geo.GeoUtils; import org.elasticsearch.common.geo.ShapeRelation; +import org.elasticsearch.geometry.Geometry; +import org.elasticsearch.geometry.MultiPoint; import org.elasticsearch.geometry.Point; import org.elasticsearch.index.query.GeoShapeQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; +import java.util.ArrayList; +import java.util.List; import java.util.function.BiConsumer; public final class GeoMatchProcessor extends AbstractEnrichProcessor { @@ -49,10 +53,34 @@ public final class GeoMatchProcessor extends AbstractEnrichProcessor { this.shapeRelation = shapeRelation; } + @SuppressWarnings("unchecked") @Override public QueryBuilder getQueryBuilder(Object fieldValue) { - GeoPoint point = GeoUtils.parseGeoPoint(fieldValue, true); - GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, new Point(point.lon(), point.lat())); + List points = new ArrayList<>(); + if (fieldValue instanceof List) { + List values = (List) fieldValue; + if (values.size() == 2 && values.get(0) instanceof Number) { + GeoPoint geoPoint = GeoUtils.parseGeoPoint(values, true); + points.add(new Point(geoPoint.lon(), geoPoint.lat())); + } else { + for (Object value : values) { + GeoPoint geoPoint = GeoUtils.parseGeoPoint(value, true); + points.add(new Point(geoPoint.lon(), geoPoint.lat())); + } + } + } else { + GeoPoint geoPoint = GeoUtils.parseGeoPoint(fieldValue, true); + points.add(new Point(geoPoint.lon(), geoPoint.lat())); + } + final Geometry queryGeometry; + if (points.isEmpty()) { + throw new IllegalArgumentException("no geopoints found"); + } else if (points.size() == 1) { + queryGeometry = points.get(0); + } else { + queryGeometry = new MultiPoint(points); + } + GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, queryGeometry); shapeQuery.relation(shapeRelation); return shapeQuery; } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java index c7e631809c110..52817dca83948 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java @@ -16,6 +16,8 @@ import org.elasticsearch.common.geo.ShapeRelation; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.geometry.Geometry; +import org.elasticsearch.geometry.MultiPoint; import org.elasticsearch.geometry.Point; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; @@ -39,20 +41,38 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class GeoMatchProcessorTests extends ESTestCase { public void testBasics() { + Point expectedPoint = new Point(-122.084110, 37.386637); + testBasicsForFieldValue(Map.of("lat", 37.386637, "lon", -122.084110), expectedPoint); + testBasicsForFieldValue("37.386637, -122.084110", expectedPoint); + testBasicsForFieldValue("POINT (-122.084110 37.386637)", expectedPoint); + testBasicsForFieldValue(List.of(-122.084110, 37.386637), expectedPoint); + testBasicsForFieldValue(List.of(List.of(-122.084110, 37.386637), "37.386637, -122.084110", "POINT (-122.084110 37.386637)"), + new MultiPoint(List.of(expectedPoint, expectedPoint, expectedPoint))); + + testBasicsForFieldValue("not a point", null); + } + + private void testBasicsForFieldValue(Object fieldValue, Geometry expectedGeometry) { int maxMatches = randomIntBetween(1, 8); MockSearchFunction mockSearch = mockedSearchFunction(Map.of("key", Map.of("shape", "object", "zipcode",94040))); GeoMatchProcessor processor = new GeoMatchProcessor("_tag", mockSearch, "_name", "location", "entry", - false, true, "shape", maxMatches, ShapeRelation.INTERSECTS); + false, false, "shape", maxMatches, ShapeRelation.INTERSECTS); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, - Map.of("location", "37.386637, -122.084110")); + Map.of("location", fieldValue)); // Run IngestDocument[] holder = new IngestDocument[1]; processor.execute(ingestDocument, (result, e) -> holder[0] = result); - assertThat(holder[0], notNullValue()); + if (expectedGeometry == null) { + assertThat(holder[0], nullValue()); + return; + } else { + assertThat(holder[0], notNullValue()); + } // Check request SearchRequest request = mockSearch.getCapturedRequest(); assertThat(request.indices().length, equalTo(1)); @@ -67,13 +87,14 @@ public void testBasics() { assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(GeoShapeQueryBuilder.class)); GeoShapeQueryBuilder shapeQueryBuilder = (GeoShapeQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery(); assertThat(shapeQueryBuilder.fieldName(), equalTo("shape")); - assertThat(shapeQueryBuilder.shape(), equalTo(new Point(-122.084110, 37.386637))); + assertThat(shapeQueryBuilder.shape(), equalTo(expectedGeometry)); // Check result List entries = ingestDocument.getFieldValue("entry", List.class); Map entry = (Map) entries.get(0); assertThat(entry.size(), equalTo(2)); assertThat(entry.get("zipcode"), equalTo(94040)); + } private static final class MockSearchFunction implements BiConsumer> {