@@ -143,14 +143,14 @@ public void testTaskCounts() {
143
143
}
144
144
145
145
public void testMasterNodeOperationTasks () {
146
- registerTaskManageListeners (ClusterHealthAction .NAME );
146
+ registerTaskManagerListeners (ClusterHealthAction .NAME );
147
147
148
148
// First run the health on the master node - should produce only one task on the master node
149
149
internalCluster ().masterClient ().admin ().cluster ().prepareHealth ().get ();
150
150
assertEquals (1 , numberOfEvents (ClusterHealthAction .NAME , Tuple ::v1 )); // counting only registration events
151
151
assertEquals (1 , numberOfEvents (ClusterHealthAction .NAME , event -> event .v1 () == false )); // counting only unregistration events
152
152
153
- resetTaskManageListeners (ClusterHealthAction .NAME );
153
+ resetTaskManagerListeners (ClusterHealthAction .NAME );
154
154
155
155
// Now run the health on a non-master node - should produce one task on master and one task on another node
156
156
internalCluster ().nonMasterClient ().admin ().cluster ().prepareHealth ().get ();
@@ -167,8 +167,8 @@ public void testMasterNodeOperationTasks() {
167
167
}
168
168
169
169
public void testTransportReplicationAllShardsTasks () {
170
- registerTaskManageListeners (ValidateQueryAction .NAME ); // main task
171
- registerTaskManageListeners (ValidateQueryAction .NAME + "[s]" ); // shard
170
+ registerTaskManagerListeners (ValidateQueryAction .NAME ); // main task
171
+ registerTaskManagerListeners (ValidateQueryAction .NAME + "[s]" ); // shard
172
172
// level
173
173
// tasks
174
174
createIndex ("test" );
@@ -186,8 +186,8 @@ public void testTransportReplicationAllShardsTasks() {
186
186
}
187
187
188
188
public void testTransportBroadcastByNodeTasks () {
189
- registerTaskManageListeners (UpgradeAction .NAME ); // main task
190
- registerTaskManageListeners (UpgradeAction .NAME + "[n]" ); // node level tasks
189
+ registerTaskManagerListeners (UpgradeAction .NAME ); // main task
190
+ registerTaskManagerListeners (UpgradeAction .NAME + "[n]" ); // node level tasks
191
191
createIndex ("test" );
192
192
ensureGreen ("test" ); // Make sure all shards are allocated
193
193
client ().admin ().indices ().prepareUpgrade ("test" ).get ();
@@ -202,8 +202,8 @@ public void testTransportBroadcastByNodeTasks() {
202
202
}
203
203
204
204
public void testTransportReplicationSingleShardTasks () {
205
- registerTaskManageListeners (ValidateQueryAction .NAME ); // main task
206
- registerTaskManageListeners (ValidateQueryAction .NAME + "[s]" ); // shard level tasks
205
+ registerTaskManagerListeners (ValidateQueryAction .NAME ); // main task
206
+ registerTaskManagerListeners (ValidateQueryAction .NAME + "[s]" ); // shard level tasks
207
207
createIndex ("test" );
208
208
ensureGreen ("test" ); // Make sure all shards are allocated
209
209
client ().admin ().indices ().prepareValidateQuery ("test" ).get ();
@@ -218,9 +218,9 @@ public void testTransportReplicationSingleShardTasks() {
218
218
219
219
220
220
public void testTransportBroadcastReplicationTasks () {
221
- registerTaskManageListeners (RefreshAction .NAME ); // main task
222
- registerTaskManageListeners (RefreshAction .NAME + "[s]" ); // shard level tasks
223
- registerTaskManageListeners (RefreshAction .NAME + "[s][*]" ); // primary and replica shard tasks
221
+ registerTaskManagerListeners (RefreshAction .NAME ); // main task
222
+ registerTaskManagerListeners (RefreshAction .NAME + "[s]" ); // shard level tasks
223
+ registerTaskManagerListeners (RefreshAction .NAME + "[s][*]" ); // primary and replica shard tasks
224
224
createIndex ("test" );
225
225
ensureGreen ("test" ); // Make sure all shards are allocated
226
226
client ().admin ().indices ().prepareRefresh ("test" ).get ();
@@ -292,10 +292,10 @@ public void testTransportBroadcastReplicationTasks() {
292
292
}
293
293
294
294
public void testTransportBulkTasks () {
295
- registerTaskManageListeners (BulkAction .NAME ); // main task
296
- registerTaskManageListeners (BulkAction .NAME + "[s]" ); // shard task
297
- registerTaskManageListeners (BulkAction .NAME + "[s][p]" ); // shard task on primary
298
- registerTaskManageListeners (BulkAction .NAME + "[s][r]" ); // shard task on replica
295
+ registerTaskManagerListeners (BulkAction .NAME ); // main task
296
+ registerTaskManagerListeners (BulkAction .NAME + "[s]" ); // shard task
297
+ registerTaskManagerListeners (BulkAction .NAME + "[s][p]" ); // shard task on primary
298
+ registerTaskManagerListeners (BulkAction .NAME + "[s][r]" ); // shard task on replica
299
299
createIndex ("test" );
300
300
ensureGreen ("test" ); // Make sure all shards are allocated to catch replication tasks
301
301
// ensures the mapping is available on all nodes so we won't retry the request (in case replicas don't have the right mapping).
@@ -345,10 +345,9 @@ public void testTransportBulkTasks() {
345
345
assertParentTask (findEvents (BulkAction .NAME + "[s][r]" , Tuple ::v1 ), shardTask );
346
346
}
347
347
348
-
349
348
public void testSearchTaskDescriptions () {
350
- registerTaskManageListeners (SearchAction .NAME ); // main task
351
- registerTaskManageListeners (SearchAction .NAME + "[*]" ); // shard task
349
+ registerTaskManagerListeners (SearchAction .NAME ); // main task
350
+ registerTaskManagerListeners (SearchAction .NAME + "[*]" ); // shard task
352
351
createIndex ("test" );
353
352
ensureGreen ("test" ); // Make sure all shards are allocated to catch replication tasks
354
353
client ().prepareIndex ("test" , "doc" , "test_id" ).setSource ("{\" foo\" : \" bar\" }" , XContentType .JSON )
@@ -494,8 +493,9 @@ public void waitForTaskCompletion(Task task) {
494
493
public void testTasksCancellation () throws Exception {
495
494
// Start blocking test task
496
495
// Get real client (the plugin is not registered on transport nodes)
497
- ActionFuture <TestTaskPlugin .NodesResponse > future = new TestTaskPlugin .NodesRequestBuilder (client (),
498
- TestTaskPlugin .TestTaskAction .INSTANCE ).execute ();
496
+ TestTaskPlugin .NodesRequest request = new TestTaskPlugin .NodesRequest ("test" );
497
+ ActionFuture <TestTaskPlugin .NodesResponse > future = client ().execute (TestTaskPlugin .TestTaskAction .INSTANCE , request );
498
+
499
499
logger .info ("--> started test tasks" );
500
500
501
501
// Wait for the task to start on all nodes
@@ -516,8 +516,8 @@ public void testTasksCancellation() throws Exception {
516
516
517
517
public void testTasksUnblocking () throws Exception {
518
518
// Start blocking test task
519
- ActionFuture < TestTaskPlugin .NodesResponse > future =
520
- new TestTaskPlugin . NodesRequestBuilder ( client (), TestTaskPlugin .TestTaskAction .INSTANCE ). execute ( );
519
+ TestTaskPlugin .NodesRequest request = new TestTaskPlugin . NodesRequest ( "test" );
520
+ ActionFuture < TestTaskPlugin . NodesResponse > future = client (). execute ( TestTaskPlugin .TestTaskAction .INSTANCE , request );
521
521
// Wait for the task to start on all nodes
522
522
assertBusy (() -> assertEquals (internalCluster ().size (),
523
523
client ().admin ().cluster ().prepareListTasks ().setActions (TestTaskPlugin .TestTaskAction .NAME + "[n]" ).get ().getTasks ().size ()));
@@ -580,8 +580,9 @@ public void testGetTaskWaitForCompletionWithStoringResult() throws Exception {
580
580
private <T > void waitForCompletionTestCase (boolean storeResult , Function <TaskId , ActionFuture <T >> wait , Consumer <T > validator )
581
581
throws Exception {
582
582
// Start blocking test task
583
- ActionFuture <TestTaskPlugin .NodesResponse > future = new TestTaskPlugin .NodesRequestBuilder (client (),
584
- TestTaskPlugin .TestTaskAction .INSTANCE ).setShouldStoreResult (storeResult ).execute ();
583
+ TestTaskPlugin .NodesRequest request = new TestTaskPlugin .NodesRequest ("test" );
584
+ request .setShouldStoreResult (storeResult );
585
+ ActionFuture <TestTaskPlugin .NodesResponse > future = client ().execute (TestTaskPlugin .TestTaskAction .INSTANCE , request );
585
586
586
587
ActionFuture <T > waitResponseFuture ;
587
588
TaskId taskId ;
@@ -654,8 +655,8 @@ public void testGetTaskWaitForTimeout() throws Exception {
654
655
*/
655
656
private void waitForTimeoutTestCase (Function <TaskId , ? extends Iterable <? extends Throwable >> wait ) throws Exception {
656
657
// Start blocking test task
657
- ActionFuture < TestTaskPlugin .NodesResponse > future = new TestTaskPlugin .NodesRequestBuilder ( client (),
658
- TestTaskPlugin .TestTaskAction .INSTANCE ). execute ( );
658
+ TestTaskPlugin .NodesRequest request = new TestTaskPlugin .NodesRequest ( "test" );
659
+ ActionFuture < TestTaskPlugin . NodesResponse > future = client (). execute ( TestTaskPlugin .TestTaskAction .INSTANCE , request );
659
660
try {
660
661
TaskId taskId = waitForTestTaskStartOnAllNodes ();
661
662
@@ -722,12 +723,17 @@ public void testTasksWaitForAllTask() throws Exception {
722
723
assertThat (response .getTasks ().size (), greaterThanOrEqualTo (1 ));
723
724
}
724
725
725
- public void testTaskStoringSuccesfulResult () throws Exception {
726
- registerTaskManageListeners (TestTaskPlugin .TestTaskAction .NAME ); // we need this to get task id of the process
726
+ public void testTaskStoringSuccessfulResult () throws Exception {
727
+ registerTaskManagerListeners (TestTaskPlugin .TestTaskAction .NAME ); // we need this to get task id of the process
727
728
728
729
// Start non-blocking test task
729
- new TestTaskPlugin .NodesRequestBuilder (client (), TestTaskPlugin .TestTaskAction .INSTANCE )
730
- .setShouldStoreResult (true ).setShouldBlock (false ).get ();
730
+ TestTaskPlugin .NodesRequest request = new TestTaskPlugin .NodesRequest ("test" );
731
+ request .setShouldStoreResult (true );
732
+ request .setShouldBlock (false );
733
+ TaskId parentTaskId = new TaskId ("parent_node" , randomLong ());
734
+ request .setParentTask (parentTaskId );
735
+
736
+ client ().execute (TestTaskPlugin .TestTaskAction .INSTANCE , request ).get ();
731
737
732
738
List <TaskInfo > events = findEvents (TestTaskPlugin .TestTaskAction .NAME , Tuple ::v1 );
733
739
@@ -741,6 +747,7 @@ public void testTaskStoringSuccesfulResult() throws Exception {
741
747
assertNull (taskResult .getError ());
742
748
743
749
assertEquals (taskInfo .getTaskId (), taskResult .getTask ().getTaskId ());
750
+ assertEquals (taskInfo .getParentTaskId (), taskResult .getTask ().getParentTaskId ());
744
751
assertEquals (taskInfo .getType (), taskResult .getTask ().getType ());
745
752
assertEquals (taskInfo .getAction (), taskResult .getTask ().getAction ());
746
753
assertEquals (taskInfo .getDescription (), taskResult .getTask ().getDescription ());
@@ -770,14 +777,16 @@ public void testTaskStoringSuccesfulResult() throws Exception {
770
777
}
771
778
772
779
public void testTaskStoringFailureResult () throws Exception {
773
- registerTaskManageListeners (TestTaskPlugin .TestTaskAction .NAME ); // we need this to get task id of the process
780
+ registerTaskManagerListeners (TestTaskPlugin .TestTaskAction .NAME ); // we need this to get task id of the process
781
+
782
+ TestTaskPlugin .NodesRequest request = new TestTaskPlugin .NodesRequest ("test" );
783
+ request .setShouldFail (true );
784
+ request .setShouldStoreResult (true );
785
+ request .setShouldBlock (false );
774
786
775
787
// Start non-blocking test task that should fail
776
788
assertThrows (
777
- new TestTaskPlugin .NodesRequestBuilder (client (), TestTaskPlugin .TestTaskAction .INSTANCE )
778
- .setShouldFail (true )
779
- .setShouldStoreResult (true )
780
- .setShouldBlock (false ),
789
+ client ().execute (TestTaskPlugin .TestTaskAction .INSTANCE , request ),
781
790
IllegalStateException .class
782
791
);
783
792
@@ -858,7 +867,7 @@ public void tearDown() throws Exception {
858
867
/**
859
868
* Registers recording task event listeners with the given action mask on all nodes
860
869
*/
861
- private void registerTaskManageListeners (String actionMasks ) {
870
+ private void registerTaskManagerListeners (String actionMasks ) {
862
871
for (String nodeName : internalCluster ().getNodeNames ()) {
863
872
DiscoveryNode node = internalCluster ().getInstance (ClusterService .class , nodeName ).localNode ();
864
873
RecordingTaskManagerListener listener = new RecordingTaskManagerListener (node .getId (), actionMasks .split ("," ));
@@ -871,7 +880,7 @@ private void registerTaskManageListeners(String actionMasks) {
871
880
/**
872
881
* Resets all recording task event listeners with the given action mask on all nodes
873
882
*/
874
- private void resetTaskManageListeners (String actionMasks ) {
883
+ private void resetTaskManagerListeners (String actionMasks ) {
875
884
for (Map .Entry <Tuple <String , String >, RecordingTaskManagerListener > entry : listeners .entrySet ()) {
876
885
if (actionMasks == null || entry .getKey ().v2 ().equals (actionMasks )) {
877
886
entry .getValue ().reset ();
@@ -925,11 +934,12 @@ private void assertParentTask(TaskInfo task, TaskInfo parentTask) {
925
934
assertEquals (parentTask .getId (), task .getParentTaskId ().getId ());
926
935
}
927
936
928
- private ResourceNotFoundException expectNotFound (ThrowingRunnable r ) {
937
+ private void expectNotFound (ThrowingRunnable r ) {
929
938
Exception e = expectThrows (Exception .class , r );
930
939
ResourceNotFoundException notFound = (ResourceNotFoundException ) ExceptionsHelper .unwrap (e , ResourceNotFoundException .class );
931
- if (notFound == null ) throw new RuntimeException ("Expected ResourceNotFoundException" , e );
932
- return notFound ;
940
+ if (notFound == null ) {
941
+ throw new AssertionError ("Expected " + ResourceNotFoundException .class .getSimpleName (), e );
942
+ }
933
943
}
934
944
935
945
/**
0 commit comments