Skip to content

Tighten up threading in snapshot finalization #124403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThrottledIterator;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
Expand Down Expand Up @@ -1738,6 +1737,7 @@ int sizeInBytes() {

@Override
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations();
final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
Expand Down Expand Up @@ -1767,14 +1767,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
SubscribableListener

// Get the current RepositoryData
.<RepositoryData>newForked(
listener -> getRepositoryData(
// TODO we might already be on a SNAPSHOT thread, make it so that we're always on a SNAPSHOT thread here and then we
// can avoid a little more forking below
EsExecutors.DIRECT_EXECUTOR_SERVICE,
listener
)
)
.<RepositoryData>newForked(listener -> getRepositoryData(executor, listener))

// Identify and write the missing metadata
.<MetadataWriteResult>andThen((l, existingRepositoryData) -> {
Expand Down Expand Up @@ -1842,13 +1835,12 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new
}));
}

// Write the SnapshotInfo blob
executor.execute(
ActionRunnable.run(
allMetaListeners.acquire(),
() -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
)
);
// Write the SnapshotInfo blob to the repo (we're already on a SNAPSHOT thread so no need to fork this)
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
ActionListener.completeWith(allMetaListeners.acquire(), () -> {
SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress);
return null;
});

// TODO fail fast if any metadata write fails
// TODO clean up successful metadata writes on failure (needs care, we must not clobber another node concurrently
Expand All @@ -1858,7 +1850,7 @@ record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData new

// Update the root blob
.<RootBlobUpdateResult>andThen((l, metadataWriteResult) -> {
// unlikely, but in theory we could still be on the thread which called finalizeSnapshot - TODO must fork to SNAPSHOT here
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
final var snapshotDetails = SnapshotDetails.fromSnapshotInfo(snapshotInfo);
final var existingRepositoryData = metadataWriteResult.existingRepositoryData();
writeIndexGen(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -1398,9 +1400,34 @@ private void leaveRepoLoop(String repository) {
}

private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
assert currentlyFinalizing.contains(snapshot.getRepository());
assert repositoryOperations.assertNotQueued(snapshot);
try {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new SnapshotFinalization(snapshot, metadata, repositoryData));
}

/**
* Implements the finalization process for a snapshot: does some preparatory calculations, builds a {@link SnapshotInfo} and a
* {@link FinalizeSnapshotContext}, calls {@link Repository#finalizeSnapshot} and handles the outcome by notifying waiting listeners
* and triggering the next snapshot-related activity (another finalization, a batch of deletes, etc.)
*/
// This only really makes sense to run against a BlobStoreRepository, and the division of work between this class and
// BlobStoreRepository#finalizeSnapshot is kind of awkward and artificial; TODO consolidate all this stuff into one place and simplify
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a little more concise, like:

// TODO: This only makes sense to run against a BlobStoreRepository. This logic should be consolidated into the BlobStoreRepository#finalizeSnapshot method, and hopefully simplified thereby.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it'll work just to move this into BlobStoreRepository#finalizeSnapshot so I didn't want to prescribe that as the solution.

private class SnapshotFinalization extends AbstractRunnable {

private final Snapshot snapshot;
private final Metadata metadata;
private final RepositoryData repositoryData;

SnapshotFinalization(Snapshot snapshot, Metadata metadata, RepositoryData repositoryData) {
this.snapshot = snapshot;
this.metadata = metadata;
this.repositoryData = repositoryData;
}

@Override
protected void doRun() {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
assert currentlyFinalizing.contains(snapshot.getRepository());
assert repositoryOperations.assertNotQueued(snapshot);

SnapshotsInProgress.Entry entry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
final String failure = entry.failure();
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
Expand Down Expand Up @@ -1428,7 +1455,9 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
final ListenableFuture<Metadata> metadataListener = new ListenableFuture<>();
final Repository repo = repositoriesService.repository(snapshot.getRepository());
if (entry.isClone()) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
// This listener is kinda unnecessary since we now always complete it synchronously. It's only here to catch exceptions.
// TODO simplify this.
ActionListener.completeWith(metadataListener, () -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this is equivalent to threadPool.executor(EsExecutors.DIRECT_EXECUTOR_SERVICE).execute(ActionRunnable.supply(metadataListener, () -> {, i.e the only change from before is the move from forking to ThreadPool.Names.SNAPSHOT to running this on the current thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[opt] if you know what this code does, it might be worth documenting what originally caused us to fork this work over the SNAPSHOT pool -- cpu or I/o intensive? But perhaps that's overkill.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will go away in a (hopefully fairly immediate) follow up. It used to fork because repo.getSnapshotIndexMetaData interacts with the repository.

final Metadata existing = repo.getSnapshotGlobalMetadata(entry.source());
final Metadata.Builder metaBuilder = Metadata.builder(existing);
final Set<Index> existingIndices = new HashSet<>();
Expand All @@ -1450,11 +1479,12 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
);
metaBuilder.dataStreams(dataStreamsToCopy, dataStreamAliasesToCopy);
return metaBuilder.build();
}));
});
} else {
metadataListener.onResponse(metadata);
}
metadataListener.addListener(ActionListener.wrap(meta -> {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
final Metadata metaForSnapshot = metadataForSnapshot(entry, meta);

final Map<String, SnapshotInfo.IndexSnapshotDetails> indexSnapshotDetails = Maps.newMapWithExpectedSize(
Expand Down Expand Up @@ -1554,7 +1584,20 @@ public void onFailure(Exception e) {
shardGenerations
)
));
} catch (Exception e) {
}

@Override
public void onRejection(Exception e) {
if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
logger.debug("failing finalization of {} due to shutdown", snapshot);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice extra handling.

To verify my understanding, and make a note of the behavior change: previously we'd log a warning about failing due to shutdown; now we're making that quiet under debug. That seems reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we didn't fork here, we just ran all this on the calling thread, so rejection was not a thing that could happen.

handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
} else {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e);
assert false : new AssertionError("unexpected failure finalizing " + snapshot, e);
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
Expand Down