Skip to content

Commit 358fd4e

Browse files
committed
ingest: fix on_failure with Drop processor (#36686)
This commit allows a document to be dropped when a Drop processor is used in the on_failure fork of the processor chain. Fixes #36151
1 parent 0cd088b commit 358fd4e

File tree

3 files changed

+81
-4
lines changed

3 files changed

+81
-4
lines changed

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/220_drop_processor.yml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,44 @@ teardown:
5757
type: test
5858
id: 2
5959
- match: { _source.foo: "blub" }
60+
61+
---
62+
"Test Drop Processor On Failure":
63+
- do:
64+
ingest.put_pipeline:
65+
id: "my_pipeline_with_failure"
66+
body: >
67+
{
68+
"description" : "pipeline with on failure drop",
69+
"processors": [
70+
{
71+
"fail": {
72+
"message": "failed",
73+
"on_failure": [
74+
{
75+
"drop": {}
76+
}
77+
]
78+
}
79+
}
80+
]
81+
}
82+
- match: { acknowledged: true }
83+
84+
- do:
85+
index:
86+
index: test
87+
type: test
88+
id: 3
89+
pipeline: "my_pipeline_with_failure"
90+
body: {
91+
foo: "bar"
92+
}
93+
94+
- do:
95+
catch: missing
96+
get:
97+
index: test
98+
type: test
99+
id: 3
100+
- match: { found: false }

server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
135135
if (onFailureProcessors.isEmpty()) {
136136
throw compoundProcessorException;
137137
} else {
138-
executeOnFailure(ingestDocument, compoundProcessorException);
138+
if (executeOnFailure(ingestDocument, compoundProcessorException) == false) {
139+
return null;
140+
}
139141
break;
140142
}
141143
} finally {
@@ -146,20 +148,25 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
146148
return ingestDocument;
147149
}
148150

149-
150-
void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
151+
/**
152+
* @return true if execution should continue, false if document is dropped.
153+
*/
154+
boolean executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
151155
try {
152156
putFailureMetadata(ingestDocument, exception);
153157
for (Processor processor : onFailureProcessors) {
154158
try {
155-
processor.execute(ingestDocument);
159+
if (processor.execute(ingestDocument) == null) {
160+
return false;
161+
}
156162
} catch (Exception e) {
157163
throw newCompoundProcessorException(e, processor.getType(), processor.getTag());
158164
}
159165
}
160166
} finally {
161167
removeFailureMetadata(ingestDocument);
162168
}
169+
return true;
163170
}
164171

165172
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {

server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,35 @@ public void testSingleProcessorWithOnFailureProcessor() throws Exception {
129129
assertThat(processor2.getInvokedCounter(), equalTo(1));
130130
}
131131

132+
public void testSingleProcessorWithOnFailureDropProcessor() throws Exception {
133+
TestProcessor processor1 = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
134+
Processor processor2 = new Processor() {
135+
@Override
136+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
137+
//Simulates the drop processor
138+
return null;
139+
}
140+
141+
@Override
142+
public String getType() {
143+
return "drop";
144+
}
145+
146+
@Override
147+
public String getTag() {
148+
return null;
149+
}
150+
};
151+
152+
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
153+
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
154+
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.singletonList(processor1),
155+
Collections.singletonList(processor2), relativeTimeProvider);
156+
assertNull(compoundProcessor.execute(ingestDocument));
157+
assertThat(processor1.getInvokedCounter(), equalTo(1));
158+
assertStats(compoundProcessor, 1, 1, 0);
159+
}
160+
132161
public void testSingleProcessorWithNestedFailures() throws Exception {
133162
TestProcessor processor = new TestProcessor("id", "first", ingestDocument -> {throw new RuntimeException("error");});
134163
TestProcessor processorToFail = new TestProcessor("id2", "second", ingestDocument -> {

0 commit comments

Comments
 (0)