Skip to content

Commit 1c2ae91

Browse files
authored
Add PersistentTasksClusterService::unassignPersistentTask method (#37576)
* Add PersistentTasksClusterService::unassignPersistentTask method * adding cancellation test * Adding integration test for unallocating tasks from a node * Addressing review comments * adressing minor PR comments
1 parent e3672aa commit 1c2ae91

File tree

3 files changed

+184
-19
lines changed

3 files changed

+184
-19
lines changed

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,45 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
247247
});
248248
}
249249

250+
/**
251+
* This unassigns a task from any node, i.e. it is assigned to a {@code null} node with the provided reason.
252+
*
253+
* Since the assignment executor node is null, the {@link PersistentTasksClusterService} will attempt to reassign it to a valid
254+
* node quickly.
255+
*
256+
* @param taskId the id of a persistent task
257+
* @param taskAllocationId the expected allocation id of the persistent task
258+
* @param reason the reason for unassigning the task from any node
259+
* @param listener the listener that will be called when task is unassigned
260+
*/
261+
public void unassignPersistentTask(final String taskId,
262+
final long taskAllocationId,
263+
final String reason,
264+
final ActionListener<PersistentTask<?>> listener) {
265+
clusterService.submitStateUpdateTask("unassign persistent task from any node", new ClusterStateUpdateTask() {
266+
@Override
267+
public ClusterState execute(ClusterState currentState) throws Exception {
268+
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
269+
if (tasksInProgress.hasTask(taskId, taskAllocationId)) {
270+
logger.trace("Unassigning task {} with allocation id {}", taskId, taskAllocationId);
271+
return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason)));
272+
} else {
273+
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId);
274+
}
275+
}
276+
277+
@Override
278+
public void onFailure(String source, Exception e) {
279+
listener.onFailure(e);
280+
}
281+
282+
@Override
283+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
284+
listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, taskId));
285+
}
286+
});
287+
}
288+
250289
/**
251290
* Creates a new {@link Assignment} for the given persistent task.
252291
*
@@ -263,7 +302,7 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(final
263302

264303
AssignmentDecision decision = decider.canAssign();
265304
if (decision.getType() == AssignmentDecision.Type.NO) {
266-
return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
305+
return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
267306
}
268307

269308
return persistentTasksExecutor.getAssignment(taskParams, currentState);
@@ -404,6 +443,10 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus
404443
}
405444
}
406445

446+
private static Assignment unassignedAssignment(String reason) {
447+
return new Assignment(null, reason);
448+
}
449+
407450
/**
408451
* Class to periodically try to reassign unassigned persistent tasks.
409452
*/

server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
package org.elasticsearch.persistent;
2121

2222
import com.carrotsearch.hppc.cursors.ObjectCursor;
23+
import org.elasticsearch.ResourceNotFoundException;
2324
import org.elasticsearch.Version;
25+
import org.elasticsearch.action.ActionListener;
2426
import org.elasticsearch.cluster.ClusterChangedEvent;
2527
import org.elasticsearch.cluster.ClusterName;
2628
import org.elasticsearch.cluster.ClusterState;
@@ -63,10 +65,13 @@
6365
import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged;
6466
import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND;
6567
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
68+
import static org.elasticsearch.test.ClusterServiceUtils.setState;
6669
import static org.hamcrest.Matchers.equalTo;
70+
import static org.hamcrest.Matchers.instanceOf;
6771
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6872
import static org.hamcrest.Matchers.notNullValue;
6973
import static org.hamcrest.Matchers.nullValue;
74+
import static org.hamcrest.core.Is.is;
7075
import static org.mockito.Matchers.any;
7176
import static org.mockito.Matchers.anyString;
7277
import static org.mockito.Mockito.doAnswer;
@@ -464,6 +469,56 @@ public void testPeriodicRecheck() throws Exception {
464469
});
465470
}
466471

472+
public void testUnassignTask() {
473+
ClusterState clusterState = initialState();
474+
ClusterState.Builder builder = ClusterState.builder(clusterState);
475+
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(
476+
clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE));
477+
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
478+
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
479+
.localNodeId("_node_1")
480+
.masterNodeId("_node_1")
481+
.add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT));
482+
483+
String unassignedId = addTask(tasks, "unassign", "_node_2");
484+
485+
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
486+
clusterState = builder.metaData(metaData).nodes(nodes).build();
487+
setState(clusterService, clusterState);
488+
PersistentTasksClusterService service = createService((params, currentState) ->
489+
new Assignment("_node_2", "test"));
490+
service.unassignPersistentTask(unassignedId, tasks.getLastAllocationId(), "unassignment test", ActionListener.wrap(
491+
task -> {
492+
assertThat(task.getAssignment().getExecutorNode(), is(nullValue()));
493+
assertThat(task.getId(), equalTo(unassignedId));
494+
assertThat(task.getAssignment().getExplanation(), equalTo("unassignment test"));
495+
},
496+
e -> fail()
497+
));
498+
}
499+
500+
public void testUnassignNonExistentTask() {
501+
ClusterState clusterState = initialState();
502+
ClusterState.Builder builder = ClusterState.builder(clusterState);
503+
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(
504+
clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE));
505+
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
506+
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
507+
.localNodeId("_node_1")
508+
.masterNodeId("_node_1")
509+
.add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT));
510+
511+
MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
512+
clusterState = builder.metaData(metaData).nodes(nodes).build();
513+
setState(clusterService, clusterState);
514+
PersistentTasksClusterService service = createService((params, currentState) ->
515+
new Assignment("_node_2", "test"));
516+
service.unassignPersistentTask("missing-task", tasks.getLastAllocationId(), "unassignment test", ActionListener.wrap(
517+
task -> fail(),
518+
e -> assertThat(e, instanceOf(ResourceNotFoundException.class))
519+
));
520+
}
521+
467522
private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) {
468523
AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure);
469524
AtomicReference<ClusterState> state = new AtomicReference<>(initialState);
@@ -728,9 +783,11 @@ private ClusterState.Builder addRandomTask(ClusterState.Builder clusterStateBuil
728783
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param), assignment).build()));
729784
}
730785

731-
private void addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) {
732-
tasks.addTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams(param),
786+
private String addTask(PersistentTasksCustomMetaData.Builder tasks, String param, String node) {
787+
String id = UUIDs.base64UUID();
788+
tasks.addTask(id, TestPersistentTasksExecutor.NAME, new TestParams(param),
733789
new Assignment(node, "explanation: " + param));
790+
return id;
734791
}
735792

736793
private DiscoveryNode newNode(String nodeId) {

server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java

Lines changed: 81 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@
4646
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
4747
import static org.hamcrest.Matchers.empty;
4848
import static org.hamcrest.Matchers.equalTo;
49+
import static org.hamcrest.Matchers.hasSize;
4950
import static org.hamcrest.Matchers.notNullValue;
5051
import static org.hamcrest.Matchers.nullValue;
52+
import static org.hamcrest.core.Is.is;
5153

5254
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
5355
public class PersistentTasksExecutorIT extends ESIntegTestCase {
@@ -155,11 +157,8 @@ public void testPersistentActionWithNoAvailableNode() throws Exception {
155157
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
156158
String newNode = internalCluster().startNode(nodeSettings);
157159
String newNodeId = internalCluster().clusterService(newNode).localNode().getId();
158-
assertBusy(() -> {
159-
// Wait for the task to start
160-
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
161-
.size(), equalTo(1));
162-
});
160+
waitForTaskToStart();
161+
163162
TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
164163
.get().getTasks().get(0);
165164

@@ -199,11 +198,7 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception
199198

200199
TestPersistentTasksExecutor.setNonClusterStateCondition(true);
201200

202-
assertBusy(() -> {
203-
// Wait for the task to start
204-
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
205-
.size(), equalTo(1));
206-
});
201+
waitForTaskToStart();
207202
TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
208203
.get().getTasks().get(0);
209204

@@ -221,12 +216,7 @@ public void testPersistentActionStatusUpdate() throws Exception {
221216
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
222217
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future);
223218
String taskId = future.get().getId();
224-
225-
assertBusy(() -> {
226-
// Wait for the task to start
227-
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
228-
.size(), equalTo(1));
229-
});
219+
waitForTaskToStart();
230220
TaskInfo firstRunningTask = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
231221
.get().getTasks().get(0);
232222

@@ -307,6 +297,62 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception {
307297
});
308298
}
309299

300+
public void testUnassignRunningPersistentTask() throws Exception {
301+
PersistentTasksClusterService persistentTasksClusterService =
302+
internalCluster().getInstance(PersistentTasksClusterService.class, internalCluster().getMasterName());
303+
// Speed up rechecks to a rate that is quicker than what settings would allow
304+
persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1));
305+
PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class);
306+
PlainActionFuture<PersistentTask<TestParams>> future = new PlainActionFuture<>();
307+
TestParams testParams = new TestParams("Blah");
308+
testParams.setExecutorNodeAttr("test");
309+
persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future);
310+
PersistentTask<TestParams> task = future.get();
311+
String taskId = task.getId();
312+
313+
Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build();
314+
internalCluster().startNode(nodeSettings);
315+
316+
waitForTaskToStart();
317+
318+
PlainActionFuture<PersistentTask<?>> unassignmentFuture = new PlainActionFuture<>();
319+
320+
// Disallow re-assignment after it is unallocated to verify master and node state
321+
TestPersistentTasksExecutor.setNonClusterStateCondition(false);
322+
323+
persistentTasksClusterService.unassignPersistentTask(taskId,
324+
task.getAllocationId() + 1,
325+
"unassignment test",
326+
unassignmentFuture);
327+
PersistentTask<?> unassignedTask = unassignmentFuture.get();
328+
assertThat(unassignedTask.getId(), equalTo(taskId));
329+
assertThat(unassignedTask.getAssignment().getExplanation(), equalTo("unassignment test"));
330+
assertThat(unassignedTask.getAssignment().getExecutorNode(), is(nullValue()));
331+
332+
assertBusy(() -> {
333+
// Verify that the task is NOT running on the node
334+
List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get()
335+
.getTasks();
336+
assertThat(tasks.size(), equalTo(0));
337+
338+
// Verify that the task is STILL in internal cluster state
339+
assertClusterStateHasTask(taskId);
340+
});
341+
342+
// Allow it to be reassigned again to the same node
343+
TestPersistentTasksExecutor.setNonClusterStateCondition(true);
344+
345+
// Verify it starts again
346+
waitForTaskToStart();
347+
348+
assertClusterStateHasTask(taskId);
349+
350+
// Complete or cancel the running task
351+
TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
352+
.get().getTasks().get(0);
353+
stopOrCancelTask(taskInfo.getTaskId());
354+
}
355+
310356
private void stopOrCancelTask(TaskId taskId) {
311357
if (randomBoolean()) {
312358
logger.info("Completing the running task");
@@ -322,6 +368,25 @@ private void stopOrCancelTask(TaskId taskId) {
322368
}
323369
}
324370

371+
private static void waitForTaskToStart() throws Exception {
372+
assertBusy(() -> {
373+
// Wait for the task to start
374+
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks()
375+
.size(), equalTo(1));
376+
});
377+
}
378+
379+
private static void assertClusterStateHasTask(String taskId) {
380+
Collection<PersistentTask<?>> clusterTasks = ((PersistentTasksCustomMetaData) internalCluster()
381+
.clusterService()
382+
.state()
383+
.getMetaData()
384+
.custom(PersistentTasksCustomMetaData.TYPE))
385+
.tasks();
386+
assertThat(clusterTasks, hasSize(1));
387+
assertThat(clusterTasks.iterator().next().getId(), equalTo(taskId));
388+
}
389+
325390
private void assertNoRunningTasks() throws Exception {
326391
assertBusy(() -> {
327392
// Wait for the task to finish

0 commit comments

Comments
 (0)