diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index a4bb6a559254c..9baa47fbc2600 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -54,6 +54,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; @@ -191,6 +192,7 @@ public static Collection createAllocationDeciders(Settings se addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new NodeVersionAllocationDecider(settings)); addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider(settings)); + addAllocationDecider(deciders, new RestoreInProgressAllocationDecider(settings)); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings)); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 774e4b9301ca4..d79237b8a65c0 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -48,6 +48,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -135,13 +137,14 @@ protected ClusterState buildResultAndLogHealthChange(ClusterState oldState, Rout return newState; } + // Used for testing public ClusterState applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { - return applyFailedShards(clusterState, Collections.singletonList(new FailedShard(failedShard, null, null)), - Collections.emptyList()); + return applyFailedShards(clusterState, singletonList(new FailedShard(failedShard, null, null)), emptyList()); } + // Used for testing public ClusterState applyFailedShards(ClusterState clusterState, List failedShards) { - return applyFailedShards(clusterState, failedShards, Collections.emptyList()); + return applyFailedShards(clusterState, failedShards, emptyList()); } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java new file mode 100644 index 0000000000000..3fefd4e0abba4 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDecider.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.snapshots.Snapshot; + +/** + * This {@link AllocationDecider} prevents shards that have failed to be + * restored from a snapshot to be allocated. + */ +public class RestoreInProgressAllocationDecider extends AllocationDecider { + + public static final String NAME = "restore_in_progress"; + + /** + * Creates a new {@link RestoreInProgressAllocationDecider} instance from + * given settings + * + * @param settings {@link Settings} to use + */ + public RestoreInProgressAllocationDecider(Settings settings) { + super(settings); + } + + @Override + public Decision canAllocate(final ShardRouting shardRouting, final RoutingNode node, final RoutingAllocation allocation) { + return canAllocate(shardRouting, allocation); + } + + @Override + public Decision canAllocate(final ShardRouting shardRouting, final RoutingAllocation allocation) { + final RecoverySource recoverySource = shardRouting.recoverySource(); + if (recoverySource == null || recoverySource.getType() != RecoverySource.Type.SNAPSHOT) { + return allocation.decision(Decision.YES, NAME, "ignored as shard is not being recovered from a snapshot"); + } + + final Snapshot snapshot = ((RecoverySource.SnapshotRecoverySource) recoverySource).snapshot(); + final RestoreInProgress restoresInProgress = allocation.custom(RestoreInProgress.TYPE); + + if (restoresInProgress != null) { + for (RestoreInProgress.Entry restoreInProgress : restoresInProgress.entries()) { + if (restoreInProgress.snapshot().equals(snapshot)) { + RestoreInProgress.ShardRestoreStatus shardRestoreStatus = restoreInProgress.shards().get(shardRouting.shardId()); + if (shardRestoreStatus != null && shardRestoreStatus.state().completed() == false) { + assert shardRestoreStatus.state() != RestoreInProgress.State.SUCCESS : "expected shard [" + shardRouting + + "] to be in initializing state but got [" + shardRestoreStatus.state() + "]"; + return allocation.decision(Decision.YES, NAME, "shard is currently being restored"); + } + break; + } + } + } + return allocation.decision(Decision.NO, NAME, "shard has failed to be restored from the snapshot [%s] because of [%s] - " + + "manually close or delete the index [%s] in order to retry to restore the snapshot again or use the reroute API to force the " + + "allocation of an empty primary shard", snapshot, shardRouting.unassignedInfo().getDetails(), shardRouting.getIndexName()); + } + + @Override + public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + assert shardRouting.primary() : "must not call canForceAllocatePrimary on a non-primary shard " + shardRouting; + return canAllocate(shardRouting, node, allocation); + } +} diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index a7d68a8197bc4..63b461afbd747 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -64,7 +64,6 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; @@ -534,7 +533,7 @@ public void shardStarted(ShardRouting initializingShard, ShardRouting startedSha RecoverySource recoverySource = initializingShard.recoverySource(); if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); - changes(snapshot).startedShards.put(initializingShard.shardId(), + changes(snapshot).shards.put(initializingShard.shardId(), new ShardRestoreStatus(initializingShard.currentNodeId(), RestoreInProgress.State.SUCCESS)); } } @@ -550,7 +549,7 @@ public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) // to restore this shard on another node if the snapshot files are corrupt. In case where a node just left or crashed, // however, we only want to acknowledge the restore operation once it has been successfully restored on another node. if (unassignedInfo.getFailure() != null && Lucene.isCorruptionException(unassignedInfo.getFailure().getCause())) { - changes(snapshot).failedShards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(), + changes(snapshot).shards.put(failedShard.shardId(), new ShardRestoreStatus(failedShard.currentNodeId(), RestoreInProgress.State.FAILURE, unassignedInfo.getFailure().getCause().getMessage())); } } @@ -563,11 +562,24 @@ public void shardInitialized(ShardRouting unassignedShard, ShardRouting initiali if (unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT && initializedShard.recoverySource().getType() != RecoverySource.Type.SNAPSHOT) { Snapshot snapshot = ((SnapshotRecoverySource) unassignedShard.recoverySource()).snapshot(); - changes(snapshot).failedShards.put(unassignedShard.shardId(), new ShardRestoreStatus(null, + changes(snapshot).shards.put(unassignedShard.shardId(), new ShardRestoreStatus(null, RestoreInProgress.State.FAILURE, "recovery source type changed from snapshot to " + initializedShard.recoverySource())); } } + @Override + public void unassignedInfoUpdated(ShardRouting unassignedShard, UnassignedInfo newUnassignedInfo) { + RecoverySource recoverySource = unassignedShard.recoverySource(); + if (recoverySource.getType() == RecoverySource.Type.SNAPSHOT) { + if (newUnassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.DECIDERS_NO) { + Snapshot snapshot = ((SnapshotRecoverySource) recoverySource).snapshot(); + String reason = "shard could not be allocated to any of the nodes"; + changes(snapshot).shards.put(unassignedShard.shardId(), + new ShardRestoreStatus(unassignedShard.currentNodeId(), RestoreInProgress.State.FAILURE, reason)); + } + } + } + /** * Helper method that creates update entry for the given shard id if such an entry does not exist yet. */ @@ -576,25 +588,21 @@ private Updates changes(Snapshot snapshot) { } private static class Updates { - private Map failedShards = new HashMap<>(); - private Map startedShards = new HashMap<>(); + private Map shards = new HashMap<>(); } - public RestoreInProgress applyChanges(RestoreInProgress oldRestore) { + public RestoreInProgress applyChanges(final RestoreInProgress oldRestore) { if (shardChanges.isEmpty() == false) { final List entries = new ArrayList<>(); for (RestoreInProgress.Entry entry : oldRestore.entries()) { Snapshot snapshot = entry.snapshot(); Updates updates = shardChanges.get(snapshot); - assert Sets.haveEmptyIntersection(updates.startedShards.keySet(), updates.failedShards.keySet()); - if (updates.startedShards.isEmpty() == false || updates.failedShards.isEmpty() == false) { + if (updates.shards.isEmpty() == false) { ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(entry.shards()); - for (Map.Entry startedShardEntry : updates.startedShards.entrySet()) { - shardsBuilder.put(startedShardEntry.getKey(), startedShardEntry.getValue()); - } - for (Map.Entry failedShardEntry : updates.failedShards.entrySet()) { - shardsBuilder.put(failedShardEntry.getKey(), failedShardEntry.getValue()); + for (Map.Entry shard : updates.shards.entrySet()) { + shardsBuilder.put(shard.getKey(), shard.getValue()); } + ImmutableOpenMap shards = shardsBuilder.build(); RestoreInProgress.State newState = overallState(RestoreInProgress.State.STARTED, shards); entries.add(new RestoreInProgress.Entry(entry.snapshot(), newState, entry.indices(), shards)); @@ -607,7 +615,6 @@ public RestoreInProgress applyChanges(RestoreInProgress oldRestore) { return oldRestore; } } - } public static RestoreInProgress.Entry restoreInProgress(ClusterState state, Snapshot snapshot) { diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java index 6fd3d66c8f81b..176616690f0aa 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterModuleTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ResizeAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; @@ -183,6 +184,7 @@ public void testAllocationDeciderOrder() { EnableAllocationDecider.class, NodeVersionAllocationDecider.class, SnapshotInProgressAllocationDecider.class, + RestoreInProgressAllocationDecider.class, FilterAllocationDecider.class, SameShardAllocationDecider.class, DiskThresholdDecider.class, diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java index 5cafe410d56bd..8be4c858655d2 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -38,6 +39,7 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -46,7 +48,10 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.gateway.TestGatewayAllocator; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -309,6 +314,8 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat DiscoveryNode node1 = newNode("node1"); MetaData.Builder metaDataBuilder = new MetaData.Builder(metaData); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Snapshot snapshot = new Snapshot("repo", new SnapshotId("snap", "randomId")); + Set snapshotIndices = new HashSet<>(); for (ObjectCursor cursor: metaData.indices().values()) { Index index = cursor.value.getIndex(); IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(cursor.value); @@ -329,14 +336,14 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat routingTableBuilder.addAsFromDangling(indexMetaData); break; case 3: + snapshotIndices.add(index.getName()); routingTableBuilder.addAsNewRestore(indexMetaData, - new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT, - indexMetaData.getIndex().getName()), new IntHashSet()); + new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName()), new IntHashSet()); break; case 4: + snapshotIndices.add(index.getName()); routingTableBuilder.addAsRestore(indexMetaData, - new SnapshotRecoverySource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT, - indexMetaData.getIndex().getName())); + new SnapshotRecoverySource(snapshot, Version.CURRENT, indexMetaData.getIndex().getName())); break; case 5: routingTableBuilder.addAsNew(indexMetaData); @@ -345,10 +352,31 @@ private ClusterState createRecoveryStateAndInitalizeAllocations(MetaData metaDat throw new IndexOutOfBoundsException(); } } + + final RoutingTable routingTable = routingTableBuilder.build(); + + final ImmutableOpenMap.Builder restores = ImmutableOpenMap.builder(); + if (snapshotIndices.isEmpty() == false) { + // Some indices are restored from snapshot, the RestoreInProgress must be set accordingly + ImmutableOpenMap.Builder restoreShards = ImmutableOpenMap.builder(); + for (ShardRouting shard : routingTable.allShards()) { + if (shard.primary() && shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT) { + ShardId shardId = shard.shardId(); + restoreShards.put(shardId, new RestoreInProgress.ShardRestoreStatus(node1.getId(), RestoreInProgress.State.INIT)); + } + } + + RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, RestoreInProgress.State.INIT, + new ArrayList<>(snapshotIndices), restoreShards.build()); + restores.put(RestoreInProgress.TYPE, new RestoreInProgress(restore)); + } + return ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .nodes(DiscoveryNodes.builder().add(node1)) .metaData(metaDataBuilder.build()) - .routingTable(routingTableBuilder.build()).build(); + .routingTable(routingTable) + .customs(restores.build()) + .build(); } private void addInSyncAllocationIds(Index index, IndexMetaData.Builder indexMetaData, diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java new file mode 100644 index 0000000000000..49d69272af629 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -0,0 +1,208 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation.decider; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.IOException; +import java.util.Collections; + +import static java.util.Collections.singletonList; + +/** + * Test {@link RestoreInProgressAllocationDecider} + */ +public class RestoreInProgressAllocationDeciderTests extends ESAllocationTestCase { + + public void testCanAllocatePrimary() { + ClusterState clusterState = createInitialClusterState(); + ShardRouting shard; + if (randomBoolean()) { + shard = clusterState.getRoutingTable().shardRoutingTable("test", 0).primaryShard(); + assertEquals(RecoverySource.Type.EMPTY_STORE, shard.recoverySource().getType()); + } else { + shard = clusterState.getRoutingTable().shardRoutingTable("test", 0).replicaShards().get(0); + assertEquals(RecoverySource.Type.PEER, shard.recoverySource().getType()); + } + + final Decision decision = executeAllocation(clusterState, shard); + assertEquals(Decision.Type.YES, decision.type()); + assertEquals("ignored as shard is not being recovered from a snapshot", decision.getExplanation()); + } + + public void testCannotAllocatePrimaryMissingInRestoreInProgress() { + ClusterState clusterState = createInitialClusterState(); + RoutingTable routingTable = RoutingTable.builder(clusterState.getRoutingTable()) + .addAsRestore(clusterState.getMetaData().index("test"), createSnapshotRecoverySource("_missing")) + .build(); + + clusterState = ClusterState.builder(clusterState) + .routingTable(routingTable) + .build(); + + ShardRouting primary = clusterState.getRoutingTable().shardRoutingTable("test", 0).primaryShard(); + assertEquals(ShardRoutingState.UNASSIGNED, primary.state()); + assertEquals(RecoverySource.Type.SNAPSHOT, primary.recoverySource().getType()); + + final Decision decision = executeAllocation(clusterState, primary); + assertEquals(Decision.Type.NO, decision.type()); + assertEquals("shard has failed to be restored from the snapshot [_repository:_missing/_uuid] because of " + + "[restore_source[_repository/_missing]] - manually close or delete the index [test] in order to retry to restore " + + "the snapshot again or use the reroute API to force the allocation of an empty primary shard", decision.getExplanation()); + } + + public void testCanAllocatePrimaryExistingInRestoreInProgress() { + RecoverySource.SnapshotRecoverySource recoverySource = createSnapshotRecoverySource("_existing"); + + ClusterState clusterState = createInitialClusterState(); + RoutingTable routingTable = RoutingTable.builder(clusterState.getRoutingTable()) + .addAsRestore(clusterState.getMetaData().index("test"), recoverySource) + .build(); + + clusterState = ClusterState.builder(clusterState) + .routingTable(routingTable) + .build(); + + ShardRouting primary = clusterState.getRoutingTable().shardRoutingTable("test", 0).primaryShard(); + assertEquals(ShardRoutingState.UNASSIGNED, primary.state()); + assertEquals(RecoverySource.Type.SNAPSHOT, primary.recoverySource().getType()); + + routingTable = clusterState.routingTable(); + + final RestoreInProgress.State shardState; + if (randomBoolean()) { + shardState = randomFrom(RestoreInProgress.State.STARTED, RestoreInProgress.State.INIT); + } else { + shardState = RestoreInProgress.State.FAILURE; + + UnassignedInfo currentInfo = primary.unassignedInfo(); + UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"), + currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), + currentInfo.getUnassignedTimeInMillis(), currentInfo.isDelayed(), currentInfo.getLastAllocationStatus()); + primary = primary.updateUnassigned(newInfo, primary.recoverySource()); + + IndexRoutingTable indexRoutingTable = routingTable.index("test"); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + newIndexRoutingTable.addShard(primary); + } else { + newIndexRoutingTable.addShard(shardRouting); + } + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + } + + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + shards.put(primary.shardId(), new RestoreInProgress.ShardRestoreStatus(clusterState.getNodes().getLocalNodeId(), shardState)); + + Snapshot snapshot = recoverySource.snapshot(); + RestoreInProgress.State restoreState = RestoreInProgress.State.STARTED; + RestoreInProgress.Entry restore = new RestoreInProgress.Entry(snapshot, restoreState, singletonList("test"), shards.build()); + + clusterState = ClusterState.builder(clusterState) + .putCustom(RestoreInProgress.TYPE, new RestoreInProgress(restore)) + .routingTable(routingTable) + .build(); + + Decision decision = executeAllocation(clusterState, primary); + if (shardState == RestoreInProgress.State.FAILURE) { + assertEquals(Decision.Type.NO, decision.type()); + assertEquals("shard has failed to be restored from the snapshot [_repository:_existing/_uuid] because of " + + "[restore_source[_repository/_existing], failure IOException[i/o failure]] - manually close or delete the index " + + "[test] in order to retry to restore the snapshot again or use the reroute API to force the allocation of " + + "an empty primary shard", decision.getExplanation()); + } else { + assertEquals(Decision.Type.YES, decision.type()); + assertEquals("shard is currently being restored", decision.getExplanation()); + } + } + + private ClusterState createInitialClusterState() { + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(newNode("master", Collections.singleton(DiscoveryNode.Role.MASTER))) + .localNodeId("master") + .masterNodeId("master") + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(discoveryNodes) + .build(); + + assertEquals(2, clusterState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size()); + return clusterState; + } + + private Decision executeAllocation(final ClusterState clusterState, final ShardRouting shardRouting) { + final AllocationDecider decider = new RestoreInProgressAllocationDecider(Settings.EMPTY); + final RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.singleton(decider)), + clusterState.getRoutingNodes(), clusterState, null, 0L); + allocation.debugDecision(true); + + final Decision decision; + if (randomBoolean()) { + decision = decider.canAllocate(shardRouting, allocation); + } else { + DiscoveryNode node = clusterState.getNodes().getMasterNode(); + decision = decider.canAllocate(shardRouting, new RoutingNode(node.getId(), node), allocation); + } + return decision; + } + + private RecoverySource.SnapshotRecoverySource createSnapshotRecoverySource(final String snapshotName) { + Snapshot snapshot = new Snapshot("_repository", new SnapshotId(snapshotName, "_uuid")); + return new RecoverySource.SnapshotRecoverySource(snapshot, Version.CURRENT, "test"); + } +} diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index fa3920c1e8d1c..3f9c80f3ffad4 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -46,6 +46,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.Entry; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; @@ -55,6 +56,10 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -97,14 +102,15 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING; -import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist; @@ -117,9 +123,11 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; @@ -824,6 +832,8 @@ public void testDataFileFailureDuringRestore() throws Exception { prepareCreate("test-idx").setSettings(Settings.builder().put("index.allocation.max_retries", Integer.MAX_VALUE)).get(); ensureGreen(); + final NumShards numShards = getNumShards("test-idx"); + logger.info("--> indexing some data"); for (int i = 0; i < 100; i++) { index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); @@ -848,14 +858,31 @@ public void testDataFileFailureDuringRestore() throws Exception { logger.info("--> delete index"); cluster().wipeIndices("test-idx"); logger.info("--> restore index after deletion"); - RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - SearchResponse countResponse = client.prepareSearch("test-idx").setSize(0).get(); - assertThat(countResponse.getHits().getTotalHits(), equalTo(100L)); + final RestoreSnapshotResponse restoreResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true) + .get(); + logger.info("--> total number of simulated failures during restore: [{}]", getFailureCount("test-repo")); + final RestoreInfo restoreInfo = restoreResponse.getRestoreInfo(); + assertThat(restoreInfo.totalShards(), equalTo(numShards.numPrimaries)); + + if (restoreInfo.successfulShards() == restoreInfo.totalShards()) { + // All shards were restored, we must find the exact number of hits + assertHitCount(client.prepareSearch("test-idx").setSize(0).get(), 100L); + } else { + // One or more shards failed to be restored. This can happen when there is + // only 1 data node: a shard failed because of the random IO exceptions + // during restore and then we don't allow the shard to be assigned on the + // same node again during the same reroute operation. Then another reroute + // operation is scheduled, but the RestoreInProgressAllocationDecider will + // block the shard to be assigned again because it failed during restore. + final ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().get(); + assertEquals(1, clusterStateResponse.getState().getNodes().getDataNodes().size()); + assertEquals(restoreInfo.failedShards(), + clusterStateResponse.getState().getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED).size()); + } } - @TestLogging("org.elasticsearch.cluster.routing:TRACE,org.elasticsearch.snapshots:TRACE") public void testDataFileCorruptionDuringRestore() throws Exception { Path repositoryLocation = randomRepoPath(); Client client = client(); @@ -907,6 +934,155 @@ public void testDataFileCorruptionDuringRestore() throws Exception { cluster().wipeIndices("test-idx"); } + /** + * Test that restoring a snapshot whose files can't be downloaded at all is not stuck or + * does not hang indefinitely. + */ + public void testUnrestorableFilesDuringRestore() throws Exception { + final String indexName = "unrestorable-files"; + final int maxRetries = randomIntBetween(1, 10); + + Settings createIndexSettings = Settings.builder().put(SETTING_ALLOCATION_MAX_RETRY.getKey(), maxRetries).build(); + + Settings repositorySettings = Settings.builder() + .put("random", randomAlphaOfLength(10)) + .put("max_failure_number", 10000000L) + // No lucene corruptions, we want to test retries + .put("use_lucene_corruption", false) + // Restoring a file will never complete + .put("random_data_file_io_exception_rate", 1.0) + .build(); + + Consumer checkUnassignedInfo = unassignedInfo -> { + assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); + assertThat(unassignedInfo.getNumFailedAllocations(), anyOf(equalTo(maxRetries), equalTo(1))); + }; + + unrestorableUseCase(indexName, createIndexSettings, repositorySettings, Settings.EMPTY, checkUnassignedInfo, () -> {}); + } + + /** + * Test that restoring an index with shard allocation filtering settings that prevents + * its allocation does not hang indefinitely. + */ + public void testUnrestorableIndexDuringRestore() throws Exception { + final String indexName = "unrestorable-index"; + Settings restoreIndexSettings = Settings.builder().put("index.routing.allocation.include._name", randomAlphaOfLength(5)).build(); + + Consumer checkUnassignedInfo = unassignedInfo -> { + assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.NEW_INDEX_RESTORED)); + }; + + Runnable fixupAction =() -> { + // remove the shard allocation filtering settings and use the Reroute API to retry the failed shards + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder() + .putNull("index.routing.allocation.include._name") + .build())); + assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true)); + }; + + unrestorableUseCase(indexName, Settings.EMPTY, Settings.EMPTY, restoreIndexSettings, checkUnassignedInfo, fixupAction); + } + + /** Execute the unrestorable test use case **/ + private void unrestorableUseCase(final String indexName, + final Settings createIndexSettings, + final Settings repositorySettings, + final Settings restoreIndexSettings, + final Consumer checkUnassignedInfo, + final Runnable fixUpAction) throws Exception { + // create a test repository + final Path repositoryLocation = randomRepoPath(); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("fs") + .setSettings(Settings.builder().put("location", repositoryLocation))); + // create a test index + assertAcked(prepareCreate(indexName, Settings.builder().put(createIndexSettings))); + + // index some documents + final int nbDocs = scaledRandomIntBetween(10, 100); + for (int i = 0; i < nbDocs; i++) { + index(indexName, "doc", Integer.toString(i), "foo", "bar" + i); + } + flushAndRefresh(indexName); + assertThat(client().prepareSearch(indexName).setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs)); + + // create a snapshot + final NumShards numShards = getNumShards(indexName); + CreateSnapshotResponse snapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true) + .setIndices(indexName) + .get(); + + assertThat(snapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotResponse.getSnapshotInfo().successfulShards(), equalTo(numShards.numPrimaries)); + assertThat(snapshotResponse.getSnapshotInfo().failedShards(), equalTo(0)); + + // delete the test index + assertAcked(client().admin().indices().prepareDelete(indexName)); + + // update the test repository + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("mock") + .setSettings(Settings.builder() + .put("location", repositoryLocation) + .put(repositorySettings) + .build())); + + // attempt to restore the snapshot with the given settings + RestoreSnapshotResponse restoreResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") + .setIndices(indexName) + .setIndexSettings(restoreIndexSettings) + .setWaitForCompletion(true) + .get(); + + // check that all shards failed during restore + assertThat(restoreResponse.getRestoreInfo().totalShards(), equalTo(numShards.numPrimaries)); + assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(0)); + + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setCustoms(true).setRoutingTable(true).get(); + + // check that there is no restore in progress + RestoreInProgress restoreInProgress = clusterStateResponse.getState().custom(RestoreInProgress.TYPE); + assertNotNull("RestoreInProgress must be not null", restoreInProgress); + assertThat("RestoreInProgress must be empty", restoreInProgress.entries(), hasSize(0)); + + // check that the shards have been created but are not assigned + assertThat(clusterStateResponse.getState().getRoutingTable().allShards(indexName), hasSize(numShards.totalNumShards)); + + // check that every primary shard is unassigned + for (ShardRouting shard : clusterStateResponse.getState().getRoutingTable().allShards(indexName)) { + if (shard.primary()) { + assertThat(shard.state(), equalTo(ShardRoutingState.UNASSIGNED)); + assertThat(shard.recoverySource().getType(), equalTo(RecoverySource.Type.SNAPSHOT)); + assertThat(shard.unassignedInfo().getLastAllocationStatus(), equalTo(UnassignedInfo.AllocationStatus.DECIDERS_NO)); + checkUnassignedInfo.accept(shard.unassignedInfo()); + } + } + + // update the test repository in order to make it work + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("fs") + .setSettings(Settings.builder().put("location", repositoryLocation))); + + // execute action to eventually fix the situation + fixUpAction.run(); + + // delete the index and restore again + assertAcked(client().admin().indices().prepareDelete(indexName)); + + restoreResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get(); + assertThat(restoreResponse.getRestoreInfo().totalShards(), equalTo(numShards.numPrimaries)); + assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(numShards.numPrimaries)); + + // Wait for the shards to be assigned + ensureGreen(indexName); + refresh(indexName); + + assertThat(client().prepareSearch(indexName).setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs)); + } + public void testDeletionOfFailingToRecoverIndexShouldStopRestore() throws Exception { Path repositoryLocation = randomRepoPath(); Client client = client();