Skip to content

Commit 2daa984

Browse files
authored
Add support for Rest XPackUsage task cancellation (#72413)
Backport of #72304
1 parent 88ec149 commit 2daa984

File tree

11 files changed

+287
-81
lines changed

11 files changed

+287
-81
lines changed

qa/smoke-test-http/src/test/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131
import org.elasticsearch.indices.IndicesService;
3232
import org.elasticsearch.plugins.EnginePlugin;
3333
import org.elasticsearch.plugins.Plugin;
34-
import org.elasticsearch.tasks.CancellableTask;
35-
import org.elasticsearch.tasks.TaskInfo;
36-
import org.elasticsearch.transport.TransportService;
3734

3835
import java.util.ArrayList;
3936
import java.util.Collection;
@@ -44,6 +41,9 @@
4441
import java.util.function.Function;
4542

4643
import static java.util.Collections.singletonList;
44+
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
45+
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
46+
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
4747
import static org.hamcrest.Matchers.empty;
4848
import static org.hamcrest.Matchers.not;
4949

@@ -115,30 +115,12 @@ public void onFailure(Exception exception) {
115115
cancellable.cancel();
116116
expectThrows(CancellationException.class, future::actionGet);
117117

118-
logger.info("--> checking that all tasks are marked as cancelled");
119-
assertBusy(() -> {
120-
boolean foundTask = false;
121-
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
122-
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
123-
if (cancellableTask.getAction().startsWith(actionPrefix)) {
124-
foundTask = true;
125-
assertTrue(
126-
"task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
127-
cancellableTask.isCancelled());
128-
}
129-
}
130-
}
131-
assertTrue("found no cancellable tasks", foundTask);
132-
});
118+
assertAllCancellableTasksAreCancelled(actionPrefix);
133119
} finally {
134120
Releasables.close(releasables);
135121
}
136122

137-
logger.info("--> checking that all tasks have finished");
138-
assertBusy(() -> {
139-
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
140-
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(actionPrefix)));
141-
});
123+
assertAllTasksHaveFinished(actionPrefix);
142124
}
143125

144126
public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin {

qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStateRestCancellationIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.concurrent.CancellationException;
3434
import java.util.function.UnaryOperator;
3535

36+
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
37+
3638
public class ClusterStateRestCancellationIT extends HttpSmokeTestCase {
3739

3840
@Override

qa/smoke-test-http/src/test/java/org/elasticsearch/http/ClusterStatsRestCancellationIT.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@
3434
import org.elasticsearch.indices.IndicesService;
3535
import org.elasticsearch.plugins.EnginePlugin;
3636
import org.elasticsearch.plugins.Plugin;
37-
import org.elasticsearch.tasks.CancellableTask;
38-
import org.elasticsearch.tasks.TaskInfo;
39-
import org.elasticsearch.transport.TransportService;
4037

4138
import java.util.ArrayList;
4239
import java.util.Collection;
@@ -47,6 +44,9 @@
4744
import java.util.function.Function;
4845

4946
import static java.util.Collections.singletonList;
47+
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
48+
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
49+
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
5050
import static org.hamcrest.Matchers.empty;
5151
import static org.hamcrest.Matchers.not;
5252

@@ -123,28 +123,12 @@ public void onFailure(Exception exception) {
123123
cancellable.cancel();
124124
expectThrows(CancellationException.class, future::actionGet);
125125

126-
logger.info("--> checking that all cluster stats tasks are marked as cancelled");
127-
assertBusy(() -> {
128-
boolean foundTask = false;
129-
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
130-
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
131-
if (cancellableTask.getAction().startsWith(ClusterStatsAction.NAME)) {
132-
foundTask = true;
133-
assertTrue(cancellableTask.isCancelled());
134-
}
135-
}
136-
}
137-
assertTrue(foundTask);
138-
});
126+
assertAllCancellableTasksAreCancelled(ClusterStatsAction.NAME);
139127
} finally {
140128
Releasables.close(releasables);
141129
}
142130

143-
logger.info("--> checking that all cluster stats tasks have finished");
144-
assertBusy(() -> {
145-
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
146-
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(ClusterStatsAction.NAME)));
147-
});
131+
assertAllTasksHaveFinished(ClusterStatsAction.NAME);
148132
}
149133

150134
public static class StatsBlockingPlugin extends Plugin implements EnginePlugin {

qa/smoke-test-http/src/test/java/org/elasticsearch/http/HttpSmokeTestCase.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.plugins.Plugin;
1313
import org.elasticsearch.test.ESIntegTestCase;
1414
import org.elasticsearch.transport.Netty4Plugin;
15-
import org.elasticsearch.transport.TransportService;
1615
import org.elasticsearch.transport.nio.MockNioTransportPlugin;
1716
import org.elasticsearch.transport.nio.NioTransportPlugin;
1817
import org.junit.BeforeClass;
@@ -89,16 +88,4 @@ protected Settings transportClientSettings() {
8988
protected boolean ignoreExternalCluster() {
9089
return true;
9190
}
92-
93-
protected void awaitTaskWithPrefix(String actionPrefix) throws Exception {
94-
logger.info("--> waiting for task with prefix [{}] to start", actionPrefix);
95-
assertBusy(() -> {
96-
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
97-
if (transportService.getTaskManager().getTasks().values().stream().anyMatch(t -> t.getAction().startsWith(actionPrefix))) {
98-
return;
99-
}
100-
}
101-
fail("no task with prefix [" + actionPrefix + "] found");
102-
});
103-
}
10491
}

qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesRecoveryRestCancellationIT.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919
import org.elasticsearch.client.ResponseListener;
2020
import org.elasticsearch.common.lease.Releasable;
2121
import org.elasticsearch.common.lease.Releasables;
22-
import org.elasticsearch.tasks.CancellableTask;
23-
import org.elasticsearch.tasks.TaskInfo;
24-
import org.elasticsearch.transport.TransportService;
2522

2623
import java.util.ArrayList;
2724
import java.util.List;
2825
import java.util.concurrent.CancellationException;
2926
import java.util.concurrent.Semaphore;
3027

28+
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
29+
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
30+
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefix;
3131
import static org.hamcrest.Matchers.empty;
3232
import static org.hamcrest.Matchers.not;
3333

@@ -91,28 +91,12 @@ public void onFailure(Exception exception) {
9191
cancellable.cancel();
9292
expectThrows(CancellationException.class, future::actionGet);
9393

94-
logger.info("--> checking that all tasks are marked as cancelled");
95-
assertBusy(() -> {
96-
boolean foundTask = false;
97-
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
98-
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
99-
if (cancellableTask.getAction().startsWith(RecoveryAction.NAME)) {
100-
foundTask = true;
101-
assertTrue("task " + cancellableTask.getId() + " not cancelled", cancellableTask.isCancelled());
102-
}
103-
}
104-
}
105-
assertTrue("found no cancellable tasks", foundTask);
106-
});
94+
assertAllCancellableTasksAreCancelled(RecoveryAction.NAME);
10795
} finally {
10896
Releasables.close(releasables);
10997
}
11098

111-
logger.info("--> checking that all tasks have finished");
112-
assertBusy(() -> {
113-
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
114-
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(RecoveryAction.NAME)));
115-
});
99+
assertAllTasksHaveFinished(RecoveryAction.NAME);
116100
}
117101

118102
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.test;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.tasks.CancellableTask;
14+
import org.elasticsearch.tasks.TaskInfo;
15+
import org.elasticsearch.transport.TransportService;
16+
17+
import java.util.List;
18+
19+
import static junit.framework.TestCase.assertTrue;
20+
import static junit.framework.TestCase.fail;
21+
import static org.elasticsearch.test.ESIntegTestCase.client;
22+
import static org.elasticsearch.test.ESIntegTestCase.internalCluster;
23+
import static org.elasticsearch.test.ESTestCase.assertBusy;
24+
25+
public class TaskAssertions {
26+
private static final Logger logger = LogManager.getLogger(TaskAssertions.class);
27+
28+
private TaskAssertions() { }
29+
30+
public static void awaitTaskWithPrefix(String actionPrefix) throws Exception {
31+
logger.info("--> waiting for task with prefix [{}] to start", actionPrefix);
32+
33+
assertBusy(() -> {
34+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
35+
if (transportService.getTaskManager().getTasks().values().stream().anyMatch(t -> t.getAction().startsWith(actionPrefix))) {
36+
return;
37+
}
38+
}
39+
fail("no task with prefix [" + actionPrefix + "] found");
40+
});
41+
}
42+
43+
public static void assertAllCancellableTasksAreCancelled(String actionPrefix) throws Exception {
44+
logger.info("--> checking that all tasks with prefix {} are marked as cancelled", actionPrefix);
45+
46+
assertBusy(() -> {
47+
boolean foundTask = false;
48+
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
49+
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
50+
if (cancellableTask.getAction().startsWith(actionPrefix)) {
51+
foundTask = true;
52+
assertTrue(
53+
"task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
54+
cancellableTask.isCancelled());
55+
}
56+
}
57+
}
58+
assertTrue("found no cancellable tasks", foundTask);
59+
});
60+
}
61+
62+
public static void assertAllTasksHaveFinished(String actionPrefix) throws Exception {
63+
logger.info("--> checking that all tasks with prefix {} have finished", actionPrefix);
64+
assertBusy(() -> {
65+
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
66+
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(actionPrefix)));
67+
});
68+
}
69+
}

0 commit comments

Comments
 (0)