Skip to content

Commit c150ac5

Browse files
committed
Fail restore when the shard allocations max retries count is reached (#27493)
This commit changes the RestoreService so that it now fails the snapshot restore if one of the shards to restore has failed to be allocated. It also adds a new RestoreInProgressAllocationDecider that forbids such shards to be allocated again. This way, when a restore is impossible or failed too many times, the user is forced to take a manual action (like deleting the index which failed shards) in order to try to restore it again. This behaviour has been implemented because when the allocation of a shard has been retried too many times, the MaxRetryDecider is engaged to prevent any future allocation of the failed shard. If it happens while restoring a snapshot, the restore hanged and was never completed because it stayed around waiting for the shards to be assigned (and that won't happen). It also blocked future attempts to restore the snapshot again. With this commit, the restore does not hang and is marked as failed, leaving failed shards around for investigation. This is the second part of the #26865 issue. Closes #26865
1 parent 04c47bc commit c150ac5

File tree

8 files changed

+541
-28
lines changed

8 files changed

+541
-28
lines changed

core/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
5555
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
5656
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
57+
import org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
5758
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
5859
import org.elasticsearch.cluster.service.ClusterService;
5960
import org.elasticsearch.common.ParseField;
@@ -191,6 +192,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(Settings se
191192
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
192193
addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings));
193194
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider(settings));
195+
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider(settings));
194196
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
195197
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
196198
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));

core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import java.util.function.Function;
4949
import java.util.stream.Collectors;
5050

51+
import static java.util.Collections.emptyList;
52+
import static java.util.Collections.singletonList;
5153
import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
5254

5355

@@ -135,13 +137,14 @@ protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, Rout
135137
return newState;
136138
}
137139

140+
// Used for testing
138141
public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
139-
return applyFailedShards(clusterState, Collections.singletonList(new FailedShard(failedShard, null, null)),
140-
Collections.emptyList());
142+
return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null)), emptyList());
141143
}
142144

145+
// Used for testing
143146
public ClusterState applyFailedShards(ClusterState clusterState, List<FailedShard> failedShards) {
144-
return applyFailedShards(clusterState, failedShards, Collections.emptyList());
147+
return applyFailedShards(clusterState, failedShards, emptyList());
145148
}
146149

147150
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster.routing.allocation.decider;
21+
22+
import org.elasticsearch.cluster.RestoreInProgress;
23+
import org.elasticsearch.cluster.routing.RecoverySource;
24+
import org.elasticsearch.cluster.routing.RoutingNode;
25+
import org.elasticsearch.cluster.routing.ShardRouting;
26+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
27+
import org.elasticsearch.common.settings.Settings;
28+
import org.elasticsearch.snapshots.Snapshot;
29+
30+
/**
31+
* This {@link AllocationDecider} prevents shards that have failed to be
32+
* restored from a snapshot to be allocated.
33+
*/
34+
public class RestoreInProgressAllocationDecider extends AllocationDecider {
35+
36+
public static final String NAME = "restore_in_progress";
37+
38+
/**
39+
* Creates a new {@link RestoreInProgressAllocationDecider} instance from
40+
* given settings
41+
*
42+
* @param settings {@link Settings} to use
43+
*/
44+
public RestoreInProgressAllocationDecider(Settings settings) {
45+
super(settings);
46+
}
47+
48+
@Override
49+
public Decision canAllocate(final ShardRouting shardRouting, final RoutingNode node, final RoutingAllocation allocation) {
50+
return canAllocate(shardRouting, allocation);
51+
}
52+
53+
@Override
54+
public Decision canAllocate(final ShardRouting shardRouting, final RoutingAllocation allocation) {
55+
final RecoverySource recoverySource = shardRouting.recoverySource();
56+
if (recoverySource == null || recoverySource.getType() != RecoverySource.Type.SNAPSHOT) {
57+
return allocation.decision(Decision.YES, NAME, "ignored as shard is not being recovered from a snapshot");
58+
}
59+
60+
final Snapshot snapshot = ((RecoverySource.SnapshotRecoverySource) recoverySource).snapshot();
61+
final RestoreInProgress restoresInProgress = allocation.custom(RestoreInProgress.TYPE);
62+
63+
if (restoresInProgress != null) {
64+
for (RestoreInProgress.Entry restoreInProgress : restoresInProgress.entries()) {
65+
if (restoreInProgress.snapshot().equals(snapshot)) {
66+
RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId());
67+
if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) {
68+
assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting
69+
+ "] to be in initializing state but got [" + shardRestoreStatus.state() + "]";
70+
return allocation.decision(Decision.YES, NAME, "shard is currently being restored");
71+
}
72+
break;
73+
}
74+
}
75+
}
76+
return allocation.decision(Decision.NO, NAME, "shard has failed to be restored from the snapshot [%s] because of [%s] - " +
77+
"manually close or delete the index [%s] in order to retry to restore the snapshot again or use the reroute API to force the " +
78+
"allocation of an empty primary shard", snapshot, shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName());
79+
}
80+
81+
@Override
82+
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
83+
assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting;
84+
return canAllocate(shardRouting, node, allocation);
85+
}
86+
}

core/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import org.elasticsearch.common.settings.ClusterSettings;
6565
import org.elasticsearch.common.settings.Settings;
6666
import org.elasticsearch.common.unit.TimeValue;
67-
import org.elasticsearch.common.util.set.Sets;
6867
import org.elasticsearch.index.Index;
6968
import org.elasticsearch.index.shard.IndexShard;
7069
import org.elasticsearch.index.shard.ShardId;
@@ -535,7 +534,7 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha
535534
RecoverySource recoverySource = initializingShard.recoverySource();
536535
if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
537536
Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot();
538-
changes(snapshot).startedShards.put(initializingShard.shardId(),
537+
changes(snapshot).shards.put(initializingShard.shardId(),
539538
new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS));
540539
}
541540
}
@@ -551,7 +550,7 @@ public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo)
551550
// to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed,
552551
// however, we only want to acknowledge the restore operation once it has been successfully restored on another node.
553552
if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) {
554-
changes(snapshot).failedShards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(),
553+
changes(snapshot).shards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(),
555554
RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage()));
556555
}
557556
}
@@ -564,11 +563,24 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali
564563
if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT &&
565564
initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) {
566565
Snapshot snapshot = ((SnapshotRecoverySource) unassignedShard.recoverySource()).snapshot();
567-
changes(snapshot).failedShards.put(unassignedShard.shardId(), new ShardRestoreStatus(null,
566+
changes(snapshot).shards.put(unassignedShard.shardId(), new ShardRestoreStatus(null,
568567
RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource()));
569568
}
570569
}
571570

571+
@Override
572+
public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) {
573+
RecoverySource recoverySource = unassignedShard.recoverySource();
574+
if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) {
575+
if (newUnassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO) {
576+
Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot();
577+
String reason = "shard could not be allocated to any of the nodes";
578+
changes(snapshot).shards.put(unassignedShard.shardId(),
579+
new ShardRestoreStatus(unassignedShard.currentNodeId(), RestoreInProgress.State.FAILURE, reason));
580+
}
581+
}
582+
}
583+
572584
/**
573585
* Helper method that creates update entry for the given shard id if such an entry does not exist yet.
574586
*/
@@ -577,25 +589,21 @@ private Updates changes(Snapshot snapshot) {
577589
}
578590

579591
private static class Updates {
580-
private Map<ShardId, ShardRestoreStatus> failedShards = new HashMap<>();
581-
private Map<ShardId, ShardRestoreStatus> startedShards = new HashMap<>();
592+
private Map<ShardId, ShardRestoreStatus> shards = new HashMap<>();
582593
}
583594

584-
public RestoreInProgress applyChanges(RestoreInProgress oldRestore) {
595+
public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) {
585596
if (shardChanges.isEmpty() == false) {
586597
final List<RestoreInProgress.Entry> entries = new ArrayList<>();
587598
for (RestoreInProgress.Entry entry : oldRestore.entries()) {
588599
Snapshot snapshot = entry.snapshot();
589600
Updates updates = shardChanges.get(snapshot);
590-
assert Sets.haveEmptyIntersection(updates.startedShards.keySet(), updates.failedShards.keySet());
591-
if (updates.startedShards.isEmpty() == false || updates.failedShards.isEmpty() == false) {
601+
if (updates.shards.isEmpty() == false) {
592602
ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder(entry.shards());
593-
for (Map.Entry<ShardId, ShardRestoreStatus> startedShardEntry : updates.startedShards.entrySet()) {
594-
shardsBuilder.put(startedShardEntry.getKey(), startedShardEntry.getValue());
595-
}
596-
for (Map.Entry<ShardId, ShardRestoreStatus> failedShardEntry : updates.failedShards.entrySet()) {
597-
shardsBuilder.put(failedShardEntry.getKey(), failedShardEntry.getValue());
603+
for (Map.Entry<ShardId, ShardRestoreStatus> shard : updates.shards.entrySet()) {
604+
shardsBuilder.put(shard.getKey(), shard.getValue());
598605
}
606+
599607
ImmutableOpenMap<ShardId, ShardRestoreStatus> shards = shardsBuilder.build();
600608
RestoreInProgress.State newState = overallState(RestoreInProgress.State.STARTED, shards);
601609
entries.add(new RestoreInProgress.Entry(entry.snapshot(), newState, entry.indices(), shards));
@@ -608,7 +616,6 @@ public RestoreInProgress applyChanges(RestoreInProgress oldRestore) {
608616
return oldRestore;
609617
}
610618
}
611-
612619
}
613620

614621
public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Snapshot snapshot) {

core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider;
3636
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
3737
import org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider;
38+
import org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider;
3839
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
3940
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
4041
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
@@ -183,6 +184,7 @@ public void testAllocationDeciderOrder() {
183184
EnableAllocationDecider.class,
184185
NodeVersionAllocationDecider.class,
185186
SnapshotInProgressAllocationDecider.class,
187+
RestoreInProgressAllocationDecider.class,
186188
FilterAllocationDecider.class,
187189
SameShardAllocationDecider.class,
188190
DiskThresholdDecider.class,

core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.Version;
2626
import org.elasticsearch.cluster.ClusterState;
2727
import org.elasticsearch.cluster.ESAllocationTestCase;
28+
import org.elasticsearch.cluster.RestoreInProgress;
2829
import org.elasticsearch.cluster.metadata.IndexMetaData;
2930
import org.elasticsearch.cluster.metadata.MetaData;
3031
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -38,6 +39,7 @@
3839
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
3940
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
4041
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
42+
import org.elasticsearch.common.collect.ImmutableOpenMap;
4143
import org.elasticsearch.common.logging.Loggers;
4244
import org.elasticsearch.common.settings.Settings;
4345
import org.elasticsearch.index.Index;
@@ -46,7 +48,10 @@
4648
import org.elasticsearch.snapshots.SnapshotId;
4749
import org.elasticsearch.test.gateway.TestGatewayAllocator;
4850

51+
import java.util.ArrayList;
4952
import java.util.Collections;
53+
import java.util.HashSet;
54+
import java.util.Set;
5055

5156
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
5257
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
@@ -309,6 +314,8 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat
309314
DiscoveryNode node1 = newNode("node1");
310315
MetaData.Builder metaDataBuilder = new MetaData.Builder(metaData);
311316
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
317+
Snapshot snapshot = new Snapshot("repo", new SnapshotId("snap", "randomId"));
318+
Set<String> snapshotIndices = new HashSet<>();
312319
for (ObjectCursor<IndexMetaData> cursor: metaData.indices().values()) {
313320
Index index = cursor.value.getIndex();
314321
IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(cursor.value);
@@ -329,14 +336,14 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat
329336
routingTableBuilder.addAsFromDangling(indexMetaData);
330337
break;
331338
case 3:
339+
snapshotIndices.add(index.getName());
332340
routingTableBuilder.addAsNewRestore(indexMetaData,
333-
new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT,
334-
indexMetaData.getIndex().getName()), new IntHashSet());
341+
new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet());
335342
break;
336343
case 4:
344+
snapshotIndices.add(index.getName());
337345
routingTableBuilder.addAsRestore(indexMetaData,
338-
new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT,
339-
indexMetaData.getIndex().getName()));
346+
new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName()));
340347
break;
341348
case 5:
342349
routingTableBuilder.addAsNew(indexMetaData);
@@ -345,10 +352,31 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat
345352
throw new IndexOutOfBoundsException();
346353
}
347354
}
355+
356+
final RoutingTable routingTable = routingTableBuilder.build();
357+
358+
final ImmutableOpenMap.Builder<String, ClusterState.Custom> restores = ImmutableOpenMap.builder();
359+
if (snapshotIndices.isEmpty() == false) {
360+
// Some indices are restored from snapshot, the RestoreInProgress must be set accordingly
361+
ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> restoreShards = ImmutableOpenMap.builder();
362+
for (ShardRouting shard : routingTable.allShards()) {
363+
if (shard.primary() && shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) {
364+
ShardId shardId = shard.shardId();
365+
restoreShards.put(shardId, new RestoreInProgress.ShardRestoreStatus(node1.getId(), RestoreInProgress.State.INIT));
366+
}
367+
}
368+
369+
RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, RestoreInProgress.State.INIT,
370+
new ArrayList<>(snapshotIndices), restoreShards.build());
371+
restores.put(RestoreInProgress.TYPE, new RestoreInProgress(restore));
372+
}
373+
348374
return ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
349375
.nodes(DiscoveryNodes.builder().add(node1))
350376
.metaData(metaDataBuilder.build())
351-
.routingTable(routingTableBuilder.build()).build();
377+
.routingTable(routingTable)
378+
.customs(restores.build())
379+
.build();
352380
}
353381

354382
private void addInSyncAllocationIds(Index index, IndexMetaData.Builder indexMetaData,

0 commit comments

Comments
 (0)