Skip to content

Commit b868109

Browse files
PnPiemartijnvg
authored andcommitted
version set in ingest pipeline (#27573)
Add support version and version_type in ingest pipelines Add support for setting document version and version type in set processor of an ingest pipeline.
1 parent 780cbca commit b868109

File tree

17 files changed

+244
-69
lines changed

17 files changed

+244
-69
lines changed

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/AppendProcessorTests.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.elasticsearch.ingest.common;
2121

22+
import org.elasticsearch.index.VersionType;
2223
import org.elasticsearch.ingest.IngestDocument;
24+
import org.elasticsearch.ingest.IngestDocument.MetaData;
2325
import org.elasticsearch.ingest.Processor;
2426
import org.elasticsearch.ingest.RandomDocumentPicks;
2527
import org.elasticsearch.ingest.TestTemplateService;
@@ -122,10 +124,10 @@ public void testConvertScalarToList() throws Exception {
122124
}
123125
}
124126

125-
public void testAppendMetadata() throws Exception {
126-
//here any metadata field value becomes a list, which won't make sense in most of the cases,
127+
public void testAppendMetadataExceptVersion() throws Exception {
128+
// here any metadata field value becomes a list, which won't make sense in most of the cases,
127129
// but support for append is streamlined like for set so we test it
128-
IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values());
130+
MetaData randomMetaData = randomFrom(MetaData.INDEX, MetaData.TYPE, MetaData.ID, MetaData.ROUTING, MetaData.PARENT);
129131
List<String> values = new ArrayList<>();
130132
Processor appendProcessor;
131133
if (randomBoolean()) {

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/DateIndexNameProcessorTests.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void testJodaPattern() throws Exception {
3838
"events-", "y", "yyyyMMdd"
3939
);
4040

41-
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null,
41+
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
4242
Collections.singletonMap("_field", "2016-04-25T12:24:20.101Z"));
4343
processor.execute(document);
4444
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{20160425||/y{yyyyMMdd|UTC}}>"));
@@ -48,7 +48,7 @@ public void testTAI64N()throws Exception {
4848
Function<String, DateTime> function = DateFormat.Tai64n.getFunction(null, DateTimeZone.UTC, null);
4949
DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function),
5050
DateTimeZone.UTC, "events-", "m", "yyyyMMdd");
51-
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null,
51+
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
5252
Collections.singletonMap("_field", (randomBoolean() ? "@" : "") + "4000000050d506482dbdf024"));
5353
dateProcessor.execute(document);
5454
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{20121222||/m{yyyyMMdd|UTC}}>"));
@@ -58,12 +58,12 @@ public void testUnixMs()throws Exception {
5858
Function<String, DateTime> function = DateFormat.UnixMs.getFunction(null, DateTimeZone.UTC, null);
5959
DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function),
6060
DateTimeZone.UTC, "events-", "m", "yyyyMMdd");
61-
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null,
61+
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
6262
Collections.singletonMap("_field", "1000500"));
6363
dateProcessor.execute(document);
6464
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{19700101||/m{yyyyMMdd|UTC}}>"));
6565

66-
document = new IngestDocument("_index", "_type", "_id", null, null,
66+
document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
6767
Collections.singletonMap("_field", 1000500L));
6868
dateProcessor.execute(document);
6969
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{19700101||/m{yyyyMMdd|UTC}}>"));
@@ -73,7 +73,7 @@ public void testUnix()throws Exception {
7373
Function<String, DateTime> function = DateFormat.Unix.getFunction(null, DateTimeZone.UTC, null);
7474
DateIndexNameProcessor dateProcessor = new DateIndexNameProcessor("_tag", "_field", Collections.singletonList(function),
7575
DateTimeZone.UTC, "events-", "m", "yyyyMMdd");
76-
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null,
76+
IngestDocument document = new IngestDocument("_index", "_type", "_id", null, null, null, null,
7777
Collections.singletonMap("_field", "1000.5"));
7878
dateProcessor.execute(document);
7979
assertThat(document.getSourceAndMetadata().get("_index"), equalTo("<events-{19700101||/m{yyyyMMdd|UTC}}>"));

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/ForEachProcessorTests.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void testExecute() throws Exception {
4545
values.add("bar");
4646
values.add("baz");
4747
IngestDocument ingestDocument = new IngestDocument(
48-
"_index", "_type", "_id", null, null, Collections.singletonMap("values", values)
48+
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
4949
);
5050

5151
ForEachProcessor processor = new ForEachProcessor(
@@ -61,7 +61,7 @@ public void testExecute() throws Exception {
6161

6262
public void testExecuteWithFailure() throws Exception {
6363
IngestDocument ingestDocument = new IngestDocument(
64-
"_index", "_type", "_id", null, null, Collections.singletonMap("values", Arrays.asList("a", "b", "c"))
64+
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", Arrays.asList("a", "b", "c"))
6565
);
6666

6767
TestProcessor testProcessor = new TestProcessor(id -> {
@@ -101,7 +101,7 @@ public void testMetaDataAvailable() throws Exception {
101101
values.add(new HashMap<>());
102102
values.add(new HashMap<>());
103103
IngestDocument ingestDocument = new IngestDocument(
104-
"_index", "_type", "_id", null, null, Collections.singletonMap("values", values)
104+
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
105105
);
106106

107107
TestProcessor innerProcessor = new TestProcessor(id -> {
@@ -132,7 +132,7 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
132132
document.put("values", values);
133133
document.put("flat_values", new ArrayList<>());
134134
document.put("other", "value");
135-
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, document);
135+
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null, document);
136136

137137
ForEachProcessor processor = new ForEachProcessor(
138138
"_tag", "values", new SetProcessor("_tag",
@@ -171,7 +171,7 @@ public String getTag() {
171171
values.add("");
172172
}
173173
IngestDocument ingestDocument = new IngestDocument(
174-
"_index", "_type", "_id", null, null, Collections.singletonMap("values", values)
174+
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
175175
);
176176

177177
ForEachProcessor processor = new ForEachProcessor("_tag", "values", innerProcessor);
@@ -190,7 +190,7 @@ public void testModifyFieldsOutsideArray() throws Exception {
190190
values.add(1);
191191
values.add(null);
192192
IngestDocument ingestDocument = new IngestDocument(
193-
"_index", "_type", "_id", null, null, Collections.singletonMap("values", values)
193+
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values", values)
194194
);
195195

196196
TemplateScript.Factory template = new TestTemplateService.MockTemplateScript.Factory("errors");
@@ -220,7 +220,7 @@ public void testScalarValueAllowsUnderscoreValueFieldToRemainAccessible() throws
220220
source.put("_value", "new_value");
221221
source.put("values", values);
222222
IngestDocument ingestDocument = new IngestDocument(
223-
"_index", "_type", "_id", null, null, source
223+
"_index", "_type", "_id", null, null, null, null, source
224224
);
225225

226226
TestProcessor processor = new TestProcessor(doc -> doc.setFieldValue("_ingest._value",
@@ -251,7 +251,7 @@ public void testNestedForEach() throws Exception {
251251
values.add(value);
252252

253253
IngestDocument ingestDocument = new IngestDocument(
254-
"_index", "_type", "_id", null, null, Collections.singletonMap("values1", values)
254+
"_index", "_type", "_id", null, null, null, null, Collections.singletonMap("values1", values)
255255
);
256256

257257
TestProcessor testProcessor = new TestProcessor(

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/SetProcessorTests.java

+20-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.elasticsearch.ingest.common;
2121

22+
import org.elasticsearch.index.VersionType;
2223
import org.elasticsearch.ingest.IngestDocument;
24+
import org.elasticsearch.ingest.IngestDocument.MetaData;
2325
import org.elasticsearch.ingest.Processor;
2426
import org.elasticsearch.ingest.RandomDocumentPicks;
2527
import org.elasticsearch.ingest.TestTemplateService;
@@ -99,14 +101,30 @@ public void testSetExistingNullFieldWithOverrideDisabled() throws Exception {
99101
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(newValue));
100102
}
101103

102-
public void testSetMetadata() throws Exception {
103-
IngestDocument.MetaData randomMetaData = randomFrom(IngestDocument.MetaData.values());
104+
public void testSetMetadataExceptVersion() throws Exception {
105+
MetaData randomMetaData = randomFrom(MetaData.INDEX, MetaData.TYPE, MetaData.ID, MetaData.ROUTING, MetaData.PARENT);
104106
Processor processor = createSetProcessor(randomMetaData.getFieldName(), "_value", true);
105107
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
106108
processor.execute(ingestDocument);
107109
assertThat(ingestDocument.getFieldValue(randomMetaData.getFieldName(), String.class), Matchers.equalTo("_value"));
108110
}
109111

112+
public void testSetMetadataVersion() throws Exception {
113+
long version = randomNonNegativeLong();
114+
Processor processor = createSetProcessor(MetaData.VERSION.getFieldName(), version, true);
115+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
116+
processor.execute(ingestDocument);
117+
assertThat(ingestDocument.getFieldValue(MetaData.VERSION.getFieldName(), Long.class), Matchers.equalTo(version));
118+
}
119+
120+
public void testSetMetadataVersionType() throws Exception {
121+
String versionType = randomFrom("internal", "external", "external_gte");
122+
Processor processor = createSetProcessor(MetaData.VERSION_TYPE.getFieldName(), versionType, true);
123+
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
124+
processor.execute(ingestDocument);
125+
assertThat(ingestDocument.getFieldValue(MetaData.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType));
126+
}
127+
110128
private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled) {
111129
return new SetProcessor(randomAlphaOfLength(10), new TestTemplateService.MockTemplateScript.Factory(fieldName),
112130
ValueSource.wrap(fieldValue, TestTemplateService.instance()), overrideEnabled);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
---
2+
teardown:
3+
- do:
4+
ingest.delete_pipeline:
5+
id: "my_pipeline"
6+
ignore: 404
7+
8+
---
9+
"Test set document version & version type":
10+
- do:
11+
cluster.health:
12+
wait_for_status: green
13+
14+
- do:
15+
ingest.put_pipeline:
16+
id: "my_pipeline1"
17+
body: >
18+
{
19+
"description": "_description",
20+
"processors": [
21+
{
22+
"set" : {
23+
"field" : "_version",
24+
"value": 1
25+
}
26+
},
27+
{
28+
"set" : {
29+
"field" : "_version_type",
30+
"value": "internal"
31+
}
32+
}
33+
]
34+
}
35+
- match: { acknowledged: true }
36+
37+
- do:
38+
ingest.put_pipeline:
39+
id: "my_pipeline2"
40+
body: >
41+
{
42+
"description": "_description",
43+
"processors": [
44+
{
45+
"set" : {
46+
"field" : "_version",
47+
"value": 1
48+
}
49+
},
50+
{
51+
"set" : {
52+
"field" : "_version_type",
53+
"value": "external"
54+
}
55+
}
56+
]
57+
}
58+
- match: { acknowledged: true }
59+
60+
- do:
61+
catch: conflict
62+
index:
63+
index: test
64+
type: test
65+
id: 1
66+
pipeline: "my_pipeline1"
67+
body: {}
68+
69+
- do:
70+
index:
71+
index: test
72+
type: test
73+
id: 1
74+
pipeline: "my_pipeline2"
75+
body: {}
76+
- match: { _version: 1 }

qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/IngestDocumentMustacheIT.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class IngestDocumentMustacheIT extends AbstractScriptTestCase {
3333
public void testAccessMetaDataViaTemplate() {
3434
Map<String, Object> document = new HashMap<>();
3535
document.put("foo", "bar");
36-
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
36+
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
3737
ingestDocument.setFieldValue(compile("field1"), ValueSource.wrap("1 {{foo}}", scriptService));
3838
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 bar"));
3939

@@ -48,7 +48,7 @@ public void testAccessMapMetaDataViaTemplate() {
4848
innerObject.put("baz", "hello baz");
4949
innerObject.put("qux", Collections.singletonMap("fubar", "hello qux and fubar"));
5050
document.put("foo", innerObject);
51-
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
51+
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
5252
ingestDocument.setFieldValue(compile("field1"),
5353
ValueSource.wrap("1 {{foo.bar}} {{foo.baz}} {{foo.qux.fubar}}", scriptService));
5454
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 hello bar hello baz hello qux and fubar"));
@@ -67,7 +67,7 @@ public void testAccessListMetaDataViaTemplate() {
6767
list.add(value);
6868
list.add(null);
6969
document.put("list2", list);
70-
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
70+
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
7171
ingestDocument.setFieldValue(compile("field1"), ValueSource.wrap("1 {{list1.0}} {{list2.0}}", scriptService));
7272
assertThat(ingestDocument.getFieldValue("field1", String.class), equalTo("1 foo {field=value}"));
7373
}
@@ -77,7 +77,7 @@ public void testAccessIngestMetadataViaTemplate() {
7777
Map<String, Object> ingestMap = new HashMap<>();
7878
ingestMap.put("timestamp", "bogus_timestamp");
7979
document.put("_ingest", ingestMap);
80-
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, document);
80+
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, document);
8181
ingestDocument.setFieldValue(compile("ingest_timestamp"),
8282
ValueSource.wrap("{{_ingest.timestamp}} and {{_source._ingest.timestamp}}", scriptService));
8383
assertThat(ingestDocument.getFieldValue("ingest_timestamp", String.class),

qa/smoke-test-ingest-with-all-dependencies/src/test/java/org/elasticsearch/ingest/ValueSourceMustacheIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public void testValueSourceWithTemplates() {
6464
}
6565

6666
public void testAccessSourceViaTemplate() {
67-
IngestDocument ingestDocument = new IngestDocument("marvel", "type", "id", null, null, new HashMap<>());
67+
IngestDocument ingestDocument = new IngestDocument("marvel", "type", "id", null, null, null, null, new HashMap<>());
6868
assertThat(ingestDocument.hasField("marvel"), is(false));
6969
ingestDocument.setFieldValue(compile("{{_index}}"), ValueSource.wrap("{{_index}}", scriptService));
7070
assertThat(ingestDocument.getFieldValue("marvel", String.class), equalTo("marvel"));

server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.io.stream.StreamOutput;
2828
import org.elasticsearch.common.xcontent.XContentFactory;
2929
import org.elasticsearch.common.xcontent.XContentType;
30+
import org.elasticsearch.index.VersionType;
3031
import org.elasticsearch.ingest.ConfigurationUtils;
3132
import org.elasticsearch.ingest.IngestDocument;
3233
import org.elasticsearch.ingest.Pipeline;
@@ -190,8 +191,17 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config) {
190191
dataMap, MetaData.ROUTING.getFieldName());
191192
String parent = ConfigurationUtils.readOptionalStringOrIntProperty(null, null,
192193
dataMap, MetaData.PARENT.getFieldName());
194+
Long version = null;
195+
if (dataMap.containsKey(MetaData.VERSION.getFieldName())) {
196+
version = (Long) ConfigurationUtils.readObject(null, null, dataMap, MetaData.VERSION.getFieldName());
197+
}
198+
VersionType versionType = null;
199+
if (dataMap.containsKey(MetaData.VERSION_TYPE.getFieldName())) {
200+
versionType = VersionType.fromString(ConfigurationUtils.readStringProperty(null, null, dataMap,
201+
MetaData.VERSION_TYPE.getFieldName()));
202+
}
193203
IngestDocument ingestDocument =
194-
new IngestDocument(index, type, id, routing, parent, document);
204+
new IngestDocument(index, type, id, routing, parent, version, versionType, document);
195205
ingestDocumentList.add(ingestDocument);
196206
}
197207
return ingestDocumentList;

server/src/main/java/org/elasticsearch/action/ingest/WriteableIngestDocument.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ IngestDocument getIngestDocument() {
6868
@Override
6969
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
7070
builder.startObject("doc");
71-
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
72-
for (Map.Entry<IngestDocument.MetaData, String> metadata : metadataMap.entrySet()) {
71+
Map<IngestDocument.MetaData, Object> metadataMap = ingestDocument.extractMetadata();
72+
for (Map.Entry<IngestDocument.MetaData, Object> metadata : metadataMap.entrySet()) {
7373
if (metadata.getValue() != null) {
74-
builder.field(metadata.getKey().getFieldName(), metadata.getValue());
74+
builder.field(metadata.getKey().getFieldName(), metadata.getValue().toString());
7575
}
7676
}
7777
builder.field("_source", ingestDocument.getSourceAndMetadata());

0 commit comments

Comments
 (0)