Skip to content

Add Snapshot Resiliency Test for Master Failover during Delete #54866

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,19 @@ public void applyClusterState(ClusterChangedEvent event) {
} catch (Exception e) {
logger.warn("Failed to update snapshot state ", e);
}
assert assertConsistentWithClusterState(event.state());
}

private boolean assertConsistentWithClusterState(ClusterState state) {
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty() == false) {
final Set<Snapshot> runningSnapshots =
snapshotsInProgress.entries().stream().map(SnapshotsInProgress.Entry::snapshot).collect(Collectors.toSet());
final Set<Snapshot> snapshotListenerKeys = snapshotCompletionListeners.keySet();
assert runningSnapshots.containsAll(snapshotListenerKeys) : "Saw completion listeners for unknown snapshots in "
+ snapshotListenerKeys + " but running snapshots are " + runningSnapshots;
}
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
Expand Down Expand Up @@ -407,6 +408,49 @@ public void testSnapshotWithNodeDisconnects() {
assertThat(snapshotIds, hasSize(1));
}

public void testSnapshotDeleteWithMasterFailover() {
final int dataNodes = randomIntBetween(2, 10);
final int masterNodes = randomFrom(3, 5);
setupTestCluster(masterNodes, dataNodes);

String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);

final boolean waitForSnapshot = randomBoolean();
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse ->
testClusterNodes.randomMasterNodeSafe().client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(waitForSnapshot).execute(createSnapshotResponseStepListener));

final AtomicBoolean snapshotDeleteResponded = new AtomicBoolean(false);
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
scheduleNow(this::disconnectOrRestartMasterNode);
testClusterNodes.randomDataNodeSafe().client.admin().cluster()
.prepareDeleteSnapshot(repoName, snapshotName).execute(ActionListener.wrap(() -> snapshotDeleteResponded.set(true)));
});

runUntil(() -> testClusterNodes.randomMasterNode().map(master -> {
if (snapshotDeleteResponded.get() == false) {
return false;
}
final SnapshotDeletionsInProgress snapshotDeletionsInProgress =
master.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE);
return snapshotDeletionsInProgress == null || snapshotDeletionsInProgress.getEntries().isEmpty();
}).orElse(false), TimeUnit.MINUTES.toMillis(1L));

clearDisruptionsAndAwaitSync();

final TestClusterNodes.TestClusterNode randomMaster = testClusterNodes.randomMasterNode()
.orElseThrow(() -> new AssertionError("expected to find at least one active master node"));
SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertThat(finalSnapshotsInProgress.entries(), empty());
final Repository repository = randomMaster.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
assertThat(snapshotIds, hasSize(0));
}

public void testConcurrentSnapshotCreateAndDelete() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

Expand Down Expand Up @@ -1149,6 +1193,8 @@ private final class TestClusterNode {

private final ClusterService clusterService;

private final NodeConnectionsService nodeConnectionsService;

private final RepositoriesService repositoriesService;

private final SnapshotsService snapshotsService;
Expand Down Expand Up @@ -1300,6 +1346,8 @@ public void onFailure(final Exception e) {
new BatchedRerouteService(clusterService, allocationService::reroute),
threadPool
);
nodeConnectionsService =
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService);
@SuppressWarnings("rawtypes")
Map<ActionType, TransportAction> actions = new HashMap<>();
actions.put(GlobalCheckpointSyncAction.TYPE,
Expand Down Expand Up @@ -1453,6 +1501,7 @@ public void stop() {
testClusterNodes.disconnectNode(this);
indicesService.close();
clusterService.close();
nodeConnectionsService.stop();
indicesClusterStateService.close();
if (coordinator != null) {
coordinator.close();
Expand All @@ -1477,10 +1526,9 @@ public void start(ClusterState initialState) {
new BatchedRerouteService(clusterService, allocationService::reroute), ElectionStrategy.DEFAULT_INSTANCE);
masterService.setClusterStatePublisher(coordinator);
coordinator.start();
masterService.start();
clusterService.getClusterApplierService().setNodeConnectionsService(
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
clusterService.getClusterApplierService().start();
clusterService.getClusterApplierService().setNodeConnectionsService(nodeConnectionsService);
nodeConnectionsService.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also call close in the stop method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea let's do it to prevent stray connection checks.

clusterService.start();
indicesService.start();
indicesClusterStateService.start();
coordinator.startInitialJoin();
Expand Down