From 6ccd8abac3547564f2cefdd136837dfc6146c323 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 8 Mar 2025 09:13:13 +0000 Subject: [PATCH 1/5] Tighten up threading in snapshot finalization Today snapshot finalization does nontrivial work on the calling thread (often the cluster applier thread) and also in theory may fork back to the cluster applier thread in `getRepositoryData`, yet it always forks at least one task (the `SnapshotInfo` write) to the `SNAPSHOT` pool anyway. With this change we fork to the `SNAPSHOT` pool straight away and then make sure to stay on this pool throughout. --- .../blobstore/BlobStoreRepository.java | 26 ++++-------- .../snapshots/SnapshotsService.java | 42 +++++++++++++++++-- 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index dc8254f751330..3ef4dec6b7a15 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -78,7 +78,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThrottledIterator; import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; @@ -1738,6 +1737,7 @@ int sizeInBytes() { @Override public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); final long repositoryStateId = finalizeSnapshotContext.repositoryStateId(); final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations(); final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo(); @@ -1767,14 +1767,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new SubscribableListener // Get the current RepositoryData - .newForked( - listener -> getRepositoryData( - // TODO we might already be on a SNAPSHOT thread, make it so that we're always on a SNAPSHOT thread here and then we - // can avoid a little more forking below - EsExecutors.DIRECT_EXECUTOR_SERVICE, - listener - ) - ) + .newForked(listener -> getRepositoryData(executor, listener)) // Identify and write the missing metadata .andThen((l, existingRepositoryData) -> { @@ -1842,13 +1835,12 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new })); } - // Write the SnapshotInfo blob - executor.execute( - ActionRunnable.run( - allMetaListeners.acquire(), - () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress) - ) - ); + // Write the SnapshotInfo blob (we're already on a SNAPSHOT thread so no need to fork this) + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); + ActionListener.completeWith(allMetaListeners.acquire(), () -> { + SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress); + return null; + }); // TODO fail fast if any metadata write fails // TODO clean up successful metadata writes on failure (needs care, we must not clobber another node concurrently @@ -1858,7 +1850,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new // Update the root blob .andThen((l, metadataWriteResult) -> { - // unlikely, but in theory we could still be on the thread which called finalizeSnapshot - TODO must fork to SNAPSHOT here + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); final var snapshotDetails = SnapshotDetails.fromSnapshotInfo(snapshotInfo); final var existingRepositoryData = metadataWriteResult.existingRepositoryData(); writeIndexGen( diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index cce2eaa8d60ed..bda10d5790049 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -71,7 +71,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; @@ -1398,9 +1400,27 @@ private void leaveRepoLoop(String repository) { } private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) { - assert currentlyFinalizing.contains(snapshot.getRepository()); - assert repositoryOperations.assertNotQueued(snapshot); - try { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new SnapshotFinalization(snapshot, metadata, repositoryData)); + } + + private class SnapshotFinalization extends AbstractRunnable { + + private final Snapshot snapshot; + private final Metadata metadata; + private final RepositoryData repositoryData; + + SnapshotFinalization(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) { + this.snapshot = snapshot; + this.metadata = metadata; + this.repositoryData = repositoryData; + } + + @Override + protected void doRun() { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); + assert currentlyFinalizing.contains(snapshot.getRepository()); + assert repositoryOperations.assertNotQueued(snapshot); + SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot); final String failure = entry.failure(); logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); @@ -1455,6 +1475,7 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit metadataListener.onResponse(metadata); } metadataListener.addListener(ActionListener.wrap(meta -> { + assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); final Metadata metaForSnapshot = metadataForSnapshot(entry, meta); final Map indexSnapshotDetails = Maps.newMapWithExpectedSize( @@ -1554,7 +1575,20 @@ public void onFailure(Exception e) { shardGenerations ) )); - } catch (Exception e) { + } + + @Override + public void onRejection(Exception e) { + if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) { + logger.debug("failing finalization of {} due to shutdown", snapshot); + handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY); + } else { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e); assert false : new AssertionError("unexpected failure finalizing " + snapshot, e); handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY); From 7b127bf3d45d1edb200a849d6b70894b1bfdd34a Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 8 Mar 2025 11:41:36 +0000 Subject: [PATCH 2/5] Comments --- .../org/elasticsearch/snapshots/SnapshotsService.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index bda10d5790049..7b67f522fafe7 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1403,6 +1403,13 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new SnapshotFinalization(snapshot, metadata, repositoryData)); } + /** + * Implements the finalization process for a snapshot: does some preparatory calculations, builds a {@link SnapshotInfo} and a + * {@link FinalizeSnapshotContext}, calls {@link Repository#finalizeSnapshot} and handles the outcome by notifying waiting listeners + * and triggering the next snapshot-related activity. + */ + // This only really makes sense to run against a BlobStoreRepository, and the division of work between this class and + // BlobStoreRepository#finalizeSnapshot is kind of awkward and artificial; TODO consolidate all this stuff into one place and simplify private class SnapshotFinalization extends AbstractRunnable { private final Snapshot snapshot; @@ -1448,6 +1455,7 @@ protected void doRun() { final ListenableFuture metadataListener = new ListenableFuture<>(); final Repository repo = repositoriesService.repository(snapshot.getRepository()); if (entry.isClone()) { + // TODO no need to fork here any more, we're already on a SNAPSHOT thread threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> { final Metadata existing = repo.getSnapshotGlobalMetadata(entry.source()); final Metadata.Builder metaBuilder = Metadata.builder(existing); From fbbd199928ce261410e1c458e1b4d241bfba37e5 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 8 Mar 2025 20:50:01 +0000 Subject: [PATCH 3/5] Remove another obviously-pointless fork --- .../java/org/elasticsearch/snapshots/SnapshotsService.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7b67f522fafe7..0d5a542d1b31b 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1455,8 +1455,7 @@ protected void doRun() { final ListenableFuture metadataListener = new ListenableFuture<>(); final Repository repo = repositoriesService.repository(snapshot.getRepository()); if (entry.isClone()) { - // TODO no need to fork here any more, we're already on a SNAPSHOT thread - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> { + ActionListener.completeWith(metadataListener, () -> { final Metadata existing = repo.getSnapshotGlobalMetadata(entry.source()); final Metadata.Builder metaBuilder = Metadata.builder(existing); final Set existingIndices = new HashSet<>(); @@ -1478,7 +1477,7 @@ protected void doRun() { ); metaBuilder.dataStreams(dataStreamsToCopy, dataStreamAliasesToCopy); return metaBuilder.build(); - })); + }); } else { metadataListener.onResponse(metadata); } From 594202b4e39619696f5e6e54b4b911061d83c52e Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 8 Mar 2025 20:59:42 +0000 Subject: [PATCH 4/5] Comment --- .../main/java/org/elasticsearch/snapshots/SnapshotsService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 0d5a542d1b31b..aa1d0769f5bcd 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1455,6 +1455,8 @@ protected void doRun() { final ListenableFuture metadataListener = new ListenableFuture<>(); final Repository repo = repositoriesService.repository(snapshot.getRepository()); if (entry.isClone()) { + // This listener is kinda unnecessary since we now always complete it synchronously. It's only here to catch exceptions. + // TODO simplify this. ActionListener.completeWith(metadataListener, () -> { final Metadata existing = repo.getSnapshotGlobalMetadata(entry.source()); final Metadata.Builder metaBuilder = Metadata.builder(existing); From 0d70bd5d0379832be8bb70aba28c9f1ebccf95a8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 11 Mar 2025 06:02:34 +0000 Subject: [PATCH 5/5] Comment additions --- .../repositories/blobstore/BlobStoreRepository.java | 2 +- .../main/java/org/elasticsearch/snapshots/SnapshotsService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 3ef4dec6b7a15..d3fae8adb466d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1835,7 +1835,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new })); } - // Write the SnapshotInfo blob (we're already on a SNAPSHOT thread so no need to fork this) + // Write the SnapshotInfo blob to the repo (we're already on a SNAPSHOT thread so no need to fork this) assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT); ActionListener.completeWith(allMetaListeners.acquire(), () -> { SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index aa1d0769f5bcd..10ebd2e66c304 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1406,7 +1406,7 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit /** * Implements the finalization process for a snapshot: does some preparatory calculations, builds a {@link SnapshotInfo} and a * {@link FinalizeSnapshotContext}, calls {@link Repository#finalizeSnapshot} and handles the outcome by notifying waiting listeners - * and triggering the next snapshot-related activity. + * and triggering the next snapshot-related activity (another finalization, a batch of deletes, etc.) */ // This only really makes sense to run against a BlobStoreRepository, and the division of work between this class and // BlobStoreRepository#finalizeSnapshot is kind of awkward and artificial; TODO consolidate all this stuff into one place and simplify