Skip to content

Refactor SnapshotInfo dataflow in finalization #124336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,10 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
finalizeSnapshotContext.snapshotInfo(),
finalizeSnapshotContext.repositoryMetaVersion(),
wrapWithWeakConsistencyProtection(ActionListener.runAfter(finalizeSnapshotContext, () -> metadataDone.onResponse(null))),
info -> metadataDone.addListener(new ActionListener<>() {
() -> metadataDone.addListener(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
finalizeSnapshotContext.onDone(info);
finalizeSnapshotContext.onDone();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

/**
* Context for finalizing a snapshot.
Expand All @@ -44,7 +43,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo

private final IndexVersion repositoryMetaVersion;

private final Consumer<SnapshotInfo> onDone;
private final Runnable onDone;

/**
* @param updatedShardGenerations updated shard generations
Expand All @@ -64,7 +63,7 @@ public FinalizeSnapshotContext(
SnapshotInfo snapshotInfo,
IndexVersion repositoryMetaVersion,
ActionListener<RepositoryData> listener,
Consumer<SnapshotInfo> onDone
Runnable onDone
) {
super(listener);
this.updatedShardGenerations = updatedShardGenerations;
Expand Down Expand Up @@ -113,8 +112,8 @@ public ClusterState updatedClusterState(ClusterState state) {
return updatedState;
}

public void onDone(SnapshotInfo snapshotInfo) {
onDone.accept(snapshotInfo);
public void onDone() {
onDone.run();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1894,7 +1894,6 @@ public String toString() {
rootBlobUpdateResult.oldRepositoryData(),
rootBlobUpdateResult.newRepositoryData(),
finalizeSnapshotContext,
snapshotInfo,
writeShardGens
);
})
Expand All @@ -1913,7 +1912,6 @@ private void cleanupOldMetadata(
RepositoryData existingRepositoryData,
RepositoryData updatedRepositoryData,
FinalizeSnapshotContext finalizeSnapshotContext,
SnapshotInfo snapshotInfo,
boolean writeShardGenerations
) {
final Set<String> toDelete = new HashSet<>();
Expand Down Expand Up @@ -1962,11 +1960,11 @@ public void onFailure(Exception e) {

@Override
public void onAfter() {
finalizeSnapshotContext.onDone(snapshotInfo);
finalizeSnapshotContext.onDone();
}
});
} else {
finalizeSnapshotContext.onDone(snapshotInfo);
finalizeSnapshotContext.onDone();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1529,11 +1529,11 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
shardGenerations
)
),
snInfo -> snapshotListeners.addListener(new ActionListener<>() {
() -> snapshotListeners.addListener(new ActionListener<>() {
@Override
public void onResponse(List<ActionListener<SnapshotInfo>> actionListeners) {
completeListenersIgnoringException(actionListeners, snInfo);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snInfo.state());
completeListenersIgnoringException(actionListeners, snapshotInfo);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void testSnapshotWithConflictingName() throws Exception {
),
IndexVersion.current(),
listener,
info -> {}
() -> {}
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map<St
snapshotInfo,
SnapshotsService.OLD_SNAPSHOT_FORMAT,
listener,
info -> {}
() -> {}
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.Bits;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
Expand Down Expand Up @@ -68,7 +68,6 @@
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGeneration;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.ShardSnapshotResult;
Expand All @@ -87,6 +86,7 @@
import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -356,7 +356,7 @@ public void testRestoreMinimal() throws IOException {
)
);
future.actionGet();
final PlainActionFuture<SnapshotInfo> finFuture = new PlainActionFuture<>();
final CountDownLatch finishedLatch = new CountDownLatch(2);
final ShardGenerations shardGenerations = ShardGenerations.builder()
.put(indexId, 0, indexShardSnapshotStatus.generation())
.build();
Expand All @@ -380,21 +380,11 @@ public void testRestoreMinimal() throws IOException {
Collections.emptyMap()
),
IndexVersion.current(),
new ActionListener<>() {
@Override
public void onResponse(RepositoryData repositoryData) {
// nothing will resolve in the onDone callback below
}

@Override
public void onFailure(Exception e) {
finFuture.onFailure(e);
}
},
finFuture::onResponse
ActionTestUtils.assertNoFailureListener(ignored -> finishedLatch.countDown()),
finishedLatch::countDown
)
);
finFuture.actionGet();
safeAwait(finishedLatch);
});
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,11 @@ public void finalizeSnapshot(FinalizeSnapshotContext fsc) {
fsc.snapshotInfo(),
fsc.repositoryMetaVersion(),
fsc,
snapshotInfo -> {
() -> {
// run the passed lambda before calling the usual callback
// this is where the cluster can be restarted before SLM is called back with the snapshotInfo
beforeResponseRunnable.run();
fsc.onDone(snapshotInfo);
fsc.onDone();
}
);
super.finalizeSnapshot(newFinalizeContext);
Expand Down