59
59
import org .elasticsearch .transport .TransportException ;
60
60
import org .elasticsearch .transport .TransportResponseHandler ;
61
61
import org .elasticsearch .transport .TransportService ;
62
+ import org .junit .After ;
62
63
import org .junit .Before ;
63
64
64
65
import java .io .IOException ;
@@ -99,6 +100,16 @@ public void resetTestStates() {
99
100
completedLatches .clear ();
100
101
}
101
102
103
+ @ After
104
+ public void ensureAllBansRemoved () throws Exception {
105
+ assertBusy (() -> {
106
+ for (String node : internalCluster ().getNodeNames ()) {
107
+ TaskManager taskManager = internalCluster ().getInstance (TransportService .class , node ).getTaskManager ();
108
+ assertThat ("node " + node , taskManager .getBannedTaskIds (), empty ());
109
+ }
110
+ }, 30 , TimeUnit .SECONDS );
111
+ }
112
+
102
113
static TestRequest generateTestRequest (Set <DiscoveryNode > nodes , int level , int maxLevel ) {
103
114
List <TestRequest > subRequests = new ArrayList <>();
104
115
int lower = level == 0 ? 1 : 0 ;
@@ -165,21 +176,12 @@ static void allowEntireRequest(TestRequest request) {
165
176
}
166
177
}
167
178
168
- void ensureAllBansRemoved () throws Exception {
169
- assertBusy (() -> {
170
- for (String node : internalCluster ().getNodeNames ()) {
171
- TaskManager taskManager = internalCluster ().getInstance (TransportService .class , node ).getTaskManager ();
172
- assertThat ("node " + node , taskManager .getBannedTaskIds (), empty ());
173
- }
174
- }, 30 , TimeUnit .SECONDS );
175
- }
176
-
177
179
public void testBanOnlyNodesWithOutstandingDescendantTasks () throws Exception {
178
180
if (randomBoolean ()) {
179
- internalCluster ().startNodes (randomIntBetween ( 1 , 3 ) );
181
+ internalCluster ().startNodes (1 );
180
182
}
181
183
Set <DiscoveryNode > nodes = StreamSupport .stream (clusterService ().state ().nodes ().spliterator (), false ).collect (Collectors .toSet ());
182
- final TestRequest rootRequest = generateTestRequest (nodes , 0 , between (1 , 4 ));
184
+ final TestRequest rootRequest = generateTestRequest (nodes , 0 , between (1 , 3 ));
183
185
ActionFuture <TestResponse > rootTaskFuture = client ().execute (TransportTestAction .ACTION , rootRequest );
184
186
Set <TestRequest > pendingRequests = allowPartialRequest (rootRequest );
185
187
TaskId rootTaskId = getRootTaskId (rootRequest );
@@ -192,28 +194,31 @@ public void testBanOnlyNodesWithOutstandingDescendantTasks() throws Exception {
192
194
client ().admin ().cluster ().prepareCancelTasks ().setTaskId (subTask .getTaskId ()).waitForCompletion (false ).get ();
193
195
}
194
196
}
195
- assertBusy (() -> {
196
- for (DiscoveryNode node : nodes ) {
197
- TaskManager taskManager = internalCluster ().getInstance (TransportService .class , node .getName ()).getTaskManager ();
198
- Set <TaskId > expectedBans = new HashSet <>();
199
- for (TestRequest req : pendingRequests ) {
200
- if (req .node .equals (node )) {
201
- List <Task > childTasks = taskManager .getTasks ().values ().stream ()
202
- .filter (t -> t .getParentTaskId () != null && t .getDescription ().equals (req .taskDescription ()))
203
- .collect (Collectors .toList ());
204
- assertThat (childTasks , hasSize (1 ));
205
- CancellableTask childTask = (CancellableTask ) childTasks .get (0 );
206
- assertTrue (childTask .isCancelled ());
207
- expectedBans .add (childTask .getParentTaskId ());
197
+ try {
198
+ assertBusy (() -> {
199
+ for (DiscoveryNode node : nodes ) {
200
+ TaskManager taskManager = internalCluster ().getInstance (TransportService .class , node .getName ()).getTaskManager ();
201
+ Set <TaskId > expectedBans = new HashSet <>();
202
+ for (TestRequest req : pendingRequests ) {
203
+ if (req .node .equals (node )) {
204
+ List <Task > childTasks = taskManager .getTasks ().values ().stream ()
205
+ .filter (t -> t .getParentTaskId () != null && t .getDescription ().equals (req .taskDescription ()))
206
+ .collect (Collectors .toList ());
207
+ assertThat (childTasks , hasSize (1 ));
208
+ CancellableTask childTask = (CancellableTask ) childTasks .get (0 );
209
+ assertTrue (childTask .isCancelled ());
210
+ expectedBans .add (childTask .getParentTaskId ());
211
+ }
208
212
}
213
+ assertThat (taskManager .getBannedTaskIds (), equalTo (expectedBans ));
209
214
}
210
- assertThat ( taskManager . getBannedTaskIds (), equalTo ( expectedBans ) );
211
- }
212
- }, 30 , TimeUnit . SECONDS );
213
- allowEntireRequest ( rootRequest );
214
- cancelFuture . actionGet ( );
215
- waitForRootTask ( rootTaskFuture );
216
- ensureAllBansRemoved ();
215
+ }, 30 , TimeUnit . SECONDS );
216
+ } finally {
217
+ allowEntireRequest ( rootRequest );
218
+ cancelFuture . actionGet ( );
219
+ waitForRootTask ( rootTaskFuture );
220
+ ensureAllBansRemoved ( );
221
+ }
217
222
}
218
223
219
224
public void testCancelTaskMultipleTimes () throws Exception {
@@ -310,7 +315,7 @@ public void testCancelOrphanedTasks() throws Exception {
310
315
311
316
public void testRemoveBanParentsOnDisconnect () throws Exception {
312
317
Set <DiscoveryNode > nodes = StreamSupport .stream (clusterService ().state ().nodes ().spliterator (), false ).collect (Collectors .toSet ());
313
- final TestRequest rootRequest = generateTestRequest (nodes , 0 , between (1 , 4 ));
318
+ final TestRequest rootRequest = generateTestRequest (nodes , 0 , between (1 , 3 ));
314
319
client ().execute (TransportTestAction .ACTION , rootRequest );
315
320
Set <TestRequest > pendingRequests = allowPartialRequest (rootRequest );
316
321
TaskId rootTaskId = getRootTaskId (rootRequest );
0 commit comments