Skip to content

Commit fe2ce8a

Browse files
authored
Also abort ongoing file restores when snapshot restore is aborted (#62441)
Today when a snapshot restore is aborted (for example when the index is explicitly deleted) while the restoration of the files from the repository has already started the file restores are not interrupted. It means that Elasticsearch will continue to read the files from the repository and will continue to write them to disk until all files are restored; the store will then be closed and files will be deleted from disk at some point but this can take a while. This will also take some slots in the SNAPSHOT thread pool too. The Recovery API won't show any files actively being recovered, the only notable indicator would be the active threads in the SNAPSHOT thread pool. This commit adds a check before reading a file to restore and before writing bytes on disk so that a closing store can be detected more quickly and the file recovery process aborted. This way the file restores just stops and for most of the repository implementations it means that no more bytes are read (see #62370 for S3), finishing threads in the SNAPSHOT thread pool more quickly too.
1 parent 2b90a3c commit fe2ce8a

File tree

5 files changed

+188
-6
lines changed

5 files changed

+188
-6
lines changed
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.snapshots;
21+
22+
import org.elasticsearch.action.ActionFuture;
23+
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
24+
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
25+
import org.elasticsearch.action.support.IndicesOptions;
26+
import org.elasticsearch.cluster.routing.RecoverySource;
27+
import org.elasticsearch.cluster.service.ClusterService;
28+
import org.elasticsearch.indices.recovery.RecoveryState;
29+
import org.elasticsearch.test.ESIntegTestCase;
30+
import org.elasticsearch.threadpool.ThreadPool;
31+
import org.elasticsearch.threadpool.ThreadPoolStats;
32+
import org.hamcrest.Matcher;
33+
34+
import java.util.List;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.stream.StreamSupport;
37+
38+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
39+
import static org.hamcrest.Matchers.equalTo;
40+
import static org.hamcrest.Matchers.greaterThan;
41+
import static org.hamcrest.Matchers.hasSize;
42+
import static org.hamcrest.Matchers.is;
43+
44+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
45+
public class AbortedRestoreIT extends AbstractSnapshotIntegTestCase {
46+
47+
public void testAbortedRestoreAlsoAbortFileRestores() throws Exception {
48+
internalCluster().startMasterOnlyNode();
49+
final String dataNode = internalCluster().startDataOnlyNode();
50+
51+
final String indexName = "test-abort-restore";
52+
createIndex(indexName, indexSettingsNoReplicas(1).build());
53+
indexRandomDocs(indexName, scaledRandomIntBetween(10, 1_000));
54+
ensureGreen();
55+
forceMerge();
56+
57+
final String repositoryName = "repository";
58+
createRepository(repositoryName, "mock");
59+
60+
final String snapshotName = "snapshot";
61+
createFullSnapshot(repositoryName, snapshotName);
62+
assertAcked(client().admin().indices().prepareDelete(indexName));
63+
64+
logger.info("--> blocking all data nodes for repository [{}]", repositoryName);
65+
blockAllDataNodes(repositoryName);
66+
failReadsAllDataNodes(repositoryName);
67+
68+
logger.info("--> starting restore");
69+
final ActionFuture<RestoreSnapshotResponse> future = client().admin().cluster().prepareRestoreSnapshot(repositoryName, snapshotName)
70+
.setWaitForCompletion(true)
71+
.setIndices(indexName)
72+
.execute();
73+
74+
assertBusy(() -> {
75+
final RecoveryResponse recoveries = client().admin().indices().prepareRecoveries(indexName)
76+
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).setActiveOnly(true).get();
77+
assertThat(recoveries.hasRecoveries(), is(true));
78+
final List<RecoveryState> shardRecoveries = recoveries.shardRecoveryStates().get(indexName);
79+
assertThat(shardRecoveries, hasSize(1));
80+
assertThat(future.isDone(), is(false));
81+
82+
for (RecoveryState shardRecovery : shardRecoveries) {
83+
assertThat(shardRecovery.getRecoverySource().getType(), equalTo(RecoverySource.Type.SNAPSHOT));
84+
assertThat(shardRecovery.getStage(), equalTo(RecoveryState.Stage.INDEX));
85+
}
86+
});
87+
88+
final ThreadPool.Info snapshotThreadPoolInfo = threadPool(dataNode).info(ThreadPool.Names.SNAPSHOT);
89+
assertThat(snapshotThreadPoolInfo.getMax(), greaterThan(0));
90+
91+
logger.info("--> waiting for snapshot thread [max={}] pool to be full", snapshotThreadPoolInfo.getMax());
92+
waitForMaxActiveSnapshotThreads(dataNode, equalTo(snapshotThreadPoolInfo.getMax()));
93+
94+
logger.info("--> aborting restore by deleting the index");
95+
assertAcked(client().admin().indices().prepareDelete(indexName));
96+
97+
logger.info("--> unblocking repository [{}]", repositoryName);
98+
unblockAllDataNodes(repositoryName);
99+
100+
logger.info("--> restore should have failed");
101+
final RestoreSnapshotResponse restoreSnapshotResponse = future.get();
102+
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(1));
103+
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(0));
104+
105+
logger.info("--> waiting for snapshot thread pool to be empty");
106+
waitForMaxActiveSnapshotThreads(dataNode, equalTo(0));
107+
}
108+
109+
private static void waitForMaxActiveSnapshotThreads(final String node, final Matcher<Integer> matcher) throws Exception {
110+
assertBusy(() -> assertThat(threadPoolStats(node, ThreadPool.Names.SNAPSHOT).getActive(), matcher), 30L, TimeUnit.SECONDS);
111+
}
112+
113+
private static ThreadPool threadPool(final String node) {
114+
return internalCluster().getInstance(ClusterService.class, node).getClusterApplierService().threadPool();
115+
}
116+
117+
private static ThreadPoolStats.Stats threadPoolStats(final String node, final String threadPoolName) {
118+
return StreamSupport.stream(threadPool(node).stats().spliterator(), false)
119+
.filter(threadPool -> threadPool.getName().equals(threadPoolName))
120+
.findFirst()
121+
.orElseThrow(() -> new AssertionError("Failed to find thread pool " + threadPoolName));
122+
}
123+
}

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,14 +403,22 @@ public final void decRef() {
403403

404404
@Override
405405
public void close() {
406-
407406
if (isClosed.compareAndSet(false, true)) {
408407
// only do this once!
409408
decRef();
410409
logger.debug("store reference count on close: {}", refCounter.refCount());
411410
}
412411
}
413412

413+
/**
414+
* @return true if the {@link Store#close()} method has been called. This indicates that the current
415+
* store is either closed or being closed waiting for all references to it to be released.
416+
* You might prefer to use {@link Store#ensureOpen()} instead.
417+
*/
418+
public boolean isClosing() {
419+
return isClosed.get();
420+
}
421+
414422
private void closeInternal() {
415423
// Leverage try-with-resources to close the shard lock for us
416424
try (Closeable c = shardLock) {

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.lucene.index.IndexCommit;
2727
import org.apache.lucene.index.IndexFormatTooNewException;
2828
import org.apache.lucene.index.IndexFormatTooOldException;
29+
import org.apache.lucene.store.AlreadyClosedException;
2930
import org.apache.lucene.store.IOContext;
3031
import org.apache.lucene.store.IndexInput;
3132
import org.apache.lucene.store.IndexOutput;
@@ -2087,6 +2088,7 @@ private void executeOneFileRestore(BlockingQueue<BlobStoreIndexShardSnapshot.Fil
20872088
}
20882089

20892090
private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException {
2091+
ensureNotClosing(store);
20902092
boolean success = false;
20912093
try (IndexOutput indexOutput =
20922094
store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
@@ -2098,12 +2100,14 @@ private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store st
20982100
try (InputStream stream = maybeRateLimitRestores(new SlicedInputStream(fileInfo.numberOfParts()) {
20992101
@Override
21002102
protected InputStream openSlice(int slice) throws IOException {
2103+
ensureNotClosing(store);
21012104
return container.readBlob(fileInfo.partName(slice));
21022105
}
21032106
})) {
21042107
final byte[] buffer = new byte[Math.toIntExact(Math.min(bufferSize, fileInfo.length()))];
21052108
int length;
21062109
while ((length = stream.read(buffer)) > 0) {
2110+
ensureNotClosing(store);
21072111
indexOutput.writeBytes(buffer, 0, length);
21082112
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), length);
21092113
}
@@ -2126,6 +2130,14 @@ protected InputStream openSlice(int slice) throws IOException {
21262130
}
21272131
}
21282132
}
2133+
2134+
void ensureNotClosing(final Store store) throws AlreadyClosedException {
2135+
assert store.refCount() > 0;
2136+
if (store.isClosing()) {
2137+
throw new AlreadyClosedException("store is closing");
2138+
}
2139+
}
2140+
21292141
}.restore(snapshotFiles, store, l);
21302142
}));
21312143
}

test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,13 @@ public static void unblockAllDataNodes(String repository) {
277277
}
278278
}
279279

280+
public static void failReadsAllDataNodes(String repository) {
281+
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
282+
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
283+
mockRepository.setFailReadsAfterUnblock(true);
284+
}
285+
}
286+
280287
public static void waitForBlockOnAnyDataNode(String repository, TimeValue timeout) throws InterruptedException {
281288
final boolean blocked = waitUntil(() -> {
282289
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
@@ -307,11 +314,16 @@ protected void createRepository(String repoName, String type, Path location) {
307314
}
308315

309316
protected void createRepository(String repoName, String type) {
310-
Settings.Builder settings = Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean());
317+
createRepository(repoName, type, randomRepositorySettings());
318+
}
319+
320+
protected Settings.Builder randomRepositorySettings() {
321+
final Settings.Builder settings = Settings.builder();
322+
settings.put("location", randomRepoPath()).put("compress", randomBoolean());
311323
if (rarely()) {
312-
settings = settings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES);
324+
settings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES);
313325
}
314-
createRepository(repoName, type, settings);
326+
return settings;
315327
}
316328

317329
protected static Settings.Builder indexSettingsNoReplicas(int shards) {

test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@ public long getFailureCount() {
123123
*/
124124
private volatile boolean failOnIndexLatest = false;
125125

126+
/**
127+
* Reading blobs will fail with an {@link AssertionError} once the repository has been blocked once.
128+
*/
129+
private volatile boolean failReadsAfterUnblock;
130+
private volatile boolean throwReadErrorAfterUnblock = false;
131+
126132
private volatile boolean blocked = false;
127133

128134
public MockRepository(RepositoryMetadata metadata, Environment environment,
@@ -206,6 +212,10 @@ public void setBlockOnDeleteIndexFile() {
206212
blockOnDeleteIndexN = true;
207213
}
208214

215+
public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) {
216+
this.failReadsAfterUnblock = failReadsAfterUnblock;
217+
}
218+
209219
public boolean blocked() {
210220
return blocked;
211221
}
@@ -228,6 +238,10 @@ private synchronized boolean blockExecution() {
228238
Thread.currentThread().interrupt();
229239
}
230240
logger.debug("[{}] Unblocking execution", metadata.name());
241+
if (wasBlocked && failReadsAfterUnblock) {
242+
logger.debug("[{}] Next read operations will fail", metadata.name());
243+
this.throwReadErrorAfterUnblock = true;
244+
}
231245
return wasBlocked;
232246
}
233247

@@ -255,7 +269,6 @@ public BlobContainer blobContainer(BlobPath path) {
255269
}
256270

257271
private class MockBlobContainer extends FilterBlobContainer {
258-
private MessageDigest digest;
259272

260273
private boolean shouldFail(String blobName, double probability) {
261274
if (probability > 0.0) {
@@ -270,7 +283,7 @@ private boolean shouldFail(String blobName, double probability) {
270283

271284
private int hashCode(String path) {
272285
try {
273-
digest = MessageDigest.getInstance("MD5");
286+
MessageDigest digest = MessageDigest.getInstance("MD5");
274287
byte[] bytes = digest.digest(path.getBytes("UTF-8"));
275288
int i = 0;
276289
return ((bytes[i++] & 0xFF) << 24) | ((bytes[i++] & 0xFF) << 16)
@@ -331,6 +344,12 @@ private void blockExecutionAndFail(final String blobName) throws IOException {
331344
throw new IOException("exception after block");
332345
}
333346

347+
private void maybeReadErrorAfterBlock(final String blobName) {
348+
if (throwReadErrorAfterUnblock) {
349+
throw new AssertionError("Read operation are not allowed anymore at this point [blob=" + blobName + "]");
350+
}
351+
}
352+
334353
MockBlobContainer(BlobContainer delegate) {
335354
super(delegate);
336355
}
@@ -342,10 +361,18 @@ protected BlobContainer wrapChild(BlobContainer child) {
342361

343362
@Override
344363
public InputStream readBlob(String name) throws IOException {
364+
maybeReadErrorAfterBlock(name);
345365
maybeIOExceptionOrBlock(name);
346366
return super.readBlob(name);
347367
}
348368

369+
@Override
370+
public InputStream readBlob(String name, long position, long length) throws IOException {
371+
maybeReadErrorAfterBlock(name);
372+
maybeIOExceptionOrBlock(name);
373+
return super.readBlob(name, position, length);
374+
}
375+
349376
@Override
350377
public DeleteResult delete() throws IOException {
351378
DeleteResult deleteResult = DeleteResult.ZERO;

0 commit comments

Comments
 (0)