Skip to content

Commit 3c8b46a

Browse files
authored
[7.x] Handle errors when evaluating if conditions in processors (#52892)
1 parent 8785f57 commit 3c8b46a

File tree

3 files changed

+61
-5
lines changed

3 files changed

+61
-5
lines changed

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java

+48-2
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
*/
1919
package org.elasticsearch.ingest.common;
2020

21+
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
2122
import org.elasticsearch.action.support.WriteRequest;
2223
import org.elasticsearch.common.bytes.BytesArray;
2324
import org.elasticsearch.common.bytes.BytesReference;
2425
import org.elasticsearch.common.settings.Settings;
2526
import org.elasticsearch.common.xcontent.XContentType;
27+
import org.elasticsearch.ingest.IngestStats;
2628
import org.elasticsearch.plugins.Plugin;
2729
import org.elasticsearch.script.MockScriptEngine;
2830
import org.elasticsearch.script.MockScriptPlugin;
@@ -31,12 +33,14 @@
3133

3234
import java.util.Arrays;
3335
import java.util.Collection;
34-
import java.util.Collections;
36+
import java.util.HashMap;
37+
import java.util.List;
3538
import java.util.Map;
3639
import java.util.function.Consumer;
3740
import java.util.function.Function;
3841

3942
import static org.hamcrest.Matchers.equalTo;
43+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4044

4145
// Ideally I like this test to live in the server module, but otherwise a large part of the ScriptProcessor
4246
// ends up being copied into this test.
@@ -56,10 +60,52 @@ protected boolean ignoreExternalCluster() {
5660
public static class CustomScriptPlugin extends MockScriptPlugin {
5761
@Override
5862
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
59-
return Collections.singletonMap("my_script", ctx -> {
63+
Map<String, Function<Map<String, Object>, Object>> pluginScripts = new HashMap<>();
64+
pluginScripts.put("my_script", ctx -> {
6065
ctx.put("z", 0);
6166
return null;
6267
});
68+
pluginScripts.put("throwing_script", ctx -> {
69+
throw new RuntimeException("this script always fails");
70+
});
71+
return pluginScripts;
72+
}
73+
}
74+
75+
public void testFailureInConditionalProcessor() {
76+
internalCluster().ensureAtLeastNumDataNodes(1);
77+
internalCluster().startMasterOnlyNode();
78+
final String pipelineId = "foo";
79+
client().admin().cluster().preparePutPipeline(pipelineId,
80+
new BytesArray("{\n" +
81+
" \"processors\" : [\n" +
82+
" {\"set\" : {\"field\": \"any_field\", \"value\": \"any_value\"}},\n" +
83+
" {\"set\" : {" + "" +
84+
" \"if\" : " + "{\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"throwing_script\"}," +
85+
" \"field\": \"any_field2\"," +
86+
" \"value\": \"any_value2\"}" +
87+
" }\n" +
88+
" ]\n" +
89+
"}"), XContentType.JSON).get();
90+
91+
Exception e = expectThrows(
92+
Exception.class,
93+
() ->
94+
client().prepareIndex("index", "doc").setId("1")
95+
.setSource("x", 0)
96+
.setPipeline(pipelineId)
97+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
98+
.get()
99+
);
100+
assertTrue(e.getMessage().contains("this script always fails"));
101+
102+
NodesStatsResponse r = client().admin().cluster().prepareNodesStats(internalCluster().getNodeNames()).setIngest(true).get();
103+
int nodeCount = r.getNodes().size();
104+
for (int k = 0; k < nodeCount; k++) {
105+
List<IngestStats.ProcessorStat> stats = r.getNodes().get(k).getIngestStats().getProcessorStats().get(pipelineId);
106+
for (IngestStats.ProcessorStat st : stats) {
107+
assertThat(st.getStats().getIngestCurrent(), greaterThanOrEqualTo(0L));
108+
}
63109
}
64110
}
65111

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,15 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
7676

7777
@Override
7878
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
79-
if (evaluate(ingestDocument)) {
79+
final boolean matches;
80+
try {
81+
matches = evaluate(ingestDocument);
82+
} catch (Exception e) {
83+
handler.accept(null, e);
84+
return;
85+
}
86+
87+
if (matches) {
8088
final long startTimeInNanos = relativeTimeProvider.getAsLong();
8189
metric.preIngest();
8290
processor.execute(ingestDocument, (result, e) -> {

server/src/main/java/org/elasticsearch/ingest/Processor.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,14 @@ public interface Processor {
4747
* otherwise just overwrite {@link #execute(IngestDocument)}.
4848
*/
4949
default void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
50+
final IngestDocument result;
5051
try {
51-
IngestDocument result = execute(ingestDocument);
52-
handler.accept(result, null);
52+
result = execute(ingestDocument);
5353
} catch (Exception e) {
5454
handler.accept(null, e);
55+
return;
5556
}
57+
handler.accept(result, null);
5658
}
5759

5860
/**

0 commit comments

Comments
 (0)