Skip to content

Commit 09c4269

Browse files
committed
Add templating support to enrich processor (#49093)
Adds support for templating to `field` and `target_field` options.
1 parent 90850f4 commit 09c4269

File tree

12 files changed

+128
-44
lines changed

12 files changed

+128
-44
lines changed

docs/reference/ingest/processors/enrich.asciidoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ See <<ingest-enriching-data,enrich data>> section for more information about how
1212
|======
1313
| Name | Required | Default | Description
1414
| `policy_name` | yes | - | The name of the enrich policy to use.
15-
| `field` | yes | - | The field in the input document that matches the policies match_field used to retrieve the enrichment data.
16-
| `target_field` | yes | - | The field that will be used for the enrichment data.
15+
| `field` | yes | - | The field in the input document that matches the policies match_field used to retrieve the enrichment data. Supports <<accessing-template-fields,template snippets>>.
16+
| `target_field` | yes | - | The field that will be used for the enrichment data. Supports <<accessing-template-fields,template snippets>>.
1717
| `ignore_missing` | no | false | If `true` and `field` does not exist, the processor quietly exits without modifying the document
1818
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
1919
| `max_matches` | no | 1 | The maximum number of matched documents to include under the configured target field. The `target_field` will be turned into a json array if `max_matches` is higher than 1, otherwise `target_field` will become a json object. In order to avoid documents getting too large, the maximum allowed value is 128.

server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java

+6
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,12 @@ public static List<Processor> readProcessorConfigs(List<Map<String, Object>> pro
348348
return processors;
349349
}
350350

351+
public static TemplateScript.Factory readTemplateProperty(String processorType, String processorTag, Map<String, Object> configuration,
352+
String propertyName, ScriptService scriptService) {
353+
String value = readStringProperty(processorType, processorTag, configuration, propertyName, null);
354+
return compileTemplate(processorType, processorTag, propertyName, value, scriptService);
355+
}
356+
351357
public static TemplateScript.Factory compileTemplate(String processorType, String processorTag, String propertyName,
352358
String propertyValue, ScriptService scriptService) {
353359
try {

x-pack/plugin/enrich/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ dependencies {
1313
compileOnly project(path: xpackModule('core'), configuration: 'default')
1414
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
1515
testCompile project(path: ':modules:ingest-common')
16+
testCompile project(path: ':modules:lang-mustache')
1617
testCompile project(path: xpackModule('monitoring'), configuration: 'testArtifacts')
1718
}
1819

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
import org.elasticsearch.index.query.QueryBuilder;
1515
import org.elasticsearch.ingest.AbstractProcessor;
1616
import org.elasticsearch.ingest.IngestDocument;
17+
import org.elasticsearch.script.TemplateScript;
1718
import org.elasticsearch.search.SearchHit;
1819
import org.elasticsearch.search.builder.SearchSourceBuilder;
1920
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
2021
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;
2122

2223
import java.util.ArrayList;
24+
import java.util.Collections;
2325
import java.util.List;
2426
import java.util.Map;
2527
import java.util.function.BiConsumer;
@@ -28,8 +30,8 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
2830

2931
private final String policyName;
3032
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
31-
private final String field;
32-
private final String targetField;
33+
private final TemplateScript.Factory field;
34+
private final TemplateScript.Factory targetField;
3335
private final boolean ignoreMissing;
3436
private final boolean overrideEnabled;
3537
protected final String matchField;
@@ -39,8 +41,8 @@ protected AbstractEnrichProcessor(
3941
String tag,
4042
Client client,
4143
String policyName,
42-
String field,
43-
String targetField,
44+
TemplateScript.Factory field,
45+
TemplateScript.Factory targetField,
4446
boolean ignoreMissing,
4547
boolean overrideEnabled,
4648
String matchField,
@@ -53,8 +55,8 @@ protected AbstractEnrichProcessor(
5355
String tag,
5456
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
5557
String policyName,
56-
String field,
57-
String targetField,
58+
TemplateScript.Factory field,
59+
TemplateScript.Factory targetField,
5860
boolean ignoreMissing,
5961
boolean overrideEnabled,
6062
String matchField,
@@ -77,6 +79,7 @@ protected AbstractEnrichProcessor(
7779
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
7880
try {
7981
// If a document does not have the enrich key, return the unchanged document
82+
String field = ingestDocument.renderTemplate(this.field);
8083
final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
8184
if (value == null) {
8285
handler.accept(ingestDocument, null);
@@ -111,6 +114,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
111114
return;
112115
}
113116

117+
String targetField = ingestDocument.renderTemplate(this.targetField);
114118
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
115119
if (maxMatches == 1) {
116120
Map<String, Object> firstDocument = searchHits[0].getSourceAsMap();
@@ -146,11 +150,13 @@ public String getType() {
146150
}
147151

148152
String getField() {
149-
return field;
153+
// used for testing only:
154+
return field.newInstance(Collections.emptyMap()).execute();
150155
}
151156

152-
public String getTargetField() {
153-
return targetField;
157+
String getTargetField() {
158+
// used for testing only:
159+
return targetField.newInstance(Collections.emptyMap()).execute();
154160
}
155161

156162
boolean isIgnoreMissing() {

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
138138
return emptyMap();
139139
}
140140

141-
EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.client);
141+
EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.client, parameters.scriptService);
142142
parameters.ingestService.addIngestClusterStateListener(factory);
143143
return Collections.singletonMap(EnrichProcessorFactory.TYPE, factory);
144144
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1515
import org.elasticsearch.ingest.ConfigurationUtils;
1616
import org.elasticsearch.ingest.Processor;
17+
import org.elasticsearch.script.ScriptService;
18+
import org.elasticsearch.script.TemplateScript;
1719
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1820

1921
import java.util.Map;
@@ -23,11 +25,13 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste
2325

2426
static final String TYPE = "enrich";
2527
private final Client client;
28+
private final ScriptService scriptService;
2629

2730
volatile MetaData metaData;
2831

29-
EnrichProcessorFactory(Client client) {
32+
EnrichProcessorFactory(Client client, ScriptService scriptService) {
3033
this.client = client;
34+
this.scriptService = scriptService;
3135
}
3236

3337
@Override
@@ -42,17 +46,17 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
4246
assert aliasOrIndex.getIndices().size() == 1;
4347
IndexMetaData imd = aliasOrIndex.getIndices().get(0);
4448

45-
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
4649
Map<String, Object> mappingAsMap = imd.mapping().sourceAsMap();
4750
String policyType = (String) XContentMapValues.extractValue(
4851
"_meta." + EnrichPolicyRunner.ENRICH_POLICY_TYPE_FIELD_NAME,
4952
mappingAsMap
5053
);
5154
String matchField = (String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_MATCH_FIELD_NAME, mappingAsMap);
5255

56+
TemplateScript.Factory field = ConfigurationUtils.readTemplateProperty(TYPE, tag, config, "field", scriptService);
5357
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
5458
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
55-
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
59+
TemplateScript.Factory targetField = ConfigurationUtils.readTemplateProperty(TYPE, tag, config, "target_field", scriptService);
5660
int maxMatches = ConfigurationUtils.readIntProperty(TYPE, tag, config, "max_matches", 1);
5761
if (maxMatches < 1 || maxMatches > 128) {
5862
throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 128");

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.geometry.Point;
1717
import org.elasticsearch.index.query.GeoShapeQueryBuilder;
1818
import org.elasticsearch.index.query.QueryBuilder;
19+
import org.elasticsearch.script.TemplateScript;
1920

2021
import java.util.ArrayList;
2122
import java.util.List;
@@ -29,8 +30,8 @@ public final class GeoMatchProcessor extends AbstractEnrichProcessor {
2930
String tag,
3031
Client client,
3132
String policyName,
32-
String field,
33-
String targetField,
33+
TemplateScript.Factory field,
34+
TemplateScript.Factory targetField,
3435
boolean overrideEnabled,
3536
boolean ignoreMissing,
3637
String matchField,
@@ -46,8 +47,8 @@ public final class GeoMatchProcessor extends AbstractEnrichProcessor {
4647
String tag,
4748
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
4849
String policyName,
49-
String field,
50-
String targetField,
50+
TemplateScript.Factory field,
51+
TemplateScript.Factory targetField,
5152
boolean overrideEnabled,
5253
boolean ignoreMissing,
5354
String matchField,

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.index.query.QueryBuilder;
1212
import org.elasticsearch.index.query.TermQueryBuilder;
1313
import org.elasticsearch.index.query.TermsQueryBuilder;
14+
import org.elasticsearch.script.TemplateScript;
1415

1516
import java.util.List;
1617
import java.util.function.BiConsumer;
@@ -21,8 +22,8 @@ public class MatchProcessor extends AbstractEnrichProcessor {
2122
String tag,
2223
Client client,
2324
String policyName,
24-
String field,
25-
String targetField,
25+
TemplateScript.Factory field,
26+
TemplateScript.Factory targetField,
2627
boolean overrideEnabled,
2728
boolean ignoreMissing,
2829
String matchField,
@@ -36,8 +37,8 @@ public class MatchProcessor extends AbstractEnrichProcessor {
3637
String tag,
3738
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
3839
String policyName,
39-
String field,
40-
String targetField,
40+
TemplateScript.Factory field,
41+
TemplateScript.Factory targetField,
4142
boolean overrideEnabled,
4243
boolean ignoreMissing,
4344
String matchField,

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java

+39-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.index.reindex.ReindexPlugin;
2323
import org.elasticsearch.ingest.common.IngestCommonPlugin;
2424
import org.elasticsearch.plugins.Plugin;
25+
import org.elasticsearch.script.mustache.MustachePlugin;
2526
import org.elasticsearch.test.ESSingleNodeTestCase;
2627
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
2728
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
@@ -53,7 +54,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {
5354

5455
@Override
5556
protected Collection<Class<? extends Plugin>> getPlugins() {
56-
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
57+
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class, MustachePlugin.class);
5758
}
5859

5960
@Override
@@ -313,6 +314,43 @@ public void testAsyncTaskExecute() throws Exception {
313314
}
314315
}
315316

317+
public void testTemplating() throws Exception {
318+
List<String> keys = createSourceMatchIndex(1, 1);
319+
String policyName = "my-policy";
320+
EnrichPolicy enrichPolicy = new EnrichPolicy(
321+
EnrichPolicy.MATCH_TYPE,
322+
null,
323+
Collections.singletonList(SOURCE_INDEX_NAME),
324+
MATCH_FIELD,
325+
Arrays.asList(DECORATE_FIELDS)
326+
);
327+
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
328+
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
329+
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
330+
331+
String pipelineName = "my-pipeline";
332+
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\""
333+
+ policyName
334+
+ "\", \"field\": \"{{indirection1}}\", \"target_field\": \"{{indirection2}}\""
335+
+ "}}]}";
336+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
337+
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
338+
339+
IndexRequest indexRequest = new IndexRequest("my-index").id("1")
340+
.setPipeline(pipelineName)
341+
.source(mapOf("indirection1", MATCH_FIELD, "indirection2", "users", MATCH_FIELD, keys.get(0)));
342+
client().index(indexRequest).get();
343+
GetResponse getResponse = client().get(new GetRequest("my-index", "1")).actionGet();
344+
Map<String, Object> source = getResponse.getSourceAsMap();
345+
Map<?, ?> userEntry = (Map<?, ?>) source.get("users");
346+
assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
347+
for (int j = 0; j < 3; j++) {
348+
String field = DECORATE_FIELDS[j];
349+
assertThat(userEntry.get(field), equalTo(keys.get(0) + j));
350+
}
351+
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
352+
}
353+
316354
private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) {
317355
Set<String> keys = new HashSet<>();
318356
for (int id = 0; id < numKeys; id++) {

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import org.elasticsearch.cluster.metadata.MetaData;
1313
import org.elasticsearch.common.collect.Tuple;
1414
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.script.ScriptService;
1516
import org.elasticsearch.test.ESTestCase;
1617
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
18+
import org.junit.Before;
1719

1820
import java.io.IOException;
1921
import java.util.ArrayList;
@@ -26,9 +28,17 @@
2628
import static org.hamcrest.Matchers.equalTo;
2729
import static org.hamcrest.Matchers.is;
2830
import static org.hamcrest.Matchers.notNullValue;
31+
import static org.mockito.Mockito.mock;
2932

3033
public class EnrichProcessorFactoryTests extends ESTestCase {
3134

35+
private ScriptService scriptService;
36+
37+
@Before
38+
public void initializeScriptService() {
39+
scriptService = mock(ScriptService.class);
40+
}
41+
3242
public void testCreateProcessorInstance() throws Exception {
3343
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
3444
EnrichPolicy policy = new EnrichPolicy(
@@ -38,7 +48,7 @@ public void testCreateProcessorInstance() throws Exception {
3848
"my_key",
3949
enrichValues
4050
);
41-
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
51+
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
4252
factory.metaData = createMetaData("majestic", policy);
4353

4454
Map<String, Object> config = new HashMap<>();
@@ -88,7 +98,7 @@ public void testCreateProcessorInstance() throws Exception {
8898

8999
public void testPolicyDoesNotExist() {
90100
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
91-
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
101+
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
92102
factory.metaData = MetaData.builder().build();
93103

94104
Map<String, Object> config = new HashMap<>();
@@ -120,7 +130,7 @@ public void testPolicyDoesNotExist() {
120130

121131
public void testPolicyNameMissing() {
122132
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
123-
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
133+
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
124134

125135
Map<String, Object> config = new HashMap<>();
126136
config.put("enrich_key", "host");
@@ -151,7 +161,7 @@ public void testPolicyNameMissing() {
151161
public void testUnsupportedPolicy() throws Exception {
152162
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
153163
EnrichPolicy policy = new EnrichPolicy("unsupported", null, Collections.singletonList("source_index"), "my_key", enrichValues);
154-
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
164+
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
155165
factory.metaData = createMetaData("majestic", policy);
156166

157167
Map<String, Object> config = new HashMap<>();
@@ -176,7 +186,7 @@ public void testCompactEnrichValuesFormat() throws Exception {
176186
"host",
177187
enrichValues
178188
);
179-
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
189+
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
180190
factory.metaData = createMetaData("majestic", policy);
181191

182192
Map<String, Object> config = new HashMap<>();
@@ -200,7 +210,7 @@ public void testNoTargetField() throws Exception {
200210
"host",
201211
enrichValues
202212
);
203-
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
213+
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
204214
factory.metaData = createMetaData("majestic", policy);
205215

206216
Map<String, Object> config1 = new HashMap<>();
@@ -214,7 +224,7 @@ public void testNoTargetField() throws Exception {
214224
public void testIllegalMaxMatches() throws Exception {
215225
List<String> enrichValues = Arrays.asList("globalRank", "tldRank", "tld");
216226
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, Arrays.asList("source_index"), "my_key", enrichValues);
217-
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
227+
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
218228
factory.metaData = createMetaData("majestic", policy);
219229

220230
Map<String, Object> config = new HashMap<>();

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.function.BiConsumer;
4242

4343
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.mapOf;
44+
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.str;
4445
import static org.hamcrest.Matchers.emptyArray;
4546
import static org.hamcrest.Matchers.equalTo;
4647
import static org.hamcrest.Matchers.instanceOf;
@@ -70,8 +71,8 @@ private void testBasicsForFieldValue(Object fieldValue, Geometry expectedGeometr
7071
"_tag",
7172
mockSearch,
7273
"_name",
73-
"location",
74-
"entry",
74+
str("location"),
75+
str("entry"),
7576
false,
7677
false,
7778
"shape",

0 commit comments

Comments
 (0)