Skip to content

Commit 2854f5c

Browse files
Allow Parallel Snapshot Restore And Delete (#51608)
There is no reason not to allow deletes in parallel to restores if they're dealing with different snapshots. A delete will not remove any files related to the snapshot that is being restored if it is different from the deleted snapshot because those files will still be referenced by the restoring snapshot. Loading RepositoryData concurrently to modifying it is concurrency safe nowadays as well since the repo generation is tracked in the cluster state. Closes #41463
1 parent 73b5564 commit 2854f5c

File tree

5 files changed

+95
-65
lines changed

5 files changed

+95
-65
lines changed

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ public ClusterState execute(ClusterState currentState) {
226226
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
227227
// Check if the snapshot to restore is currently being deleted
228228
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
229-
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
229+
if (deletionsInProgress != null
230+
&& deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshot().equals(snapshot))) {
230231
throw new ConcurrentSnapshotExecutionException(snapshot,
231232
"cannot restore a snapshot while a snapshot deletion is in-progress [" +
232233
deletionsInProgress.getEntries().get(0).getSnapshot() + "]");

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -1279,9 +1279,12 @@ public ClusterState execute(ClusterState currentState) {
12791279
// don't allow snapshot deletions while a restore is taking place,
12801280
// otherwise we could end up deleting a snapshot that is being restored
12811281
// and the files the restore depends on would all be gone
1282-
if (restoreInProgress.isEmpty() == false) {
1283-
throw new ConcurrentSnapshotExecutionException(snapshot,
1284-
"cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]");
1282+
1283+
for (RestoreInProgress.Entry entry : restoreInProgress) {
1284+
if (entry.snapshot().equals(snapshot)) {
1285+
throw new ConcurrentSnapshotExecutionException(snapshot,
1286+
"cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]");
1287+
}
12851288
}
12861289
}
12871290
ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState);

server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java

-54
Original file line numberDiff line numberDiff line change
@@ -152,58 +152,4 @@ public void testSnapshottingWithInProgressDeletionNotAllowed() throws Exception
152152
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
153153
assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots(repo).size());
154154
}
155-
156-
public void testRestoreWithInProgressDeletionsNotAllowed() throws Exception {
157-
logger.info("--> creating repository");
158-
final String repo = "test-repo";
159-
assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings(
160-
Settings.builder()
161-
.put("location", randomRepoPath())
162-
.put("random", randomAlphaOfLength(10))
163-
.put("wait_after_unblock", 200)).get());
164-
165-
logger.info("--> snapshot");
166-
final String index = "test-idx";
167-
assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
168-
for (int i = 0; i < 10; i++) {
169-
indexDoc(index, Integer.toString(i), "foo", "bar" + i);
170-
}
171-
refresh();
172-
final String snapshot1 = "test-snap1";
173-
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
174-
final String index2 = "test-idx2";
175-
assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
176-
for (int i = 0; i < 10; i++) {
177-
indexDoc(index2, Integer.toString(i), "foo", "bar" + i);
178-
}
179-
refresh();
180-
final String snapshot2 = "test-snap2";
181-
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
182-
client().admin().indices().prepareClose(index, index2).get();
183-
184-
String blockedNode = internalCluster().getMasterName();
185-
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
186-
logger.info("--> start deletion of snapshot");
187-
ActionFuture<AcknowledgedResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
188-
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
189-
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
190-
191-
logger.info("--> try restoring the other snapshot, should fail because the deletion is in progress");
192-
try {
193-
client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
194-
fail("should not be able to restore a snapshot while another is being deleted");
195-
} catch (ConcurrentSnapshotExecutionException e) {
196-
assertThat(e.getMessage(), containsString("cannot restore a snapshot while a snapshot deletion is in-progress"));
197-
}
198-
199-
logger.info("--> unblocking blocked node [{}]", blockedNode);
200-
unblockNode(repo, blockedNode);
201-
202-
logger.info("--> wait until snapshot deletion is finished");
203-
assertAcked(future.actionGet());
204-
205-
logger.info("--> restoring snapshot, which should now work");
206-
client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
207-
assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots(repo).size());
208-
}
209155
}

server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

-7
Original file line numberDiff line numberDiff line change
@@ -2712,13 +2712,6 @@ public void testDeleteSnapshotWhileRestoringFails() throws Exception {
27122712
assertEquals(repoName, e.getRepositoryName());
27132713
assertEquals(snapshotName, e.getSnapshotName());
27142714
assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore"));
2715-
2716-
logger.info("-- try deleting another snapshot while the restore is in progress (should throw an error)");
2717-
e = expectThrows(ConcurrentSnapshotExecutionException.class, () ->
2718-
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName2).get());
2719-
assertEquals(repoName, e.getRepositoryName());
2720-
assertEquals(snapshotName2, e.getSnapshotName());
2721-
assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore"));
27222715
} finally {
27232716
// unblock even if the try block fails otherwise we will get bogus failures when we delete all indices in test teardown.
27242717
logger.info("--> unblocking all data nodes");

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+87
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@
207207
import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
208208
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
209209
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
210+
import static org.hamcrest.Matchers.contains;
210211
import static org.hamcrest.Matchers.containsInAnyOrder;
211212
import static org.hamcrest.Matchers.either;
212213
import static org.hamcrest.Matchers.empty;
@@ -517,6 +518,92 @@ public void testConcurrentSnapshotCreateAndDeleteOther() {
517518
}
518519
}
519520

521+
public void testConcurrentSnapshotRestoreAndDeleteOther() {
522+
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
523+
524+
String repoName = "repo";
525+
String snapshotName = "snapshot";
526+
final String index = "test";
527+
final int shards = randomIntBetween(1, 10);
528+
529+
TestClusterNodes.TestClusterNode masterNode =
530+
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
531+
532+
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
533+
534+
final int documentsFirstSnapshot = randomIntBetween(0, 100);
535+
536+
continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> indexNDocuments(
537+
documentsFirstSnapshot, index, () -> client().admin().cluster()
538+
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createSnapshotResponseStepListener)));
539+
540+
final int documentsSecondSnapshot = randomIntBetween(0, 100);
541+
542+
final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();
543+
544+
final String secondSnapshotName = "snapshot-2";
545+
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> indexNDocuments(
546+
documentsSecondSnapshot, index, () -> client().admin().cluster().prepareCreateSnapshot(repoName, secondSnapshotName)
547+
.setWaitForCompletion(true).execute(createOtherSnapshotResponseStepListener)));
548+
549+
final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
550+
final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new StepListener<>();
551+
552+
continueOrDie(createOtherSnapshotResponseStepListener,
553+
createSnapshotResponse -> {
554+
scheduleNow(
555+
() -> client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener));
556+
scheduleNow(() -> client().admin().cluster().restoreSnapshot(
557+
new RestoreSnapshotRequest(repoName, secondSnapshotName).waitForCompletion(true)
558+
.renamePattern("(.+)").renameReplacement("restored_$1"),
559+
restoreSnapshotResponseListener));
560+
});
561+
562+
final StepListener<SearchResponse> searchResponseListener = new StepListener<>();
563+
continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
564+
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
565+
client().search(new SearchRequest("restored_" + index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)),
566+
searchResponseListener);
567+
});
568+
569+
deterministicTaskQueue.runAllRunnableTasks();
570+
571+
assertEquals(documentsFirstSnapshot + documentsSecondSnapshot,
572+
Objects.requireNonNull(searchResponseListener.result().getHits().getTotalHits()).value);
573+
assertThat(deleteSnapshotStepListener.result().isAcknowledged(), is(true));
574+
assertThat(restoreSnapshotResponseListener.result().getRestoreInfo().failedShards(), is(0));
575+
576+
final Repository repository = masterNode.repositoriesService.repository(repoName);
577+
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
578+
assertThat(snapshotIds, contains(createOtherSnapshotResponseStepListener.result().getSnapshotInfo().snapshotId()));
579+
580+
for (SnapshotId snapshotId : snapshotIds) {
581+
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
582+
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
583+
assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
584+
assertEquals(shards, snapshotInfo.successfulShards());
585+
assertEquals(0, snapshotInfo.failedShards());
586+
}
587+
}
588+
589+
private void indexNDocuments(int documents, String index, Runnable afterIndexing) {
590+
if (documents == 0) {
591+
afterIndexing.run();
592+
return;
593+
}
594+
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
595+
for (int i = 0; i < documents; ++i) {
596+
bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
597+
}
598+
final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
599+
client().bulk(bulkRequest, bulkResponseStepListener);
600+
continueOrDie(bulkResponseStepListener, bulkResponse -> {
601+
assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
602+
assertEquals(documents, bulkResponse.getItems().length);
603+
afterIndexing.run();
604+
});
605+
}
606+
520607
public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException {
521608
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
522609

0 commit comments

Comments
 (0)