Skip to content

Commit 9bc9d01

Browse files
Do not Block Snapshot Thread Pool Fully During Restore or Snapshot (#57360) (#57511)
Allow for a fairer distribution of snapshot and restore operations to enable parallel snapshots and improve behaviour for parallel snapshot + restore. Closes #55803
1 parent b4a2cd8 commit 9bc9d01

File tree

1 file changed

+41
-22
lines changed

1 file changed

+41
-22
lines changed

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1835,23 +1835,30 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
18351835
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
18361836
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener);
18371837
for (int i = 0; i < workers; ++i) {
1838-
executor.execute(ActionRunnable.run(filesListener, () -> {
1839-
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
1840-
if (snapshotFileInfo != null) {
1841-
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
1842-
do {
1843-
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
1844-
snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
1845-
} while (snapshotFileInfo != null);
1846-
}
1847-
}
1848-
}));
1838+
executeOneFileSnapshot(store, snapshotId, indexId, snapshotStatus, filesToSnapshot, executor, filesListener);
18491839
}
18501840
} catch (Exception e) {
18511841
listener.onFailure(e);
18521842
}
18531843
}
18541844

1845+
private void executeOneFileSnapshot(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus,
1846+
BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot, Executor executor,
1847+
ActionListener<Void> listener) throws InterruptedException {
1848+
final ShardId shardId = store.shardId();
1849+
final BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
1850+
if (snapshotFileInfo == null) {
1851+
listener.onResponse(null);
1852+
} else {
1853+
executor.execute(ActionRunnable.wrap(listener, l -> {
1854+
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
1855+
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
1856+
executeOneFileSnapshot(store, snapshotId, indexId, snapshotStatus, filesToSnapshot, executor, l);
1857+
}
1858+
}));
1859+
}
1860+
}
1861+
18551862
private static Releasable incrementStoreRef(Store store, IndexShardSnapshotStatus snapshotStatus, ShardId shardId) {
18561863
if (store.tryIncRef() == false) {
18571864
if (snapshotStatus.isAborted()) {
@@ -1901,21 +1908,33 @@ protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRe
19011908
fileQueueListener(files, workers, ActionListener.map(listener, v -> null));
19021909
// restore the files from the snapshot to the Lucene store
19031910
for (int i = 0; i < workers; ++i) {
1904-
executor.execute(ActionRunnable.run(allFilesListener, () -> {
1905-
store.incRef();
1906-
try {
1907-
BlobStoreIndexShardSnapshot.FileInfo fileToRecover;
1908-
while ((fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS)) != null) {
1909-
restoreFile(fileToRecover, store);
1910-
}
1911-
} finally {
1912-
store.decRef();
1913-
}
1914-
}));
1911+
try {
1912+
executeOneFileRestore(files, allFilesListener);
1913+
} catch (Exception e) {
1914+
allFilesListener.onFailure(e);
1915+
}
19151916
}
19161917
}
19171918
}
19181919

1920+
private void executeOneFileRestore(BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files,
1921+
ActionListener<Void> allFilesListener) throws InterruptedException {
1922+
final BlobStoreIndexShardSnapshot.FileInfo fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS);
1923+
if (fileToRecover == null) {
1924+
allFilesListener.onResponse(null);
1925+
} else {
1926+
executor.execute(ActionRunnable.wrap(allFilesListener, filesListener -> {
1927+
store.incRef();
1928+
try {
1929+
restoreFile(fileToRecover, store);
1930+
} finally {
1931+
store.decRef();
1932+
}
1933+
executeOneFileRestore(files, filesListener);
1934+
}));
1935+
}
1936+
}
1937+
19191938
private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException {
19201939
boolean success = false;
19211940
try (IndexOutput indexOutput =

0 commit comments

Comments
 (0)