Skip to content

Commit e67b11c

Browse files
committed
init
1 parent 69cad3d commit e67b11c

File tree

5 files changed

+346
-1
lines changed

5 files changed

+346
-1
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {
3636
public static final String ENRICH_INDEX_NAME_BASE = ".enrich-";
3737

3838
public static final String MATCH_TYPE = "match";
39-
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{MATCH_TYPE};
39+
public static final String GEO_MATCH_TYPE = "geo_match";
40+
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{
41+
MATCH_TYPE,
42+
GEO_MATCH_TYPE
43+
};
4044

4145
private static final ParseField QUERY = new ParseField("query");
4246
private static final ParseField INDICES = new ParseField("indices");

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
200200
if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
201201
keyType = "keyword";
202202
// No need to also configure index_options, because keyword type defaults to 'docs'.
203+
} else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) {
204+
keyType = "geo_shape";
203205
} else {
204206
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
205207
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
5959
case EnrichPolicy.MATCH_TYPE:
6060
return new MatchProcessor(tag, client, policyName, field, targetField, matchField,
6161
ignoreMissing, overrideEnabled, maxMatches);
62+
case EnrichPolicy.GEO_MATCH_TYPE:
63+
return new GeoMatchProcessor(tag, client, policyName, field, targetField, matchField,
64+
ignoreMissing, overrideEnabled, maxMatches);
6265
default:
6366
throw new IllegalArgumentException("unsupported policy type [" + policyType + "]");
6467
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.action.search.SearchRequest;
10+
import org.elasticsearch.action.search.SearchResponse;
11+
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.cluster.routing.Preference;
13+
import org.elasticsearch.common.geo.GeoPoint;
14+
import org.elasticsearch.common.geo.GeoUtils;
15+
import org.elasticsearch.common.geo.ShapeRelation;
16+
import org.elasticsearch.geometry.Point;
17+
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
18+
import org.elasticsearch.index.query.GeoShapeQueryBuilder;
19+
import org.elasticsearch.ingest.IngestDocument;
20+
import org.elasticsearch.search.SearchHit;
21+
import org.elasticsearch.search.builder.SearchSourceBuilder;
22+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
23+
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.function.BiConsumer;
29+
30+
public final class GeoMatchProcessor extends AbstractEnrichProcessor {
31+
32+
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
33+
private final String field;
34+
private final String targetField;
35+
private final String matchField;
36+
private final boolean ignoreMissing;
37+
private final boolean overrideEnabled;
38+
private final int maxMatches;
39+
40+
GeoMatchProcessor(String tag,
41+
Client client,
42+
String policyName,
43+
String field,
44+
String targetField,
45+
String matchField,
46+
boolean ignoreMissing,
47+
boolean overrideEnabled,
48+
int maxMatches) {
49+
this(
50+
tag,
51+
createSearchRunner(client),
52+
policyName,
53+
field,
54+
targetField,
55+
matchField,
56+
ignoreMissing,
57+
overrideEnabled,
58+
maxMatches
59+
);
60+
}
61+
62+
GeoMatchProcessor(String tag,
63+
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
64+
String policyName,
65+
String field,
66+
String targetField,
67+
String matchField,
68+
boolean ignoreMissing,
69+
boolean overrideEnabled,
70+
int maxMatches) {
71+
super(tag, policyName);
72+
this.searchRunner = searchRunner;
73+
this.field = field;
74+
this.targetField = targetField;
75+
this.matchField = matchField;
76+
this.ignoreMissing = ignoreMissing;
77+
this.overrideEnabled = overrideEnabled;
78+
this.maxMatches = maxMatches;
79+
}
80+
81+
@Override
82+
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
83+
try {
84+
// If a document does not have the enrich key, return the unchanged document
85+
final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
86+
if (value == null) {
87+
handler.accept(ingestDocument, null);
88+
return;
89+
}
90+
91+
GeoPoint point = GeoUtils.parseGeoPoint(value, true);
92+
93+
GeoShapeQueryBuilder shapeQuery = new GeoShapeQueryBuilder(matchField, new Point(point.lon(), point.lat()));
94+
shapeQuery.relation(ShapeRelation.INTERSECTS);
95+
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(shapeQuery);
96+
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
97+
searchBuilder.from(0);
98+
searchBuilder.size(maxMatches);
99+
searchBuilder.trackScores(false);
100+
searchBuilder.fetchSource(true);
101+
searchBuilder.query(constantScore);
102+
103+
SearchRequest req = new SearchRequest();
104+
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
105+
req.preference(Preference.LOCAL.type());
106+
req.source(searchBuilder);
107+
108+
searchRunner.accept(req, (searchResponse, e) -> {
109+
if (e != null) {
110+
handler.accept(null, e);
111+
return;
112+
}
113+
114+
// If the index is empty, return the unchanged document
115+
// If the enrich key does not exist in the index, throw an error
116+
// If no documents match the key, return the unchanged document
117+
SearchHit[] searchHits = searchResponse.getHits().getHits();
118+
if (searchHits.length < 1) {
119+
handler.accept(ingestDocument, null);
120+
return;
121+
}
122+
123+
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
124+
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
125+
for (SearchHit searchHit : searchHits) {
126+
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
127+
enrichDocuments.add(enrichDocument);
128+
}
129+
ingestDocument.setFieldValue(targetField, enrichDocuments);
130+
}
131+
handler.accept(ingestDocument, null);
132+
});
133+
} catch (Exception e) {
134+
handler.accept(null, e);
135+
}
136+
}
137+
138+
@Override
139+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
140+
throw new UnsupportedOperationException("this method should not get executed");
141+
}
142+
143+
@Override
144+
public String getType() {
145+
return EnrichProcessorFactory.TYPE;
146+
}
147+
148+
String getField() {
149+
return field;
150+
}
151+
152+
public String getTargetField() {
153+
return targetField;
154+
}
155+
156+
public String getMatchField() {
157+
return matchField;
158+
}
159+
160+
boolean isIgnoreMissing() {
161+
return ignoreMissing;
162+
}
163+
164+
boolean isOverrideEnabled() {
165+
return overrideEnabled;
166+
}
167+
168+
int getMaxMatches() {
169+
return maxMatches;
170+
}
171+
172+
private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
173+
return (req, handler) -> {
174+
client.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(
175+
resp -> {
176+
handler.accept(resp, null);
177+
},
178+
e -> {
179+
handler.accept(null, e);
180+
}));
181+
};
182+
}
183+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import org.apache.lucene.search.TotalHits;
9+
import org.apache.lucene.util.SetOnce;
10+
import org.elasticsearch.action.search.SearchRequest;
11+
import org.elasticsearch.action.search.SearchResponse;
12+
import org.elasticsearch.action.search.SearchResponseSections;
13+
import org.elasticsearch.action.search.ShardSearchFailure;
14+
import org.elasticsearch.cluster.metadata.IndexMetaData;
15+
import org.elasticsearch.cluster.routing.Preference;
16+
import org.elasticsearch.common.bytes.BytesArray;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.text.Text;
19+
import org.elasticsearch.common.xcontent.XContentBuilder;
20+
import org.elasticsearch.common.xcontent.XContentType;
21+
import org.elasticsearch.geometry.Point;
22+
import org.elasticsearch.index.IndexSettings;
23+
import org.elasticsearch.index.VersionType;
24+
import org.elasticsearch.index.mapper.MapperService;
25+
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
26+
import org.elasticsearch.index.query.GeoShapeQueryBuilder;
27+
import org.elasticsearch.index.query.QueryShardContext;
28+
import org.elasticsearch.ingest.IngestDocument;
29+
import org.elasticsearch.search.SearchHit;
30+
import org.elasticsearch.search.SearchHits;
31+
import org.elasticsearch.search.aggregations.Aggregations;
32+
import org.elasticsearch.search.suggest.Suggest;
33+
import org.elasticsearch.test.ESTestCase;
34+
35+
import java.io.ByteArrayOutputStream;
36+
import java.io.IOException;
37+
import java.io.UncheckedIOException;
38+
import java.util.Collections;
39+
import java.util.List;
40+
import java.util.Map;
41+
import java.util.function.BiConsumer;
42+
43+
import static org.hamcrest.Matchers.emptyArray;
44+
import static org.hamcrest.Matchers.equalTo;
45+
import static org.hamcrest.Matchers.instanceOf;
46+
import static org.hamcrest.Matchers.notNullValue;
47+
48+
public class GeoMatchProcessorTests extends ESTestCase {
49+
50+
public void testBasics() throws Exception {
51+
int maxMatches = randomIntBetween(1, 8);
52+
MockSearchFunction mockSearch = mockedSearchFunction(Map.of("key", Map.of("shape", "object", "zipcode",94040)));
53+
GeoMatchProcessor processor = new GeoMatchProcessor("_tag", mockSearch, "_name", "location", "entry",
54+
"shape", false, true, maxMatches);
55+
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", "_routing", 1L, VersionType.INTERNAL,
56+
Map.of("location", "37.386637, -122.084110"));
57+
// Run
58+
IngestDocument[] holder = new IngestDocument[1];
59+
processor.execute(ingestDocument, (result, e) -> holder[0] = result);
60+
assertThat(holder[0], notNullValue());
61+
// Check request
62+
SearchRequest request = mockSearch.getCapturedRequest();
63+
assertThat(request.indices().length, equalTo(1));
64+
assertThat(request.indices()[0], equalTo(".enrich-_name"));
65+
assertThat(request.preference(), equalTo(Preference.LOCAL.type()));
66+
assertThat(request.source().size(), equalTo(maxMatches));
67+
assertThat(request.source().trackScores(), equalTo(false));
68+
assertThat(request.source().fetchSource().fetchSource(), equalTo(true));
69+
assertThat(request.source().fetchSource().excludes(), emptyArray());
70+
assertThat(request.source().fetchSource().includes(), emptyArray());
71+
assertThat(request.source().query(), instanceOf(ConstantScoreQueryBuilder.class));
72+
assertThat(((ConstantScoreQueryBuilder) request.source().query()).innerQuery(), instanceOf(GeoShapeQueryBuilder.class));
73+
GeoShapeQueryBuilder shapeQueryBuilder = (GeoShapeQueryBuilder) ((ConstantScoreQueryBuilder) request.source().query()).innerQuery();
74+
assertThat(shapeQueryBuilder.fieldName(), equalTo("shape"));
75+
assertThat(shapeQueryBuilder.shape(), equalTo(new Point(-122.084110, 37.386637)));
76+
o
77+
78+
shapeQueryBuilder.toQuery()
79+
// Check result
80+
List<?> entries = ingestDocument.getFieldValue("entry", List.class);
81+
Map<?, ?> entry = (Map<?, ?>) entries.get(0);
82+
assertThat(entry.size(), equalTo(2));
83+
assertThat(entry.get("zipcode"), equalTo(94040));
84+
}
85+
86+
private static final class MockSearchFunction implements BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> {
87+
private final SearchResponse mockResponse;
88+
private final SetOnce<SearchRequest> capturedRequest;
89+
private final Exception exception;
90+
91+
MockSearchFunction(SearchResponse mockResponse) {
92+
this.mockResponse = mockResponse;
93+
this.exception = null;
94+
this.capturedRequest = new SetOnce<>();
95+
}
96+
97+
MockSearchFunction(Exception exception) {
98+
this.mockResponse = null;
99+
this.exception = exception;
100+
this.capturedRequest = new SetOnce<>();
101+
}
102+
103+
@Override
104+
public void accept(SearchRequest request, BiConsumer<SearchResponse, Exception> handler) {
105+
capturedRequest.set(request);
106+
if (exception != null) {
107+
handler.accept(null, exception);
108+
} else {
109+
handler.accept(mockResponse, null);
110+
}
111+
}
112+
113+
SearchRequest getCapturedRequest() {
114+
return capturedRequest.get();
115+
}
116+
}
117+
118+
public MockSearchFunction mockedSearchFunction() {
119+
return new MockSearchFunction(mockResponse(Collections.emptyMap()));
120+
}
121+
122+
public MockSearchFunction mockedSearchFunction(Exception exception) {
123+
return new MockSearchFunction(exception);
124+
}
125+
126+
public MockSearchFunction mockedSearchFunction(Map<String, Map<String, ?>> documents) {
127+
return new MockSearchFunction(mockResponse(documents));
128+
}
129+
130+
public SearchResponse mockResponse(Map<String, Map<String, ?>> documents) {
131+
132+
final QueryShardContext context = new QueryShardContext(0, new IndexSettings(new IndexMetaData(), Settings.EMPTY), null, null, null, mapperService,
133+
null, null, xContentRegistry(), writableRegistry(), client, leaf.reader(), () -> nowInMillis, null);
134+
135+
SearchHit[] searchHits = documents.entrySet().stream().map(e -> {
136+
SearchHit searchHit = new SearchHit(randomInt(100), e.getKey(), new Text(MapperService.SINGLE_MAPPING_NAME),
137+
Collections.emptyMap());
138+
try (XContentBuilder builder = XContentBuilder.builder(XContentType.SMILE.xContent())) {
139+
builder.map(e.getValue());
140+
builder.flush();
141+
ByteArrayOutputStream outputStream = (ByteArrayOutputStream) builder.getOutputStream();
142+
searchHit.sourceRef(new BytesArray(outputStream.toByteArray()));
143+
} catch (IOException ex) {
144+
throw new UncheckedIOException(ex);
145+
}
146+
return searchHit;
147+
}).toArray(SearchHit[]::new);
148+
return new SearchResponse(new SearchResponseSections(
149+
new SearchHits(searchHits, new TotalHits(documents.size(), TotalHits.Relation.EQUAL_TO), 1.0f),
150+
new Aggregations(Collections.emptyList()), new Suggest(Collections.emptyList()),
151+
false, false, null, 1), null, 1, 1, 0, 1, ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(1, 1, 0));
152+
}
153+
}

0 commit comments

Comments
 (0)