Skip to content

Commit 02b6ea3

Browse files
committed
Rename methods in PersistentTasksService (#30837)
This commit renames methods in the PersistentTasksService, to make obvious that the methods send requests in order to change the state of persistent tasks. Relates to #29608.
1 parent e2749bd commit 02b6ea3

17 files changed

+173
-165
lines changed

server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,16 @@ public long getAllocationId() {
103103
}
104104

105105
/**
106-
* Waits for this persistent task to have the desired state.
106+
* Waits for a given persistent task to comply with a given predicate, then call back the listener accordingly.
107+
*
108+
* @param predicate the persistent task predicate to evaluate
109+
* @param timeout a timeout for waiting
110+
* @param listener the callback listener
107111
*/
108-
public void waitForPersistentTaskStatus(Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate,
109-
@Nullable TimeValue timeout,
110-
PersistentTasksService.WaitForPersistentTaskStatusListener<?> listener) {
111-
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener);
112+
public void waitForPersistentTask(final Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate,
113+
final @Nullable TimeValue timeout,
114+
final PersistentTasksService.WaitForPersistentTaskListener<?> listener) {
115+
persistentTasksService.waitForPersistentTaskCondition(persistentTaskId, predicate, timeout, listener);
112116
}
113117

114118
final boolean isCompleted() {
@@ -143,7 +147,7 @@ private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
143147
this.failure = failure;
144148
if (prevState == State.STARTED) {
145149
logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId());
146-
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), getAllocationId(), failure, new
150+
persistentTasksService.sendCompletionRequest(getPersistentTaskId(), getAllocationId(), failure, new
147151
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
148152
@Override
149153
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {

server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ private void cancelTask(Long allocationId) {
196196
AllocatedPersistentTask task = runningTasks.remove(allocationId);
197197
if (task.markAsCancelled()) {
198198
// Cancel the local task using the task manager
199-
persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener<CancelTasksResponse>() {
199+
String reason = "task has been removed, cancelling locally";
200+
persistentTasksService.sendCancelRequest(task.getId(), reason, new ActionListener<CancelTasksResponse>() {
200201
@Override
201202
public void onResponse(CancelTasksResponse cancelTasksResponse) {
202203
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was cancelled", task.getAction(),

server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

+108-104
Large diffs are not rendered by default.

server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ public void testFullClusterRestart() throws Exception {
6565
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
6666
futures.add(future);
6767
taskIds[i] = UUIDs.base64UUID();
68-
service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"),
69-
future);
68+
service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"), future);
7069
}
7170

7271
for (int i = 0; i < numberOfTasks; i++) {

server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java

+17-17
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.elasticsearch.tasks.TaskInfo;
3131
import org.elasticsearch.test.ESIntegTestCase;
3232
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
33-
import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener;
33+
import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener;
3434
import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status;
3535
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor;
3636
import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams;
@@ -69,15 +69,15 @@ public void cleanup() throws Exception {
6969
assertNoRunningTasks();
7070
}
7171

72-
public static class WaitForPersistentTaskStatusFuture<Params extends PersistentTaskParams>
72+
public static class WaitForPersistentTaskFuture<Params extends PersistentTaskParams>
7373
extends PlainActionFuture<PersistentTask<Params>>
74-
implements WaitForPersistentTaskStatusListener<Params> {
74+
implements WaitForPersistentTaskListener<Params> {
7575
}
7676

7777
public void testPersistentActionFailure() throws Exception {
7878
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
7979
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
80-
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
80+
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
8181
long allocationId = future.get().getAllocationId();
8282
assertBusy(() -> {
8383
// Wait for the task to start
@@ -108,7 +108,7 @@ public void testPersistentActionCompletion() throws Exception {
108108
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
109109
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
110110
String taskId = UUIDs.base64UUID();
111-
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
111+
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
112112
long allocationId = future.get().getAllocationId();
113113
assertBusy(() -> {
114114
// Wait for the task to start
@@ -127,7 +127,7 @@ public void testPersistentActionCompletion() throws Exception {
127127
logger.info("Simulating errant completion notification");
128128
//try sending completion request with incorrect allocation id
129129
PlainActionFuture<PersistentTask<?>> failedCompletionNotificationFuture = new PlainActionFuture<>();
130-
persistentTasksService.sendCompletionNotification(taskId, Long.MAX_VALUE, null, failedCompletionNotificationFuture);
130+
persistentTasksService.sendCompletionRequest(taskId, Long.MAX_VALUE, null, failedCompletionNotificationFuture);
131131
assertThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class);
132132
// Make sure that the task is still running
133133
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
@@ -142,7 +142,7 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {
142142
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
143143
TestParams testParams = new TestParams("Blah");
144144
testParams.setExecutorNodeAttr("test");
145-
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future);
145+
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future);
146146
String taskId = future.get().getId();
147147

148148
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
@@ -169,14 +169,14 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {
169169

170170
// Remove the persistent task
171171
PlainActionFuture<PersistentTask<?>> removeFuture = new PlainActionFuture<>();
172-
persistentTasksService.cancelPersistentTask(taskId, removeFuture);
172+
persistentTasksService.sendRemoveRequest(taskId, removeFuture);
173173
assertEquals(removeFuture.get().getId(), taskId);
174174
}
175175

176176
public void testPersistentActionStatusUpdate() throws Exception {
177177
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
178178
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
179-
persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
179+
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
180180
String taskId = future.get().getId();
181181

182182
assertBusy(() -> {
@@ -200,16 +200,16 @@ public void testPersistentActionStatusUpdate() throws Exception {
200200
.get().getTasks().size(), equalTo(1));
201201

202202
int finalI = i;
203-
WaitForPersistentTaskStatusFuture<?> future1 = new WaitForPersistentTaskStatusFuture<>();
204-
persistentTasksService.waitForPersistentTaskStatus(taskId,
203+
WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
204+
persistentTasksService.waitForPersistentTaskCondition(taskId,
205205
task -> task != null && task.getStatus() != null && task.getStatus().toString() != null &&
206206
task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"),
207207
TimeValue.timeValueSeconds(10), future1);
208208
assertThat(future1.get().getId(), equalTo(taskId));
209209
}
210210

211-
WaitForPersistentTaskStatusFuture<?> future1 = new WaitForPersistentTaskStatusFuture<>();
212-
persistentTasksService.waitForPersistentTaskStatus(taskId,
211+
WaitForPersistentTaskFuture<?> future1 = new WaitForPersistentTaskFuture<>();
212+
persistentTasksService.waitForPersistentTaskCondition(taskId,
213213
task -> false, TimeValue.timeValueMillis(10), future1);
214214

215215
assertThrows(future1, IllegalStateException.class, "timed out after 10ms");
@@ -220,8 +220,8 @@ public void testPersistentActionStatusUpdate() throws Exception {
220220
" and allocation id -2 doesn't exist");
221221

222222
// Wait for the task to disappear
223-
WaitForPersistentTaskStatusFuture<?> future2 = new WaitForPersistentTaskStatusFuture<>();
224-
persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
223+
WaitForPersistentTaskFuture<?> future2 = new WaitForPersistentTaskFuture<>();
224+
persistentTasksService.waitForPersistentTaskCondition(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2);
225225

226226
logger.info("Completing the running task");
227227
// Complete the running task and make sure it finishes properly
@@ -235,11 +235,11 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception {
235235
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
236236
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
237237
String taskId = UUIDs.base64UUID();
238-
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
238+
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
239239
future.get();
240240

241241
PlainActionFuture<PersistentTask<TestParams>> future2 = new PlainActionFuture<>();
242-
persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future2);
242+
persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future2);
243243
assertThrows(future2, ResourceAlreadyExistsException.class);
244244

245245
assertBusy(() -> {

server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -235,14 +235,14 @@ public void testTaskCancellation() {
235235
AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
236236
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) {
237237
@Override
238-
public void sendTaskManagerCancellation(long taskId, ActionListener<CancelTasksResponse> listener) {
238+
void sendCancelRequest(final long taskId, final String reason, final ActionListener<CancelTasksResponse> listener) {
239239
capturedTaskId.set(taskId);
240240
capturedListener.set(listener);
241241
}
242242

243243
@Override
244-
public void sendCompletionNotification(String taskId, long allocationId, Exception failure,
245-
ActionListener<PersistentTask<?>> listener) {
244+
public void sendCompletionRequest(final String taskId, final long taskAllocationId,
245+
final Exception taskFailure, final ActionListener<PersistentTask<?>> listener) {
246246
fail("Shouldn't be called during Cluster State cancellation");
247247
}
248248
};

server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void testEnableAssignmentAfterRestart() throws Exception {
7171
final CountDownLatch latch = new CountDownLatch(numberOfTasks);
7272
for (int i = 0; i < numberOfTasks; i++) {
7373
PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class);
74-
service.startPersistentTask("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(),
74+
service.sendStartRequest("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(),
7575
new ActionListener<PersistentTask<PersistentTaskParams>>() {
7676
@Override
7777
public void onResponse(PersistentTask<PersistentTaskParams> task) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ private void forceCloseJob(ClusterState currentState, CloseJobAction.Request req
315315
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlMetadata.getJobTask(jobId, tasks);
316316
if (jobTask != null) {
317317
auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING);
318-
persistentTasksService.cancelPersistentTask(jobTask.getId(),
318+
persistentTasksService.sendRemoveRequest(jobTask.getId(),
319319
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
320320
@Override
321321
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
@@ -400,7 +400,7 @@ public boolean hasJobsToWaitFor() {
400400
// so wait for that to happen here.
401401
void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitForCloseRequest, CloseJobAction.Response response,
402402
ActionListener<CloseJobAction.Response> listener) {
403-
persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> {
403+
persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> {
404404
for (String persistentTaskId : waitForCloseRequest.persistentTaskIds) {
405405
if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
406406
return false;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private void removeDatafeedTask(DeleteDatafeedAction.Request request, ClusterSta
9090
if (datafeedTask == null) {
9191
listener.onResponse(true);
9292
} else {
93-
persistentTasksService.cancelPersistentTask(datafeedTask.getId(),
93+
persistentTasksService.sendRemoveRequest(datafeedTask.getId(),
9494
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
9595
@Override
9696
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ private void removePersistentTask(String jobId, ClusterState currentState,
182182
if (jobTask == null) {
183183
listener.onResponse(null);
184184
} else {
185-
persistentTasksService.cancelPersistentTask(jobTask.getId(),
185+
persistentTasksService.sendRemoveRequest(jobTask.getId(),
186186
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
187187
@Override
188188
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ public void onFailure(Exception e) {
449449

450450
// Step 4. Start job task
451451
ActionListener<PutJobAction.Response> jobUpateListener = ActionListener.wrap(
452-
response -> persistentTasksService.startPersistentTask(MlMetadata.jobTaskId(jobParams.getJobId()),
452+
response -> persistentTasksService.sendStartRequest(MlMetadata.jobTaskId(jobParams.getJobId()),
453453
OpenJobAction.TASK_NAME, jobParams, finalListener),
454454
listener::onFailure
455455
);
@@ -523,8 +523,8 @@ public void onFailure(Exception e) {
523523

524524
private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener<OpenJobAction.Response> listener) {
525525
JobPredicate predicate = new JobPredicate();
526-
persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, jobParams.getTimeout(),
527-
new PersistentTasksService.WaitForPersistentTaskStatusListener<OpenJobAction.JobParams>() {
526+
persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, jobParams.getTimeout(),
527+
new PersistentTasksService.WaitForPersistentTaskListener<OpenJobAction.JobParams>() {
528528
@Override
529529
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask) {
530530
if (predicate.exception != null) {
@@ -555,7 +555,7 @@ public void onTimeout(TimeValue timeout) {
555555

556556
private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask<OpenJobAction.JobParams> persistentTask, Exception exception,
557557
ActionListener<OpenJobAction.Response> listener) {
558-
persistentTasksService.cancelPersistentTask(persistentTask.getId(),
558+
persistentTasksService.sendRemoveRequest(persistentTask.getId(),
559559
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
560560
@Override
561561
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {

0 commit comments

Comments
 (0)