30
30
import org .elasticsearch .tasks .TaskInfo ;
31
31
import org .elasticsearch .test .ESIntegTestCase ;
32
32
import org .elasticsearch .persistent .PersistentTasksCustomMetaData .PersistentTask ;
33
- import org .elasticsearch .persistent .PersistentTasksService .WaitForPersistentTaskStatusListener ;
33
+ import org .elasticsearch .persistent .PersistentTasksService .WaitForPersistentTaskListener ;
34
34
import org .elasticsearch .persistent .TestPersistentTasksPlugin .Status ;
35
35
import org .elasticsearch .persistent .TestPersistentTasksPlugin .TestPersistentTasksExecutor ;
36
36
import org .elasticsearch .persistent .TestPersistentTasksPlugin .TestParams ;
@@ -69,15 +69,15 @@ public void cleanup() throws Exception {
69
69
assertNoRunningTasks ();
70
70
}
71
71
72
- public static class WaitForPersistentTaskStatusFuture <Params extends PersistentTaskParams >
72
+ public static class WaitForPersistentTaskFuture <Params extends PersistentTaskParams >
73
73
extends PlainActionFuture <PersistentTask <Params >>
74
- implements WaitForPersistentTaskStatusListener <Params > {
74
+ implements WaitForPersistentTaskListener <Params > {
75
75
}
76
76
77
77
public void testPersistentActionFailure () throws Exception {
78
78
PersistentTasksService persistentTasksService = internalCluster ().getInstance (PersistentTasksService .class );
79
79
PlainActionFuture <PersistentTask <TestParams >> future = new PlainActionFuture <>();
80
- persistentTasksService .startPersistentTask (UUIDs .base64UUID (), TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future );
80
+ persistentTasksService .sendStartRequest (UUIDs .base64UUID (), TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future );
81
81
long allocationId = future .get ().getAllocationId ();
82
82
assertBusy (() -> {
83
83
// Wait for the task to start
@@ -108,7 +108,7 @@ public void testPersistentActionCompletion() throws Exception {
108
108
PersistentTasksService persistentTasksService = internalCluster ().getInstance (PersistentTasksService .class );
109
109
PlainActionFuture <PersistentTask <TestParams >> future = new PlainActionFuture <>();
110
110
String taskId = UUIDs .base64UUID ();
111
- persistentTasksService .startPersistentTask (taskId , TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future );
111
+ persistentTasksService .sendStartRequest (taskId , TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future );
112
112
long allocationId = future .get ().getAllocationId ();
113
113
assertBusy (() -> {
114
114
// Wait for the task to start
@@ -127,7 +127,7 @@ public void testPersistentActionCompletion() throws Exception {
127
127
logger .info ("Simulating errant completion notification" );
128
128
//try sending completion request with incorrect allocation id
129
129
PlainActionFuture <PersistentTask <?>> failedCompletionNotificationFuture = new PlainActionFuture <>();
130
- persistentTasksService .sendCompletionNotification (taskId , Long .MAX_VALUE , null , failedCompletionNotificationFuture );
130
+ persistentTasksService .sendCompletionRequest (taskId , Long .MAX_VALUE , null , failedCompletionNotificationFuture );
131
131
assertThrows (failedCompletionNotificationFuture , ResourceNotFoundException .class );
132
132
// Make sure that the task is still running
133
133
assertThat (client ().admin ().cluster ().prepareListTasks ().setActions (TestPersistentTasksExecutor .NAME + "[c]" )
@@ -142,7 +142,7 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {
142
142
PlainActionFuture <PersistentTask <TestParams >> future = new PlainActionFuture <>();
143
143
TestParams testParams = new TestParams ("Blah" );
144
144
testParams .setExecutorNodeAttr ("test" );
145
- persistentTasksService .startPersistentTask (UUIDs .base64UUID (), TestPersistentTasksExecutor .NAME , testParams , future );
145
+ persistentTasksService .sendStartRequest (UUIDs .base64UUID (), TestPersistentTasksExecutor .NAME , testParams , future );
146
146
String taskId = future .get ().getId ();
147
147
148
148
Settings nodeSettings = Settings .builder ().put (nodeSettings (0 )).put ("node.attr.test_attr" , "test" ).build ();
@@ -169,14 +169,14 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {
169
169
170
170
// Remove the persistent task
171
171
PlainActionFuture <PersistentTask <?>> removeFuture = new PlainActionFuture <>();
172
- persistentTasksService .cancelPersistentTask (taskId , removeFuture );
172
+ persistentTasksService .sendRemoveRequest (taskId , removeFuture );
173
173
assertEquals (removeFuture .get ().getId (), taskId );
174
174
}
175
175
176
176
public void testPersistentActionStatusUpdate () throws Exception {
177
177
PersistentTasksService persistentTasksService = internalCluster ().getInstance (PersistentTasksService .class );
178
178
PlainActionFuture <PersistentTask <TestParams >> future = new PlainActionFuture <>();
179
- persistentTasksService .startPersistentTask (UUIDs .base64UUID (), TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future );
179
+ persistentTasksService .sendStartRequest (UUIDs .base64UUID (), TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future );
180
180
String taskId = future .get ().getId ();
181
181
182
182
assertBusy (() -> {
@@ -200,16 +200,16 @@ public void testPersistentActionStatusUpdate() throws Exception {
200
200
.get ().getTasks ().size (), equalTo (1 ));
201
201
202
202
int finalI = i ;
203
- WaitForPersistentTaskStatusFuture <?> future1 = new WaitForPersistentTaskStatusFuture <>();
204
- persistentTasksService .waitForPersistentTaskStatus (taskId ,
203
+ WaitForPersistentTaskFuture <?> future1 = new WaitForPersistentTaskFuture <>();
204
+ persistentTasksService .waitForPersistentTaskCondition (taskId ,
205
205
task -> task != null && task .getStatus () != null && task .getStatus ().toString () != null &&
206
206
task .getStatus ().toString ().equals ("{\" phase\" :\" phase " + (finalI + 1 ) + "\" }" ),
207
207
TimeValue .timeValueSeconds (10 ), future1 );
208
208
assertThat (future1 .get ().getId (), equalTo (taskId ));
209
209
}
210
210
211
- WaitForPersistentTaskStatusFuture <?> future1 = new WaitForPersistentTaskStatusFuture <>();
212
- persistentTasksService .waitForPersistentTaskStatus (taskId ,
211
+ WaitForPersistentTaskFuture <?> future1 = new WaitForPersistentTaskFuture <>();
212
+ persistentTasksService .waitForPersistentTaskCondition (taskId ,
213
213
task -> false , TimeValue .timeValueMillis (10 ), future1 );
214
214
215
215
assertThrows (future1 , IllegalStateException .class , "timed out after 10ms" );
@@ -220,8 +220,8 @@ public void testPersistentActionStatusUpdate() throws Exception {
220
220
" and allocation id -2 doesn't exist" );
221
221
222
222
// Wait for the task to disappear
223
- WaitForPersistentTaskStatusFuture <?> future2 = new WaitForPersistentTaskStatusFuture <>();
224
- persistentTasksService .waitForPersistentTaskStatus (taskId , Objects ::isNull , TimeValue .timeValueSeconds (10 ), future2 );
223
+ WaitForPersistentTaskFuture <?> future2 = new WaitForPersistentTaskFuture <>();
224
+ persistentTasksService .waitForPersistentTaskCondition (taskId , Objects ::isNull , TimeValue .timeValueSeconds (10 ), future2 );
225
225
226
226
logger .info ("Completing the running task" );
227
227
// Complete the running task and make sure it finishes properly
@@ -235,11 +235,11 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception {
235
235
PersistentTasksService persistentTasksService = internalCluster ().getInstance (PersistentTasksService .class );
236
236
PlainActionFuture <PersistentTask <TestParams >> future = new PlainActionFuture <>();
237
237
String taskId = UUIDs .base64UUID ();
238
- persistentTasksService .startPersistentTask (taskId , TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future );
238
+ persistentTasksService .sendStartRequest (taskId , TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future );
239
239
future .get ();
240
240
241
241
PlainActionFuture <PersistentTask <TestParams >> future2 = new PlainActionFuture <>();
242
- persistentTasksService .startPersistentTask (taskId , TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future2 );
242
+ persistentTasksService .sendStartRequest (taskId , TestPersistentTasksExecutor .NAME , new TestParams ("Blah" ), future2 );
243
243
assertThrows (future2 , ResourceAlreadyExistsException .class );
244
244
245
245
assertBusy (() -> {
0 commit comments