Skip to content

Commit 400b7ec

Browse files
authored
Child requests proactively cancel children tasks (#92588)
To make this possible we modify the CancellableTasksTracker to track children tasks by the Request ID as well. That way, we can send an Action to cancel a child based on the parent task and the Request ID. This is especially useful when parents' children requests timeout on the parents' side. Fixes #90353 Relates #66992
1 parent f353be2 commit 400b7ec

File tree

25 files changed

+435
-95
lines changed

25 files changed

+435
-95
lines changed

docs/changelog/92588.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 92588
2+
summary: Failed tasks proactively cancel children tasks
3+
area: Snapshot/Restore
4+
type: enhancement
5+
issues:
6+
- 90353

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java

+77-31
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+10
Original file line numberDiff line numberDiff line change
@@ -1335,6 +1335,16 @@ public TaskId getParentTask() {
13351335
return request.getParentTask();
13361336
}
13371337

1338+
@Override
1339+
public void setRequestId(long requestId) {
1340+
request.setRequestId(requestId);
1341+
}
1342+
1343+
@Override
1344+
public long getRequestId() {
1345+
return request.getRequestId();
1346+
}
1347+
13381348
@Override
13391349
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
13401350
return request.createTask(id, type, action, parentTaskId, headers);

server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

+3
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@ private <T extends ClusterStateTaskListener> void executeAndPublishBatch(
254254
@Override
255255
public void setParentTask(TaskId taskId) {}
256256

257+
@Override
258+
public void setRequestId(long requestId) {}
259+
257260
@Override
258261
public TaskId getParentTask() {
259262
return TaskId.EMPTY_TASK_ID;

server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

+5
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ public void setParentTask(TaskId taskId) {
175175
throw new UnsupportedOperationException("parent task if for persistent tasks shouldn't change");
176176
}
177177

178+
@Override
179+
public void setRequestId(long requestId) {
180+
throw new UnsupportedOperationException("does not have a request ID");
181+
}
182+
178183
@Override
179184
public TaskId getParentTask() {
180185
return parentTaskId;

server/src/main/java/org/elasticsearch/tasks/CancellableTasksTracker.java

+76-36
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,45 @@ public CancellableTasksTracker(T[] empty) {
3131
}
3232

3333
private final Map<Long, T> byTaskId = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
34-
private final Map<TaskId, T[]> byParentTaskId = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
34+
private final Map<TaskId, Map<Long, T[]>> byParentTaskId = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
35+
36+
/**
37+
* Gets the cancellable children of a parent task.
38+
*
39+
* Note: children of non-positive request IDs (e.g., -1) may be grouped together.
40+
*/
41+
public Stream<T> getChildrenByRequestId(TaskId parentTaskId, long childRequestId) {
42+
Map<Long, T[]> byRequestId = byParentTaskId.get(parentTaskId);
43+
if (byRequestId != null) {
44+
T[] children = byRequestId.get(childRequestId);
45+
if (children != null) {
46+
return Arrays.stream(children);
47+
}
48+
}
49+
return Stream.empty();
50+
}
3551

3652
/**
3753
* Add an item for the given task. Should only be called once for each task, and {@code item} must be unique per task too.
3854
*/
39-
public void put(Task task, T item) {
55+
public void put(Task task, long requestId, T item) {
4056
final long taskId = task.getId();
4157
if (task.getParentTaskId().isSet()) {
42-
byParentTaskId.compute(task.getParentTaskId(), (ignored, oldValue) -> {
43-
if (oldValue == null) {
44-
oldValue = empty;
58+
byParentTaskId.compute(task.getParentTaskId(), (taskKey, oldRequestIdMap) -> {
59+
if (oldRequestIdMap == null) {
60+
oldRequestIdMap = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
4561
}
46-
final T[] newValue = Arrays.copyOf(oldValue, oldValue.length + 1);
47-
newValue[oldValue.length] = item;
48-
return newValue;
62+
63+
oldRequestIdMap.compute(requestId, (requestIdKey, oldValue) -> {
64+
if (oldValue == null) {
65+
oldValue = empty;
66+
}
67+
final T[] newValue = Arrays.copyOf(oldValue, oldValue.length + 1);
68+
newValue[oldValue.length] = item;
69+
return newValue;
70+
});
71+
72+
return oldRequestIdMap;
4973
});
5074
}
5175
final T oldItem = byTaskId.put(taskId, item);
@@ -60,36 +84,50 @@ public T get(long id) {
6084
}
6185

6286
/**
63-
* Remove (and return) the item that corresponds with the given task. Return {@code null} if not present. Safe to call multiple times
64-
* for each task. However, {@link #getByParent} may return this task even after a call to this method completes, if the removal is
65-
* actually being completed by a concurrent call that's still ongoing.
87+
* Remove (and return) the item that corresponds with the given task and request ID. Return {@code null} if not present. Safe to call
88+
* multiple times for each task. However, {@link #getByParent} may return this task even after a call to this method completes, if
89+
* the removal is actually being completed by a concurrent call that's still ongoing.
6690
*/
6791
public T remove(Task task) {
6892
final long taskId = task.getId();
6993
final T oldItem = byTaskId.remove(taskId);
7094
if (oldItem != null && task.getParentTaskId().isSet()) {
71-
byParentTaskId.compute(task.getParentTaskId(), (ignored, oldValue) -> {
72-
if (oldValue == null) {
95+
byParentTaskId.compute(task.getParentTaskId(), (taskKey, oldRequestIdMap) -> {
96+
if (oldRequestIdMap == null) {
7397
return null;
7498
}
75-
if (oldValue.length == 1) {
76-
if (oldValue[0] == oldItem) {
77-
return null;
78-
} else {
99+
100+
for (Long requestId : oldRequestIdMap.keySet()) {
101+
oldRequestIdMap.compute(requestId, (requestIdKey, oldValue) -> {
102+
if (oldValue == null) {
103+
return null;
104+
}
105+
if (oldValue.length == 1) {
106+
if (oldValue[0] == oldItem) {
107+
return null;
108+
} else {
109+
return oldValue;
110+
}
111+
}
112+
if (oldValue[0] == oldItem) {
113+
return Arrays.copyOfRange(oldValue, 1, oldValue.length);
114+
}
115+
for (int i = 1; i < oldValue.length; i++) {
116+
if (oldValue[i] == oldItem) {
117+
final T[] newValue = Arrays.copyOf(oldValue, oldValue.length - 1);
118+
System.arraycopy(oldValue, i + 1, newValue, i, oldValue.length - i - 1);
119+
return newValue;
120+
}
121+
}
79122
return oldValue;
80-
}
81-
}
82-
if (oldValue[0] == oldItem) {
83-
return Arrays.copyOfRange(oldValue, 1, oldValue.length);
123+
});
84124
}
85-
for (int i = 1; i < oldValue.length; i++) {
86-
if (oldValue[i] == oldItem) {
87-
final T[] newValue = Arrays.copyOf(oldValue, oldValue.length - 1);
88-
System.arraycopy(oldValue, i + 1, newValue, i, oldValue.length - i - 1);
89-
return newValue;
90-
}
125+
126+
if (oldRequestIdMap.keySet().isEmpty()) {
127+
return null;
91128
}
92-
return oldValue;
129+
130+
return oldRequestIdMap;
93131
});
94132
}
95133
return oldItem;
@@ -109,11 +147,11 @@ public Collection<T> values() {
109147
* started before this method was called have not completed.
110148
*/
111149
public Stream<T> getByParent(TaskId parentTaskId) {
112-
final T[] byParent = byParentTaskId.get(parentTaskId);
150+
final Map<Long, T[]> byParent = byParentTaskId.get(parentTaskId);
113151
if (byParent == null) {
114152
return Stream.empty();
115153
}
116-
return Arrays.stream(byParent);
154+
return byParent.values().stream().flatMap(Stream::of);
117155
}
118156

119157
// assertion for tests, not an invariant but should eventually be true
@@ -123,12 +161,14 @@ boolean assertConsistent() {
123161

124162
// every by-parent value must be tracked by task too; the converse isn't true since we don't track values without a parent
125163
final Set<T> byTaskValues = new HashSet<>(byTaskId.values());
126-
for (T[] byParent : byParentTaskId.values()) {
127-
assert byParent.length > 0;
128-
for (T t : byParent) {
129-
assert byTaskValues.contains(t);
130-
}
131-
}
164+
byParentTaskId.values().forEach(byParentMap -> {
165+
byParentMap.forEach((requestId, byParentArray) -> {
166+
assert byParentArray.length > 0;
167+
for (T t : byParentArray) {
168+
assert byTaskValues.contains(t);
169+
}
170+
});
171+
});
132172

133173
return true;
134174
}

server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java

+12
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ default void setParentTask(String parentTaskNode, long parentTaskId) {
2626
*/
2727
void setParentTask(TaskId taskId);
2828

29+
/**
30+
* Gets the request ID. Defaults to -1, meaning "no request ID is set".
31+
*/
32+
default long getRequestId() {
33+
return -1;
34+
}
35+
36+
/**
37+
* Set the request ID related to this task.
38+
*/
39+
void setRequestId(long requestId);
40+
2941
/**
3042
* Get a reference to the task that created this request. Implementers should default to
3143
* {@link TaskId#EMPTY_TASK_ID}, meaning "there is no parent".

server/src/main/java/org/elasticsearch/tasks/TaskCancellationService.java

+74
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.action.support.CountDownActionListener;
2121
import org.elasticsearch.action.support.GroupedActionListener;
2222
import org.elasticsearch.action.support.RefCountingRunnable;
23+
import org.elasticsearch.cluster.node.DiscoveryNode;
2324
import org.elasticsearch.common.io.stream.StreamInput;
2425
import org.elasticsearch.common.io.stream.StreamOutput;
2526
import org.elasticsearch.threadpool.ThreadPool;
@@ -44,6 +45,8 @@
4445

4546
public class TaskCancellationService {
4647
public static final String BAN_PARENT_ACTION_NAME = "internal:admin/tasks/ban";
48+
public static final String CANCEL_CHILD_ACTION_NAME = "internal:admin/tasks/cancel_child";
49+
public static final TransportVersion VERSION_SUPPORTING_CANCEL_CHILD_ACTION = TransportVersion.V_8_8_0;
4750
private static final Logger logger = LogManager.getLogger(TaskCancellationService.class);
4851
private final TransportService transportService;
4952
private final TaskManager taskManager;
@@ -59,6 +62,12 @@ public TaskCancellationService(TransportService transportService) {
5962
BanParentTaskRequest::new,
6063
new BanParentRequestHandler()
6164
);
65+
transportService.registerRequestHandler(
66+
CANCEL_CHILD_ACTION_NAME,
67+
ThreadPool.Names.SAME,
68+
CancelChildRequest::new,
69+
new CancelChildRequestHandler()
70+
);
6271
}
6372

6473
private String localNodeId() {
@@ -341,4 +350,69 @@ public void messageReceived(final BanParentTaskRequest request, final TransportC
341350
}
342351
}
343352
}
353+
354+
private static class CancelChildRequest extends TransportRequest {
355+
356+
private final TaskId parentTaskId;
357+
private final long childRequestId;
358+
private final String reason;
359+
360+
static CancelChildRequest createCancelChildRequest(TaskId parentTaskId, long childRequestId, String reason) {
361+
return new CancelChildRequest(parentTaskId, childRequestId, reason);
362+
}
363+
364+
private CancelChildRequest(TaskId parentTaskId, long childRequestId, String reason) {
365+
this.parentTaskId = parentTaskId;
366+
this.childRequestId = childRequestId;
367+
this.reason = reason;
368+
}
369+
370+
private CancelChildRequest(StreamInput in) throws IOException {
371+
super(in);
372+
parentTaskId = TaskId.readFromStream(in);
373+
childRequestId = in.readLong();
374+
reason = in.readString();
375+
}
376+
377+
@Override
378+
public void writeTo(StreamOutput out) throws IOException {
379+
super.writeTo(out);
380+
parentTaskId.writeTo(out);
381+
out.writeLong(childRequestId);
382+
out.writeString(reason);
383+
}
384+
}
385+
386+
private class CancelChildRequestHandler implements TransportRequestHandler<CancelChildRequest> {
387+
@Override
388+
public void messageReceived(final CancelChildRequest request, final TransportChannel channel, Task task) throws Exception {
389+
taskManager.cancelChildLocal(request.parentTaskId, request.childRequestId, request.reason);
390+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
391+
}
392+
}
393+
394+
/**
395+
* Sends an action to cancel a child task, associated with the given request ID and parent task.
396+
*/
397+
public void cancelChildRemote(TaskId parentTask, long childRequestId, Transport.Connection childConnection, String reason) {
398+
if (childConnection.getTransportVersion().onOrAfter(VERSION_SUPPORTING_CANCEL_CHILD_ACTION)) {
399+
DiscoveryNode childNode = childConnection.getNode();
400+
logger.debug(
401+
"sending cancellation of child of parent task [{}] with request ID [{}] to node [{}] because of [{}]",
402+
parentTask,
403+
childRequestId,
404+
childNode,
405+
reason
406+
);
407+
final CancelChildRequest request = CancelChildRequest.createCancelChildRequest(parentTask, childRequestId, reason);
408+
transportService.sendRequest(
409+
childNode,
410+
CANCEL_CHILD_ACTION_NAME,
411+
request,
412+
TransportRequestOptions.EMPTY,
413+
EmptyTransportResponseHandler.INSTANCE_SAME
414+
);
415+
}
416+
}
417+
344418
}

0 commit comments

Comments
 (0)