Skip to content

Commit 7929ec1

Browse files
committed
Introduce on_failure_pipeline ingest metadata
In case an exception occurs inside a pipeline processor, the pipeline stack is kept around as header in the exception. Then in the on_failure processor the id of the pipeline the exception occurred is made accessible via the `on_failure_pipeline` ingest metadata. Closes elastic#44920
1 parent c4e113e commit 7929ec1

File tree

8 files changed

+226
-15
lines changed

8 files changed

+226
-15
lines changed

docs/reference/ingest/ingest-node.asciidoc

+4-3
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ The `if` condition can be more then a simple equality check.
376376
The full power of the <<modules-scripting-painless, Painless Scripting Language>> is available and
377377
running in the {painless}/painless-ingest-processor-context.html[ingest processor context].
378378

379-
IMPORTANT: The value of ctx is read-only in `if` conditions.
379+
IMPORTANT: The value of ctx is read-only in `if` conditions.
380380

381381
A more complex `if` condition that drops the document (i.e. not index it)
382382
unless it has a multi-valued tag field with at least one value that contains the characters
@@ -718,8 +718,9 @@ The `ignore_failure` can be set on any processor and defaults to `false`.
718718

719719
You may want to retrieve the actual error message that was thrown
720720
by a failed processor. To do so you can access metadata fields called
721-
`on_failure_message`, `on_failure_processor_type`, and `on_failure_processor_tag`. These fields are only accessible
722-
from within the context of an `on_failure` block.
721+
`on_failure_message`, `on_failure_processor_type`, `on_failure_processor_tag` and
722+
`on_failure_pipeline` (in case an error occurred inside a pipeline processor).
723+
These fields are only accessible from within the context of an `on_failure` block.
723724

724725
Here is an updated version of the example that you
725726
saw earlier. But instead of setting the error message manually, the example leverages the `on_failure_message`

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,4 @@ teardown:
107107
pipeline: "outer"
108108
body: {}
109109
- match: { error.root_cause.0.type: "exception" }
110-
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
110+
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Cycle detected for pipeline: outer" }

server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v
5454
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
5555
});
5656
} else {
57-
pipeline.execute(ingestDocument, (result, e) -> {
57+
ingestDocument.executePipeline(pipeline, (result, e) -> {
5858
if (e == null) {
5959
handler.accept(new SimulateDocumentBaseResult(result), null);
6060
} else {

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class CompoundProcessor implements Processor {
4040
public static final String ON_FAILURE_MESSAGE_FIELD = "on_failure_message";
4141
public static final String ON_FAILURE_PROCESSOR_TYPE_FIELD = "on_failure_processor_type";
4242
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
43+
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";
4344

4445
private final boolean ignoreFailure;
4546
private final List<Processor> processors;
@@ -144,7 +145,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
144145
innerExecute(currentProcessor + 1, ingestDocument, handler);
145146
} else {
146147
ElasticsearchException compoundProcessorException =
147-
newCompoundProcessorException(e, processor.getType(), processor.getTag());
148+
newCompoundProcessorException(e, processor, ingestDocument);
148149
if (onFailureProcessors.isEmpty()) {
149150
handler.accept(null, compoundProcessorException);
150151
} else {
@@ -177,7 +178,7 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD
177178
onFailureProcessor.execute(ingestDocument, (result, e) -> {
178179
if (e != null) {
179180
removeFailureMetadata(ingestDocument);
180-
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor.getType(), onFailureProcessor.getTag()));
181+
handler.accept(null, newCompoundProcessorException(e, onFailureProcessor, ingestDocument));
181182
return;
182183
}
183184
if (result == null) {
@@ -192,34 +193,46 @@ void executeOnFailureAsync(int currentOnFailureProcessor, IngestDocument ingestD
192193
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
193194
List<String> processorTypeHeader = cause.getHeader("processor_type");
194195
List<String> processorTagHeader = cause.getHeader("processor_tag");
196+
List<String> processorOriginHeader = cause.getHeader("pipeline_origin");
195197
String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
196198
String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
199+
String failedPipelineId = (processorOriginHeader != null) ? processorOriginHeader.get(0) : null;
197200
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
198201
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
199202
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
200203
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
204+
if (failedPipelineId != null) {
205+
ingestMetadata.put(ON_FAILURE_PIPELINE_FIELD, failedPipelineId);
206+
}
201207
}
202208

203209
private void removeFailureMetadata(IngestDocument ingestDocument) {
204210
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
205211
ingestMetadata.remove(ON_FAILURE_MESSAGE_FIELD);
206212
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
207213
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
214+
ingestMetadata.remove(ON_FAILURE_PIPELINE_FIELD);
208215
}
209216

210-
private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
217+
private ElasticsearchException newCompoundProcessorException(Exception e, Processor processor, IngestDocument document) {
211218
if (e instanceof ElasticsearchException && ((ElasticsearchException) e).getHeader("processor_type") != null) {
212219
return (ElasticsearchException) e;
213220
}
214221

215222
ElasticsearchException exception = new ElasticsearchException(e);
216223

224+
String processorType = processor.getType();
217225
if (processorType != null) {
218226
exception.addHeader("processor_type", processorType);
219227
}
228+
String processorTag = processor.getTag();
220229
if (processorTag != null) {
221230
exception.addHeader("processor_tag", processorTag);
222231
}
232+
List<String> pipelineStack = document.getPipelineStack();
233+
if (pipelineStack.size() > 1) {
234+
exception.addHeader("pipeline_origin", pipelineStack);
235+
}
223236

224237
return exception;
225238
}

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import java.util.Date;
3939
import java.util.EnumMap;
4040
import java.util.HashMap;
41-
import java.util.IdentityHashMap;
41+
import java.util.LinkedHashSet;
4242
import java.util.List;
4343
import java.util.Map;
4444
import java.util.Objects;
@@ -60,7 +60,7 @@ public final class IngestDocument {
6060
private final Map<String, Object> ingestMetadata;
6161

6262
// Contains all pipelines that have been executed for this document
63-
private final Set<Pipeline> executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>());
63+
private final Set<String> executedPipelines = new LinkedHashSet<>();
6464

6565
public IngestDocument(String index, String id, String routing,
6666
Long version, VersionType versionType, Map<String, Object> source) {
@@ -646,16 +646,25 @@ private static Object deepCopy(Object value) {
646646
* @param handler handles the result or failure
647647
*/
648648
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
649-
if (executedPipelines.add(pipeline)) {
649+
if (executedPipelines.add(pipeline.getId())) {
650650
pipeline.execute(this, (result, e) -> {
651-
executedPipelines.remove(pipeline);
651+
executedPipelines.remove(pipeline.getId());
652652
handler.accept(result, e);
653653
});
654654
} else {
655655
handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()));
656656
}
657657
}
658658

659+
/**
660+
* @return a pipeline stack; all pipelines that are in execution by this document in reverse order
661+
*/
662+
List<String> getPipelineStack() {
663+
List<String> pipelineStack = new ArrayList<>(executedPipelines);
664+
Collections.reverse(pipelineStack);
665+
return pipelineStack;
666+
}
667+
659668
@Override
660669
public boolean equals(Object obj) {
661670
if (obj == this) { return true; }

server/src/main/java/org/elasticsearch/ingest/IngestService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline
470470
VersionType versionType = indexRequest.versionType();
471471
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
472472
IngestDocument ingestDocument = new IngestDocument(index, id, routing, version, versionType, sourceAsMap);
473-
pipeline.execute(ingestDocument, (result, e) -> {
473+
ingestDocument.executePipeline(pipeline, (result, e) -> {
474474
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos);
475475
totalMetrics.postIngest(ingestTimeInMillis);
476476
if (e != null) {

server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java

+38
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@
2626
import java.util.Arrays;
2727
import java.util.Collections;
2828
import java.util.HashMap;
29+
import java.util.List;
2930
import java.util.Map;
3031
import java.util.concurrent.TimeUnit;
32+
import java.util.function.BiConsumer;
3133
import java.util.function.Consumer;
3234
import java.util.function.LongSupplier;
3335

3436
import static org.hamcrest.CoreMatchers.equalTo;
3537
import static org.hamcrest.Matchers.hasSize;
3638
import static org.hamcrest.Matchers.is;
39+
import static org.hamcrest.Matchers.notNullValue;
3740
import static org.hamcrest.Matchers.nullValue;
3841
import static org.hamcrest.Matchers.sameInstance;
3942
import static org.mockito.Mockito.mock;
@@ -279,6 +282,41 @@ public void testBreakOnFailure() throws Exception {
279282
assertStats(pipeline, 1, 1, 0);
280283
}
281284

285+
public void testFailurePipelineField() {
286+
TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
287+
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
288+
assertThat(ingestMetadata.entrySet(), hasSize(4));
289+
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
290+
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
291+
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
292+
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
293+
});
294+
295+
Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
296+
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(false, List.of(new AbstractProcessor(null) {
297+
@Override
298+
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
299+
ingestDocument.executePipeline(pipeline2, handler);
300+
}
301+
302+
@Override
303+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
304+
throw new AssertionError();
305+
}
306+
307+
@Override
308+
public String getType() {
309+
return "pipeline";
310+
}
311+
}), List.of(onFailureProcessor)));
312+
313+
ingestDocument.executePipeline(pipeline1, (document, e) -> {
314+
assertThat(document, notNullValue());
315+
assertThat(e, nullValue());
316+
});
317+
assertThat(onFailureProcessor.getInvokedCounter(), equalTo(1));
318+
}
319+
282320
private void assertStats(CompoundProcessor compoundProcessor, long count, long failed, long time) {
283321
assertStats(0, compoundProcessor, 0L, count, failed, time);
284322
}

0 commit comments

Comments
 (0)