Skip to content

Commit 96bb116

Browse files
committed
Support hierarchical task cancellation (elastic#54757)
With this change, when a task is canceled, the task manager will cancel not only its direct child tasks but all also its descendant tasks. Closes elastic#50990
1 parent 51c6f69 commit 96bb116

File tree

6 files changed

+288
-260
lines changed

6 files changed

+288
-260
lines changed

docs/reference/cluster/tasks.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ nodes `nodeId1` and `nodeId2`.
234234

235235
`wait_for_completion`::
236236
(Optional, boolean) If `true`, the request blocks until the cancellation of the
237-
task and its child tasks is completed. Otherwise, the request can return soon
237+
task and its descendant tasks is completed. Otherwise, the request can return soon
238238
after the cancellation is started. Defaults to `false`.
239239

240240
[source,console]

rest-api-spec/src/main/resources/rest-api-spec/api/tasks.cancel.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
},
4343
"wait_for_completion": {
4444
"type":"boolean",
45-
"description":"Should the request block until the cancellation of the task and its child tasks is completed. Defaults to false"
45+
"description":"Should the request block until the cancellation of the task and its descendant tasks is completed. Defaults to false"
4646
}
4747
}
4848
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public String getReason() {
7979
}
8080

8181
/**
82-
* If {@code true}, the request blocks until the cancellation of the task and its child tasks is completed.
82+
* If {@code true}, the request blocks until the cancellation of the task and its descendant tasks is completed.
8383
* Otherwise, the request can return soon after the cancellation is started. Defaults to {@code false}.
8484
*/
8585
public void setWaitForCompletion(boolean waitForCompletion) {

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
2121

2222
import org.elasticsearch.ResourceNotFoundException;
23+
import org.elasticsearch.Version;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.FailedNodeException;
2526
import org.elasticsearch.action.StepListener;
2627
import org.elasticsearch.action.TaskOperationFailure;
2728
import org.elasticsearch.action.support.ActionFilters;
29+
import org.elasticsearch.action.support.ChannelActionListener;
2830
import org.elasticsearch.action.support.GroupedActionListener;
2931
import org.elasticsearch.action.support.tasks.TransportTasksAction;
3032
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -104,34 +106,43 @@ protected void processTasks(CancelTasksRequest request, Consumer<CancellableTask
104106
@Override
105107
protected void taskOperation(CancelTasksRequest request, CancellableTask cancellableTask, ActionListener<TaskInfo> listener) {
106108
String nodeId = clusterService.localNode().getId();
107-
if (cancellableTask.shouldCancelChildrenOnCancellation()) {
109+
cancelTaskAndDescendants(cancellableTask, request.getReason(), request.waitForCompletion(),
110+
ActionListener.map(listener, r -> cancellableTask.taskInfo(nodeId, false)));
111+
}
112+
113+
void cancelTaskAndDescendants(CancellableTask task, String reason, boolean waitForCompletion, ActionListener<Void> listener) {
114+
if (task.shouldCancelChildrenOnCancellation()) {
108115
StepListener<Void> completedListener = new StepListener<>();
109116
GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.map(completedListener, r -> null), 3);
110117
Collection<DiscoveryNode> childrenNodes =
111-
taskManager.startBanOnChildrenNodes(cancellableTask.getId(), () -> groupedListener.onResponse(null));
112-
taskManager.cancel(cancellableTask, request.getReason(), () -> groupedListener.onResponse(null));
118+
taskManager.startBanOnChildrenNodes(task.getId(), () -> groupedListener.onResponse(null));
119+
taskManager.cancel(task, reason, () -> groupedListener.onResponse(null));
113120

114121
StepListener<Void> banOnNodesListener = new StepListener<>();
115-
setBanOnNodes(request.getReason(), cancellableTask, childrenNodes, banOnNodesListener);
122+
setBanOnNodes(reason, waitForCompletion, task, childrenNodes, banOnNodesListener);
116123
banOnNodesListener.whenComplete(groupedListener::onResponse, groupedListener::onFailure);
117124
// We remove bans after all child tasks are completed although in theory we can do it on a per-node basis.
118-
completedListener.whenComplete(
119-
r -> removeBanOnNodes(cancellableTask, childrenNodes),
120-
e -> removeBanOnNodes(cancellableTask, childrenNodes));
121-
// if wait_for_child_tasks is true, then only return when (1) bans are placed on child nodes, (2) child tasks are
125+
completedListener.whenComplete(r -> removeBanOnNodes(task, childrenNodes), e -> removeBanOnNodes(task, childrenNodes));
126+
// if wait_for_completion is true, then only return when (1) bans are placed on child nodes, (2) child tasks are
122127
// completed or failed, (3) the main task is cancelled. Otherwise, return after bans are placed on child nodes.
123-
if (request.waitForCompletion()) {
124-
completedListener.whenComplete(r -> listener.onResponse(cancellableTask.taskInfo(nodeId, false)), listener::onFailure);
128+
if (waitForCompletion) {
129+
completedListener.whenComplete(r -> listener.onResponse(null), listener::onFailure);
125130
} else {
126-
banOnNodesListener.whenComplete(r -> listener.onResponse(cancellableTask.taskInfo(nodeId, false)), listener::onFailure);
131+
banOnNodesListener.whenComplete(r -> listener.onResponse(null), listener::onFailure);
127132
}
128133
} else {
129-
logger.trace("task {} doesn't have any children that should be cancelled", cancellableTask.getId());
130-
taskManager.cancel(cancellableTask, request.getReason(), () -> listener.onResponse(cancellableTask.taskInfo(nodeId, false)));
134+
logger.trace("task {} doesn't have any children that should be cancelled", task.getId());
135+
if (waitForCompletion) {
136+
taskManager.cancel(task, reason, () -> listener.onResponse(null));
137+
} else {
138+
taskManager.cancel(task, reason, () -> {});
139+
listener.onResponse(null);
140+
}
131141
}
132142
}
133143

134-
private void setBanOnNodes(String reason, CancellableTask task, Collection<DiscoveryNode> childNodes, ActionListener<Void> listener) {
144+
private void setBanOnNodes(String reason, boolean waitForCompletion, CancellableTask task,
145+
Collection<DiscoveryNode> childNodes, ActionListener<Void> listener) {
135146
if (childNodes.isEmpty()) {
136147
listener.onResponse(null);
137148
return;
@@ -140,7 +151,7 @@ private void setBanOnNodes(String reason, CancellableTask task, Collection<Disco
140151
GroupedActionListener<Void> groupedListener =
141152
new GroupedActionListener<>(ActionListener.map(listener, r -> null), childNodes.size());
142153
final BanParentTaskRequest banRequest = BanParentTaskRequest.createSetBanParentTaskRequest(
143-
new TaskId(clusterService.localNode().getId(), task.getId()), reason);
154+
new TaskId(clusterService.localNode().getId(), task.getId()), reason, waitForCompletion);
144155
for (DiscoveryNode node : childNodes) {
145156
transportService.sendRequest(node, BAN_PARENT_ACTION_NAME, banRequest,
146157
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@@ -171,33 +182,41 @@ private static class BanParentTaskRequest extends TransportRequest {
171182

172183
private final TaskId parentTaskId;
173184
private final boolean ban;
185+
private final boolean waitForCompletion;
174186
private final String reason;
175187

176-
static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason) {
177-
return new BanParentTaskRequest(parentTaskId, reason);
188+
static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason, boolean waitForCompletion) {
189+
return new BanParentTaskRequest(parentTaskId, reason, waitForCompletion);
178190
}
179191

180192
static BanParentTaskRequest createRemoveBanParentTaskRequest(TaskId parentTaskId) {
181193
return new BanParentTaskRequest(parentTaskId);
182194
}
183195

184-
private BanParentTaskRequest(TaskId parentTaskId, String reason) {
196+
private BanParentTaskRequest(TaskId parentTaskId, String reason, boolean waitForCompletion) {
185197
this.parentTaskId = parentTaskId;
186198
this.ban = true;
187199
this.reason = reason;
200+
this.waitForCompletion = waitForCompletion;
188201
}
189202

190203
private BanParentTaskRequest(TaskId parentTaskId) {
191204
this.parentTaskId = parentTaskId;
192205
this.ban = false;
193206
this.reason = null;
207+
this.waitForCompletion = false;
194208
}
195209

196210
private BanParentTaskRequest(StreamInput in) throws IOException {
197211
super(in);
198212
parentTaskId = TaskId.readFromStream(in);
199213
ban = in.readBoolean();
200214
reason = ban ? in.readString() : null;
215+
if (in.getVersion().onOrAfter(Version.V_7_8_0)) {
216+
waitForCompletion = in.readBoolean();
217+
} else {
218+
waitForCompletion = false;
219+
}
201220
}
202221

203222
@Override
@@ -208,6 +227,9 @@ public void writeTo(StreamOutput out) throws IOException {
208227
if (ban) {
209228
out.writeString(reason);
210229
}
230+
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
231+
out.writeBoolean(waitForCompletion);
232+
}
211233
}
212234
}
213235

@@ -217,13 +239,20 @@ public void messageReceived(final BanParentTaskRequest request, final TransportC
217239
if (request.ban) {
218240
logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId,
219241
clusterService.localNode().getId(), request.reason);
220-
taskManager.setBan(request.parentTaskId, request.reason);
242+
final List<CancellableTask> childTasks = taskManager.setBan(request.parentTaskId, request.reason);
243+
final GroupedActionListener<Void> listener = new GroupedActionListener<>(ActionListener.map(
244+
new ChannelActionListener<>(channel, BAN_PARENT_ACTION_NAME, request), r -> TransportResponse.Empty.INSTANCE),
245+
childTasks.size() + 1);
246+
for (CancellableTask childTask : childTasks) {
247+
cancelTaskAndDescendants(childTask, request.reason, request.waitForCompletion, listener);
248+
}
249+
listener.onResponse(null);
221250
} else {
222251
logger.debug("Removing ban for the parent [{}] on the node [{}]", request.parentTaskId,
223252
clusterService.localNode().getId());
224253
taskManager.removeBan(request.parentTaskId);
254+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
225255
}
226-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
227256
}
228257
}
229258

server/src/main/java/org/elasticsearch/tasks/TaskManager.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,9 @@ public int getBanCount() {
333333
* Bans all tasks with the specified parent task from execution, cancels all tasks that are currently executing.
334334
* <p>
335335
* This method is called when a parent task that has children is cancelled.
336+
* @return a list of pending cancellable child tasks
336337
*/
337-
public void setBan(TaskId parentTaskId, String reason) {
338+
public List<CancellableTask> setBan(TaskId parentTaskId, String reason) {
338339
logger.trace("setting ban for the parent task {} {}", parentTaskId, reason);
339340

340341
// Set the ban first, so the newly created tasks cannot be registered
@@ -344,14 +345,10 @@ public void setBan(TaskId parentTaskId, String reason) {
344345
banedParents.put(parentTaskId, reason);
345346
}
346347
}
347-
348-
// Now go through already running tasks and cancel them
349-
for (Map.Entry<Long, CancellableTaskHolder> taskEntry : cancellableTasks.entrySet()) {
350-
CancellableTaskHolder holder = taskEntry.getValue();
351-
if (holder.hasParent(parentTaskId)) {
352-
holder.cancel(reason);
353-
}
354-
}
348+
return cancellableTasks.values().stream()
349+
.filter(t -> t.hasParent(parentTaskId))
350+
.map(t -> t.task)
351+
.collect(Collectors.toList());
355352
}
356353

357354
/**
@@ -365,11 +362,8 @@ public void removeBan(TaskId parentTaskId) {
365362
}
366363

367364
// for testing
368-
public boolean childTasksCancelledOrBanned(TaskId parentTaskId) {
369-
if (banedParents.containsKey(parentTaskId)) {
370-
return true;
371-
}
372-
return cancellableTasks.values().stream().noneMatch(task -> task.hasParent(parentTaskId));
365+
public Set<TaskId> getBannedTaskIds() {
366+
return Collections.unmodifiableSet(banedParents.keySet());
373367
}
374368

375369
/**

0 commit comments

Comments
 (0)