Skip to content

Commit 840273e

Browse files
committed
Geo-Match Enrich Processor
this commit introduces a geo-match enrich processor that looks up a specific `geo_point` field in the enrich-index for all entries that have a geo_shape match field that meets some specific relation criteria with the input field. For example, the enrich index may contain documents with zipcodes and their respective geo_shape. Ingesting documents with a geo_point field can be enriched with which zipcode they associate according to which shape they are contained within. this commit also refactors some of the MatchProcessor by moving a lot of the shared code to AbstractEnrichProcessor. Closes elastic#42639.
1 parent e67b11c commit 840273e

File tree

8 files changed

+265
-318
lines changed

8 files changed

+265
-318
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,151 @@
55
*/
66
package org.elasticsearch.xpack.enrich;
77

8+
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.action.search.SearchRequest;
10+
import org.elasticsearch.action.search.SearchResponse;
11+
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.cluster.routing.Preference;
813
import org.elasticsearch.ingest.AbstractProcessor;
14+
import org.elasticsearch.ingest.IngestDocument;
15+
import org.elasticsearch.search.SearchHit;
16+
import org.elasticsearch.search.builder.SearchSourceBuilder;
17+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
18+
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.function.BiConsumer;
924

1025
public abstract class AbstractEnrichProcessor extends AbstractProcessor {
1126

1227
private final String policyName;
28+
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
29+
private final String field;
30+
private final String targetField;
31+
private final boolean ignoreMissing;
32+
private final boolean overrideEnabled;
33+
protected final String matchField;
34+
protected final int maxMatches;
35+
36+
37+
protected AbstractEnrichProcessor(String tag, Client client, String policyName, String field, String targetField,
38+
boolean ignoreMissing, boolean overrideEnabled, String matchField, int maxMatches) {
39+
this(tag, createSearchRunner(client), policyName, field, targetField, ignoreMissing, overrideEnabled, matchField, maxMatches);
40+
}
1341

14-
protected AbstractEnrichProcessor(String tag, String policyName) {
42+
protected AbstractEnrichProcessor(String tag,
43+
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
44+
String policyName, String field, String targetField, boolean ignoreMissing, boolean overrideEnabled,
45+
String matchField, int maxMatches) {
1546
super(tag);
1647
this.policyName = policyName;
48+
this.searchRunner = searchRunner;
49+
this.field = field;
50+
this.targetField = targetField;
51+
this.ignoreMissing = ignoreMissing;
52+
this.overrideEnabled = overrideEnabled;
53+
this.matchField = matchField;
54+
this.maxMatches = maxMatches;
55+
}
56+
57+
public abstract SearchSourceBuilder getSearchSourceBuilder(Object fieldValue);
58+
59+
@Override
60+
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
61+
try {
62+
// If a document does not have the enrich key, return the unchanged document
63+
final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
64+
if (value == null) {
65+
handler.accept(ingestDocument, null);
66+
return;
67+
}
68+
69+
SearchSourceBuilder searchSourceBuilder = getSearchSourceBuilder(value);
70+
71+
SearchRequest req = new SearchRequest();
72+
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
73+
req.preference(Preference.LOCAL.type());
74+
req.source(searchSourceBuilder);
75+
76+
searchRunner.accept(req, (searchResponse, e) -> {
77+
if (e != null) {
78+
handler.accept(null, e);
79+
return;
80+
}
81+
82+
// If the index is empty, return the unchanged document
83+
// If the enrich key does not exist in the index, throw an error
84+
// If no documents match the key, return the unchanged document
85+
SearchHit[] searchHits = searchResponse.getHits().getHits();
86+
if (searchHits.length < 1) {
87+
handler.accept(ingestDocument, null);
88+
return;
89+
}
90+
91+
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
92+
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
93+
for (SearchHit searchHit : searchHits) {
94+
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
95+
enrichDocuments.add(enrichDocument);
96+
}
97+
ingestDocument.setFieldValue(targetField, enrichDocuments);
98+
}
99+
handler.accept(ingestDocument, null);
100+
});
101+
} catch (Exception e) {
102+
handler.accept(null, e);
103+
}
104+
}
105+
106+
@Override
107+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
108+
throw new UnsupportedOperationException("this method should not get executed");
17109
}
18110

19111
public String getPolicyName() {
20112
return policyName;
21113
}
114+
115+
@Override
116+
public String getType() {
117+
return EnrichProcessorFactory.TYPE;
118+
}
119+
120+
String getField() {
121+
return field;
122+
}
123+
124+
public String getTargetField() {
125+
return targetField;
126+
}
127+
128+
boolean isIgnoreMissing() {
129+
return ignoreMissing;
130+
}
131+
132+
boolean isOverrideEnabled() {
133+
return overrideEnabled;
134+
}
135+
136+
public String getMatchField() {
137+
return matchField;
138+
}
139+
140+
int getMaxMatches() {
141+
return maxMatches;
142+
}
143+
144+
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
145+
return (req, handler) -> {
146+
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
147+
resp -> {
148+
handler.accept(resp, null);
149+
},
150+
e -> {
151+
handler.accept(null, e);
152+
}));
153+
};
154+
}
22155
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -196,31 +196,29 @@ private void validateField(Map<?, ?> properties, String fieldName, boolean field
196196

197197
private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
198198
// Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type
199-
String keyType;
199+
final String keyType;
200+
final MatchFieldMapping matchFieldMapping;
200201
if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
201-
keyType = "keyword";
202+
matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false);
202203
// No need to also configure index_options, because keyword type defaults to 'docs'.
203204
} else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) {
204-
keyType = "geo_shape";
205+
matchFieldMapping = (builder) -> builder.field("type", "geo_shape");
205206
} else {
206207
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
207208
}
208209

209210
// Enable _source on enrich index. Explicitly mark key mapping type.
210211
try {
211212
XContentBuilder builder = JsonXContent.contentBuilder();
212-
builder.startObject()
213+
builder = builder.startObject()
213214
.startObject(MapperService.SINGLE_MAPPING_NAME)
214215
.field("dynamic", false)
215216
.startObject("_source")
216217
.field("enabled", true)
217218
.endObject()
218219
.startObject("properties")
219-
.startObject(policy.getMatchField())
220-
.field("type", keyType)
221-
.field("doc_values", false)
222-
.endObject()
223-
.endObject()
220+
.startObject(policy.getMatchField());
221+
builder = matchFieldMapping.build(builder).endObject().endObject()
224222
.startObject("_meta")
225223
.field(ENRICH_README_FIELD_NAME, ENRICH_INDEX_README_TEXT)
226224
.field(ENRICH_POLICY_NAME_FIELD_NAME, policyName)
@@ -413,4 +411,9 @@ public void onFailure(Exception e) {
413411
}
414412
});
415413
}
414+
415+
@FunctionalInterface
416+
private interface MatchFieldMapping {
417+
XContentBuilder build(XContentBuilder builder) throws IOException;
418+
}
416419
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.cluster.metadata.AliasOrIndex;
1111
import org.elasticsearch.cluster.metadata.IndexMetaData;
1212
import org.elasticsearch.cluster.metadata.MetaData;
13+
import org.elasticsearch.common.geo.ShapeRelation;
1314
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1415
import org.elasticsearch.ingest.ConfigurationUtils;
1516
import org.elasticsearch.ingest.Processor;
@@ -57,11 +58,11 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
5758

5859
switch (policyType) {
5960
case EnrichPolicy.MATCH_TYPE:
60-
return new MatchProcessor(tag, client, policyName, field, targetField, matchField,
61-
ignoreMissing, overrideEnabled, maxMatches);
61+
return new MatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
62+
maxMatches);
6263
case EnrichPolicy.GEO_MATCH_TYPE:
63-
return new GeoMatchProcessor(tag, client, policyName, field, targetField, matchField,
64-
ignoreMissing, overrideEnabled, maxMatches);
64+
return new GeoMatchProcessor(tag, client, policyName, field, targetField, overrideEnabled, ignoreMissing, matchField,
65+
maxMatches, ShapeRelation.INTERSECTS);
6566
default:
6667
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
6768
}

0 commit comments

Comments
 (0)