Skip to content

Commit 0bf7d2c

Browse files
committed
Geo-Match Enrich Processor (elastic#47243)
this commit introduces a geo-match enrich processor that looks up a specific `geo_point` field in the enrich-index for all entries that have a geo_shape match field that meets some specific relation criteria with the input field. For example, the enrich index may contain documents with zipcodes and their respective geo_shape. Ingesting documents with a geo_point field can be enriched with which zipcode they associate according to which shape they are contained within. this commit also refactors some of the MatchProcessor by moving a lot of the shared code to AbstractEnrichProcessor. Closes elastic#42639.
1 parent f2f2304 commit 0bf7d2c

File tree

11 files changed

+584
-163
lines changed

11 files changed

+584
-163
lines changed

server/src/test/java/org/elasticsearch/index/mapper/GeoShapeFieldMapperTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public void testDefaultConfiguration() throws IOException {
6060
GeoShapeFieldMapper geoShapeFieldMapper = (GeoShapeFieldMapper) fieldMapper;
6161
assertThat(geoShapeFieldMapper.fieldType().orientation(),
6262
equalTo(GeoShapeFieldMapper.Defaults.ORIENTATION.value()));
63+
assertThat(geoShapeFieldMapper.fieldType.hasDocValues(), equalTo(false));
6364
}
6465

6566
/**

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
3535
public static final String ENRICH_INDEX_NAME_BASE = ".enrich-";
3636

3737
public static final String MATCH_TYPE = "match";
38-
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{MATCH_TYPE};
38+
public static final String GEO_MATCH_TYPE = "geo_match";
39+
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{
40+
MATCH_TYPE,
41+
GEO_MATCH_TYPE
42+
};
3943

4044
private static final ParseField QUERY = new ParseField("query");
4145
private static final ParseField INDICES = new ParseField("indices");

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,158 @@
55
*/
66
package org.elasticsearch.xpack.enrich;
77

8+
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.action.search.SearchRequest;
10+
import org.elasticsearch.action.search.SearchResponse;
11+
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.cluster.routing.Preference;
13+
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
14+
import org.elasticsearch.index.query.QueryBuilder;
815
import org.elasticsearch.ingest.AbstractProcessor;
16+
import org.elasticsearch.ingest.IngestDocument;
17+
import org.elasticsearch.search.SearchHit;
18+
import org.elasticsearch.search.builder.SearchSourceBuilder;
19+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
20+
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.function.BiConsumer;
926

1027
public abstract class AbstractEnrichProcessor extends AbstractProcessor {
1128

1229
private final String policyName;
30+
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
31+
private final String field;
32+
private final String targetField;
33+
private final boolean ignoreMissing;
34+
private final boolean overrideEnabled;
35+
protected final String matchField;
36+
protected final int maxMatches;
37+
38+
protected AbstractEnrichProcessor(String tag, Client client, String policyName, String field, String targetField,
39+
boolean ignoreMissing, boolean overrideEnabled, String matchField, int maxMatches) {
40+
this(tag, createSearchRunner(client), policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
41+
}
1342

14-
protected AbstractEnrichProcessor(String tag, String policyName) {
43+
protected AbstractEnrichProcessor(String tag,
44+
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
45+
String policyName, String field, String targetField, boolean ignoreMissing, boolean overrideEnabled,
46+
String matchField, int maxMatches) {
1547
super(tag);
1648
this.policyName = policyName;
49+
this.searchRunner = searchRunner;
50+
this.field = field;
51+
this.targetField = targetField;
52+
this.ignoreMissing = ignoreMissing;
53+
this.overrideEnabled = overrideEnabled;
54+
this.matchField = matchField;
55+
this.maxMatches = maxMatches;
56+
}
57+
58+
public abstract QueryBuilder getQueryBuilder(Object fieldValue);
59+
60+
@Override
61+
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
62+
try {
63+
// If a document does not have the enrich key, return the unchanged document
64+
final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
65+
if (value == null) {
66+
handler.accept(ingestDocument, null);
67+
return;
68+
}
69+
70+
QueryBuilder queryBuilder = getQueryBuilder(value);
71+
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
72+
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
73+
searchBuilder.from(0);
74+
searchBuilder.size(maxMatches);
75+
searchBuilder.trackScores(false);
76+
searchBuilder.fetchSource(true);
77+
searchBuilder.query(constantScore);
78+
SearchRequest req = new SearchRequest();
79+
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
80+
req.preference(Preference.LOCAL.type());
81+
req.source(searchBuilder);
82+
83+
searchRunner.accept(req, (searchResponse, e) -> {
84+
if (e != null) {
85+
handler.accept(null, e);
86+
return;
87+
}
88+
89+
// If the index is empty, return the unchanged document
90+
// If the enrich key does not exist in the index, throw an error
91+
// If no documents match the key, return the unchanged document
92+
SearchHit[] searchHits = searchResponse.getHits().getHits();
93+
if (searchHits.length < 1) {
94+
handler.accept(ingestDocument, null);
95+
return;
96+
}
97+
98+
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
99+
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
100+
for (SearchHit searchHit : searchHits) {
101+
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
102+
enrichDocuments.add(enrichDocument);
103+
}
104+
ingestDocument.setFieldValue(targetField, enrichDocuments);
105+
}
106+
handler.accept(ingestDocument, null);
107+
});
108+
} catch (Exception e) {
109+
handler.accept(null, e);
110+
}
111+
}
112+
113+
@Override
114+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
115+
throw new UnsupportedOperationException("this method should not get executed");
17116
}
18117

19118
public String getPolicyName() {
20119
return policyName;
21120
}
121+
122+
@Override
123+
public String getType() {
124+
return EnrichProcessorFactory.TYPE;
125+
}
126+
127+
String getField() {
128+
return field;
129+
}
130+
131+
public String getTargetField() {
132+
return targetField;
133+
}
134+
135+
boolean isIgnoreMissing() {
136+
return ignoreMissing;
137+
}
138+
139+
boolean isOverrideEnabled() {
140+
return overrideEnabled;
141+
}
142+
143+
public String getMatchField() {
144+
return matchField;
145+
}
146+
147+
int getMaxMatches() {
148+
return maxMatches;
149+
}
150+
151+
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
152+
return (req, handler) -> {
153+
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
154+
resp -> {
155+
handler.accept(resp, null);
156+
},
157+
e -> {
158+
handler.accept(null, e);
159+
}));
160+
};
161+
}
22162
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2929
import org.elasticsearch.cluster.metadata.MappingMetaData;
3030
import org.elasticsearch.cluster.service.ClusterService;
31+
import org.elasticsearch.common.CheckedFunction;
3132
import org.elasticsearch.common.Strings;
3233
import org.elasticsearch.common.bytes.BytesArray;
3334
import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -196,29 +197,29 @@ private void validateField(Map<?, ?> properties, String fieldName, boolean field
196197

197198
private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
198199
// Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type
199-
String keyType;
200+
final String keyType;
201+
final CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping;
200202
if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
201-
keyType = "keyword";
203+
matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false);
202204
// No need to also configure index_options, because keyword type defaults to 'docs'.
205+
} else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) {
206+
matchFieldMapping = (builder) -> builder.field("type", "geo_shape");
203207
} else {
204208
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
205209
}
206210

207211
// Enable _source on enrich index. Explicitly mark key mapping type.
208212
try {
209213
XContentBuilder builder = JsonXContent.contentBuilder();
210-
builder.startObject()
214+
builder = builder.startObject()
211215
.startObject(MapperService.SINGLE_MAPPING_NAME)
212216
.field("dynamic", false)
213217
.startObject("_source")
214218
.field("enabled", true)
215219
.endObject()
216220
.startObject("properties")
217-
.startObject(policy.getMatchField())
218-
.field("type", keyType)
219-
.field("doc_values", false)
220-
.endObject()
221-
.endObject()
221+
.startObject(policy.getMatchField());
222+
builder = matchFieldMapping.apply(builder).endObject().endObject()
222223
.startObject("_meta")
223224
.field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT)
224225
.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName)

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.cluster.metadata.AliasOrIndex;
1111
import org.elasticsearch.cluster.metadata.IndexMetaData;
1212
import org.elasticsearch.cluster.metadata.MetaData;
13+
import org.elasticsearch.common.geo.ShapeRelation;
1314
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1415
import org.elasticsearch.ingest.ConfigurationUtils;
1516
import org.elasticsearch.ingest.Processor;
@@ -57,8 +58,13 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
5758

5859
switch (policyType) {
5960
case EnrichPolicy.MATCH_TYPE:
60-
return new MatchProcessor(tag, client, policyName, field, targetField, matchField,
61-
ignoreMissing, overrideEnabled, maxMatches);
61+
return new MatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
62+
maxMatches);
63+
case EnrichPolicy.GEO_MATCH_TYPE:
64+
String relationStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "shape_relation", "intersects");
65+
ShapeRelation shapeRelation = ShapeRelation.getRelationByName(relationStr);
66+
return new GeoMatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
67+
maxMatches, shapeRelation);
6268
default:
6369
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
6470
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import org.elasticsearch.action.search.SearchRequest;
9+
import org.elasticsearch.action.search.SearchResponse;
10+
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.common.geo.GeoPoint;
12+
import org.elasticsearch.common.geo.GeoUtils;
13+
import org.elasticsearch.common.geo.ShapeRelation;
14+
import org.elasticsearch.geometry.Geometry;
15+
import org.elasticsearch.geometry.MultiPoint;
16+
import org.elasticsearch.geometry.Point;
17+
import org.elasticsearch.index.query.GeoShapeQueryBuilder;
18+
import org.elasticsearch.index.query.QueryBuilder;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.function.BiConsumer;
23+
24+
public final class GeoMatchProcessor extends AbstractEnrichProcessor {
25+
26+
private ShapeRelation shapeRelation;
27+
28+
GeoMatchProcessor(String tag,
29+
Client client,
30+
String policyName,
31+
String field,
32+
String targetField,
33+
boolean overrideEnabled,
34+
boolean ignoreMissing,
35+
String matchField,
36+
int maxMatches,
37+
ShapeRelation shapeRelation) {
38+
super(tag, client, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
39+
this.shapeRelation = shapeRelation;
40+
}
41+
42+
/** used in tests **/
43+
GeoMatchProcessor(String tag,
44+
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
45+
String policyName,
46+
String field,
47+
String targetField,
48+
boolean overrideEnabled,
49+
boolean ignoreMissing,
50+
String matchField,
51+
int maxMatches, ShapeRelation shapeRelation) {
52+
super(tag, searchRunner, policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
53+
this.shapeRelation = shapeRelation;
54+
}
55+
56+
@SuppressWarnings("unchecked")
57+
@Override
58+
public QueryBuilder getQueryBuilder(Object fieldValue) {
59+
List<Point> points = new ArrayList<>();
60+
if (fieldValue instanceof List) {
61+
List<Object> values = (List<Object>) fieldValue;
62+
if (values.size() == 2 && values.get(0) instanceof Number) {
63+
GeoPoint geoPoint = GeoUtils.parseGeoPoint(values, true);
64+
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
65+
} else {
66+
for (Object value : values) {
67+
GeoPoint geoPoint = GeoUtils.parseGeoPoint(value, true);
68+
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
69+
}
70+
}
71+
} else {
72+
GeoPoint geoPoint = GeoUtils.parseGeoPoint(fieldValue, true);
73+
points.add(new Point(geoPoint.lon(), geoPoint.lat()));
74+
}
75+
final Geometry queryGeometry;
76+
if (points.isEmpty()) {
77+
throw new IllegalArgumentException("no geopoints found");
78+
} else if (points.size() == 1) {
79+
queryGeometry = points.get(0);
80+
} else {
81+
queryGeometry = new MultiPoint(points);
82+
}
83+
GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, queryGeometry);
84+
shapeQuery.relation(shapeRelation);
85+
return shapeQuery;
86+
}
87+
88+
public ShapeRelation getShapeRelation() {
89+
return shapeRelation;
90+
}
91+
}

0 commit comments

Comments
 (0)