Skip to content

Commit 52e5ceb

Browse files
Restore from Individual Shard Snapshot Files in Parallel (elastic#48110) (elastic#48686)
Make restoring shard snapshots run in parallel on the `SNAPSHOT` thread-pool.
1 parent dbc05cd commit 52e5ceb

File tree

15 files changed

+381
-286
lines changed

15 files changed

+381
-286
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.elasticsearch.ExceptionsHelper;
4343
import org.elasticsearch.Version;
4444
import org.elasticsearch.action.ActionListener;
45+
import org.elasticsearch.action.ActionRunnable;
4546
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
4647
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
4748
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
@@ -1850,12 +1851,16 @@ public boolean recoverFromStore() {
18501851
return storeRecovery.recoverFromStore(this);
18511852
}
18521853

1853-
public boolean restoreFromRepository(Repository repository) {
1854-
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
1855-
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " +
1856-
recoveryState.getRecoverySource();
1857-
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
1858-
return storeRecovery.recoverFromRepository(this, repository);
1854+
public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
1855+
try {
1856+
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
1857+
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: " +
1858+
recoveryState.getRecoverySource();
1859+
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
1860+
storeRecovery.recoverFromRepository(this, repository, listener);
1861+
} catch (Exception e) {
1862+
listener.onFailure(e);
1863+
}
18591864
}
18601865

18611866
/**
@@ -2540,17 +2545,15 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
25402545
case SNAPSHOT:
25412546
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
25422547
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
2543-
threadPool.generic().execute(() -> {
2544-
try {
2545-
final Repository repository = repositoriesService.repository(recoverySource.snapshot().getRepository());
2546-
if (restoreFromRepository(repository)) {
2547-
recoveryListener.onRecoveryDone(recoveryState);
2548-
}
2549-
} catch (Exception e) {
2550-
recoveryListener.onRecoveryFailure(recoveryState,
2551-
new RecoveryFailedException(recoveryState, null, e), true);
2552-
}
2553-
});
2548+
threadPool.generic().execute(
2549+
ActionRunnable.<Boolean>wrap(ActionListener.wrap(r -> {
2550+
if (r) {
2551+
recoveryListener.onRecoveryDone(recoveryState);
2552+
}
2553+
},
2554+
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)),
2555+
restoreListener -> restoreFromRepository(
2556+
repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener)));
25542557
break;
25552558
case LOCAL_SHARDS:
25562559
final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 99 additions & 76 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,9 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
128128
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener);
129129
}
130130
@Override
131-
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
132-
in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState);
131+
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
132+
ActionListener<Void> listener) {
133+
in.restoreShard(store, snapshotId, indexId, snapshotShardId, recoveryState, listener);
133134
}
134135

135136
@Override

server/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,10 @@ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshot
224224
* @param indexId id of the index in the repository from which the restore is occurring
225225
* @param snapshotShardId shard id (in the snapshot)
226226
* @param recoveryState recovery state
227+
* @param listener listener to invoke once done
227228
*/
228-
void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState);
229-
229+
void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,
230+
ActionListener<Void> listener);
230231
/**
231232
* Retrieve shard snapshot status for the stored snapshot
232233
*

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,11 +1214,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
12141214
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
12151215
// Start as many workers as fit into the snapshot pool at once at the most
12161216
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
1217-
final ActionListener<Void> filesListener = ActionListener.delegateResponse(
1218-
new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> {
1219-
filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception
1220-
l.onFailure(e);
1221-
});
1217+
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener);
12221218
for (int i = 0; i < workers; ++i) {
12231219
executor.execute(ActionRunnable.run(filesListener, () -> {
12241220
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
@@ -1242,19 +1238,42 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
12421238

12431239
@Override
12441240
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
1245-
RecoveryState recoveryState) {
1246-
ShardId shardId = store.shardId();
1247-
try {
1248-
final BlobContainer container = shardContainer(indexId, snapshotShardId);
1249-
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
1250-
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
1241+
RecoveryState recoveryState, ActionListener<Void> listener) {
1242+
final ShardId shardId = store.shardId();
1243+
final ActionListener<Void> restoreListener = ActionListener.delegateResponse(listener,
1244+
(l, e) -> l.onFailure(new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e)));
1245+
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
1246+
final BlobContainer container = shardContainer(indexId, snapshotShardId);
1247+
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
1248+
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
1249+
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
12511250
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
12521251
@Override
1253-
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException {
1254-
// restore the files from the snapshot to the Lucene store
1255-
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
1256-
logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
1257-
restoreFile(fileToRecover, store);
1252+
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,
1253+
ActionListener<Void> listener) {
1254+
if (filesToRecover.isEmpty()) {
1255+
listener.onResponse(null);
1256+
} else {
1257+
// Start as many workers as fit into the snapshot pool at once at the most
1258+
final int workers =
1259+
Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), snapshotFiles.indexFiles().size());
1260+
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files = new LinkedBlockingQueue<>(filesToRecover);
1261+
final ActionListener<Void> allFilesListener =
1262+
fileQueueListener(files, workers, ActionListener.map(listener, v -> null));
1263+
// restore the files from the snapshot to the Lucene store
1264+
for (int i = 0; i < workers; ++i) {
1265+
executor.execute(ActionRunnable.run(allFilesListener, () -> {
1266+
store.incRef();
1267+
try {
1268+
BlobStoreIndexShardSnapshot.FileInfo fileToRecover;
1269+
while ((fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS)) != null) {
1270+
restoreFile(fileToRecover, store);
1271+
}
1272+
} finally {
1273+
store.decRef();
1274+
}
1275+
}));
1276+
}
12581277
}
12591278
}
12601279

@@ -1294,10 +1313,16 @@ protected InputStream openSlice(long slice) throws IOException {
12941313
}
12951314
}
12961315
}
1297-
}.restore(snapshotFiles, store);
1298-
} catch (Exception e) {
1299-
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
1300-
}
1316+
}.restore(snapshotFiles, store, l);
1317+
}));
1318+
}
1319+
1320+
private static ActionListener<Void> fileQueueListener(BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files, int workers,
1321+
ActionListener<Collection<Void>> listener) {
1322+
return ActionListener.delegateResponse(new GroupedActionListener<>(listener, workers), (l, e) -> {
1323+
files.clear(); // Stop uploading the remaining files if we run into any exception
1324+
l.onFailure(e);
1325+
});
13011326
}
13021327

13031328
private static InputStream maybeRateLimit(InputStream stream, @Nullable RateLimiter rateLimiter, CounterMetric metric) {

server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.logging.log4j.LogManager;
2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.common.lucene.Lucene;
2526
import org.elasticsearch.common.util.iterable.Iterables;
2627
import org.elasticsearch.index.shard.ShardId;
@@ -74,7 +75,7 @@ protected FileRestoreContext(String repositoryName, ShardId shardId, SnapshotId
7475
/**
7576
* Performs restore operation
7677
*/
77-
public void restore(SnapshotFiles snapshotFiles, Store store) {
78+
public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener<Void> listener) {
7879
store.incRef();
7980
try {
8081
logger.debug("[{}] [{}] restoring to [{}] ...", snapshotId, repositoryName, shardId);
@@ -150,36 +151,49 @@ public void restore(SnapshotFiles snapshotFiles, Store store) {
150151
}
151152
}
152153

153-
restoreFiles(filesToRecover, store);
154+
restoreFiles(filesToRecover, store, ActionListener.wrap(
155+
v -> {
156+
store.incRef();
157+
try {
158+
afterRestore(snapshotFiles, store, restoredSegmentsFile);
159+
listener.onResponse(null);
160+
} finally {
161+
store.decRef();
162+
}
163+
}, listener::onFailure));
154164
} catch (IOException ex) {
155165
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
156166
}
167+
} catch (Exception e) {
168+
listener.onFailure(e);
169+
} finally {
170+
store.decRef();
171+
}
172+
}
157173

158-
// read the snapshot data persisted
159-
try {
160-
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
161-
} catch (IOException e) {
162-
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
163-
}
174+
private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMetaData restoredSegmentsFile) {
175+
// read the snapshot data persisted
176+
try {
177+
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
178+
} catch (IOException e) {
179+
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
180+
}
164181

165-
/// now, go over and clean files that are in the store, but were not in the snapshot
166-
try {
167-
for (String storeFile : store.directory().listAll()) {
168-
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
169-
continue; //skip write.lock, checksum files and files that exist in the snapshot
170-
}
171-
try {
172-
store.deleteQuiet("restore", storeFile);
173-
store.directory().deleteFile(storeFile);
174-
} catch (IOException e) {
175-
logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile);
176-
}
182+
/// now, go over and clean files that are in the store, but were not in the snapshot
183+
try {
184+
for (String storeFile : store.directory().listAll()) {
185+
if (Store.isAutogenerated(storeFile) || snapshotFiles.containPhysicalIndexFile(storeFile)) {
186+
continue; //skip write.lock, checksum files and files that exist in the snapshot
187+
}
188+
try {
189+
store.deleteQuiet("restore", storeFile);
190+
store.directory().deleteFile(storeFile);
191+
} catch (IOException e) {
192+
logger.warn("[{}] [{}] failed to delete file [{}] during snapshot cleanup", shardId, snapshotId, storeFile);
177193
}
178-
} catch (IOException e) {
179-
logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId);
180194
}
181-
} finally {
182-
store.decRef();
195+
} catch (IOException e) {
196+
logger.warn("[{}] [{}] failed to list directory - some of files might not be deleted", shardId, snapshotId);
183197
}
184198
}
185199

@@ -189,7 +203,8 @@ public void restore(SnapshotFiles snapshotFiles, Store store) {
189203
* @param filesToRecover List of files to restore
190204
* @param store Store to restore into
191205
*/
192-
protected abstract void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store) throws IOException;
206+
protected abstract void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,
207+
ActionListener<Void> listener);
193208

194209
@SuppressWarnings("unchecked")
195210
private static Iterable<StoreFileMetaData> concat(Store.RecoveryDiff diff) {

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
* {@link RoutingTable.Builder#addAsRestore(IndexMetaData, SnapshotRecoverySource)} method.
108108
* <p>
109109
* Individual shards are getting restored as part of normal recovery process in
110-
* {@link IndexShard#restoreFromRepository(Repository)} )}
110+
* {@link IndexShard#restoreFromRepository} )}
111111
* method, which detects that shard should be restored from snapshot rather than recovered from gateway by looking
112112
* at the {@link ShardRouting#recoverySource()} property.
113113
* <p>

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2348,23 +2348,24 @@ public void testRestoreShard() throws IOException {
23482348

23492349
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
23502350
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
2351-
assertTrue(target.restoreFromRepository(new RestoreOnlyRepository("test") {
2351+
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
2352+
target.restoreFromRepository(new RestoreOnlyRepository("test") {
23522353
@Override
23532354
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
2354-
RecoveryState recoveryState) {
2355-
try {
2355+
RecoveryState recoveryState, ActionListener<Void> listener) {
2356+
ActionListener.completeWith(listener, () -> {
23562357
cleanLuceneIndex(targetStore.directory());
23572358
for (String file : sourceStore.directory().listAll()) {
23582359
if (file.equals("write.lock") || file.startsWith("extra")) {
23592360
continue;
23602361
}
23612362
targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT);
23622363
}
2363-
} catch (Exception ex) {
2364-
throw new RuntimeException(ex);
2365-
}
2364+
return null;
2365+
});
23662366
}
2367-
}));
2367+
}, future);
2368+
assertTrue(future.actionGet());
23682369
assertThat(target.getLocalCheckpoint(), equalTo(2L));
23692370
assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(2L));
23702371
assertThat(target.seqNoStats().getGlobalCheckpoint(), equalTo(0L));

server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
210210

211211
@Override
212212
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId,
213-
RecoveryState recoveryState) {
213+
RecoveryState recoveryState, ActionListener<Void> listener) {
214214

215215
}
216216

0 commit comments

Comments
 (0)