Skip to content

Commit 46ada22

Browse files
authored
Expose dynamic_templates parameter in Ingest (#71716)
This change exposes the newly introduced parameter `dynamic_templates` in ingest. This parameter can be set by a set processor or a script processor. Relates #69948
1 parent 06664d5 commit 46ada22

File tree

8 files changed

+234
-2
lines changed

8 files changed

+234
-2
lines changed

docs/reference/ingest.asciidoc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ Processors can access the following metadata fields by name:
519519
* `_index`
520520
* `_id`
521521
* `_routing`
522+
* `_dynamic_templates`
522523

523524
[source,console]
524525
----
@@ -539,6 +540,30 @@ PUT _ingest/pipeline/my-pipeline
539540
Use a Mustache template snippet to access metadata field values. For example,
540541
`{{{_routing}}}` retrieves a document's routing value.
541542

543+
[source,console]
544+
----
545+
PUT _ingest/pipeline/my-pipeline
546+
{
547+
"processors": [
548+
{
549+
"set": {
550+
"description": "Use geo_point dynamic template for address field",
551+
"field": "_dynamic_templates",
552+
"value": {
553+
"address": "geo_point"
554+
}
555+
}
556+
}
557+
]
558+
}
559+
----
560+
561+
The set processor above tells ES to use the dynamic template named `geo_point`
562+
for the field `address` if this field is not defined in the mapping of the index
563+
yet. This processor overrides the dynamic template for the field `address` if
564+
already defined in the bulk request, but has no effect on other dynamic
565+
templates defined in the bulk request.
566+
542567
WARNING: If you <<create-document-ids-automatically,automatically generate>>
543568
document IDs, you cannot use `{{{_id}}}` in a processor. {es} assigns
544569
auto-generated `_id` values after ingest.

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ScriptProcessorTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashMap;
2525
import java.util.Map;
2626

27+
import static org.hamcrest.Matchers.equalTo;
2728
import static org.hamcrest.Matchers.hasKey;
2829
import static org.hamcrest.core.Is.is;
2930

@@ -45,6 +46,7 @@ Script.DEFAULT_SCRIPT_LANG, new MockScriptEngine(
4546
Integer bytesIn = (Integer) ctx.get("bytes_in");
4647
Integer bytesOut = (Integer) ctx.get("bytes_out");
4748
ctx.put("bytes_total", bytesIn + bytesOut);
49+
ctx.put("_dynamic_templates", Map.of("foo", "bar"));
4850
return null;
4951
}
5052
),
@@ -84,5 +86,6 @@ private void assertIngestDocument(IngestDocument ingestDocument) {
8486
assertThat(ingestDocument.getSourceAndMetadata(), hasKey("bytes_total"));
8587
int bytesTotal = ingestDocument.getFieldValue("bytes_in", Integer.class) + ingestDocument.getFieldValue("bytes_out", Integer.class);
8688
assertThat(ingestDocument.getSourceAndMetadata().get("bytes_total"), is(bytesTotal));
89+
assertThat(ingestDocument.getSourceAndMetadata().get("_dynamic_templates"), equalTo(Map.of("foo", "bar")));
8790
}
8891
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Set;
27+
import java.util.stream.Collectors;
28+
import java.util.stream.IntStream;
2729

2830
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
2931
import static org.hamcrest.Matchers.equalTo;
@@ -136,6 +138,18 @@ public void testSetMetadataIfPrimaryTerm() throws Exception {
136138
assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm));
137139
}
138140

141+
public void testSetDynamicTemplates() throws Exception {
142+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
143+
int iters = between(1, 3);
144+
for (int i = 0; i < iters; i++) {
145+
Map<String, String> dynamicTemplates = IntStream.range(0, between(0, 3)).boxed()
146+
.collect(Collectors.toMap(n -> "field-" + n, n -> randomFrom("int", "geo_point", "keyword")));
147+
Processor processor = createSetProcessor(Metadata.DYNAMIC_TEMPLATES.getFieldName(), dynamicTemplates, null, true, false);
148+
processor.execute(ingestDocument);
149+
assertThat(ingestDocument.getFieldValue(Metadata.DYNAMIC_TEMPLATES.getFieldName(), Map.class), equalTo(dynamicTemplates));
150+
}
151+
}
152+
139153
public void testCopyFromOtherField() throws Exception {
140154
Map<String, Object> document = new HashMap<>();
141155
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
---
2+
"Dynamic templates":
3+
- do:
4+
indices.create:
5+
index: test_index
6+
body:
7+
mappings:
8+
dynamic_templates:
9+
- location:
10+
mapping:
11+
type: geo_point
12+
13+
- do:
14+
ingest.put_pipeline:
15+
id: "my_set_pipeline"
16+
body: >
17+
{
18+
"description": "Use location dynamic template for home_address and work_address",
19+
"processors": [
20+
{
21+
"set" : {
22+
"field": "_dynamic_templates",
23+
"value": {
24+
"home_address": "location",
25+
"work_address": "location"
26+
}
27+
}
28+
}
29+
]
30+
}
31+
- match: { acknowledged: true }
32+
33+
- do:
34+
ingest.put_pipeline:
35+
id: "my_script_pipeline"
36+
body: >
37+
{
38+
"description": "Use location dynamic template for home_address and work_address",
39+
"processors": [
40+
{
41+
"script" : {
42+
"source": "ctx._dynamic_templates = [params.f1: params.type, params.f2: params.type]",
43+
"params": {
44+
"f1": "home_address",
45+
"f2": "work_address",
46+
"type": "location"
47+
}
48+
}
49+
}
50+
]
51+
}
52+
- match: { acknowledged: true }
53+
54+
- do:
55+
bulk:
56+
refresh: true
57+
body:
58+
- index:
59+
_index: test_index
60+
_id: id_1
61+
pipeline: my_set_pipeline
62+
- { "home_address": [ -71.34, 41.12 ]}
63+
- index:
64+
_index: test_index
65+
_id: id_2
66+
pipeline: my_script_pipeline
67+
- { "work_address": "41.12,-71.34"}
68+
- match: { errors: false }
69+
- match: { items.0.index.result: created }
70+
- match: { items.1.index.result: created }
71+
72+
- do:
73+
index:
74+
index: test_index
75+
id: id_3
76+
body: { "home_address": [ -71.34, 41.12 ]}
77+
refresh: true
78+
pipeline: my_script_pipeline
79+
80+
- do:
81+
index:
82+
index: test_index
83+
id: id_4
84+
body: { "work_address": [ -71.34, 41.12 ]}
85+
refresh: true
86+
pipeline: my_set_pipeline
87+
88+
- do:
89+
search:
90+
index: test_index
91+
body:
92+
query:
93+
geo_bounding_box:
94+
home_address:
95+
top_left:
96+
lat: 42
97+
lon: -72
98+
bottom_right:
99+
lat: 40
100+
lon: -74
101+
- match: { hits.total.value: 2 }
102+
- match: { hits.hits.0._id: id_1 }
103+
- match: { hits.hits.1._id: id_3 }
104+
105+
- do:
106+
search:
107+
index: test_index
108+
body:
109+
query:
110+
geo_bounding_box:
111+
work_address:
112+
top_left:
113+
lat: 42
114+
lon: -72
115+
bottom_right:
116+
lat: 40
117+
lon: -74
118+
- match: { hits.total.value: 2 }
119+
- match: { hits.hits.0._id: id_2 }
120+
- match: { hits.hits.1._id: id_4 }
121+
122+
- do:
123+
bulk:
124+
refresh: true
125+
body:
126+
- index:
127+
_index: test_index
128+
_id: id_5
129+
pipeline: my_set_pipeline
130+
dynamic_templates:
131+
school_address: location
132+
- { "school_address": [ -71.34, 41.12 ]}
133+
- match: { errors: false }
134+
- match: { items.0.index.result: created }
135+
136+
- do:
137+
search:
138+
index: test_index
139+
body:
140+
query:
141+
geo_bounding_box:
142+
school_address:
143+
top_left:
144+
lat: 42
145+
lon: -72
146+
bottom_right:
147+
lat: 40
148+
lon: -74
149+
- match: { hits.total.value: 1 }
150+
- match: { hits.hits.0._id: id_5 }

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.IOException;
2828
import java.util.ArrayList;
2929
import java.util.Collections;
30+
import java.util.HashMap;
3031
import java.util.List;
3132
import java.util.Map;
3233
import java.util.Objects;
@@ -210,6 +211,15 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config) {
210211
throw new IllegalArgumentException("[_if_primary_term] cannot be null");
211212
}
212213
}
214+
if (dataMap.containsKey(Metadata.DYNAMIC_TEMPLATES.getFieldName())) {
215+
Map<String, String> dynamicTemplates = ConfigurationUtils.readMap(
216+
null, null, dataMap, Metadata.DYNAMIC_TEMPLATES.getFieldName());
217+
if (dynamicTemplates != null) {
218+
ingestDocument.setFieldValue(Metadata.DYNAMIC_TEMPLATES.getFieldName(), new HashMap<>(dynamicTemplates));
219+
} else {
220+
throw new IllegalArgumentException("[_dynamic_templates] cannot be null");
221+
}
222+
}
213223
ingestDocumentList.add(ingestDocument);
214224
}
215225
return ingestDocumentList;

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ public IngestDocument(String index, String id, String routing,
6767
if (versionType != null) {
6868
sourceAndMetadata.put(Metadata.VERSION_TYPE.getFieldName(), VersionType.toString(versionType));
6969
}
70-
7170
this.ingestMetadata = new HashMap<>();
7271
this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC));
7372
}
@@ -816,7 +815,8 @@ public enum Metadata {
816815
VERSION(VersionFieldMapper.NAME),
817816
VERSION_TYPE("_version_type"),
818817
IF_SEQ_NO("_if_seq_no"),
819-
IF_PRIMARY_TERM("_if_primary_term");
818+
IF_PRIMARY_TERM("_if_primary_term"),
819+
DYNAMIC_TEMPLATES("_dynamic_templates");
820820

821821
private final String fieldName;
822822

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,11 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline
652652
indexRequest.setIfPrimaryTerm(((Number) metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM)).longValue());
653653
}
654654
indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType());
655+
if (metadataMap.get(IngestDocument.Metadata.DYNAMIC_TEMPLATES) != null) {
656+
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
657+
mergedDynamicTemplates.putAll((Map<String, String>) metadataMap.get(IngestDocument.Metadata.DYNAMIC_TEMPLATES));
658+
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
659+
}
655660
handler.accept(null);
656661
}
657662
});

server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import java.util.List;
6767
import java.util.Map;
6868
import java.util.Objects;
69+
import java.util.concurrent.CountDownLatch;
6970
import java.util.concurrent.ExecutorService;
7071
import java.util.concurrent.atomic.AtomicInteger;
7172
import java.util.concurrent.atomic.AtomicReference;
@@ -720,6 +721,28 @@ public void testExecuteSuccess() {
720721
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
721722
}
722723

724+
public void testDynamicTemplates() throws Exception {
725+
IngestService ingestService = createWithProcessors(Collections.singletonMap(
726+
"set", (factories, tag, description, config) ->
727+
new FakeProcessor("set", "", "", (ingestDocument) -> ingestDocument.setFieldValue("_dynamic_templates",
728+
Map.of("foo", "bar", "foo.bar", "baz")))));
729+
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
730+
new BytesArray("{\"processors\": [{\"set\" : {}}]}"), XContentType.JSON);
731+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
732+
ClusterState previousClusterState = clusterState;
733+
clusterState = IngestService.innerPut(putRequest, clusterState);
734+
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
735+
final IndexRequest indexRequest =
736+
new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
737+
CountDownLatch latch = new CountDownLatch(1);
738+
final BiConsumer<Integer, Exception> failureHandler = (v, e) -> { throw new AssertionError("must never fail", e);};
739+
final BiConsumer<Thread, Exception> completionHandler = (t, e) -> latch.countDown();
740+
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler,
741+
indexReq -> {}, Names.WRITE);
742+
latch.await();
743+
assertThat(indexRequest.getDynamicTemplates(), equalTo(Map.of("foo", "bar", "foo.bar", "baz")));
744+
}
745+
723746
public void testExecuteEmptyPipeline() throws Exception {
724747
IngestService ingestService = createWithProcessors(emptyMap());
725748
PutPipelineRequest putRequest =
@@ -765,6 +788,8 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception {
765788
ingestDocument.setFieldValue(metadata.getFieldName(), ifSeqNo);
766789
} else if (metadata == IngestDocument.Metadata.IF_PRIMARY_TERM) {
767790
ingestDocument.setFieldValue(metadata.getFieldName(), ifPrimaryTerm);
791+
} else if (metadata == IngestDocument.Metadata.DYNAMIC_TEMPLATES) {
792+
ingestDocument.setFieldValue(metadata.getFieldName(), Map.of("foo", "bar"));
768793
} else {
769794
ingestDocument.setFieldValue(metadata.getFieldName(), "update" + metadata.getFieldName());
770795
}

0 commit comments

Comments
 (0)