Skip to content

Commit d106927

Browse files
committed
Continue registering pipelines after one pipeline parse failure. (#28752)
Ingest has been failing to apply existing pipelines from cluster-state into the in-memory representation that are no longer valid. One example of this is a pipeline with a script processor. If a cluster starts up with scripting disabled, these pipelines will not be loaded. Even though GETing a pipeline worked, indexing operations claimed that this pipeline did not exist. This is because one gets information from cluster-state and the other is from an in-memory data-structure. Now, two things happen 1. suppress the exceptions until after other successful pipelines are loaded 2. replace failed pipelines with a placeholder pipeline If the pipeline execution service encounters the stubbed pipeline, it is known that something went wrong at the time of pipeline creation and an exception was thrown to the user at some point at start-up. closes #28269.
1 parent a445f7f commit d106927

File tree

7 files changed

+142
-14
lines changed

7 files changed

+142
-14
lines changed

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java

+65
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
*/
1919
package org.elasticsearch.ingest.common;
2020

21+
import org.elasticsearch.ElasticsearchException;
22+
import org.elasticsearch.ExceptionsHelper;
2123
import org.elasticsearch.action.support.WriteRequest;
24+
import org.elasticsearch.common.Strings;
2225
import org.elasticsearch.common.bytes.BytesArray;
2326
import org.elasticsearch.common.bytes.BytesReference;
2427
import org.elasticsearch.common.settings.Settings;
28+
import org.elasticsearch.common.util.set.Sets;
2529
import org.elasticsearch.common.xcontent.XContentType;
2630
import org.elasticsearch.plugins.Plugin;
2731
import org.elasticsearch.script.MockScriptEngine;
@@ -33,6 +37,7 @@
3337
import java.util.Collection;
3438
import java.util.Collections;
3539
import java.util.Map;
40+
import java.util.function.Consumer;
3641
import java.util.function.Function;
3742

3843
import static org.hamcrest.Matchers.equalTo;
@@ -64,6 +69,66 @@ protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
6469
}
6570
}
6671

72+
public void testScriptDisabled() throws Exception {
73+
String pipelineIdWithoutScript = randomAlphaOfLengthBetween(5, 10);
74+
String pipelineIdWithScript = pipelineIdWithoutScript + "_script";
75+
internalCluster().startNode();
76+
77+
BytesReference pipelineWithScript = new BytesArray("{\n" +
78+
" \"processors\" : [\n" +
79+
" {\"script\" : {\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"my_script\"}}\n" +
80+
" ]\n" +
81+
"}");
82+
BytesReference pipelineWithoutScript = new BytesArray("{\n" +
83+
" \"processors\" : [\n" +
84+
" {\"set\" : {\"field\": \"y\", \"value\": 0}}\n" +
85+
" ]\n" +
86+
"}");
87+
88+
Consumer<String> checkPipelineExists = (id) -> assertThat(client().admin().cluster().prepareGetPipeline(id)
89+
.get().pipelines().get(0).getId(), equalTo(id));
90+
91+
client().admin().cluster().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get();
92+
client().admin().cluster().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get();
93+
94+
checkPipelineExists.accept(pipelineIdWithScript);
95+
checkPipelineExists.accept(pipelineIdWithoutScript);
96+
97+
98+
internalCluster().stopCurrentMasterNode();
99+
internalCluster().startNode(Settings.builder().put("script.allowed_types", "none"));
100+
101+
checkPipelineExists.accept(pipelineIdWithoutScript);
102+
checkPipelineExists.accept(pipelineIdWithScript);
103+
104+
client().prepareIndex("index", "doc", "1")
105+
.setSource("x", 0)
106+
.setPipeline(pipelineIdWithoutScript)
107+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
108+
.get();
109+
110+
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
111+
() -> client().prepareIndex("index", "doc", "2")
112+
.setSource("x", 0)
113+
.setPipeline(pipelineIdWithScript)
114+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
115+
.get());
116+
assertThat(exception.getHeaderKeys(), equalTo(Sets.newHashSet("processor_type")));
117+
assertThat(exception.getHeader("processor_type"), equalTo(Arrays.asList("unknown")));
118+
assertThat(exception.getRootCause().getMessage(),
119+
equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " +
120+
"[ElasticsearchParseException[Error updating pipeline with id [" + pipelineIdWithScript + "]]; " +
121+
"nested: ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " +
122+
"nested: IllegalArgumentException[cannot execute [inline] scripts];; " +
123+
"ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " +
124+
"nested: IllegalArgumentException[cannot execute [inline] scripts];; java.lang.IllegalArgumentException: " +
125+
"cannot execute [inline] scripts]"));
126+
127+
Map<String, Object> source = client().prepareGet("index", "doc", "1").get().getSource();
128+
assertThat(source.get("x"), equalTo(0));
129+
assertThat(source.get("y"), equalTo(0));
130+
}
131+
67132
public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exception {
68133
internalCluster().startNode();
69134

server/src/main/java/org/elasticsearch/ingest/IngestService.java

-4
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,13 @@
2525
import java.util.List;
2626
import java.util.Map;
2727

28-
import org.elasticsearch.common.settings.ClusterSettings;
29-
import org.elasticsearch.common.settings.Setting;
3028
import org.elasticsearch.common.settings.Settings;
3129
import org.elasticsearch.env.Environment;
3230
import org.elasticsearch.index.analysis.AnalysisRegistry;
3331
import org.elasticsearch.plugins.IngestPlugin;
3432
import org.elasticsearch.script.ScriptService;
3533
import org.elasticsearch.threadpool.ThreadPool;
3634

37-
import static org.elasticsearch.common.settings.Setting.Property;
38-
3935
/**
4036
* Holder class for several ingest related services.
4137
*/

server/src/main/java/org/elasticsearch/ingest/PipelineStore.java

+27-2
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,41 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) {
8181
}
8282

8383
Map<String, Pipeline> pipelines = new HashMap<>();
84+
List<ElasticsearchParseException> exceptions = new ArrayList<>();
8485
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
8586
try {
8687
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
8788
} catch (ElasticsearchParseException e) {
88-
throw e;
89+
pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e));
90+
exceptions.add(e);
8991
} catch (Exception e) {
90-
throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e);
92+
ElasticsearchParseException parseException = new ElasticsearchParseException(
93+
"Error updating pipeline with id [" + pipeline.getId() + "]", e);
94+
pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException));
95+
exceptions.add(parseException);
9196
}
9297
}
9398
this.pipelines = Collections.unmodifiableMap(pipelines);
99+
ExceptionsHelper.rethrowAndSuppress(exceptions);
100+
}
101+
102+
private Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
103+
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
104+
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
105+
String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]";
106+
Processor failureProcessor = new AbstractProcessor(tag) {
107+
@Override
108+
public void execute(IngestDocument ingestDocument) {
109+
throw new IllegalStateException(errorMessage);
110+
}
111+
112+
@Override
113+
public String getType() {
114+
return type;
115+
}
116+
};
117+
String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded";
118+
return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
94119
}
95120

96121
/**

server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,12 @@
3636
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
3737
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
3838
import org.elasticsearch.action.ingest.WritePipelineResponse;
39-
import org.elasticsearch.action.support.replication.TransportReplicationActionTests;
4039
import org.elasticsearch.action.update.UpdateRequest;
4140
import org.elasticsearch.client.Requests;
42-
import org.elasticsearch.common.bytes.BytesArray;
4341
import org.elasticsearch.common.bytes.BytesReference;
4442
import org.elasticsearch.common.settings.Settings;
4543
import org.elasticsearch.common.xcontent.XContentType;
4644
import org.elasticsearch.plugins.Plugin;
47-
import org.elasticsearch.script.Script;
48-
import org.elasticsearch.script.ScriptService;
4945
import org.elasticsearch.test.ESIntegTestCase;
5046

5147
import java.util.Arrays;
@@ -130,6 +126,10 @@ public void testSimulate() throws Exception {
130126
IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, source);
131127
assertThat(simulateDocumentBaseResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata()));
132128
assertThat(simulateDocumentBaseResult.getFailure(), nullValue());
129+
130+
// cleanup
131+
WritePipelineResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get();
132+
assertTrue(deletePipelineResponse.isAcknowledged());
133133
}
134134

135135
public void testBulkWithIngestFailures() throws Exception {
@@ -172,6 +172,10 @@ public void testBulkWithIngestFailures() throws Exception {
172172
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
173173
}
174174
}
175+
176+
// cleanup
177+
WritePipelineResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get();
178+
assertTrue(deletePipelineResponse.isAcknowledged());
175179
}
176180

177181
public void testBulkWithUpsert() throws Exception {
@@ -271,5 +275,8 @@ public void testPutWithPipelineFactoryError() throws Exception {
271275
assertNotNull(ex);
272276
assertThat(ex.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]"));
273277
}
278+
279+
GetPipelineResponse response = client().admin().cluster().prepareGetPipeline("_id").get();
280+
assertFalse(response.isFound());
274281
}
275282
}

server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import static org.hamcrest.Matchers.equalTo;
3838
import static org.hamcrest.Matchers.is;
3939
import static org.hamcrest.Matchers.notNullValue;
40-
import static org.hamcrest.Matchers.nullValue;
4140

4241
@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
4342
public class IngestProcessorNotInstalledOnAllNodesIT extends ESIntegTestCase {
@@ -104,7 +103,11 @@ public void testFailStartNode() throws Exception {
104103
installPlugin = false;
105104
String node2 = internalCluster().startNode();
106105
pipeline = internalCluster().getInstance(NodeService.class, node2).getIngestService().getPipelineStore().get("_id");
107-
assertThat(pipeline, nullValue());
106+
107+
assertNotNull(pipeline);
108+
assertThat(pipeline.getId(), equalTo("_id"));
109+
assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, " +
110+
"because pipeline with id [_id] could not be loaded"));
108111
}
109112

110113
}

server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java

+27
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.ingest;
2121

22+
import org.apache.lucene.util.SetOnce;
2223
import org.elasticsearch.ElasticsearchException;
2324
import org.elasticsearch.action.DocWriteRequest;
2425
import org.elasticsearch.action.bulk.BulkRequest;
@@ -91,6 +92,32 @@ public void testExecuteIndexPipelineDoesNotExist() {
9192
verify(completionHandler, never()).accept(anyBoolean());
9293
}
9394

95+
public void testExecuteIndexPipelineExistsButFailedParsing() {
96+
when(store.get("_id")).thenReturn(new Pipeline("_id", "stub", null,
97+
new CompoundProcessor(new AbstractProcessor("mock") {
98+
@Override
99+
public void execute(IngestDocument ingestDocument) {
100+
throw new IllegalStateException("error");
101+
}
102+
103+
@Override
104+
public String getType() {
105+
return null;
106+
}
107+
})));
108+
SetOnce<Boolean> failed = new SetOnce<>();
109+
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
110+
Consumer<Exception> failureHandler = (e) -> {
111+
assertThat(e.getCause().getClass(), equalTo(IllegalArgumentException.class));
112+
assertThat(e.getCause().getCause().getClass(), equalTo(IllegalStateException.class));
113+
assertThat(e.getCause().getCause().getMessage(), equalTo("error"));
114+
failed.set(true);
115+
};
116+
Consumer<Boolean> completionHandler = (e) -> failed.set(false);
117+
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
118+
assertTrue(failed.get());
119+
}
120+
94121
public void testExecuteBulkPipelineDoesNotExist() {
95122
CompoundProcessor processor = mock(CompoundProcessor.class);
96123
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));

server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.cluster.metadata.MetaData;
3030
import org.elasticsearch.cluster.node.DiscoveryNode;
3131
import org.elasticsearch.common.bytes.BytesArray;
32-
import org.elasticsearch.common.settings.ClusterSettings;
3332
import org.elasticsearch.common.settings.Settings;
3433
import org.elasticsearch.common.xcontent.XContentType;
3534
import org.elasticsearch.test.ESTestCase;
@@ -165,7 +164,13 @@ public void testPutWithErrorResponse() {
165164
assertThat(e.getMessage(), equalTo("[processors] required property is missing"));
166165
}
167166
pipeline = store.get(id);
168-
assertThat(pipeline, nullValue());
167+
assertNotNull(pipeline);
168+
assertThat(pipeline.getId(), equalTo("_id"));
169+
assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with" +
170+
" id [_id] could not be loaded"));
171+
assertThat(pipeline.getProcessors().size(), equalTo(1));
172+
assertNull(pipeline.getProcessors().get(0).getTag());
173+
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("unknown"));
169174
}
170175

171176
public void testDelete() {

0 commit comments

Comments
 (0)