Skip to content

Commit 17a7849

Browse files
committed
Un-assign persistent tasks as nodes exit the cluster (#37656)
PersistentTasksClusterService decides if a task should be reassigned by checking there is a node in the cluster with the same Id. If a node is restarted PersistentTasksClusterService may not observe the change and decide the task still has a valid assignment because the node's ephemeral Id is not used in that decision. This change un-assigns tasks as the nodes in the cluster change.
1 parent e80c5f3 commit 17a7849

File tree

5 files changed

+138
-2
lines changed

5 files changed

+138
-2
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ private void deassociateDeadNodes(RoutingAllocation allocation) {
425425
for (ShardRouting shardRouting : node.copyShards()) {
426426
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
427427
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
428-
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
428+
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]",
429429
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
430430
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
431431
}

server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.transport.TransportAddress;
3939
import org.elasticsearch.common.unit.TimeValue;
4040
import org.elasticsearch.discovery.DiscoverySettings;
41+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
4142

4243
import java.util.ArrayList;
4344
import java.util.Collections;
@@ -509,6 +510,7 @@ private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState cu
509510
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
510511
.blocks(currentState.blocks())
511512
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
513+
tmpState = PersistentTasksCustomMetaData.deassociateDeadNodes(tmpState);
512514
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false,
513515
"removed dead nodes on election"));
514516
}

server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.elasticsearch.discovery.DiscoverySettings;
5858
import org.elasticsearch.discovery.DiscoveryStats;
5959
import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener;
60+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
6061
import org.elasticsearch.threadpool.ThreadPool;
6162
import org.elasticsearch.transport.EmptyTransportResponseHandler;
6263
import org.elasticsearch.transport.TransportChannel;
@@ -631,7 +632,8 @@ public ClusterTasksResult<Task> execute(final ClusterState currentState, final L
631632
masterNodes, electMasterService.minimumMasterNodes()));
632633
return resultBuilder.build(currentState);
633634
} else {
634-
return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
635+
ClusterState ptasksDisassociatedState = PersistentTasksCustomMetaData.deassociateDeadNodes(remainingNodesClusterState);
636+
return resultBuilder.build(allocationService.deassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks)));
635637
}
636638
}
637639

server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M
6262

6363
public static final String TYPE = "persistent_tasks";
6464
private static final String API_CONTEXT = MetaData.XContentContext.API.toString();
65+
static final Assignment LOST_NODE_ASSIGNMENT = new Assignment(null, "awaiting reassignment after node loss");
6566

6667
// TODO: Implement custom Diff for tasks
6768
private final Map<String, PersistentTask<?>> tasks;
@@ -119,6 +120,11 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map<String, Persiste
119120
new ParseField("allocation_id_on_last_status_update"));
120121
}
121122

123+
124+
public static PersistentTasksCustomMetaData getPersistentTasksCustomMetaData(ClusterState clusterState) {
125+
return clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
126+
}
127+
122128
/**
123129
* Private builder used in XContent parser to build task-specific portion (params and state)
124130
*/
@@ -209,6 +215,39 @@ public static <Params extends PersistentTaskParams> PersistentTask<Params> getTa
209215
return null;
210216
}
211217

218+
/**
219+
* Unassign any persistent tasks executing on nodes that are no longer in
220+
* the cluster. If the task's assigment has a non-null executor node and that
221+
* node is no longer in the cluster then the assignment is set to
222+
* {@link #LOST_NODE_ASSIGNMENT}
223+
*
224+
* @param clusterState The clusterstate
225+
* @return If no changes the argument {@code clusterState} is returned else
226+
* a copy with the modified tasks
227+
*/
228+
public static ClusterState deassociateDeadNodes(ClusterState clusterState) {
229+
PersistentTasksCustomMetaData tasks = getPersistentTasksCustomMetaData(clusterState);
230+
if (tasks == null) {
231+
return clusterState;
232+
}
233+
234+
PersistentTasksCustomMetaData.Builder taskBuilder = PersistentTasksCustomMetaData.builder(tasks);
235+
for (PersistentTask<?> task : tasks.tasks()) {
236+
if (task.getAssignment().getExecutorNode() != null &&
237+
clusterState.nodes().nodeExists(task.getAssignment().getExecutorNode()) == false) {
238+
taskBuilder.reassignTask(task.getId(), LOST_NODE_ASSIGNMENT);
239+
}
240+
}
241+
242+
if (taskBuilder.isChanged() == false) {
243+
return clusterState;
244+
}
245+
246+
MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData());
247+
metaDataBuilder.putCustom(TYPE, taskBuilder.build());
248+
return ClusterState.builder(clusterState).metaData(metaDataBuilder).build();
249+
}
250+
212251
public static class Assignment {
213252
@Nullable
214253
private final String executorNode;

server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
import org.elasticsearch.ResourceNotFoundException;
2222
import org.elasticsearch.Version;
2323
import org.elasticsearch.client.transport.TransportClient;
24+
import org.elasticsearch.cluster.ClusterName;
25+
import org.elasticsearch.cluster.ClusterState;
2426
import org.elasticsearch.cluster.Diff;
2527
import org.elasticsearch.cluster.NamedDiff;
2628
import org.elasticsearch.cluster.metadata.MetaData;
2729
import org.elasticsearch.cluster.metadata.MetaData.Custom;
30+
import org.elasticsearch.cluster.node.DiscoveryNode;
31+
import org.elasticsearch.cluster.node.DiscoveryNodes;
2832
import org.elasticsearch.common.ParseField;
2933
import org.elasticsearch.common.UUIDs;
3034
import org.elasticsearch.common.bytes.BytesReference;
@@ -33,9 +37,11 @@
3337
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
3438
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
3539
import org.elasticsearch.common.io.stream.StreamInput;
40+
import org.elasticsearch.common.io.stream.StreamOutput;
3641
import org.elasticsearch.common.io.stream.Writeable;
3742
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3843
import org.elasticsearch.common.xcontent.ToXContent;
44+
import org.elasticsearch.common.xcontent.XContentBuilder;
3945
import org.elasticsearch.common.xcontent.XContentFactory;
4046
import org.elasticsearch.common.xcontent.XContentParser;
4147
import org.elasticsearch.common.xcontent.XContentType;
@@ -65,6 +71,8 @@
6571
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
6672
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
6773
import static org.hamcrest.Matchers.equalTo;
74+
import static org.hamcrest.Matchers.not;
75+
import static org.hamcrest.Matchers.sameInstance;
6876

6977
public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializationTestCase<Custom> {
7078

@@ -305,6 +313,91 @@ public void testFeatureSerialization() throws IOException {
305313
assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible")));
306314
}
307315

316+
public void testDisassociateDeadNodes_givenNoPersistentTasks() {
317+
ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests")).build();
318+
ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState);
319+
assertThat(originalState, sameInstance(returnedState));
320+
}
321+
322+
public void testDisassociateDeadNodes_givenAssignedPersistentTask() {
323+
DiscoveryNodes nodes = DiscoveryNodes.builder()
324+
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
325+
.localNodeId("node1")
326+
.masterNodeId("node1")
327+
.build();
328+
329+
String taskName = "test/task";
330+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder()
331+
.addTask("task-id", taskName, emptyTaskParams(taskName),
332+
new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"));
333+
334+
ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests"))
335+
.nodes(nodes)
336+
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
337+
.build();
338+
ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState);
339+
assertThat(originalState, sameInstance(returnedState));
340+
341+
PersistentTasksCustomMetaData originalTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(originalState);
342+
PersistentTasksCustomMetaData returnedTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(returnedState);
343+
assertEquals(originalTasks, returnedTasks);
344+
}
345+
346+
public void testDisassociateDeadNodes() {
347+
DiscoveryNodes nodes = DiscoveryNodes.builder()
348+
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
349+
.localNodeId("node1")
350+
.masterNodeId("node1")
351+
.build();
352+
353+
String taskName = "test/task";
354+
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder()
355+
.addTask("assigned-task", taskName, emptyTaskParams(taskName),
356+
new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"))
357+
.addTask("task-on-deceased-node", taskName, emptyTaskParams(taskName),
358+
new PersistentTasksCustomMetaData.Assignment("left-the-cluster", "test assignment"));
359+
360+
ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests"))
361+
.nodes(nodes)
362+
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
363+
.build();
364+
ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState);
365+
assertThat(originalState, not(sameInstance(returnedState)));
366+
367+
PersistentTasksCustomMetaData originalTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(originalState);
368+
PersistentTasksCustomMetaData returnedTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(returnedState);
369+
assertNotEquals(originalTasks, returnedTasks);
370+
371+
assertEquals(originalTasks.getTask("assigned-task"), returnedTasks.getTask("assigned-task"));
372+
assertNotEquals(originalTasks.getTask("task-on-deceased-node"), returnedTasks.getTask("task-on-deceased-node"));
373+
assertEquals(PersistentTasksCustomMetaData.LOST_NODE_ASSIGNMENT, returnedTasks.getTask("task-on-deceased-node").getAssignment());
374+
}
375+
376+
private PersistentTaskParams emptyTaskParams(String taskName) {
377+
return new PersistentTaskParams() {
378+
379+
@Override
380+
public XContentBuilder toXContent(XContentBuilder builder, Params params) {
381+
return builder;
382+
}
383+
384+
@Override
385+
public void writeTo(StreamOutput out) {
386+
387+
}
388+
389+
@Override
390+
public String getWriteableName() {
391+
return taskName;
392+
}
393+
394+
@Override
395+
public Version getMinimalSupportedVersion() {
396+
return Version.CURRENT;
397+
}
398+
};
399+
}
400+
308401
private Assignment randomAssignment() {
309402
if (randomBoolean()) {
310403
if (randomBoolean()) {

0 commit comments

Comments
 (0)