Skip to content

Commit 4dd6995

Browse files
committed
Make the persistent task status available to PersistentTasksExecutor.nodeOperation(...) method
1 parent 1c489ee commit 4dd6995

File tree

5 files changed

+55
-11
lines changed

5 files changed

+55
-11
lines changed

server/src/main/java/org/elasticsearch/persistent/NodePersistentTasksExecutor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.elasticsearch.common.Nullable;
2222
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
23+
import org.elasticsearch.tasks.Task;
2324
import org.elasticsearch.threadpool.ThreadPool;
2425

2526
/**
@@ -35,6 +36,7 @@ public NodePersistentTasksExecutor(ThreadPool threadPool) {
3536
}
3637

3738
public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params,
39+
@Nullable Task.Status status,
3840
AllocatedPersistentTask task,
3941
PersistentTasksExecutor<Params> executor) {
4042
threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() {
@@ -47,7 +49,7 @@ public void onFailure(Exception e) {
4749
@Override
4850
protected void doRun() throws Exception {
4951
try {
50-
executor.nodeOperation(task, params);
52+
executor.nodeOperation(task, params, status);
5153
} catch (Exception ex) {
5254
task.markAsFailed(ex);
5355
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.Nullable;
2525
import org.elasticsearch.common.component.AbstractComponent;
2626
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.tasks.Task;
2728
import org.elasticsearch.tasks.TaskId;
2829
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
2930
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
@@ -115,10 +116,10 @@ protected String getDescription(PersistentTask<Params> taskInProgress) {
115116
/**
116117
* This operation will be executed on the executor node.
117118
* <p>
118-
* NOTE: The nodeOperation has to throws an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
119+
* NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
119120
* indicate that the persistent task has finished.
120121
*/
121-
protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params);
122+
protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params, @Nullable Task.Status status);
122123

123124
public String getExecutor() {
124125
return executor;

server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId)
173173
task.getPersistentTaskId(), task.getAllocationId());
174174
try {
175175
runningTasks.put(taskInProgress.getAllocationId(), task);
176-
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor);
176+
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getStatus(), task, executor);
177177
} catch (Exception e) {
178178
// Submit task failure
179179
task.markAsFailed(e);

server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,14 @@
4949
import java.util.concurrent.atomic.AtomicReference;
5050

5151
import static org.hamcrest.Matchers.empty;
52+
import static org.hamcrest.Matchers.sameInstance;
5253
import static org.hamcrest.core.IsEqual.equalTo;
5354
import static org.mockito.Matchers.any;
5455
import static org.mockito.Matchers.anyLong;
5556
import static org.mockito.Matchers.anyString;
5657
import static org.mockito.Matchers.eq;
5758
import static org.mockito.Mockito.mock;
59+
import static org.mockito.Mockito.verify;
5860
import static org.mockito.Mockito.when;
5961

6062
public class PersistentTasksNodeServiceTests extends ESTestCase {
@@ -167,6 +169,41 @@ public void testStartTask() throws Exception {
167169

168170
}
169171

172+
public void testParamsStatusAndNodeTaskAreDelegated() throws Exception {
173+
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
174+
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
175+
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
176+
when(action.getTaskName()).thenReturn(TestPersistentTasksExecutor.NAME);
177+
TaskId parentId = new TaskId("cluster", 1);
178+
AllocatedPersistentTask nodeTask = new TestPersistentTasksPlugin.TestTask(0, "persistent", "test", "", parentId);
179+
when(action.createTask(anyLong(), anyString(), anyString(), eq(parentId), any())).thenReturn(nodeTask);
180+
PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Collections.singletonList(action));
181+
182+
MockExecutor executor = new MockExecutor();
183+
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
184+
registry, new TaskManager(Settings.EMPTY), executor);
185+
186+
ClusterState state = createInitialClusterState(1, Settings.EMPTY);
187+
188+
Task.Status status = new TestPersistentTasksPlugin.Status("_test_phase");
189+
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
190+
String taskId = UUIDs.base64UUID();
191+
TestParams taskParams = new TestParams("other_0");
192+
tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams,
193+
new Assignment("this_node", "test assignment on other node"));
194+
tasks.updateTaskStatus(taskId, status);
195+
MetaData.Builder metaData = MetaData.builder(state.metaData());
196+
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
197+
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
198+
199+
coordinator.clusterChanged(new ClusterChangedEvent("test", newClusterState, state));
200+
201+
assertThat(executor.size(), equalTo(1));
202+
assertThat(executor.get(0).params, sameInstance(taskParams));
203+
assertThat(executor.get(0).status, sameInstance(status));
204+
assertThat(executor.get(0).task, sameInstance(nodeTask));
205+
}
206+
170207
public void testTaskCancellation() {
171208
AtomicLong capturedTaskId = new AtomicLong();
172209
AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
@@ -271,11 +308,13 @@ private ClusterState removeTask(ClusterState state, String taskId) {
271308
private class Execution {
272309
private final PersistentTaskParams params;
273310
private final AllocatedPersistentTask task;
311+
private final Task.Status status;
274312
private final PersistentTasksExecutor<?> holder;
275313

276-
Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTasksExecutor<?> holder) {
314+
Execution(PersistentTaskParams params, AllocatedPersistentTask task, Task.Status status, PersistentTasksExecutor<?> holder) {
277315
this.params = params;
278316
this.task = task;
317+
this.status = status;
279318
this.holder = holder;
280319
}
281320
}
@@ -288,9 +327,11 @@ private class MockExecutor extends NodePersistentTasksExecutor {
288327
}
289328

290329
@Override
291-
public <Params extends PersistentTaskParams> void executeTask(Params params, AllocatedPersistentTask task,
330+
public <Params extends PersistentTaskParams> void executeTask(Params params,
331+
Task.Status status,
332+
AllocatedPersistentTask task,
292333
PersistentTasksExecutor<Params> executor) {
293-
executions.add(new Execution(params, task, executor));
334+
executions.add(new Execution(params, task, status, executor));
294335
}
295336

296337
public Execution get(int i) {

server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ public Assignment getAssignment(TestParams params, ClusterState clusterState) {
323323
}
324324

325325
@Override
326-
protected void nodeOperation(AllocatedPersistentTask task, TestParams params) {
326+
protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Task.Status status) {
327327
logger.info("started node operation for the task {}", task);
328328
try {
329329
TestTask testTask = (TestTask) task;
@@ -346,9 +346,9 @@ protected void nodeOperation(AllocatedPersistentTask task, TestParams params) {
346346
} else if ("update_status".equals(testTask.getOperation())) {
347347
testTask.setOperation(null);
348348
CountDownLatch latch = new CountDownLatch(1);
349-
Status status = new Status("phase " + phase.incrementAndGet());
350-
logger.info("updating the task status to {}", status);
351-
task.updatePersistentStatus(status, new ActionListener<PersistentTask<?>>() {
349+
Status newStatus = new Status("phase " + phase.incrementAndGet());
350+
logger.info("updating the task status to {}", newStatus);
351+
task.updatePersistentStatus(newStatus, new ActionListener<PersistentTask<?>>() {
352352
@Override
353353
public void onResponse(PersistentTask<?> persistentTask) {
354354
logger.info("updating was successful");

0 commit comments

Comments
 (0)