Skip to content

Commit d3a9d7d

Browse files
DaveCTurnerywangd
authored andcommitted
Include reason in cancellation exceptions (elastic#75332)
Today when a task is cancelled we record the reason for the cancellation but this information is very rarely exposed to users. This commit centralises the construction of the `TaskCancellationException` and includes the reason in the exception message. Closes elastic#74825
1 parent 656b7af commit d3a9d7d

File tree

22 files changed

+244
-163
lines changed

22 files changed

+244
-163
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
import java.util.stream.StreamSupport;
6767

6868
import static org.hamcrest.Matchers.anyOf;
69-
import static org.hamcrest.Matchers.containsString;
7069
import static org.hamcrest.Matchers.empty;
7170
import static org.hamcrest.Matchers.equalTo;
7271
import static org.hamcrest.Matchers.hasSize;
@@ -260,7 +259,7 @@ public void testFailedToStartChildTaskAfterCancelled() throws Exception {
260259
beforeSendLatches.get(subRequest).countDown();
261260
mainAction.startSubTask(taskId, subRequest, future);
262261
TaskCancelledException te = expectThrows(TaskCancelledException.class, future::actionGet);
263-
assertThat(te.getMessage(), equalTo("The parent task was cancelled, shouldn't start any child tasks"));
262+
assertThat(te.getMessage(), equalTo("parent task was cancelled [by user request]"));
264263
allowEntireRequest(rootRequest);
265264
waitForRootTask(rootTaskFuture);
266265
ensureAllBansRemoved();
@@ -330,7 +329,7 @@ public void testRemoveBanParentsOnDisconnect() throws Exception {
330329
TaskManager taskManager = internalCluster().getInstance(TransportService.class, node.getName()).getTaskManager();
331330
for (TaskId bannedParent : bannedParents) {
332331
if (bannedParent.getNodeId().equals(node.getId()) && randomBoolean()) {
333-
Collection<Transport.Connection> childConns = taskManager.startBanOnChildTasks(bannedParent.getId(), () -> {});
332+
Collection<Transport.Connection> childConns = taskManager.startBanOnChildTasks(bannedParent.getId(), "", () -> {});
334333
for (Transport.Connection connection : randomSubsetOf(childConns)) {
335334
connection.close();
336335
}
@@ -365,9 +364,9 @@ static void waitForRootTask(ActionFuture<TestResponse> rootTask) {
365364
final Throwable cause = ExceptionsHelper.unwrap(e, TaskCancelledException.class);
366365
assertNotNull(cause);
367366
assertThat(cause.getMessage(), anyOf(
368-
equalTo("The parent task was cancelled, shouldn't start any child tasks"),
369-
containsString("Task cancelled before it started:"),
370-
equalTo("Task was cancelled while executing")));
367+
equalTo("parent task was cancelled [by user request]"),
368+
equalTo("task cancelled before starting [by user request]"),
369+
equalTo("task cancelled [by user request]")));
371370
}
372371
}
373372

@@ -475,9 +474,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
475474
new GroupedActionListener<>(listener.map(r -> new TestResponse()), subRequests.size() + 1);
476475
transportService.getThreadPool().generic().execute(ActionRunnable.supply(groupedListener, () -> {
477476
assertTrue(beforeExecuteLatches.get(request).await(60, TimeUnit.SECONDS));
478-
if (((CancellableTask) task).isCancelled()) {
479-
throw new TaskCancelledException("Task was cancelled while executing");
480-
}
477+
((CancellableTask)task).ensureNotCancelled();
481478
return new TestResponse();
482479
}));
483480
for (TestRequest subRequest : subRequests) {

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.elasticsearch.snapshots.SnapshotsService;
4444
import org.elasticsearch.tasks.CancellableTask;
4545
import org.elasticsearch.tasks.Task;
46-
import org.elasticsearch.tasks.TaskCancelledException;
4746
import org.elasticsearch.threadpool.ThreadPool;
4847
import org.elasticsearch.transport.TransportService;
4948

@@ -282,8 +281,7 @@ private void loadSnapshotInfos(
282281
SortOrder order,
283282
ActionListener<SnapshotsInRepo> listener
284283
) {
285-
if (task.isCancelled()) {
286-
listener.onFailure(new TaskCancelledException("task cancelled"));
284+
if (task.notifyIfCancelled(listener)) {
287285
return;
288286
}
289287

@@ -371,8 +369,7 @@ private void snapshots(
371369
SortOrder order,
372370
ActionListener<SnapshotsInRepo> listener
373371
) {
374-
if (task.isCancelled()) {
375-
listener.onFailure(new TaskCancelledException("task cancelled"));
372+
if (task.notifyIfCancelled(listener)) {
376373
return;
377374
}
378375
final Set<SnapshotInfo> snapshotSet = new HashSet<>();

server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java

+3-10
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.elasticsearch.snapshots.SnapshotsService;
4747
import org.elasticsearch.tasks.CancellableTask;
4848
import org.elasticsearch.tasks.Task;
49-
import org.elasticsearch.tasks.TaskCancelledException;
5049
import org.elasticsearch.threadpool.ThreadPool;
5150
import org.elasticsearch.transport.TransportService;
5251

@@ -293,7 +292,7 @@ private void loadRepositoryData(
293292
repositoriesService.getRepositoryData(repositoryName, repositoryDataListener);
294293
final Collection<SnapshotId> snapshotIdsToLoad = new ArrayList<>();
295294
repositoryDataListener.whenComplete(repositoryData -> {
296-
ensureNotCancelled(task);
295+
task.ensureNotCancelled();
297296
final Map<String, SnapshotId> matchedSnapshotIds = repositoryData.getSnapshotIds()
298297
.stream()
299298
.filter(s -> requestedSnapshotNames.contains(s.getName()))
@@ -398,7 +397,7 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(
398397
final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
399398
for (String index : snapshotInfo.indices()) {
400399
IndexId indexId = repositoryData.resolveIndexId(index);
401-
ensureNotCancelled(task);
400+
task.ensureNotCancelled();
402401
IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId);
403402
if (indexMetadata != null) {
404403
int numberOfShards = indexMetadata.getNumberOfShards();
@@ -419,7 +418,7 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(
419418
// could not be taken due to partial being set to false.
420419
shardSnapshotStatus = IndexShardSnapshotStatus.newFailed("skipped");
421420
} else {
422-
ensureNotCancelled(task);
421+
task.ensureNotCancelled();
423422
shardSnapshotStatus = repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), indexId, shardId);
424423
}
425424
shardStatus.put(shardId, shardSnapshotStatus);
@@ -430,12 +429,6 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(
430429
return unmodifiableMap(shardStatus);
431430
}
432431

433-
private static void ensureNotCancelled(CancellableTask task) {
434-
if (task.isCancelled()) {
435-
throw new TaskCancelledException("task cancelled");
436-
}
437-
}
438-
439432
private static SnapshotShardFailure findShardFailure(List<SnapshotShardFailure> shardFailures, ShardId shardId) {
440433
for (SnapshotShardFailure shardFailure : shardFailures) {
441434
if (shardId.getIndexName().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) {

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.node.NodeClosedException;
3333
import org.elasticsearch.tasks.CancellableTask;
3434
import org.elasticsearch.tasks.Task;
35-
import org.elasticsearch.tasks.TaskCancelledException;
3635
import org.elasticsearch.threadpool.ThreadPool;
3736
import org.elasticsearch.transport.TransportService;
3837

@@ -84,9 +83,11 @@ protected void masterOperation(Task task, final ClusterStateRequest request, fin
8483

8584
@Override
8685
public void onNewClusterState(ClusterState newState) {
87-
if (cancellableTask.isCancelled()) {
88-
listener.onFailure(new TaskCancelledException("task cancelled"));
89-
} else if (acceptableClusterStatePredicate.test(newState)) {
86+
if (cancellableTask.notifyIfCancelled(listener)) {
87+
return;
88+
}
89+
90+
if (acceptableClusterStatePredicate.test(newState)) {
9091
ActionListener.completeWith(listener, () -> buildResponse(request, newState));
9192
} else {
9293
listener.onFailure(new NotMasterException(
@@ -102,9 +103,7 @@ public void onClusterServiceClose() {
102103
@Override
103104
public void onTimeout(TimeValue timeout) {
104105
try {
105-
if (cancellableTask.isCancelled()) {
106-
listener.onFailure(new TaskCancelledException("task cancelled"));
107-
} else {
106+
if (cancellableTask.notifyIfCancelled(listener) == false) {
108107
listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true));
109108
}
110109
} catch (Exception e) {

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.elasticsearch.node.NodeService;
3838
import org.elasticsearch.tasks.CancellableTask;
3939
import org.elasticsearch.tasks.Task;
40-
import org.elasticsearch.tasks.TaskCancelledException;
4140
import org.elasticsearch.tasks.TaskId;
4241
import org.elasticsearch.threadpool.ThreadPool;
4342
import org.elasticsearch.transport.TransportRequest;
@@ -134,9 +133,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
134133
List<ShardStats> shardsStats = new ArrayList<>();
135134
for (IndexService indexService : indicesService) {
136135
for (IndexShard indexShard : indexService) {
137-
if (cancellableTask.isCancelled()) {
138-
throw new TaskCancelledException("task cancelled");
139-
}
136+
cancellableTask.ensureNotCancelled();
140137
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
141138
// only report on fully started shards
142139
CommitStats commitStats;

server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.indices.IndicesService;
3030
import org.elasticsearch.tasks.CancellableTask;
3131
import org.elasticsearch.tasks.Task;
32-
import org.elasticsearch.tasks.TaskCancelledException;
3332
import org.elasticsearch.threadpool.ThreadPool;
3433
import org.elasticsearch.transport.TransportService;
3534

@@ -75,12 +74,7 @@ protected AnalyzeDiskUsageShardResponse shardOperation(AnalyzeDiskUsageShardRequ
7574
final ShardId shardId = request.shardId();
7675
assert task instanceof CancellableTask : "AnalyzeDiskUsageShardRequest must create a cancellable task";
7776
final CancellableTask cancellableTask = (CancellableTask) task;
78-
final Runnable checkForCancellation = () -> {
79-
if (cancellableTask.isCancelled()) {
80-
final String reason = cancellableTask.getReasonCancelled();
81-
throw new TaskCancelledException(reason != null ? reason : "Task was cancelled");
82-
}
83-
};
77+
final Runnable checkForCancellation = cancellableTask::ensureNotCancelled;
8478
final IndexShard shard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
8579
try (Engine.IndexCommitRef commitRef = shard.acquireLastIndexCommit(request.flush)) {
8680
final IndexDiskUsageStats stats = IndexDiskUsageAnalyzer.analyze(shardId, commitRef.getIndexCommit(), checkForCancellation);

server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

+12-9
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,7 @@ protected void onNodeFailure(DiscoveryNode node, int nodeIndex, Throwable t) {
372372
}
373373

374374
protected void onCompletion() {
375-
if (task instanceof CancellableTask && ((CancellableTask)task).isCancelled()) {
376-
listener.onFailure(new TaskCancelledException("task cancelled"));
375+
if (task instanceof CancellableTask && ((CancellableTask)task).notifyIfCancelled(listener)) {
377376
return;
378377
}
379378

@@ -433,13 +432,18 @@ public void onFailure(Exception e) {
433432

434433
private void finishHim(NodeRequest request, TransportChannel channel, Task task,
435434
AtomicArray<Object> shardResultOrExceptions) {
436-
if (task instanceof CancellableTask && ((CancellableTask)task).isCancelled()) {
435+
if (task instanceof CancellableTask) {
437436
try {
438-
channel.sendResponse(new TaskCancelledException("task cancelled"));
439-
} catch (IOException e) {
440-
logger.warn("failed to send response", e);
437+
((CancellableTask) task).ensureNotCancelled();
438+
} catch (TaskCancelledException e) {
439+
try {
440+
channel.sendResponse(e);
441+
} catch (IOException ioException) {
442+
e.addSuppressed(ioException);
443+
logger.warn("failed to send response", e);
444+
}
445+
return;
441446
}
442-
return;
443447
}
444448
List<BroadcastShardOperationFailedException> accumulatedExceptions = new ArrayList<>();
445449
List<ShardOperationResult> results = new ArrayList<>();
@@ -461,8 +465,7 @@ private void finishHim(NodeRequest request, TransportChannel channel, Task task,
461465

462466
private void onShardOperation(final NodeRequest request, final ShardRouting shardRouting, final Task task,
463467
final ActionListener<ShardOperationResult> listener) {
464-
if (task instanceof CancellableTask && ((CancellableTask)task).isCancelled()) {
465-
listener.onFailure(new TaskCancelledException("task cancelled"));
468+
if (task instanceof CancellableTask && ((CancellableTask)task).notifyIfCancelled(listener)) {
466469
return;
467470
}
468471
if (logger.isTraceEnabled()) {

server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

+3-10
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.elasticsearch.common.io.stream.Writeable;
2121
import org.elasticsearch.tasks.CancellableTask;
2222
import org.elasticsearch.tasks.Task;
23-
import org.elasticsearch.tasks.TaskCancelledException;
2423
import org.elasticsearch.threadpool.ThreadPool;
2524
import org.elasticsearch.transport.TransportChannel;
2625
import org.elasticsearch.transport.TransportException;
@@ -254,8 +253,7 @@ private void onFailure(int idx, String nodeId, Throwable t) {
254253
}
255254

256255
private void finishHim() {
257-
if (isCancelled(task)) {
258-
listener.onFailure(new TaskCancelledException("task cancelled"));
256+
if (task instanceof CancellableTask && ((CancellableTask) task).notifyIfCancelled(listener)) {
259257
return;
260258
}
261259

@@ -264,17 +262,12 @@ private void finishHim() {
264262
}
265263
}
266264

267-
private boolean isCancelled(Task task) {
268-
return task instanceof CancellableTask && ((CancellableTask) task).isCancelled();
269-
}
270-
271265
class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {
272266
@Override
273267
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {
274-
if (isCancelled(task)) {
275-
throw new TaskCancelledException("task cancelled");
268+
if (task instanceof CancellableTask) {
269+
((CancellableTask) task).ensureNotCancelled();
276270
}
277-
278271
channel.sendResponse(nodeOperation(request, task));
279272
}
280273
}

server/src/main/java/org/elasticsearch/rest/action/RestCancellableNodeClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void onFailure(Exception e) {
106106
private void cancelTask(TaskId taskId) {
107107
CancelTasksRequest req = new CancelTasksRequest()
108108
.setTaskId(taskId)
109-
.setReason("channel closed");
109+
.setReason("http channel [" + httpChannel + "] closed");
110110
// force the origin to execute the cancellation as a system user
111111
new OriginSettingClient(client, TASKS_ORIGIN).admin().cluster().cancelTasks(req, ActionListener.wrap(() -> {}));
112112
}

server/src/main/java/org/elasticsearch/search/SearchService.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -624,9 +624,11 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
624624

625625
protected void checkCancelled(SearchShardTask task) {
626626
// check cancellation as early as possible, as it avoids opening up a Lucene reader on FrozenEngine
627-
if (task.isCancelled()) {
627+
try {
628+
task.ensureNotCancelled();
629+
} catch (TaskCancelledException e) {
628630
logger.trace("task cancelled [id: {}, action: {}]", task.getId(), task.getAction());
629-
throw new TaskCancelledException("cancelled");
631+
throw e;
630632
}
631633
}
632634

server/src/main/java/org/elasticsearch/search/query/QueryPhase.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@
3535
import org.apache.lucene.search.TotalHits;
3636
import org.apache.lucene.search.Weight;
3737
import org.elasticsearch.action.search.SearchShardTask;
38-
import org.elasticsearch.core.Booleans;
39-
import org.elasticsearch.core.CheckedConsumer;
4038
import org.elasticsearch.common.lucene.Lucene;
4139
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
4240
import org.elasticsearch.common.util.concurrent.EWMATrackingEsThreadPoolExecutor;
4341
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
42+
import org.elasticsearch.core.Booleans;
43+
import org.elasticsearch.core.CheckedConsumer;
4444
import org.elasticsearch.index.IndexSortConfig;
4545
import org.elasticsearch.index.mapper.DateFieldMapper.DateFieldType;
4646
import org.elasticsearch.index.mapper.MappedFieldType;
@@ -59,7 +59,6 @@
5959
import org.elasticsearch.search.sort.FieldSortBuilder;
6060
import org.elasticsearch.search.sort.SortAndFormats;
6161
import org.elasticsearch.search.suggest.SuggestPhase;
62-
import org.elasticsearch.tasks.TaskCancelledException;
6362
import org.elasticsearch.threadpool.ThreadPool;
6463

6564
import java.io.IOException;
@@ -103,8 +102,8 @@ public void preProcess(SearchContext context) {
103102
if (context.lowLevelCancellation()) {
104103
cancellation = context.searcher().addQueryCancellation(() -> {
105104
SearchShardTask task = context.getTask();
106-
if (task != null && task.isCancelled()) {
107-
throw new TaskCancelledException("cancelled");
105+
if (task != null) {
106+
task.ensureNotCancelled();
108107
}
109108
});
110109
} else {
@@ -272,8 +271,8 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
272271
if (searchContext.lowLevelCancellation()) {
273272
searcher.addQueryCancellation(() -> {
274273
SearchShardTask task = searchContext.getTask();
275-
if (task != null && task.isCancelled()) {
276-
throw new TaskCancelledException("cancelled");
274+
if (task != null) {
275+
task.ensureNotCancelled();
277276
}
278277
});
279278
}

0 commit comments

Comments
 (0)