diff --git a/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java index 65485bd5f9782..e14d04eb7d664 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java @@ -60,6 +60,7 @@ public void testDefaultConfiguration() throws IOException { GeoShapeFieldMapper geoShapeFieldMapper = (GeoShapeFieldMapper) fieldMapper; assertThat(geoShapeFieldMapper.fieldType().orientation(), equalTo(GeoShapeFieldMapper.Defaults.ORIENTATION.value())); + assertThat(geoShapeFieldMapper.fieldType.hasDocValues(), equalTo(false)); } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java index 34ee034326b12..4b56c5b59a554 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java @@ -35,7 +35,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment { public static final String ENRICH_INDEX_NAME_BASE = ".enrich-"; public static final String MATCH_TYPE = "match"; - public static final String[] SUPPORTED_POLICY_TYPES = new String[]{MATCH_TYPE}; + public static final String GEO_MATCH_TYPE = "geo_match"; + public static final String[] SUPPORTED_POLICY_TYPES = new String[]{ + MATCH_TYPE, + GEO_MATCH_TYPE + }; private static final ParseField QUERY = new ParseField("query"); private static final ParseField INDICES = new ParseField("indices"); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java index a2afd0ce17f66..5cedec045919f 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java @@ -5,18 +5,158 @@ */ package org.elasticsearch.xpack.enrich; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.routing.Preference; +import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; public abstract class AbstractEnrichProcessor extends AbstractProcessor { private final String policyName; + private final BiConsumer> searchRunner; + private final String field; + private final String targetField; + private final boolean ignoreMissing; + private final boolean overrideEnabled; + protected final String matchField; + protected final int maxMatches; + + protected AbstractEnrichProcessor(String tag, Client client, String policyName, String field, String targetField, + boolean ignoreMissing, boolean overrideEnabled, String matchField, int maxMatches) { + this(tag, createSearchRunner(client), policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); + } - protected AbstractEnrichProcessor(String tag, String policyName) { + protected AbstractEnrichProcessor(String tag, + BiConsumer> searchRunner, + String policyName, String field, String targetField, boolean ignoreMissing, boolean overrideEnabled, + String matchField, int maxMatches) { super(tag); this.policyName = policyName; + this.searchRunner = searchRunner; + this.field = field; + this.targetField = targetField; + this.ignoreMissing = ignoreMissing; + this.overrideEnabled = overrideEnabled; + this.matchField = matchField; + this.maxMatches = maxMatches; + } + + public abstract QueryBuilder getQueryBuilder(Object fieldValue); + + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + try { + // If a document does not have the enrich key, return the unchanged document + final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); + if (value == null) { + handler.accept(ingestDocument, null); + return; + } + + QueryBuilder queryBuilder = getQueryBuilder(value); + ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder); + SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.from(0); + searchBuilder.size(maxMatches); + searchBuilder.trackScores(false); + searchBuilder.fetchSource(true); + searchBuilder.query(constantScore); + SearchRequest req = new SearchRequest(); + req.indices(EnrichPolicy.getBaseName(getPolicyName())); + req.preference(Preference.LOCAL.type()); + req.source(searchBuilder); + + searchRunner.accept(req, (searchResponse, e) -> { + if (e != null) { + handler.accept(null, e); + return; + } + + // If the index is empty, return the unchanged document + // If the enrich key does not exist in the index, throw an error + // If no documents match the key, return the unchanged document + SearchHit[] searchHits = searchResponse.getHits().getHits(); + if (searchHits.length < 1) { + handler.accept(ingestDocument, null); + return; + } + + if (overrideEnabled || ingestDocument.hasField(targetField) == false) { + List> enrichDocuments = new ArrayList<>(searchHits.length); + for (SearchHit searchHit : searchHits) { + Map enrichDocument = searchHit.getSourceAsMap(); + enrichDocuments.add(enrichDocument); + } + ingestDocument.setFieldValue(targetField, enrichDocuments); + } + handler.accept(ingestDocument, null); + }); + } catch (Exception e) { + handler.accept(null, e); + } + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + throw new UnsupportedOperationException("this method should not get executed"); } public String getPolicyName() { return policyName; } + + @Override + public String getType() { + return EnrichProcessorFactory.TYPE; + } + + String getField() { + return field; + } + + public String getTargetField() { + return targetField; + } + + boolean isIgnoreMissing() { + return ignoreMissing; + } + + boolean isOverrideEnabled() { + return overrideEnabled; + } + + public String getMatchField() { + return matchField; + } + + int getMaxMatches() { + return maxMatches; + } + + private static BiConsumer> createSearchRunner(Client client) { + return (req, handler) -> { + client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap( + resp -> { + handler.accept(resp, null); + }, + e -> { + handler.accept(null, e); + })); + }; + } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index 0e8d30c2c4d1b..e239cf0a515bd 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -203,10 +204,13 @@ private void validateField(Map properties, String fieldName, boolean field private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { // Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type - String keyType; + final String keyType; + final CheckedFunction matchFieldMapping; if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) { - keyType = "keyword"; + matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false); // No need to also configure index_options, because keyword type defaults to 'docs'. + } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) { + matchFieldMapping = (builder) -> builder.field("type", "geo_shape"); } else { throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); } @@ -214,18 +218,15 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { // Enable _source on enrich index. Explicitly mark key mapping type. try { XContentBuilder builder = JsonXContent.contentBuilder(); - builder.startObject() + builder = builder.startObject() .startObject(MapperService.SINGLE_MAPPING_NAME) .field("dynamic", false) .startObject("_source") .field("enabled", true) .endObject() .startObject("properties") - .startObject(policy.getMatchField()) - .field("type", keyType) - .field("doc_values", false) - .endObject() - .endObject() + .startObject(policy.getMatchField()); + builder = matchFieldMapping.apply(builder).endObject().endObject() .startObject("_meta") .field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT) .field(ENRICH_POLICY_NAME_FIELD_NAME, policyName) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index 03c5f3dd58c6f..a4251d9f17083 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -10,6 +10,7 @@ import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.geo.ShapeRelation; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.Processor; @@ -57,8 +58,13 @@ public Processor create(Map processorFactories, Strin switch (policyType) { case EnrichPolicy.MATCH_TYPE: - return new MatchProcessor(tag, client, policyName, field, targetField, matchField, - ignoreMissing, overrideEnabled, maxMatches); + return new MatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField, + maxMatches); + case EnrichPolicy.GEO_MATCH_TYPE: + String relationStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "shape_relation", "intersects"); + ShapeRelation shapeRelation = ShapeRelation.getRelationByName(relationStr); + return new GeoMatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField, + maxMatches, shapeRelation); default: throw new IllegalArgumentException("unsupported policy type [" + policyType + "]"); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java new file mode 100644 index 0000000000000..ebe05772bf073 --- /dev/null +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.geo.GeoUtils; +import org.elasticsearch.common.geo.ShapeRelation; +import org.elasticsearch.geometry.Geometry; +import org.elasticsearch.geometry.MultiPoint; +import org.elasticsearch.geometry.Point; +import org.elasticsearch.index.query.GeoShapeQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; + +public final class GeoMatchProcessor extends AbstractEnrichProcessor { + + private ShapeRelation shapeRelation; + + GeoMatchProcessor(String tag, + Client client, + String policyName, + String field, + String targetField, + boolean overrideEnabled, + boolean ignoreMissing, + String matchField, + int maxMatches, + ShapeRelation shapeRelation) { + super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); + this.shapeRelation = shapeRelation; + } + + /** used in tests **/ + GeoMatchProcessor(String tag, + BiConsumer> searchRunner, + String policyName, + String field, + String targetField, + boolean overrideEnabled, + boolean ignoreMissing, + String matchField, + int maxMatches, ShapeRelation shapeRelation) { + super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); + this.shapeRelation = shapeRelation; + } + + @SuppressWarnings("unchecked") + @Override + public QueryBuilder getQueryBuilder(Object fieldValue) { + List points = new ArrayList<>(); + if (fieldValue instanceof List) { + List values = (List) fieldValue; + if (values.size() == 2 && values.get(0) instanceof Number) { + GeoPoint geoPoint = GeoUtils.parseGeoPoint(values, true); + points.add(new Point(geoPoint.lon(), geoPoint.lat())); + } else { + for (Object value : values) { + GeoPoint geoPoint = GeoUtils.parseGeoPoint(value, true); + points.add(new Point(geoPoint.lon(), geoPoint.lat())); + } + } + } else { + GeoPoint geoPoint = GeoUtils.parseGeoPoint(fieldValue, true); + points.add(new Point(geoPoint.lon(), geoPoint.lat())); + } + final Geometry queryGeometry; + if (points.isEmpty()) { + throw new IllegalArgumentException("no geopoints found"); + } else if (points.size() == 1) { + queryGeometry = points.get(0); + } else { + queryGeometry = new MultiPoint(points); + } + GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, queryGeometry); + shapeQuery.relation(shapeRelation); + return shapeQuery; + } + + public ShapeRelation getShapeRelation() { + return shapeRelation; + } +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java index 3e69690626760..55a3b2275e9c8 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java @@ -5,172 +5,43 @@ */ package org.elasticsearch.xpack.enrich; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.routing.Preference; -import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.function.BiConsumer; -public final class MatchProcessor extends AbstractEnrichProcessor { - - private final BiConsumer> searchRunner; - private final String field; - private final String targetField; - private final String matchField; - private final boolean ignoreMissing; - private final boolean overrideEnabled; - private final int maxMatches; +public class MatchProcessor extends AbstractEnrichProcessor { MatchProcessor(String tag, Client client, String policyName, String field, String targetField, - String matchField, - boolean ignoreMissing, boolean overrideEnabled, + boolean ignoreMissing, + String matchField, int maxMatches) { - this( - tag, - createSearchRunner(client), - policyName, - field, - targetField, - matchField, - ignoreMissing, - overrideEnabled, - maxMatches - ); + super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); } + /** used in tests **/ MatchProcessor(String tag, BiConsumer> searchRunner, String policyName, String field, String targetField, - String matchField, - boolean ignoreMissing, boolean overrideEnabled, + boolean ignoreMissing, + String matchField, int maxMatches) { - super(tag, policyName); - this.searchRunner = searchRunner; - this.field = field; - this.targetField = targetField; - this.matchField = matchField; - this.ignoreMissing = ignoreMissing; - this.overrideEnabled = overrideEnabled; - this.maxMatches = maxMatches; - } - - @Override - public void execute(IngestDocument ingestDocument, BiConsumer handler) { - try { - // If a document does not have the enrich key, return the unchanged document - final String value = ingestDocument.getFieldValue(field, String.class, ignoreMissing); - if (value == null) { - handler.accept(ingestDocument, null); - return; - } - - TermQueryBuilder termQuery = new TermQueryBuilder(matchField, value); - ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(termQuery); - SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); - searchBuilder.from(0); - searchBuilder.size(maxMatches); - searchBuilder.trackScores(false); - searchBuilder.fetchSource(true); - searchBuilder.query(constantScore); - - SearchRequest req = new SearchRequest(); - req.indices(EnrichPolicy.getBaseName(getPolicyName())); - req.preference(Preference.LOCAL.type()); - req.source(searchBuilder); - - searchRunner.accept(req, (searchResponse, e) -> { - if (e != null) { - handler.accept(null, e); - return; - } - - // If the index is empty, return the unchanged document - // If the enrich key does not exist in the index, throw an error - // If no documents match the key, return the unchanged document - SearchHit[] searchHits = searchResponse.getHits().getHits(); - if (searchHits.length < 1) { - handler.accept(ingestDocument, null); - return; - } - - if (overrideEnabled || ingestDocument.hasField(targetField) == false) { - List> enrichDocuments = new ArrayList<>(searchHits.length); - for (SearchHit searchHit : searchHits) { - Map enrichDocument = searchHit.getSourceAsMap(); - enrichDocuments.add(enrichDocument); - } - ingestDocument.setFieldValue(targetField, enrichDocuments); - } - handler.accept(ingestDocument, null); - }); - } catch (Exception e) { - handler.accept(null, e); - } + super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches); } @Override - public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - throw new UnsupportedOperationException("this method should not get executed"); - } - - @Override - public String getType() { - return EnrichProcessorFactory.TYPE; - } - - String getField() { - return field; - } - - public String getTargetField() { - return targetField; - } - - public String getMatchField() { - return matchField; - } - - boolean isIgnoreMissing() { - return ignoreMissing; - } - - boolean isOverrideEnabled() { - return overrideEnabled; - } - - int getMaxMatches() { - return maxMatches; - } - - private static BiConsumer> createSearchRunner(Client client) { - return (req, handler) -> { - client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap( - resp -> { - handler.accept(resp, null); - }, - e -> { - handler.accept(null, e); - })); - }; + public QueryBuilder getQueryBuilder(Object fieldValue) { + return new TermQueryBuilder(matchField, fieldValue); } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index 5e6d686292c86..cba5bceb23e8b 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -51,10 +51,10 @@ protected boolean resetNodeAfterTest() { return true; } - public void testIngestDataWithEnrichProcessor() { + public void testIngestDataWithMatchProcessor() { int numDocs = 32; int maxMatches = randomIntBetween(2, 8); - List keys = createSourceIndex(numDocs, maxMatches); + List keys = createSourceMatchIndex(numDocs, maxMatches); String policyName = "my-policy"; EnrichPolicy enrichPolicy = @@ -110,6 +110,62 @@ public void testIngestDataWithEnrichProcessor() { assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo((long) numDocs)); } + public void testIngestDataWithGeoMatchProcessor() { + String matchField = "location"; + String enrichField = "zipcode"; + // create enrich index + { + IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME); + indexRequest.source(Map.of(matchField, "POLYGON((" + + "-122.08592534065245 37.38501746624134," + + "-122.08193421363829 37.38501746624134," + + "-122.08193421363829 37.3879329075567," + + "-122.08592534065245 37.3879329075567," + + "-122.08592534065245 37.38501746624134))", + "zipcode", "94040")); + client().index(indexRequest).actionGet(); + client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet(); + } + + String policyName = "my-policy"; + EnrichPolicy enrichPolicy = + new EnrichPolicy(EnrichPolicy.GEO_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), matchField, List.of(enrichField)); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + + String pipelineName = "my-pipeline"; + String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName + + "\", \"field\": \"" + matchField + "\", \"target_field\": \"enriched\", \"max_matches\": 1 }}]}"; + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).actionGet(); + + BulkRequest bulkRequest = new BulkRequest("my-index"); + IndexRequest indexRequest = new IndexRequest(); + indexRequest.id("_id"); + indexRequest.setPipeline(pipelineName); + indexRequest.source(Map.of(matchField, "37.386444, -122.083863")); // point within match boundary + bulkRequest.add(indexRequest); + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false)); + assertThat(bulkResponse.getItems().length, equalTo(1)); + assertThat(bulkResponse.getItems()[0].getId(), equalTo("_id")); + + GetResponse getResponse = client().get(new GetRequest("my-index", "_id")).actionGet(); + Map source = getResponse.getSourceAsMap(); + List entries = (List) source.get("enriched"); + assertThat(entries, notNullValue()); + assertThat(entries.size(), equalTo(1)); + + EnrichStatsAction.Response statsResponse = + client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()).actionGet(); + assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1)); + String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId(); + assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), equalTo(localNodeId)); + assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L)); + assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo(1L)); + } + public void testMultiplePolicies() { int numPolicies = 8; for (int i = 0; i < numPolicies; i++) { @@ -152,7 +208,7 @@ public void testMultiplePolicies() { } } - private List createSourceIndex(int numKeys, int numDocsPerKey) { + private List createSourceMatchIndex(int numKeys, int numDocsPerKey) { Set keys = new HashSet<>(); for (int id = 0; id < numKeys; id++) { String key; @@ -170,5 +226,4 @@ private List createSourceIndex(int numKeys, int numDocsPerKey) { client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet(); return List.copyOf(keys); } - } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index 3c14d100580e5..00f4bc16d8e65 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -152,6 +152,90 @@ public void testRunner() throws Exception { ensureEnrichIndexIsReadOnly(createdEnrichIndex); } + public void testRunnerGeoMatchType() throws Exception { + final String sourceIndex = "source-index"; + IndexResponse indexRequest = client().index(new IndexRequest() + .index(sourceIndex) + .id("id") + .source( + "{" + + "\"location\":" + + "\"POINT(10.0 10.0)\"," + + "\"zipcode\":90210" + + "}", + XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex) + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("location"), is(equalTo("POINT(10.0 10.0)"))); + assertThat(sourceDocMap.get("zipcode"), is(equalTo(90210))); + + List enrichFields = List.of("zipcode"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.GEO_MATCH_TYPE, null, List.of(sourceIndex), "location", enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).get("_doc").sourceAsMap(); + validateMappingMetadata(mapping, policyName, policy); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map field1 = (Map) properties.get("location"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo("geo_shape"))); + assertNull(field1.get("doc_values")); + + // Validate document structure + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1") + .source(SearchSourceBuilder.searchSource() + .query(QueryBuilders.matchAllQuery()))).actionGet(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(2))); + assertThat(enrichDocument.get("location"), is(equalTo("POINT(10.0 10.0)"))); + assertThat(enrichDocument.get("zipcode"), is(equalTo(90210))); + + // Validate segments + validateSegments(createdEnrichIndex, 1); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + public void testRunnerMultiSource() throws Exception { String baseSourceName = "source-index-"; int numberOfSourceIndices = 3; diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java new file mode 100644 index 0000000000000..52817dca83948 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.enrich; + +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.cluster.routing.Preference; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.geo.ShapeRelation; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.geometry.Geometry; +import org.elasticsearch.geometry.MultiPoint; +import org.elasticsearch.geometry.Point; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.query.ConstantScoreQueryBuilder; +import org.elasticsearch.index.query.GeoShapeQueryBuilder; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.test.ESTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.hamcrest.Matchers.emptyArray; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class GeoMatchProcessorTests extends ESTestCase { + + public void testBasics() { + Point expectedPoint = new Point(-122.084110, 37.386637); + testBasicsForFieldValue(Map.of("lat", 37.386637, "lon", -122.084110), expectedPoint); + testBasicsForFieldValue("37.386637, -122.084110", expectedPoint); + testBasicsForFieldValue("POINT (-122.084110 37.386637)", expectedPoint); + testBasicsForFieldValue(List.of(-122.084110, 37.386637), expectedPoint); + testBasicsForFieldValue(List.of(List.of(-122.084110, 37.386637), "37.386637, -122.084110", "POINT (-122.084110 37.386637)"), + new MultiPoint(List.of(expectedPoint, expectedPoint, expectedPoint))); + + testBasicsForFieldValue("not a point", null); + } + + private void testBasicsForFieldValue(Object fieldValue, Geometry expectedGeometry) { + int maxMatches = randomIntBetween(1, 8); + MockSearchFunction mockSearch = mockedSearchFunction(Map.of("key", Map.of("shape", "object", "zipcode",94040))); + GeoMatchProcessor processor = new GeoMatchProcessor("_tag", mockSearch, "_name", "location", "entry", + false, false, "shape", maxMatches, ShapeRelation.INTERSECTS); + IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, + Map.of("location", fieldValue)); + // Run + IngestDocument[] holder = new IngestDocument[1]; + processor.execute(ingestDocument, (result, e) -> holder[0] = result); + if (expectedGeometry == null) { + assertThat(holder[0], nullValue()); + return; + } else { + assertThat(holder[0], notNullValue()); + } + // Check request + SearchRequest request = mockSearch.getCapturedRequest(); + assertThat(request.indices().length, equalTo(1)); + assertThat(request.indices()[0], equalTo(".enrich-_name")); + assertThat(request.preference(), equalTo(Preference.LOCAL.type())); + assertThat(request.source().size(), equalTo(maxMatches)); + assertThat(request.source().trackScores(), equalTo(false)); + assertThat(request.source().fetchSource().fetchSource(), equalTo(true)); + assertThat(request.source().fetchSource().excludes(), emptyArray()); + assertThat(request.source().fetchSource().includes(), emptyArray()); + assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class)); + assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(GeoShapeQueryBuilder.class)); + GeoShapeQueryBuilder shapeQueryBuilder = (GeoShapeQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery(); + assertThat(shapeQueryBuilder.fieldName(), equalTo("shape")); + assertThat(shapeQueryBuilder.shape(), equalTo(expectedGeometry)); + + // Check result + List entries = ingestDocument.getFieldValue("entry", List.class); + Map entry = (Map) entries.get(0); + assertThat(entry.size(), equalTo(2)); + assertThat(entry.get("zipcode"), equalTo(94040)); + + } + + private static final class MockSearchFunction implements BiConsumer> { + private final SearchResponse mockResponse; + private final SetOnce capturedRequest; + private final Exception exception; + + MockSearchFunction(SearchResponse mockResponse) { + this.mockResponse = mockResponse; + this.exception = null; + this.capturedRequest = new SetOnce<>(); + } + + MockSearchFunction(Exception exception) { + this.mockResponse = null; + this.exception = exception; + this.capturedRequest = new SetOnce<>(); + } + + @Override + public void accept(SearchRequest request, BiConsumer handler) { + capturedRequest.set(request); + if (exception != null) { + handler.accept(null, exception); + } else { + handler.accept(mockResponse, null); + } + } + + SearchRequest getCapturedRequest() { + return capturedRequest.get(); + } + } + + public MockSearchFunction mockedSearchFunction() { + return new MockSearchFunction(mockResponse(Collections.emptyMap())); + } + + public MockSearchFunction mockedSearchFunction(Exception exception) { + return new MockSearchFunction(exception); + } + + public MockSearchFunction mockedSearchFunction(Map> documents) { + return new MockSearchFunction(mockResponse(documents)); + } + + public SearchResponse mockResponse(Map> documents) { + SearchHit[] searchHits = documents.entrySet().stream().map(e -> { + SearchHit searchHit = new SearchHit(randomInt(100), e.getKey(), Collections.emptyMap()); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) { + builder.map(e.getValue()); + builder.flush(); + ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream(); + searchHit.sourceRef(new BytesArray(outputStream.toByteArray())); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + return searchHit; + }).toArray(SearchHit[]::new); + return new SearchResponse(new SearchResponseSections( + new SearchHits(searchHits, new TotalHits(documents.size(), TotalHits.Relation.EQUAL_TO), 1.0f), + new Aggregations(Collections.emptyList()), new Suggest(Collections.emptyList()), + false, false, null, 1), null, 1, 1, 0, 1, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(1, 1, 0)); + } +} diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java index c7e1010a046b4..e7a7a2a99a2ed 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java @@ -46,7 +46,7 @@ public class MatchProcessorTests extends ESTestCase { public void testBasics() throws Exception { int maxMatches = randomIntBetween(1, 8); MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); - MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, maxMatches); + MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", maxMatches); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.co")); // Run @@ -79,7 +79,7 @@ public void testBasics() throws Exception { public void testNoMatch() throws Exception { MockSearchFunction mockSearch = mockedSearchFunction(); - MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, 1); + MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", 1); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.com")); int numProperties = ingestDocument.getSourceAndMetadata().size(); @@ -109,7 +109,7 @@ public void testNoMatch() throws Exception { public void testSearchFailure() throws Exception { String indexName = ".enrich-_name"; MockSearchFunction mockSearch = mockedSearchFunction(new IndexNotFoundException(indexName)); - MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, true, 1); + MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", true, false, "domain", 1); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of("domain", "elastic.com")); // Run @@ -144,7 +144,7 @@ public void testSearchFailure() throws Exception { public void testIgnoreKeyMissing() throws Exception { { MatchProcessor processor = - new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", true, true, 1); + new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", true, true, "domain", 1); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); assertThat(ingestDocument.getSourceAndMetadata().size(), equalTo(6)); @@ -155,7 +155,7 @@ public void testIgnoreKeyMissing() throws Exception { } { MatchProcessor processor = - new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", "domain", false, true, 1); + new MatchProcessor("_tag", mockedSearchFunction(), "_name", "domain", "entry", true, false, "domain", 1); IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL, Map.of()); IngestDocument[] resultHolder = new IngestDocument[1]; Exception[] exceptionHolder = new Exception[1]; @@ -171,7 +171,7 @@ public void testIgnoreKeyMissing() throws Exception { public void testExistingFieldWithOverrideDisabled() throws Exception { MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); - MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false, 1); + MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, false, "domain", 1); IngestDocument ingestDocument = new IngestDocument(new HashMap<>(Map.of("domain", "elastic.co", "tld", "tld")), Map.of()); IngestDocument[] resultHolder = new IngestDocument[1]; @@ -187,7 +187,7 @@ public void testExistingFieldWithOverrideDisabled() throws Exception { public void testExistingNullFieldWithOverrideDisabled() throws Exception { MockSearchFunction mockSearch = mockedSearchFunction(Map.of("elastic.co", Map.of("globalRank", 451, "tldRank",23, "tld", "co"))); - MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", "domain", false, false, 1); + MatchProcessor processor = new MatchProcessor("_tag", mockSearch, "_name", "domain", "entry", false, false, "domain", 1); Map source = new HashMap<>(); source.put("domain", "elastic.co");