60
60
import org .elasticsearch .transport .TransportResponseHandler ;
61
61
import org .elasticsearch .transport .TransportService ;
62
62
import org .junit .After ;
63
- import org .junit .Before ;
64
63
65
64
import java .io .IOException ;
66
65
import java .util .ArrayList ;
83
82
import static org .hamcrest .Matchers .hasSize ;
84
83
import static org .hamcrest .Matchers .instanceOf ;
85
84
85
+ @ ESIntegTestCase .ClusterScope (scope = ESIntegTestCase .Scope .TEST )
86
86
public class CancellableTasksIT extends ESIntegTestCase {
87
87
88
88
static int idGenerator = 0 ;
@@ -91,15 +91,6 @@ public class CancellableTasksIT extends ESIntegTestCase {
91
91
static final Map <TestRequest , CountDownLatch > beforeExecuteLatches = ConcurrentCollections .newConcurrentMap ();
92
92
static final Map <TestRequest , CountDownLatch > completedLatches = ConcurrentCollections .newConcurrentMap ();
93
93
94
- @ Before
95
- public void resetTestStates () {
96
- idGenerator = 0 ;
97
- beforeSendLatches .clear ();
98
- arrivedLatches .clear ();
99
- beforeExecuteLatches .clear ();
100
- completedLatches .clear ();
101
- }
102
-
103
94
@ After
104
95
public void ensureAllBansRemoved () throws Exception {
105
96
assertBusy (() -> {
@@ -150,7 +141,7 @@ static Set<TestRequest> allowPartialRequest(TestRequest request) throws Exceptio
150
141
beforeSendLatches .get (req ).countDown ();
151
142
}
152
143
for (TestRequest req : sentRequests ) {
153
- arrivedLatches .get (req ).await ();
144
+ assertTrue ( arrivedLatches .get (req ).await (60 , TimeUnit . SECONDS ) );
154
145
}
155
146
Set <TestRequest > completedRequests = new HashSet <>();
156
147
for (TestRequest req : randomSubsetOf (sentRequests )) {
@@ -163,7 +154,7 @@ static Set<TestRequest> allowPartialRequest(TestRequest request) throws Exceptio
163
154
beforeExecuteLatches .get (req ).countDown ();
164
155
}
165
156
for (TestRequest req : completedRequests ) {
166
- completedLatches .get (req ).await ();
157
+ assertTrue ( completedLatches .get (req ).await (60 , TimeUnit . SECONDS ) );
167
158
}
168
159
return Sets .difference (sentRequests , completedRequests );
169
160
}
@@ -500,7 +491,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
500
491
GroupedActionListener <TestResponse > groupedListener =
501
492
new GroupedActionListener <>(listener .map (r -> new TestResponse ()), subRequests .size () + 1 );
502
493
transportService .getThreadPool ().generic ().execute (ActionRunnable .supply (groupedListener , () -> {
503
- beforeExecuteLatches .get (request ).await ();
494
+ assertTrue ( beforeExecuteLatches .get (request ).await (60 , TimeUnit . SECONDS ) );
504
495
if (((CancellableTask ) task ).isCancelled ()) {
505
496
throw new TaskCancelledException ("Task was cancelled while executing" );
506
497
}
@@ -524,7 +515,7 @@ public void onFailure(Exception e) {
524
515
525
516
@ Override
526
517
protected void doRun () throws Exception {
527
- beforeSendLatches .get (subRequest ).await ();
518
+ assertTrue ( beforeSendLatches .get (subRequest ).await (60 , TimeUnit . SECONDS ) );
528
519
if (client .getLocalNodeId ().equals (subRequest .node .getId ()) && randomBoolean ()) {
529
520
try {
530
521
client .executeLocally (TransportTestAction .ACTION , subRequest , latchedListener );
0 commit comments