Skip to content

Commit 4f36d38

Browse files
authored
Remove persistent tasks that failed to initialize from cluster state (#71462)
1 parent 7a8db42 commit 4f36d38

File tree

2 files changed

+151
-2
lines changed

2 files changed

+151
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.persistent;
10+
11+
import org.elasticsearch.Version;
12+
import org.elasticsearch.action.support.PlainActionFuture;
13+
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
15+
import org.elasticsearch.cluster.service.ClusterService;
16+
import org.elasticsearch.common.UUIDs;
17+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
18+
import org.elasticsearch.common.io.stream.StreamInput;
19+
import org.elasticsearch.common.io.stream.StreamOutput;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.common.settings.SettingsModule;
22+
import org.elasticsearch.common.xcontent.XContentBuilder;
23+
import org.elasticsearch.plugins.PersistentTaskPlugin;
24+
import org.elasticsearch.plugins.Plugin;
25+
import org.elasticsearch.tasks.TaskId;
26+
import org.elasticsearch.tasks.TaskManager;
27+
import org.elasticsearch.test.ESIntegTestCase;
28+
import org.elasticsearch.threadpool.ThreadPool;
29+
30+
import java.io.IOException;
31+
import java.util.Collection;
32+
import java.util.List;
33+
import java.util.Map;
34+
35+
import static org.hamcrest.Matchers.empty;
36+
37+
public class PersistentTaskInitializationFailureIT extends ESIntegTestCase {
38+
@Override
39+
protected Collection<Class<? extends Plugin>> nodePlugins() {
40+
return List.of(FailingInitializationPersistentTasksPlugin.class);
41+
}
42+
43+
public void testPersistentTasksThatFailDuringInitializationAreRemovedFromClusterState() throws Exception {
44+
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
45+
PlainActionFuture<PersistentTasksCustomMetadata.PersistentTask<FailingInitializationTaskParams>> startPersistentTaskFuture =
46+
new PlainActionFuture<>();
47+
persistentTasksService.sendStartRequest(UUIDs.base64UUID(),
48+
FailingInitializationPersistentTaskExecutor.TASK_NAME,
49+
new FailingInitializationTaskParams(),
50+
startPersistentTaskFuture
51+
);
52+
startPersistentTaskFuture.actionGet();
53+
54+
assertBusy(() -> {
55+
final ClusterService clusterService = internalCluster().getMasterNodeInstance(ClusterService.class);
56+
final PersistentTasksCustomMetadata persistentTasks =
57+
clusterService.state().metadata().custom(PersistentTasksCustomMetadata.TYPE);
58+
assertThat(persistentTasks.tasks().toString(), persistentTasks.tasks(), empty());
59+
});
60+
}
61+
62+
public static class FailingInitializationPersistentTasksPlugin extends Plugin implements PersistentTaskPlugin {
63+
public FailingInitializationPersistentTasksPlugin(Settings settings) {}
64+
65+
@Override
66+
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
67+
ThreadPool threadPool,
68+
Client client,
69+
SettingsModule settingsModule,
70+
IndexNameExpressionResolver expressionResolver) {
71+
return List.of(new FailingInitializationPersistentTaskExecutor());
72+
}
73+
74+
@Override
75+
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
76+
return List.of(
77+
new NamedWriteableRegistry.Entry(PersistentTaskParams.class,
78+
FailingInitializationPersistentTaskExecutor.TASK_NAME,
79+
FailingInitializationTaskParams::new
80+
)
81+
);
82+
}
83+
}
84+
85+
public static class FailingInitializationTaskParams implements PersistentTaskParams {
86+
public FailingInitializationTaskParams() { }
87+
88+
public FailingInitializationTaskParams(StreamInput in) throws IOException { }
89+
90+
@Override
91+
public String getWriteableName() {
92+
return FailingInitializationPersistentTaskExecutor.TASK_NAME;
93+
}
94+
95+
@Override
96+
public Version getMinimalSupportedVersion() {
97+
return Version.CURRENT;
98+
}
99+
100+
@Override
101+
public void writeTo(StreamOutput out) throws IOException { }
102+
103+
@Override
104+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
105+
builder.startObject();
106+
builder.endObject();
107+
return builder;
108+
}
109+
}
110+
111+
static class FailingInitializationPersistentTaskExecutor extends PersistentTasksExecutor<FailingInitializationTaskParams> {
112+
static final String TASK_NAME = "cluster:admin/persistent/test_init_failure";
113+
static final String EXECUTOR_NAME = "failing_executor";
114+
115+
FailingInitializationPersistentTaskExecutor() {
116+
super(TASK_NAME, EXECUTOR_NAME);
117+
}
118+
119+
@Override
120+
protected AllocatedPersistentTask createTask(long id,
121+
String type,
122+
String action,
123+
TaskId parentTaskId,
124+
PersistentTasksCustomMetadata.PersistentTask<
125+
FailingInitializationTaskParams> taskInProgress,
126+
Map<String, String> headers) {
127+
return new AllocatedPersistentTask(id, type, action, "", parentTaskId, headers) {
128+
@Override
129+
protected void init(PersistentTasksService persistentTasksService,
130+
TaskManager taskManager,
131+
String persistentTaskId,
132+
long allocationId) {
133+
throw new RuntimeException("BOOM");
134+
}
135+
};
136+
}
137+
138+
@Override
139+
protected void nodeOperation(AllocatedPersistentTask task, FailingInitializationTaskParams params, PersistentTaskState state) {
140+
assert false : "Unexpected call";
141+
}
142+
}
143+
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,11 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
171171
}
172172

173173
boolean processed = false;
174+
Exception initializationException = null;
174175
try {
175176
task.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId());
176177
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(),
177-
task.getPersistentTaskId(), task.getAllocationId());
178+
task.getPersistentTaskId(), task.getAllocationId());
178179
try {
179180
runningTasks.put(taskInProgress.getAllocationId(), task);
180181
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getState(), task, executor);
@@ -183,12 +184,17 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
183184
task.markAsFailed(e);
184185
}
185186
processed = true;
187+
} catch (Exception e) {
188+
initializationException = e;
186189
} finally {
187190
if (processed == false) {
188191
// something went wrong - unregistering task
189192
logger.warn("Persistent task [{}] with id [{}] and allocation id [{}] failed to create", task.getAction(),
190-
task.getPersistentTaskId(), task.getAllocationId());
193+
task.getPersistentTaskId(), task.getAllocationId());
191194
taskManager.unregister(task);
195+
if (initializationException != null) {
196+
notifyMasterOfFailedTask(taskInProgress, initializationException);
197+
}
192198
}
193199
}
194200
}

0 commit comments

Comments
 (0)