Skip to content

Commit 90850f4

Browse files
authored
Backport: Introduce on_failure_pipeline ingest metadata inside on_failure block (#49596)
Backport of #49076 In case an exception occurs inside a pipeline processor, the pipeline stack is kept around as header in the exception. Then in the on_failure processor the id of the pipeline the exception occurred is made accessible via the `on_failure_pipeline` ingest metadata. Closes #44920
1 parent 901c64e commit 90850f4

File tree

8 files changed

+291
-33
lines changed

8 files changed

+291
-33
lines changed

docs/reference/ingest/ingest-node.asciidoc

+4-3
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ The `if` condition can be more then a simple equality check.
378378
The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
379379
running in the {painless}/painless-ingest-processor-context.html[ingest processor context].
380380

381-
IMPORTANT: The value of ctx is read-only in `if` conditions.
381+
IMPORTANT: The value of ctx is read-only in `if` conditions.
382382

383383
A more complex `if` condition that drops the document (i.e. not index it)
384384
unless it has a multi-valued tag field with at least one value that contains the characters
@@ -722,8 +722,9 @@ The `ignore_failure` can be set on any processor and defaults to `false`.
722722

723723
You may want to retrieve the actual error message that was thrown
724724
by a failed processor. To do so you can access metadata fields called
725-
`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible
726-
from within the context of an `on_failure` block.
725+
`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag` and
726+
`on_failure_pipeline` (in case an error occurred inside a pipeline processor).
727+
These fields are only accessible from within the context of an `on_failure` block.
727728

728729
Here is an updated version of the example that you
729730
saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,4 @@ teardown:
107107
pipeline: "outer"
108108
body: {}
109109
- match: { error.root_cause.0.type: "ingest_processor_exception" }
110-
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
110+
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }

server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v
5454
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
5555
});
5656
} else {
57-
pipeline.execute(ingestDocument, (result, e) -> {
57+
ingestDocument.executePipeline(pipeline, (result, e) -> {
5858
if (e == null) {
5959
handler.accept(new SimulateDocumentBaseResult(result), null);
6060
} else {

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class CompoundProcessor implements Processor {
4040
public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
4141
public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
4242
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
43+
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";
4344

4445
private final boolean ignoreFailure;
4546
private final List<Processor> processors;
@@ -144,7 +145,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
144145
innerExecute(currentProcessor + 1, ingestDocument, handler);
145146
} else {
146147
IngestProcessorException compoundProcessorException =
147-
newCompoundProcessorException(e, processor.getType(), processor.getTag());
148+
newCompoundProcessorException(e, processor, ingestDocument);
148149
if (onFailureProcessors.isEmpty()) {
149150
handler.accept(null, compoundProcessorException);
150151
} else {
@@ -177,7 +178,7 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD
177178
onFailureProcessor.execute(ingestDocument, (result, e) -> {
178179
if (e != null) {
179180
removeFailureMetadata(ingestDocument);
180-
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag()));
181+
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor, ingestDocument));
181182
return;
182183
}
183184
if (result == null) {
@@ -192,34 +193,46 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD
192193
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
193194
List<String> processorTypeHeader = cause.getHeader("processor_type");
194195
List<String> processorTagHeader = cause.getHeader("processor_tag");
196+
List<String> processorOriginHeader = cause.getHeader("pipeline_origin");
195197
String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
196198
String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
199+
String failedPipelineId = (processorOriginHeader != null) ? processorOriginHeader.get(0) : null;
197200
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
198201
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
199202
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
200203
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
204+
if (failedPipelineId != null) {
205+
ingestMetadata.put(ON_FAILURE_PIPELINE_FIELD, failedPipelineId);
206+
}
201207
}
202208

203209
private void removeFailureMetadata(IngestDocument ingestDocument) {
204210
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
205211
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
206212
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
207213
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
214+
ingestMetadata.remove(ON_FAILURE_PIPELINE_FIELD);
208215
}
209216

210-
private IngestProcessorException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
217+
static IngestProcessorException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) {
211218
if (e instanceof IngestProcessorException && ((IngestProcessorException) e).getHeader("processor_type") != null) {
212219
return (IngestProcessorException) e;
213220
}
214221

215222
IngestProcessorException exception = new IngestProcessorException(e);
216223

224+
String processorType = processor.getType();
217225
if (processorType != null) {
218226
exception.addHeader("processor_type", processorType);
219227
}
228+
String processorTag = processor.getTag();
220229
if (processorTag != null) {
221230
exception.addHeader("processor_tag", processorTag);
222231
}
232+
List<String> pipelineStack = document.getPipelineStack();
233+
if (pipelineStack.size() > 1) {
234+
exception.addHeader("pipeline_origin", pipelineStack);
235+
}
223236

224237
return exception;
225238
}

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import java.util.Date;
3939
import java.util.EnumMap;
4040
import java.util.HashMap;
41-
import java.util.IdentityHashMap;
41+
import java.util.LinkedHashSet;
4242
import java.util.List;
4343
import java.util.Map;
4444
import java.util.Objects;
@@ -60,7 +60,7 @@ public final class IngestDocument {
6060
private final Map<String, Object> ingestMetadata;
6161

6262
// Contains all pipelines that have been executed for this document
63-
private final Set<Pipeline> executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>());
63+
private final Set<String> executedPipelines = new LinkedHashSet<>();
6464

6565
public IngestDocument(String index, String type, String id, String routing,
6666
Long version, VersionType versionType, Map<String, Object> source) {
@@ -647,16 +647,25 @@ private static Object deepCopy(Object value) {
647647
* @param handler handles the result or failure
648648
*/
649649
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
650-
if (executedPipelines.add(pipeline)) {
650+
if (executedPipelines.add(pipeline.getId())) {
651651
pipeline.execute(this, (result, e) -> {
652-
executedPipelines.remove(pipeline);
652+
executedPipelines.remove(pipeline.getId());
653653
handler.accept(result, e);
654654
});
655655
} else {
656656
handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()));
657657
}
658658
}
659659

660+
/**
661+
* @return a pipeline stack; all pipelines that are in execution by this document in reverse order
662+
*/
663+
List<String> getPipelineStack() {
664+
List<String> pipelineStack = new ArrayList<>(executedPipelines);
665+
Collections.reverse(pipelineStack);
666+
return pipelineStack;
667+
}
668+
660669
@Override
661670
public boolean equals(Object obj) {
662671
if (obj == this) { return true; }

server/src/main/java/org/elasticsearch/ingest/IngestService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline
501501
VersionType versionType = indexRequest.versionType();
502502
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
503503
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap);
504-
pipeline.execute(ingestDocument, (result, e) -> {
504+
ingestDocument.executePipeline(pipeline, (result, e) -> {
505505
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
506506
totalMetrics.postIngest(ingestTimeInMillis);
507507
if (e != null) {

0 commit comments

Comments
 (0)