Skip to content

Commit 2b2935f

Browse files
authored
Add pipeline name to ingest metadata (#50467)
This commit adds the name of the current pipeline to ingest metadata. This pipeline name is accessible under the following key: '_ingest.pipeline'. Example usage in pipeline: PUT /_ingest/pipeline/2 { "processors": [ { "set": { "field": "pipeline_name", "value": "{{_ingest.pipeline}}" } } ] } Closes #42106
1 parent 9370186 commit 2b2935f

File tree

10 files changed

+198
-41
lines changed

10 files changed

+198
-41
lines changed

docs/reference/ingest/apis/simulate-pipeline.asciidoc

+8-4
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,8 @@ The API returns the following response:
350350
"foo": "bar"
351351
},
352352
"_ingest": {
353-
"timestamp": "2017-05-04T22:46:09.674Z"
353+
"timestamp": "2017-05-04T22:46:09.674Z",
354+
"pipeline": "_simulate_pipeline"
354355
}
355356
}
356357
},
@@ -364,7 +365,8 @@ The API returns the following response:
364365
"foo": "bar"
365366
},
366367
"_ingest": {
367-
"timestamp": "2017-05-04T22:46:09.675Z"
368+
"timestamp": "2017-05-04T22:46:09.675Z",
369+
"pipeline": "_simulate_pipeline"
368370
}
369371
}
370372
}
@@ -381,7 +383,8 @@ The API returns the following response:
381383
"foo": "rab"
382384
},
383385
"_ingest": {
384-
"timestamp": "2017-05-04T22:46:09.676Z"
386+
"timestamp": "2017-05-04T22:46:09.676Z",
387+
"pipeline": "_simulate_pipeline"
385388
}
386389
}
387390
},
@@ -395,7 +398,8 @@ The API returns the following response:
395398
"foo": "rab"
396399
},
397400
"_ingest": {
398-
"timestamp": "2017-05-04T22:46:09.677Z"
401+
"timestamp": "2017-05-04T22:46:09.677Z",
402+
"pipeline": "_simulate_pipeline"
399403
}
400404
}
401405
}

docs/reference/ingest/processors/pipeline.asciidoc

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ include::common-options.asciidoc[]
2121
--------------------------------------------------
2222
// NOTCONSOLE
2323

24+
The name of the current pipeline can be accessed from the `_ingest.pipeline` ingest metadata key.
25+
2426
An example of using this processor for nesting pipelines would be:
2527

2628
Define an inner pipeline:

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml

+78
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,81 @@ teardown:
202202
}
203203
- match: { error.root_cause.0.type: "illegal_state_exception" }
204204
- match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [legal-department]" }
205+
206+
---
207+
"Test _ingest.pipeline metadata":
208+
- do:
209+
ingest.put_pipeline:
210+
id: "pipeline1"
211+
body: >
212+
{
213+
"processors" : [
214+
{
215+
"append" : {
216+
"field": "pipelines",
217+
"value": "{{_ingest.pipeline}}"
218+
}
219+
},
220+
{
221+
"pipeline" : {
222+
"name": "another_pipeline"
223+
}
224+
}
225+
]
226+
}
227+
- match: { acknowledged: true }
228+
229+
- do:
230+
ingest.put_pipeline:
231+
id: "another_pipeline"
232+
body: >
233+
{
234+
"processors" : [
235+
{
236+
"append" : {
237+
"field": "pipelines",
238+
"value": "{{_ingest.pipeline}}"
239+
}
240+
},
241+
{
242+
"pipeline" : {
243+
"name": "another_pipeline2"
244+
}
245+
}
246+
]
247+
}
248+
- match: { acknowledged: true }
249+
250+
- do:
251+
ingest.put_pipeline:
252+
id: "another_pipeline2"
253+
body: >
254+
{
255+
"processors" : [
256+
{
257+
"append" : {
258+
"field": "pipelines",
259+
"value": "{{_ingest.pipeline}}"
260+
}
261+
}
262+
]
263+
}
264+
- match: { acknowledged: true }
265+
266+
- do:
267+
index:
268+
index: test
269+
id: 1
270+
pipeline: "pipeline1"
271+
body: >
272+
{
273+
}
274+
275+
- do:
276+
get:
277+
index: test
278+
id: 1
279+
- length: { _source.pipelines: 3 }
280+
- match: { _source.pipelines.0: "pipeline1" }
281+
- match: { _source.pipelines.1: "another_pipeline" }
282+
- match: { _source.pipelines.2: "another_pipeline2" }

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml

+12-6
Original file line numberDiff line numberDiff line change
@@ -284,26 +284,30 @@ teardown:
284284
- length: { docs.0.processor_results.0.doc._source: 2 }
285285
- match: { docs.0.processor_results.0.doc._source.foo.bar.0.item: "HELLO" }
286286
- match: { docs.0.processor_results.0.doc._source.field2.value: "_value" }
287-
- length: { docs.0.processor_results.0.doc._ingest: 1 }
287+
- length: { docs.0.processor_results.0.doc._ingest: 2 }
288288
- is_true: docs.0.processor_results.0.doc._ingest.timestamp
289+
- is_true: docs.0.processor_results.0.doc._ingest.pipeline
289290
- length: { docs.0.processor_results.1.doc._source: 3 }
290291
- match: { docs.0.processor_results.1.doc._source.foo.bar.0.item: "HELLO" }
291292
- match: { docs.0.processor_results.1.doc._source.field2.value: "_value" }
292293
- match: { docs.0.processor_results.1.doc._source.field3: "third_val" }
293-
- length: { docs.0.processor_results.1.doc._ingest: 1 }
294+
- length: { docs.0.processor_results.1.doc._ingest: 2 }
294295
- is_true: docs.0.processor_results.1.doc._ingest.timestamp
296+
- is_true: docs.0.processor_results.1.doc._ingest.pipeline
295297
- length: { docs.0.processor_results.2.doc._source: 3 }
296298
- match: { docs.0.processor_results.2.doc._source.foo.bar.0.item: "HELLO" }
297299
- match: { docs.0.processor_results.2.doc._source.field2.value: "_VALUE" }
298300
- match: { docs.0.processor_results.2.doc._source.field3: "third_val" }
299-
- length: { docs.0.processor_results.2.doc._ingest: 1 }
301+
- length: { docs.0.processor_results.2.doc._ingest: 2 }
300302
- is_true: docs.0.processor_results.2.doc._ingest.timestamp
303+
- is_true: docs.0.processor_results.2.doc._ingest.pipeline
301304
- length: { docs.0.processor_results.3.doc._source: 3 }
302305
- match: { docs.0.processor_results.3.doc._source.foo.bar.0.item: "hello" }
303306
- match: { docs.0.processor_results.3.doc._source.field2.value: "_VALUE" }
304307
- match: { docs.0.processor_results.3.doc._source.field3: "third_val" }
305-
- length: { docs.0.processor_results.3.doc._ingest: 1 }
308+
- length: { docs.0.processor_results.3.doc._ingest: 2 }
306309
- is_true: docs.0.processor_results.3.doc._ingest.timestamp
310+
- is_true: docs.0.processor_results.3.doc._ingest.pipeline
307311

308312
---
309313
"Test simulate with exception thrown":
@@ -393,12 +397,14 @@ teardown:
393397
- match: { docs.1.processor_results.0.doc._index: "index" }
394398
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
395399
- match: { docs.1.processor_results.0.doc._source.bar: "hello" }
396-
- length: { docs.1.processor_results.0.doc._ingest: 1 }
400+
- length: { docs.1.processor_results.0.doc._ingest: 2 }
397401
- is_true: docs.1.processor_results.0.doc._ingest.timestamp
402+
- is_true: docs.1.processor_results.0.doc._ingest.pipeline
398403
- match: { docs.1.processor_results.1.doc._source.foo: 5 }
399404
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
400-
- length: { docs.1.processor_results.1.doc._ingest: 1 }
405+
- length: { docs.1.processor_results.1.doc._ingest: 2 }
401406
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
407+
- is_true: docs.1.processor_results.1.doc._ingest.pipeline
402408

403409
---
404410
"Test verbose simulate with on_failure":

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

+6
Original file line numberDiff line numberDiff line change
@@ -646,8 +646,14 @@ private static Object deepCopy(Object value) {
646646
*/
647647
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
648648
if (executedPipelines.add(pipeline.getId())) {
649+
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
649650
pipeline.execute(this, (result, e) -> {
650651
executedPipelines.remove(pipeline.getId());
652+
if (previousPipeline != null) {
653+
ingestMetadata.put("pipeline", previousPipeline);
654+
} else {
655+
ingestMetadata.remove("pipeline");
656+
}
651657
handler.accept(result, e);
652658
});
653659
} else {

server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class PipelineProcessor extends AbstractProcessor {
3131
private final TemplateScript.Factory pipelineTemplate;
3232
private final IngestService ingestService;
3333

34-
private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
34+
PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
3535
super(tag);
3636
this.pipelineTemplate = pipelineTemplate;
3737
this.ingestService = ingestService;

server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java

+27-29
Original file line numberDiff line numberDiff line change
@@ -91,23 +91,17 @@ public void testExecuteVerboseItem() throws Exception {
9191
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
9292
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
9393
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
94-
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
95-
IngestDocument firstProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument();
96-
assertThat(firstProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
97-
assertIngestDocument(firstProcessorIngestDocument, this.ingestDocument);
98-
assertThat(firstProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));
9994

95+
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
96+
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
10097
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
98+
10199
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("test-id"));
102-
IngestDocument secondProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument();
103-
assertThat(secondProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
104-
assertIngestDocument(secondProcessorIngestDocument, this.ingestDocument);
105-
assertThat(secondProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));
106-
assertThat(secondProcessorIngestDocument.getSourceAndMetadata(),
107-
not(sameInstance(firstProcessorIngestDocument.getSourceAndMetadata())));
100+
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(), ingestDocument);
101+
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument().getSourceAndMetadata(),
102+
not(sameInstance(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata())));
108103
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
109104
}
110-
111105
public void testExecuteItem() throws Exception {
112106
TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
113107
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
@@ -147,10 +141,7 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
147141
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
148142
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
149143
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
150-
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
151-
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
152-
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
153-
not(sameInstance(ingestDocument.getSourceAndMetadata())));
144+
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
154145
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
155146
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
156147
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), instanceOf(RuntimeException.class));
@@ -191,14 +182,12 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception {
191182
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
192183
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
193184
metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");
194-
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(),
195-
ingestDocumentWithOnFailureMetadata);
196-
185+
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(),
186+
ingestDocumentWithOnFailureMetadata);
197187
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
198188

199189
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getProcessorTag(), equalTo("processor_2"));
200-
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), not(sameInstance(ingestDocument)));
201-
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), ingestDocument);
190+
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(2), pipeline.getId(), ingestDocument);
202191
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue());
203192
}
204193

@@ -221,10 +210,7 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception
221210
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
222211
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
223212
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), sameInstance(exception));
224-
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
225-
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
226-
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
227-
not(sameInstance(ingestDocument.getSourceAndMetadata())));
213+
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
228214
}
229215

230216
public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
@@ -245,10 +231,7 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws
245231
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
246232
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
247233
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
248-
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
249-
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
250-
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
251-
not(sameInstance(ingestDocument.getSourceAndMetadata())));
234+
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
252235
}
253236

254237
public void testExecuteItemWithFailure() throws Exception {
@@ -392,4 +375,19 @@ public String getType() {
392375
}
393376
}
394377

378+
private static void assertVerboseResult(SimulateProcessorResult result,
379+
String expectedPipelineId,
380+
IngestDocument expectedIngestDocument) {
381+
IngestDocument simulateVerboseIngestDocument = result.getIngestDocument();
382+
// Remove and compare pipeline key. It is always in the verbose result,
383+
// since that is a snapshot of how the ingest doc looks during pipeline execution, but not in the final ingestDocument.
384+
// The key gets added and removed during pipeline execution.
385+
String actualPipelineId = (String) simulateVerboseIngestDocument.getIngestMetadata().remove("pipeline");
386+
assertThat(actualPipelineId, equalTo(expectedPipelineId));
387+
388+
assertThat(simulateVerboseIngestDocument, not(sameInstance(expectedIngestDocument)));
389+
assertIngestDocument(simulateVerboseIngestDocument, expectedIngestDocument);
390+
assertThat(simulateVerboseIngestDocument.getSourceAndMetadata(), not(sameInstance(expectedIngestDocument.getSourceAndMetadata())));
391+
}
392+
395393
}

server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -285,11 +285,12 @@ public void testBreakOnFailure() throws Exception {
285285
public void testFailureProcessorIsInvokedOnFailure() {
286286
TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
287287
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
288-
assertThat(ingestMetadata.entrySet(), hasSize(4));
288+
assertThat(ingestMetadata.entrySet(), hasSize(5));
289289
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
290290
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
291291
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
292292
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
293+
assertThat(ingestMetadata.get("pipeline"), equalTo("1"));
293294
});
294295

295296
Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));

0 commit comments

Comments
 (0)