Skip to content

Commit b2956b3

Browse files
authored
Identify cancelled tasks in list tasks API (#72931)
This commit adds a `cancelled` flag to each cancellable task in the response to the list tasks API, allowing users to see that a task has been properly cancelled and will complete as soon as possible. Closes #72907
1 parent eabe2d1 commit b2956b3

File tree

16 files changed

+365
-52
lines changed

16 files changed

+365
-52
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/tasks/TaskInfo.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class TaskInfo {
2929
private long startTime;
3030
private long runningTimeNanos;
3131
private boolean cancellable;
32+
private boolean cancelled;
3233
private TaskId parentTaskId;
3334
private final Map<String, Object> status = new HashMap<>();
3435
private final Map<String, String> headers = new HashMap<>();
@@ -93,6 +94,14 @@ void setCancellable(boolean cancellable) {
9394
this.cancellable = cancellable;
9495
}
9596

97+
public boolean isCancelled() {
98+
return cancelled;
99+
}
100+
101+
void setCancelled(boolean cancelled) {
102+
this.cancelled = cancelled;
103+
}
104+
96105
public TaskId getParentTaskId() {
97106
return parentTaskId;
98107
}
@@ -134,6 +143,7 @@ private void noOpParse(Object s) {}
134143
parser.declareLong(TaskInfo::setStartTime, new ParseField("start_time_in_millis"));
135144
parser.declareLong(TaskInfo::setRunningTimeNanos, new ParseField("running_time_in_nanos"));
136145
parser.declareBoolean(TaskInfo::setCancellable, new ParseField("cancellable"));
146+
parser.declareBoolean(TaskInfo::setCancelled, new ParseField("cancelled"));
137147
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
138148
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
139149
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
@@ -147,6 +157,7 @@ public boolean equals(Object o) {
147157
return getStartTime() == taskInfo.getStartTime() &&
148158
getRunningTimeNanos() == taskInfo.getRunningTimeNanos() &&
149159
isCancellable() == taskInfo.isCancellable() &&
160+
isCancelled() == taskInfo.isCancelled() &&
150161
Objects.equals(getTaskId(), taskInfo.getTaskId()) &&
151162
Objects.equals(getType(), taskInfo.getType()) &&
152163
Objects.equals(getAction(), taskInfo.getAction()) &&
@@ -159,8 +170,17 @@ public boolean equals(Object o) {
159170
@Override
160171
public int hashCode() {
161172
return Objects.hash(
162-
getTaskId(), getType(), getAction(), getDescription(), getStartTime(),
163-
getRunningTimeNanos(), isCancellable(), getParentTaskId(), status, getHeaders()
173+
getTaskId(),
174+
getType(),
175+
getAction(),
176+
getDescription(),
177+
getStartTime(),
178+
getRunningTimeNanos(),
179+
isCancellable(),
180+
isCancelled(),
181+
getParentTaskId(),
182+
status,
183+
getHeaders()
164184
);
165185
}
166186

@@ -175,6 +195,7 @@ public String toString() {
175195
", startTime=" + startTime +
176196
", runningTimeNanos=" + runningTimeNanos +
177197
", cancellable=" + cancellable +
198+
", cancelled=" + cancelled +
178199
", parentTaskId=" + parentTaskId +
179200
", status=" + status +
180201
", headers=" + headers +

client/rest-high-level/src/test/java/org/elasticsearch/client/core/tasks/GetTaskResponseTests.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,23 @@ static TaskInfo randomTaskInfo() {
6969
long startTime = randomLong();
7070
long runningTimeNanos = randomLong();
7171
boolean cancellable = randomBoolean();
72+
boolean cancelled = cancellable && randomBoolean();
7273
TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
7374
Map<String, String> headers = randomBoolean() ?
7475
Collections.emptyMap() :
7576
Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
76-
return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
77+
return new TaskInfo(
78+
taskId,
79+
type,
80+
action,
81+
description,
82+
status,
83+
startTime,
84+
runningTimeNanos,
85+
cancellable,
86+
cancelled,
87+
parentTaskId,
88+
headers);
7789
}
7890

7991
private static TaskId randomTaskId() {

client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/StatsResponseTests.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,22 @@ private static TaskInfo randomTaskInfo() {
7878
long startTime = randomLong();
7979
long runningTimeNanos = randomNonNegativeLong();
8080
boolean cancellable = randomBoolean();
81+
boolean cancelled = cancellable && randomBoolean();
8182
TaskId parentTaskId = TaskId.EMPTY_TASK_ID;
8283
Map<String, String> headers = randomBoolean() ?
8384
Collections.emptyMap() :
8485
Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
85-
return new TaskInfo(taskId, type, action, description, null, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
86+
return new TaskInfo(
87+
taskId,
88+
type,
89+
action,
90+
description,
91+
null,
92+
startTime,
93+
runningTimeNanos,
94+
cancellable,
95+
cancelled,
96+
parentTaskId,
97+
headers);
8698
}
8799
}

client/rest-high-level/src/test/java/org/elasticsearch/client/tasks/CancelTasksResponseTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
5757
}
5858

5959
for (int i = 0; i < 4; i++) {
60+
boolean isCancellable = randomBoolean();
6061
tasks.add(new org.elasticsearch.tasks.TaskInfo(
6162
new TaskId(NODE_ID, (long) i),
6263
randomAlphaOfLength(4),
@@ -65,7 +66,8 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
6566
new FakeTaskStatus(randomAlphaOfLength(4), randomInt()),
6667
randomLongBetween(1, 3),
6768
randomIntBetween(5, 10),
68-
false,
69+
isCancellable,
70+
isCancellable && randomBoolean(),
6971
new TaskId("node1", randomLong()),
7072
Map.of("x-header-of", "some-value")));
7173
}
@@ -99,6 +101,7 @@ protected void assertInstances(ByNodeCancelTasksResponse serverTestInstance,
99101
assertEquals(ti.getStartTime(), taskInfo.getStartTime());
100102
assertEquals(ti.getRunningTimeNanos(), taskInfo.getRunningTimeNanos());
101103
assertEquals(ti.isCancellable(), taskInfo.isCancellable());
104+
assertEquals(ti.isCancelled(), taskInfo.isCancelled());
102105
assertEquals(ti.getParentTaskId().getNodeId(), taskInfo.getParentTaskId().getNodeId());
103106
assertEquals(ti.getParentTaskId().getId(), taskInfo.getParentTaskId().getId());
104107
FakeTaskStatus status = (FakeTaskStatus) ti.getStatus();

docs/reference/cluster/tasks.asciidoc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,8 @@ The API returns the following result:
171171
"description" : "indices[test], types[test], search_type[QUERY_THEN_FETCH], source[{\"query\":...}]",
172172
"start_time_in_millis" : 1483478610008,
173173
"running_time_in_nanos" : 13991383,
174-
"cancellable" : true
174+
"cancellable" : true,
175+
"cancelled" : false
175176
}
176177
}
177178
}
@@ -243,6 +244,13 @@ nodes `nodeId1` and `nodeId2`.
243244
POST _tasks/_cancel?nodes=nodeId1,nodeId2&actions=*reindex
244245
--------------------------------------------------
245246

247+
A task may continue to run for some time after it has been cancelled because it
248+
may not be able to safely stop its current activity straight away. The list
249+
tasks API will continue to list these cancelled tasks until they complete. The
250+
`cancelled` flag in the response to the list tasks API indicates that the
251+
cancellation command has been processed and the task will stop as soon as
252+
possible.
253+
246254
===== Task Grouping
247255

248256
The task lists returned by task API commands can be grouped either by nodes

modules/reindex/src/test/java/org/elasticsearch/index/reindex/TransportRethrottleActionTests.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,18 @@ public void testRethrottleSuccessfulResponse() {
9191
List<BulkByScrollTask.StatusOrException> sliceStatuses = new ArrayList<>(slices);
9292
for (int i = 0; i < slices; i++) {
9393
BulkByScrollTask.Status status = believeableInProgressStatus(i);
94-
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
95-
Collections.emptyMap()));
94+
tasks.add(new TaskInfo(
95+
new TaskId("test", 123),
96+
"test",
97+
"test",
98+
"test",
99+
status,
100+
0,
101+
0,
102+
true,
103+
false,
104+
new TaskId("test", task.getId()),
105+
Collections.emptyMap()));
96106
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
97107
}
98108
rethrottleTestCase(slices,
@@ -112,8 +122,18 @@ public void testRethrottleWithSomeSucceeded() {
112122
List<TaskInfo> tasks = new ArrayList<>();
113123
for (int i = succeeded; i < slices; i++) {
114124
BulkByScrollTask.Status status = believeableInProgressStatus(i);
115-
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
116-
Collections.emptyMap()));
125+
tasks.add(new TaskInfo(
126+
new TaskId("test", 123),
127+
"test",
128+
"test",
129+
"test",
130+
status,
131+
0,
132+
0,
133+
true,
134+
false,
135+
new TaskId("test", task.getId()),
136+
Collections.emptyMap()));
117137
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
118138
}
119139
rethrottleTestCase(slices - succeeded,

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -819,8 +819,19 @@ public void testNodeNotFoundButTaskFound() throws Exception {
819819
CyclicBarrier b = new CyclicBarrier(2);
820820
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
821821
resultsService.storeResult(new TaskResult(
822-
new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()),
823-
new RuntimeException("test")),
822+
new TaskInfo(
823+
new TaskId("fake", 1),
824+
"test",
825+
"test",
826+
"",
827+
null,
828+
0,
829+
0,
830+
false,
831+
false,
832+
TaskId.EMPTY_TASK_ID,
833+
Collections.emptyMap()),
834+
new RuntimeException("test")),
824835
new ActionListener<Void>() {
825836
@Override
826837
public void onResponse(Void response) {

server/src/main/java/org/elasticsearch/tasks/Task.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,18 @@ public final TaskInfo taskInfo(String localNodeId, boolean detailed) {
9090
* Build a proper {@link TaskInfo} for this task.
9191
*/
9292
protected final TaskInfo taskInfo(String localNodeId, String description, Status status) {
93-
return new TaskInfo(new TaskId(localNodeId, getId()), getType(), getAction(), description, status, startTime,
94-
System.nanoTime() - startTimeNanos, this instanceof CancellableTask, parentTask, headers);
93+
return new TaskInfo(
94+
new TaskId(localNodeId, getId()),
95+
getType(),
96+
getAction(),
97+
description,
98+
status,
99+
startTime,
100+
System.nanoTime() - startTimeNanos,
101+
this instanceof CancellableTask,
102+
this instanceof CancellableTask && ((CancellableTask)this).isCancelled(),
103+
parentTask,
104+
headers);
95105
}
96106

97107
/**

0 commit comments

Comments
 (0)