Skip to content

Commit a4b4ba1

Browse files
authored
Refactor SnapshotInfo dataflow in finalization (#124336) (#124387)
There's no need to have a `SnapshotInfo` consumer to run at the end of finalization, we only pass it the value we already calculated earlier. This replaces it with a bare `Runnable` instead.
1 parent c8bedc5 commit a4b4ba1

File tree

8 files changed

+21
-34
lines changed

8 files changed

+21
-34
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,10 +346,10 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
346346
finalizeSnapshotContext.snapshotInfo(),
347347
finalizeSnapshotContext.repositoryMetaVersion(),
348348
wrapWithWeakConsistencyProtection(ActionListener.runAfter(finalizeSnapshotContext, () -> metadataDone.onResponse(null))),
349-
info -> metadataDone.addListener(new ActionListener<>() {
349+
() -> metadataDone.addListener(new ActionListener<>() {
350350
@Override
351351
public void onResponse(Void unused) {
352-
finalizeSnapshotContext.onDone(info);
352+
finalizeSnapshotContext.onDone();
353353
}
354354

355355
@Override

server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import java.util.Map;
2323
import java.util.Set;
24-
import java.util.function.Consumer;
2524

2625
/**
2726
* Context for finalizing a snapshot.
@@ -44,7 +43,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
4443

4544
private final IndexVersion repositoryMetaVersion;
4645

47-
private final Consumer<SnapshotInfo> onDone;
46+
private final Runnable onDone;
4847

4948
/**
5049
* @param updatedShardGenerations updated shard generations
@@ -64,7 +63,7 @@ public FinalizeSnapshotContext(
6463
SnapshotInfo snapshotInfo,
6564
IndexVersion repositoryMetaVersion,
6665
ActionListener<RepositoryData> listener,
67-
Consumer<SnapshotInfo> onDone
66+
Runnable onDone
6867
) {
6968
super(listener);
7069
this.updatedShardGenerations = updatedShardGenerations;
@@ -113,8 +112,8 @@ public ClusterState updatedClusterState(ClusterState state) {
113112
return updatedState;
114113
}
115114

116-
public void onDone(SnapshotInfo snapshotInfo) {
117-
onDone.accept(snapshotInfo);
115+
public void onDone() {
116+
onDone.run();
118117
}
119118

120119
@Override

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1889,7 +1889,6 @@ public String toString() {
18891889
rootBlobUpdateResult.oldRepositoryData(),
18901890
rootBlobUpdateResult.newRepositoryData(),
18911891
finalizeSnapshotContext,
1892-
snapshotInfo,
18931892
writeShardGens
18941893
);
18951894
})
@@ -1908,7 +1907,6 @@ private void cleanupOldMetadata(
19081907
RepositoryData existingRepositoryData,
19091908
RepositoryData updatedRepositoryData,
19101909
FinalizeSnapshotContext finalizeSnapshotContext,
1911-
SnapshotInfo snapshotInfo,
19121910
boolean writeShardGenerations
19131911
) {
19141912
final Set<String> toDelete = new HashSet<>();
@@ -1957,11 +1955,11 @@ public void onFailure(Exception e) {
19571955

19581956
@Override
19591957
public void onAfter() {
1960-
finalizeSnapshotContext.onDone(snapshotInfo);
1958+
finalizeSnapshotContext.onDone();
19611959
}
19621960
});
19631961
} else {
1964-
finalizeSnapshotContext.onDone(snapshotInfo);
1962+
finalizeSnapshotContext.onDone();
19651963
}
19661964
}
19671965

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1530,11 +1530,11 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
15301530
shardGenerations
15311531
)
15321532
),
1533-
snInfo -> snapshotListeners.addListener(new ActionListener<>() {
1533+
() -> snapshotListeners.addListener(new ActionListener<>() {
15341534
@Override
15351535
public void onResponse(List<ActionListener<SnapshotInfo>> actionListeners) {
1536-
completeListenersIgnoringException(actionListeners, snInfo);
1537-
logger.info("snapshot [{}] completed with state [{}]", snapshot, snInfo.state());
1536+
completeListenersIgnoringException(actionListeners, snapshotInfo);
1537+
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
15381538
}
15391539

15401540
@Override

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public void testSnapshotWithConflictingName() throws Exception {
193193
),
194194
IndexVersion.current(),
195195
listener,
196-
info -> {}
196+
() -> {}
197197
)
198198
)
199199
);

test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map<St
551551
snapshotInfo,
552552
SnapshotsService.OLD_SNAPSHOT_FORMAT,
553553
listener,
554-
info -> {}
554+
() -> {}
555555
)
556556
)
557557
);

x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import org.apache.lucene.search.TopDocs;
2222
import org.apache.lucene.util.Bits;
2323
import org.elasticsearch.ExceptionsHelper;
24-
import org.elasticsearch.action.ActionListener;
2524
import org.elasticsearch.action.index.IndexRequest;
25+
import org.elasticsearch.action.support.ActionTestUtils;
2626
import org.elasticsearch.action.support.PlainActionFuture;
2727
import org.elasticsearch.cluster.metadata.IndexMetadata;
2828
import org.elasticsearch.cluster.metadata.MappingMetadata;
@@ -67,7 +67,6 @@
6767
import org.elasticsearch.repositories.FinalizeSnapshotContext;
6868
import org.elasticsearch.repositories.IndexId;
6969
import org.elasticsearch.repositories.Repository;
70-
import org.elasticsearch.repositories.RepositoryData;
7170
import org.elasticsearch.repositories.ShardGeneration;
7271
import org.elasticsearch.repositories.ShardGenerations;
7372
import org.elasticsearch.repositories.ShardSnapshotResult;
@@ -86,6 +85,7 @@
8685
import java.nio.file.Path;
8786
import java.util.Collections;
8887
import java.util.concurrent.Callable;
88+
import java.util.concurrent.CountDownLatch;
8989
import java.util.concurrent.ExecutionException;
9090
import java.util.stream.Collectors;
9191

@@ -355,7 +355,7 @@ public void testRestoreMinimal() throws IOException {
355355
)
356356
);
357357
future.actionGet();
358-
final PlainActionFuture<SnapshotInfo> finFuture = new PlainActionFuture<>();
358+
final CountDownLatch finishedLatch = new CountDownLatch(2);
359359
final ShardGenerations shardGenerations = ShardGenerations.builder()
360360
.put(indexId, 0, indexShardSnapshotStatus.generation())
361361
.build();
@@ -379,21 +379,11 @@ public void testRestoreMinimal() throws IOException {
379379
Collections.emptyMap()
380380
),
381381
IndexVersion.current(),
382-
new ActionListener<>() {
383-
@Override
384-
public void onResponse(RepositoryData repositoryData) {
385-
// nothing will resolve in the onDone callback below
386-
}
387-
388-
@Override
389-
public void onFailure(Exception e) {
390-
finFuture.onFailure(e);
391-
}
392-
},
393-
finFuture::onResponse
382+
ActionTestUtils.assertNoFailureListener(ignored -> finishedLatch.countDown()),
383+
finishedLatch::countDown
394384
)
395385
);
396-
finFuture.actionGet();
386+
safeAwait(finishedLatch);
397387
});
398388
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
399389
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());

x-pack/plugin/slm/src/internalClusterTest/java/org/elasticsearch/xpack/slm/SLMStatDisruptionIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,11 @@ public void finalizeSnapshot(FinalizeSnapshotContext fsc) {
244244
fsc.snapshotInfo(),
245245
fsc.repositoryMetaVersion(),
246246
fsc,
247-
snapshotInfo -> {
247+
() -> {
248248
// run the passed lambda before calling the usual callback
249249
// this is where the cluster can be restarted before SLM is called back with the snapshotInfo
250250
beforeResponseRunnable.run();
251-
fsc.onDone(snapshotInfo);
251+
fsc.onDone();
252252
}
253253
);
254254
super.finalizeSnapshot(newFinalizeContext);

0 commit comments

Comments
 (0)