Skip to content

Commit dbb1e49

Browse files
authored
Allow re-allocation of replica shards on nodes during shutdown replacement (#79171)
This commit allows replica shards that have existing data on disk to be re-allocated to the target of a "REPLACE" type node shutdown. Prior to this if the target node of a shutdown were to restart, the replicas would not be allowed to be allocated even if their data existed on disk. Relates to #70338 as a follow-up to #76247
1 parent fd78ac3 commit dbb1e49

File tree

5 files changed

+106
-6
lines changed

5 files changed

+106
-6
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecider.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,19 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
122122
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
123123
return Decision.YES;
124124
}
125+
126+
/**
127+
* Returns a {@link Decision} whether the given replica shard can be
128+
* allocated to the given node when there is an existing retention lease
129+
* already existing on the node (meaning it has been allocated there previously)
130+
*
131+
* This method does not actually check whether there is a retention lease,
132+
* that is the responsibility of the caller.
133+
*
134+
* It defaults to the same value as {@code canAllocate}.
135+
*/
136+
public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node,
137+
RoutingAllocation allocation) {
138+
return canAllocate(shardRouting, node, allocation);
139+
}
125140
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDeciders.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,28 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
231231
return ret;
232232
}
233233

234+
@Override
235+
public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
236+
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
237+
return Decision.NO;
238+
}
239+
Decision.Multi ret = new Decision.Multi();
240+
for (AllocationDecider allocationDecider : allocations) {
241+
Decision decision = allocationDecider.canAllocateReplicaWhenThereIsRetentionLease(shardRouting, node, allocation);
242+
// short track if a NO is returned.
243+
if (decision.type() == Decision.Type.NO) {
244+
if (allocation.debugDecision() == false) {
245+
return Decision.NO;
246+
} else {
247+
ret.add(decision);
248+
}
249+
} else {
250+
addDecision(ret, decision, allocation);
251+
}
252+
}
253+
return ret;
254+
}
255+
234256
private void addDecision(Decision.Multi ret, Decision decision, RoutingAllocation allocation) {
235257
// We never add ALWAYS decisions and only add YES decisions when requested by debug mode (since Multi default is YES).
236258
if (decision != Decision.ALWAYS

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeReplacementAllocationDecider.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,17 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
9797
}
9898
}
9999

100+
@Override
101+
public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
102+
if (isReplacementTargetName(allocation, node.node().getName())) {
103+
return Decision.single(Decision.Type.YES, NAME,
104+
"node [%s] is a node replacement target and can have a previously allocated replica re-allocated to it",
105+
node.nodeId());
106+
} else {
107+
return canAllocate(shardRouting, node, allocation);
108+
}
109+
}
110+
100111
/**
101112
* Returns true if there are any node replacements ongoing in the cluster
102113
*/

server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
188188
} else if (matchingNodes.getNodeWithHighestMatch() != null) {
189189
RoutingNode nodeWithHighestMatch = allocation.routingNodes().node(matchingNodes.getNodeWithHighestMatch().getId());
190190
// we only check on THROTTLE since we checked before on NO
191-
Decision decision = allocation.deciders().canAllocate(unassignedShard, nodeWithHighestMatch, allocation);
191+
Decision decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(unassignedShard,
192+
nodeWithHighestMatch, allocation);
192193
if (decision.type() == Decision.Type.THROTTLE) {
193194
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store",
194195
unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeWithHighestMatch.node());
@@ -245,7 +246,7 @@ public static Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocatedT
245246
}
246247
// if we can't allocate it on a node, ignore it, for example, this handles
247248
// cases for only allocating a replica after a primary
248-
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
249+
Decision decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(shard, node, allocation);
249250
if (decision.type() == Decision.Type.YES && madeDecision.type() != Decision.Type.YES) {
250251
if (explain) {
251252
madeDecision = decision;
@@ -317,17 +318,26 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
317318
continue;
318319
}
319320

320-
// check if we can allocate on that node...
321-
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
322-
// then we will try and assign it next time
323-
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
321+
// Check whether we have existing data for the replica
322+
final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(discoNode);
323+
final Decision decision;
324+
if (retainingSeqNoForReplica == -1) {
325+
// There is no existing replica data on the node
326+
decision = allocation.deciders().canAllocate(shard, node, allocation);
327+
} else {
328+
// There is existing replica data on the node
329+
decision = allocation.deciders().canAllocateReplicaWhenThereIsRetentionLease(shard, node, allocation);
330+
}
331+
324332
MatchingNode matchingNode = null;
325333
if (explain) {
326334
matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetadata);
327335
ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingNode.matchingBytes);
328336
nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision));
329337
}
330338

339+
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
340+
// then we will try and assign it next time
331341
if (decision.type() == Decision.Type.NO) {
332342
continue;
333343
}

x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,48 @@ public void testNodeReplacementOnlyToTarget() throws Exception {
439439
});
440440
}
441441

442+
public void testReallocationForReplicaDuringNodeReplace() throws Exception {
443+
final String nodeA = internalCluster().startNode();
444+
final String nodeAId = getNodeId(nodeA);
445+
createIndex("myindex", Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build());
446+
ensureYellow("myindex");
447+
448+
// Start a second node, so the replica will be on nodeB
449+
final String nodeB = internalCluster().startNode();
450+
ensureGreen("myindex");
451+
452+
final String nodeC = internalCluster().startNode();
453+
454+
// Register a replace for nodeA, with nodeC as the target
455+
PutShutdownNodeAction.Request shutdownRequest = new PutShutdownNodeAction.Request(
456+
nodeAId,
457+
SingleNodeShutdownMetadata.Type.REPLACE,
458+
"testing",
459+
null,
460+
nodeC
461+
);
462+
client().execute(PutShutdownNodeAction.INSTANCE, shutdownRequest).get();
463+
464+
// Wait for the node replace shutdown to be complete
465+
assertBusy(() -> {
466+
GetShutdownStatusAction.Response shutdownStatus = client().execute(
467+
GetShutdownStatusAction.INSTANCE,
468+
new GetShutdownStatusAction.Request(nodeAId)
469+
).get();
470+
assertThat(shutdownStatus.getShutdownStatuses().get(0).migrationStatus().getStatus(), equalTo(COMPLETE));
471+
});
472+
473+
// Remove nodeA from the cluster (it's been terminated)
474+
internalCluster().stopNode(nodeA);
475+
476+
// Restart nodeC, the replica on nodeB will be flipped to primary and
477+
// when nodeC comes back up, it should have the replica assigned to it
478+
internalCluster().restartNode(nodeC);
479+
480+
// All shards for the index should be allocated
481+
ensureGreen("myindex");
482+
}
483+
442484
private void indexRandomData() throws Exception {
443485
int numDocs = scaledRandomIntBetween(100, 1000);
444486
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];

0 commit comments

Comments
 (0)