Skip to content

Fix Incorrect Concurrent SnapshotException on Master Failover #54877

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,11 @@ public ClusterState execute(ClusterState currentState) {
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null && snapshots.entries().isEmpty() == false) {
// Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from a
// previous master that we can simply ignore and remove from the cluster state because we would clean it up from the
// cluster state anyway in #applyClusterState.
if (snapshots != null && snapshots.entries().stream().anyMatch(entry ->
(entry.state() == State.INIT && initializingSnapshots.contains(entry.snapshot()) == false) == false)) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
// Store newSnapshot here to be processed in clusterStateProcessed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
Expand Down Expand Up @@ -149,17 +148,7 @@ public void clusterChanged(ClusterChangedEvent event) {
ensureStableCluster(4, masterNode1);
logger.info("--> done");

try {
future.get();
} catch (Exception ex) {
Throwable cause = ex.getCause();
if (cause.getCause() instanceof ConcurrentSnapshotExecutionException) {
logger.info("--> got exception from race in master operation retries");
} else {
logger.info("--> got exception from hanged master", ex);
}
}

future.get();
assertAllSnapshotsCompleted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,8 @@ public void testSnapshotPrimaryRelocations() {
continueOrDie(createRepoAndIndex(repoName, index, shards),
createIndexResponse -> client().admin().cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener));

final StepListener<CreateSnapshotResponse> snapshotStartedListener = new StepListener<>();

continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> {
final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0);
final TestClusterNodes.TestClusterNode currentPrimaryNode = testClusterNodes.nodeById(shardToRelocate.currentNodeId());
Expand All @@ -734,11 +736,7 @@ public void run() {
scheduleNow(() -> testClusterNodes.stopNode(masterNode));
}
testClusterNodes.randomDataNodeSafe().client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.execute(ActionListener.wrap(() -> {
createdSnapshot.set(true);
testClusterNodes.randomDataNodeSafe().client.admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName), noopListener());
}));
.execute(snapshotStartedListener);
scheduleNow(
() -> testClusterNodes.randomMasterNodeSafe().client.admin().cluster().reroute(
new ClusterRerouteRequest().add(new AllocateEmptyPrimaryAllocationCommand(
Expand All @@ -751,6 +749,12 @@ public void run() {
});
});

continueOrDie(snapshotStartedListener, snapshotResponse -> {
createdSnapshot.set(true);
testClusterNodes.randomDataNodeSafe().client.admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName), noopListener());
});

runUntil(() -> testClusterNodes.randomMasterNode().map(master -> {
if (createdSnapshot.get() == false) {
return false;
Expand Down