Skip to content

Commit f1f2df7

Browse files
authored
Tighten up threading in snapshot finalization (#124403)
Today snapshot finalization does nontrivial work on the calling thread (often the cluster applier thread) and also in theory may fork back to the cluster applier thread in `getRepositoryData`, yet it always forks at least one task (the `SnapshotInfo` write) to the `SNAPSHOT` pool anyway. With this change we fork to the `SNAPSHOT` pool straight away and then make sure to stay on this pool throughout.
1 parent 45f2eb2 commit f1f2df7

File tree

2 files changed

+58
-23
lines changed

2 files changed

+58
-23
lines changed

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878
import org.elasticsearch.common.util.BigArrays;
7979
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
8080
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
81-
import org.elasticsearch.common.util.concurrent.EsExecutors;
8281
import org.elasticsearch.common.util.concurrent.ListenableFuture;
8382
import org.elasticsearch.common.util.concurrent.ThrottledIterator;
8483
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
@@ -1738,6 +1737,7 @@ int sizeInBytes() {
17381737

17391738
@Override
17401739
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
1740+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
17411741
final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
17421742
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations();
17431743
final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
@@ -1767,14 +1767,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
17671767
SubscribableListener
17681768

17691769
// Get the current RepositoryData
1770-
.<RepositoryData>newForked(
1771-
listener -> getRepositoryData(
1772-
// TODO we might already be on a SNAPSHOT thread, make it so that we're always on a SNAPSHOT thread here and then we
1773-
// can avoid a little more forking below
1774-
EsExecutors.DIRECT_EXECUTOR_SERVICE,
1775-
listener
1776-
)
1777-
)
1770+
.<RepositoryData>newForked(listener -> getRepositoryData(executor, listener))
17781771

17791772
// Identify and write the missing metadata
17801773
.<MetadataWriteResult>andThen((l, existingRepositoryData) -> {
@@ -1842,13 +1835,12 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
18421835
}));
18431836
}
18441837

1845-
// Write the SnapshotInfo blob
1846-
executor.execute(
1847-
ActionRunnable.run(
1848-
allMetaListeners.acquire(),
1849-
() -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
1850-
)
1851-
);
1838+
// Write the SnapshotInfo blob to the repo (we're already on a SNAPSHOT thread so no need to fork this)
1839+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
1840+
ActionListener.completeWith(allMetaListeners.acquire(), () -> {
1841+
SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress);
1842+
return null;
1843+
});
18521844

18531845
// TODO fail fast if any metadata write fails
18541846
// TODO clean up successful metadata writes on failure (needs care, we must not clobber another node concurrently
@@ -1858,7 +1850,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
18581850

18591851
// Update the root blob
18601852
.<RootBlobUpdateResult>andThen((l, metadataWriteResult) -> {
1861-
// unlikely, but in theory we could still be on the thread which called finalizeSnapshot - TODO must fork to SNAPSHOT here
1853+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
18621854
final var snapshotDetails = SnapshotDetails.fromSnapshotInfo(snapshotInfo);
18631855
final var existingRepositoryData = metadataWriteResult.existingRepositoryData();
18641856
writeIndexGen(

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@
7171
import org.elasticsearch.common.unit.ByteSizeValue;
7272
import org.elasticsearch.common.util.CollectionUtils;
7373
import org.elasticsearch.common.util.Maps;
74+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
7475
import org.elasticsearch.common.util.concurrent.EsExecutors;
76+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
7577
import org.elasticsearch.common.util.concurrent.ListenableFuture;
7678
import org.elasticsearch.core.FixForMultiProject;
7779
import org.elasticsearch.core.Nullable;
@@ -1398,9 +1400,34 @@ private void leaveRepoLoop(String repository) {
13981400
}
13991401

14001402
private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
1401-
assert currentlyFinalizing.contains(snapshot.getRepository());
1402-
assert repositoryOperations.assertNotQueued(snapshot);
1403-
try {
1403+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new SnapshotFinalization(snapshot, metadata, repositoryData));
1404+
}
1405+
1406+
/**
1407+
* Implements the finalization process for a snapshot: does some preparatory calculations, builds a {@link SnapshotInfo} and a
1408+
* {@link FinalizeSnapshotContext}, calls {@link Repository#finalizeSnapshot} and handles the outcome by notifying waiting listeners
1409+
* and triggering the next snapshot-related activity (another finalization, a batch of deletes, etc.)
1410+
*/
1411+
// This only really makes sense to run against a BlobStoreRepository, and the division of work between this class and
1412+
// BlobStoreRepository#finalizeSnapshot is kind of awkward and artificial; TODO consolidate all this stuff into one place and simplify
1413+
private class SnapshotFinalization extends AbstractRunnable {
1414+
1415+
private final Snapshot snapshot;
1416+
private final Metadata metadata;
1417+
private final RepositoryData repositoryData;
1418+
1419+
SnapshotFinalization(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
1420+
this.snapshot = snapshot;
1421+
this.metadata = metadata;
1422+
this.repositoryData = repositoryData;
1423+
}
1424+
1425+
@Override
1426+
protected void doRun() {
1427+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
1428+
assert currentlyFinalizing.contains(snapshot.getRepository());
1429+
assert repositoryOperations.assertNotQueued(snapshot);
1430+
14041431
SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
14051432
final String failure = entry.failure();
14061433
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
@@ -1428,7 +1455,9 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
14281455
final ListenableFuture<Metadata> metadataListener = new ListenableFuture<>();
14291456
final Repository repo = repositoriesService.repository(snapshot.getRepository());
14301457
if (entry.isClone()) {
1431-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
1458+
// This listener is kinda unnecessary since we now always complete it synchronously. It's only here to catch exceptions.
1459+
// TODO simplify this.
1460+
ActionListener.completeWith(metadataListener, () -> {
14321461
final Metadata existing = repo.getSnapshotGlobalMetadata(entry.source());
14331462
final Metadata.Builder metaBuilder = Metadata.builder(existing);
14341463
final Set<Index> existingIndices = new HashSet<>();
@@ -1450,11 +1479,12 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
14501479
);
14511480
metaBuilder.dataStreams(dataStreamsToCopy, dataStreamAliasesToCopy);
14521481
return metaBuilder.build();
1453-
}));
1482+
});
14541483
} else {
14551484
metadataListener.onResponse(metadata);
14561485
}
14571486
metadataListener.addListener(ActionListener.wrap(meta -> {
1487+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
14581488
final Metadata metaForSnapshot = metadataForSnapshot(entry, meta);
14591489

14601490
final Map<String, SnapshotInfo.IndexSnapshotDetails> indexSnapshotDetails = Maps.newMapWithExpectedSize(
@@ -1554,7 +1584,20 @@ public void onFailure(Exception e) {
15541584
shardGenerations
15551585
)
15561586
));
1557-
} catch (Exception e) {
1587+
}
1588+
1589+
@Override
1590+
public void onRejection(Exception e) {
1591+
if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
1592+
logger.debug("failing finalization of {} due to shutdown", snapshot);
1593+
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
1594+
} else {
1595+
onFailure(e);
1596+
}
1597+
}
1598+
1599+
@Override
1600+
public void onFailure(Exception e) {
15581601
logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e);
15591602
assert false : new AssertionError("unexpected failure finalizing " + snapshot, e);
15601603
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);

0 commit comments

Comments
 (0)