58
58
import org .elasticsearch .transport .TransportException ;
59
59
import org .elasticsearch .transport .TransportResponseHandler ;
60
60
import org .elasticsearch .transport .TransportService ;
61
- import org .junit .Before ;
61
+ import org .junit .After ;
62
62
63
63
import java .io .IOException ;
64
64
import java .util .ArrayList ;
81
81
import static org .hamcrest .Matchers .hasSize ;
82
82
import static org .hamcrest .Matchers .instanceOf ;
83
83
84
+ @ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST )
84
85
public class CancellableTasksIT extends ESIntegTestCase {
85
86
86
87
static int idGenerator = 0 ;
@@ -89,13 +90,14 @@ public class CancellableTasksIT extends ESIntegTestCase {
89
90
static final Map <TestRequest , CountDownLatch > beforeExecuteLatches = ConcurrentCollections .newConcurrentMap ();
90
91
static final Map <TestRequest , CountDownLatch > completedLatches = ConcurrentCollections .newConcurrentMap ();
91
92
92
- @ Before
93
- public void resetTestStates () {
94
- idGenerator = 0 ;
95
- beforeSendLatches .clear ();
96
- arrivedLatches .clear ();
97
- beforeExecuteLatches .clear ();
98
- completedLatches .clear ();
93
+ @ After
94
+ public void ensureAllBansRemoved () throws Exception {
95
+ assertBusy (() -> {
96
+ for (String node : internalCluster ().getNodeNames ()) {
97
+ TaskManager taskManager = internalCluster ().getInstance (TransportService .class , node ).getTaskManager ();
98
+ assertThat ("node " + node , taskManager .getBannedTaskIds (), empty ());
99
+ }
100
+ }, 30 , TimeUnit .SECONDS );
99
101
}
100
102
101
103
static TestRequest generateTestRequest (Set <DiscoveryNode > nodes , int level , int maxLevel ) {
@@ -138,7 +140,7 @@ static Set<TestRequest> allowPartialRequest(TestRequest request) throws Exceptio
138
140
beforeSendLatches .get (req ).countDown ();
139
141
}
140
142
for (TestRequest req : sentRequests ) {
141
- arrivedLatches .get (req ).await ();
143
+ assertTrue ( arrivedLatches .get (req ).await (60 , TimeUnit . SECONDS ) );
142
144
}
143
145
Set <TestRequest > completedRequests = new HashSet <>();
144
146
for (TestRequest req : randomSubsetOf (sentRequests )) {
@@ -151,7 +153,7 @@ static Set<TestRequest> allowPartialRequest(TestRequest request) throws Exceptio
151
153
beforeExecuteLatches .get (req ).countDown ();
152
154
}
153
155
for (TestRequest req : completedRequests ) {
154
- completedLatches .get (req ).await ();
156
+ assertTrue ( completedLatches .get (req ).await (60 , TimeUnit . SECONDS ) );
155
157
}
156
158
return Sets .difference (sentRequests , completedRequests );
157
159
}
@@ -164,21 +166,12 @@ static void allowEntireRequest(TestRequest request) {
164
166
}
165
167
}
166
168
167
- void ensureAllBansRemoved () throws Exception {
168
- assertBusy (() -> {
169
- for (String node : internalCluster ().getNodeNames ()) {
170
- TaskManager taskManager = internalCluster ().getInstance (TransportService .class , node ).getTaskManager ();
171
- assertThat ("node " + node , taskManager .getBannedTaskIds (), empty ());
172
- }
173
- }, 30 , TimeUnit .SECONDS );
174
- }
175
-
176
169
public void testBanOnlyNodesWithOutstandingDescendantTasks () throws Exception {
177
170
if (randomBoolean ()) {
178
- internalCluster ().startNodes (randomIntBetween ( 1 , 3 ) );
171
+ internalCluster ().startNodes (1 );
179
172
}
180
173
Set <DiscoveryNode > nodes = StreamSupport .stream (clusterService ().state ().nodes ().spliterator (), false ).collect (Collectors .toSet ());
181
- final TestRequest rootRequest = generateTestRequest (nodes , 0 , between (1 , 4 ));
174
+ final TestRequest rootRequest = generateTestRequest (nodes , 0 , between (1 , 3 ));
182
175
ActionFuture <TestResponse > rootTaskFuture = client ().execute (TransportTestAction .ACTION , rootRequest );
183
176
Set <TestRequest > pendingRequests = allowPartialRequest (rootRequest );
184
177
TaskId rootTaskId = getRootTaskId (rootRequest );
@@ -191,28 +184,31 @@ public void testBanOnlyNodesWithOutstandingDescendantTasks() throws Exception {
191
184
client ().admin ().cluster ().prepareCancelTasks ().setTaskId (subTask .getTaskId ()).waitForCompletion (false ).get ();
192
185
}
193
186
}
194
- assertBusy (() -> {
195
- for (DiscoveryNode node : nodes ) {
196
- TaskManager taskManager = internalCluster ().getInstance (TransportService .class , node .getName ()).getTaskManager ();
197
- Set <TaskId > expectedBans = new HashSet <>();
198
- for (TestRequest req : pendingRequests ) {
199
- if (req .node .equals (node )) {
200
- List <Task > childTasks = taskManager .getTasks ().values ().stream ()
201
- .filter (t -> t .getParentTaskId () != null && t .getDescription ().equals (req .taskDescription ()))
202
- .collect (Collectors .toList ());
203
- assertThat (childTasks , hasSize (1 ));
204
- CancellableTask childTask = (CancellableTask ) childTasks .get (0 );
205
- assertTrue (childTask .isCancelled ());
206
- expectedBans .add (childTask .getParentTaskId ());
187
+ try {
188
+ assertBusy (() -> {
189
+ for (DiscoveryNode node : nodes ) {
190
+ TaskManager taskManager = internalCluster ().getInstance (TransportService .class , node .getName ()).getTaskManager ();
191
+ Set <TaskId > expectedBans = new HashSet <>();
192
+ for (TestRequest req : pendingRequests ) {
193
+ if (req .node .equals (node )) {
194
+ List <Task > childTasks = taskManager .getTasks ().values ().stream ()
195
+ .filter (t -> t .getParentTaskId () != null && t .getDescription ().equals (req .taskDescription ()))
196
+ .collect (Collectors .toList ());
197
+ assertThat (childTasks , hasSize (1 ));
198
+ CancellableTask childTask = (CancellableTask ) childTasks .get (0 );
199
+ assertTrue (childTask .isCancelled ());
200
+ expectedBans .add (childTask .getParentTaskId ());
201
+ }
207
202
}
203
+ assertThat (taskManager .getBannedTaskIds (), equalTo (expectedBans ));
208
204
}
209
- assertThat ( taskManager . getBannedTaskIds (), equalTo ( expectedBans ) );
210
- }
211
- }, 30 , TimeUnit . SECONDS );
212
- allowEntireRequest ( rootRequest );
213
- cancelFuture . actionGet ( );
214
- waitForRootTask ( rootTaskFuture );
215
- ensureAllBansRemoved ();
205
+ }, 30 , TimeUnit . SECONDS );
206
+ } finally {
207
+ allowEntireRequest ( rootRequest );
208
+ cancelFuture . actionGet ( );
209
+ waitForRootTask ( rootTaskFuture );
210
+ ensureAllBansRemoved ( );
211
+ }
216
212
}
217
213
218
214
public void testCancelTaskMultipleTimes () throws Exception {
@@ -326,6 +322,7 @@ static void waitForRootTask(ActionFuture<TestResponse> rootTask) {
326
322
rootTask .actionGet ();
327
323
} catch (Exception e ) {
328
324
final Throwable cause = ExceptionsHelper .unwrap (e , TaskCancelledException .class );
325
+ assertNotNull (cause );
329
326
assertThat (cause .getMessage (), anyOf (
330
327
equalTo ("The parent task was cancelled, shouldn't start any child tasks" ),
331
328
containsString ("Task cancelled before it started:" ),
@@ -441,7 +438,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
441
438
GroupedActionListener <TestResponse > groupedListener =
442
439
new GroupedActionListener <>(listener .map (r -> new TestResponse ()), subRequests .size () + 1 );
443
440
transportService .getThreadPool ().generic ().execute (ActionRunnable .supply (groupedListener , () -> {
444
- beforeExecuteLatches .get (request ).await ();
441
+ assertTrue ( beforeExecuteLatches .get (request ).await (60 , TimeUnit . SECONDS ) );
445
442
if (((CancellableTask ) task ).isCancelled ()) {
446
443
throw new TaskCancelledException ("Task was cancelled while executing" );
447
444
}
@@ -465,7 +462,7 @@ public void onFailure(Exception e) {
465
462
466
463
@ Override
467
464
protected void doRun () throws Exception {
468
- beforeSendLatches .get (subRequest ).await ();
465
+ assertTrue ( beforeSendLatches .get (subRequest ).await (60 , TimeUnit . SECONDS ) );
469
466
if (client .getLocalNodeId ().equals (subRequest .node .getId ()) && randomBoolean ()) {
470
467
try {
471
468
client .executeLocally (TransportTestAction .ACTION , subRequest , latchedListener );
0 commit comments