Skip to content

Commit bdbfcb3

Browse files
Shorten time that snapshot finalization blocks repo (#89572)
For large snapshot clusters the final cleanup step that removes previous shard metadata might have to delete tens of thousands of blobs. This can take many seconds, for which the repository is needlessly blocked from running subsequent finalizations or deletes. Similar to how the final cleanup step was made async and taken out of the finalization loop for deletes in #86514 this PR moves the final cleanup step for finalization async.
1 parent ffcf0ea commit bdbfcb3

File tree

9 files changed

+91
-42
lines changed

9 files changed

+91
-42
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.unit.ByteSizeUnit;
2727
import org.elasticsearch.common.unit.ByteSizeValue;
2828
import org.elasticsearch.common.util.BigArrays;
29+
import org.elasticsearch.common.util.concurrent.ListenableFuture;
2930
import org.elasticsearch.core.TimeValue;
3031
import org.elasticsearch.indices.recovery.RecoverySettings;
3132
import org.elasticsearch.monitor.jvm.JvmInfo;
@@ -278,18 +279,33 @@ private static Map<String, String> buildLocation(RepositoryMetadata metadata) {
278279
private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();
279280

280281
@Override
281-
public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
282+
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
283+
final FinalizeSnapshotContext wrappedFinalizeContext;
282284
if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) {
283-
finalizeSnapshotContext = new FinalizeSnapshotContext(
285+
final ListenableFuture<Void> metadataDone = new ListenableFuture<>();
286+
wrappedFinalizeContext = new FinalizeSnapshotContext(
284287
finalizeSnapshotContext.updatedShardGenerations(),
285288
finalizeSnapshotContext.repositoryStateId(),
286289
finalizeSnapshotContext.clusterMetadata(),
287290
finalizeSnapshotContext.snapshotInfo(),
288291
finalizeSnapshotContext.repositoryMetaVersion(),
289-
delayedListener(finalizeSnapshotContext)
292+
delayedListener(ActionListener.runAfter(finalizeSnapshotContext, () -> metadataDone.onResponse(null))),
293+
info -> metadataDone.addListener(new ActionListener<>() {
294+
@Override
295+
public void onResponse(Void unused) {
296+
finalizeSnapshotContext.onDone(info);
297+
}
298+
299+
@Override
300+
public void onFailure(Exception e) {
301+
assert false : e; // never fails
302+
}
303+
})
290304
);
305+
} else {
306+
wrappedFinalizeContext = finalizeSnapshotContext;
291307
}
292-
super.finalizeSnapshot(finalizeSnapshotContext);
308+
super.finalizeSnapshot(wrappedFinalizeContext);
293309
}
294310

295311
@Override

server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,17 @@
1414
import org.elasticsearch.cluster.ClusterState;
1515
import org.elasticsearch.cluster.SnapshotsInProgress;
1616
import org.elasticsearch.cluster.metadata.Metadata;
17-
import org.elasticsearch.core.Tuple;
1817
import org.elasticsearch.snapshots.SnapshotInfo;
1918
import org.elasticsearch.snapshots.SnapshotsService;
2019

2120
import java.util.Map;
2221
import java.util.Set;
22+
import java.util.function.Consumer;
2323

2424
/**
2525
* Context for finalizing a snapshot.
2626
*/
27-
public final class FinalizeSnapshotContext extends ActionListener.Delegating<
28-
Tuple<RepositoryData, SnapshotInfo>,
29-
Tuple<RepositoryData, SnapshotInfo>> {
27+
public final class FinalizeSnapshotContext extends ActionListener.Delegating<RepositoryData, RepositoryData> {
3028

3129
private final ShardGenerations updatedShardGenerations;
3230

@@ -44,29 +42,35 @@ public final class FinalizeSnapshotContext extends ActionListener.Delegating<
4442

4543
private final Version repositoryMetaVersion;
4644

45+
private final Consumer<SnapshotInfo> onDone;
46+
4747
/**
4848
* @param updatedShardGenerations updated shard generations
4949
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
5050
* @param clusterMetadata cluster metadata
5151
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
5252
* @param repositoryMetaVersion version of the updated repository metadata to write
53-
* @param listener listener to be invoked with the new {@link RepositoryData} and {@link SnapshotInfo} after completing
54-
* the snapshot
53+
* @param listener listener to be invoked with the new {@link RepositoryData} after the snapshot has been successfully
54+
* added to the repository
55+
* @param onDone consumer of the new {@link SnapshotInfo} for the snapshot that is invoked after the {@code listener}
56+
* once all cleanup operations after snapshot completion have executed
5557
*/
5658
public FinalizeSnapshotContext(
5759
ShardGenerations updatedShardGenerations,
5860
long repositoryStateId,
5961
Metadata clusterMetadata,
6062
SnapshotInfo snapshotInfo,
6163
Version repositoryMetaVersion,
62-
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener
64+
ActionListener<RepositoryData> listener,
65+
Consumer<SnapshotInfo> onDone
6366
) {
6467
super(listener);
6568
this.updatedShardGenerations = updatedShardGenerations;
6669
this.repositoryStateId = repositoryStateId;
6770
this.clusterMetadata = clusterMetadata;
6871
this.snapshotInfo = snapshotInfo;
6972
this.repositoryMetaVersion = repositoryMetaVersion;
73+
this.onDone = onDone;
7074
}
7175

7276
public long repositoryStateId() {
@@ -103,8 +107,12 @@ public ClusterState updatedClusterState(ClusterState state) {
103107
return updatedState;
104108
}
105109

110+
public void onDone(SnapshotInfo snapshotInfo) {
111+
onDone.accept(snapshotInfo);
112+
}
113+
106114
@Override
107-
public void onResponse(Tuple<RepositoryData, SnapshotInfo> repositoryData) {
115+
public void onResponse(RepositoryData repositoryData) {
108116
delegate.onResponse(repositoryData);
109117
}
110118
}

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,10 +1404,12 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
14041404
repositoryMetaVersion,
14051405
finalizeSnapshotContext::updatedClusterState,
14061406
ActionListener.wrap(newRepoData -> {
1407+
finalizeSnapshotContext.onResponse(newRepoData);
14071408
if (writeShardGens) {
1408-
cleanupOldShardGens(existingRepositoryData, newRepoData, finalizeSnapshotContext);
1409+
cleanupOldShardGens(existingRepositoryData, newRepoData, finalizeSnapshotContext, snapshotInfo);
1410+
} else {
1411+
finalizeSnapshotContext.onDone(snapshotInfo);
14091412
}
1410-
finalizeSnapshotContext.onResponse(Tuple.tuple(newRepoData, snapshotInfo));
14111413
}, onUpdateFailure)
14121414
);
14131415
}, onUpdateFailure), 2 + indices.size());
@@ -1463,7 +1465,8 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte
14631465
private void cleanupOldShardGens(
14641466
RepositoryData existingRepositoryData,
14651467
RepositoryData updatedRepositoryData,
1466-
FinalizeSnapshotContext finalizeSnapshotContext
1468+
FinalizeSnapshotContext finalizeSnapshotContext,
1469+
SnapshotInfo snapshotInfo
14671470
) {
14681471
final Set<String> toDelete = new HashSet<>();
14691472
final int prefixPathLen = basePath().buildAsString().length();
@@ -1484,10 +1487,18 @@ private void cleanupOldShardGens(
14841487
toDelete.add(containerPath + shardGeneration);
14851488
}
14861489
}
1487-
try {
1488-
deleteFromContainer(blobContainer(), toDelete.iterator());
1489-
} catch (Exception e) {
1490-
logger.warn("Failed to clean up old shard generation blobs", e);
1490+
if (toDelete.isEmpty() == false) {
1491+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
1492+
try {
1493+
deleteFromContainer(blobContainer(), toDelete.iterator());
1494+
} catch (Exception e) {
1495+
logger.warn("Failed to clean up old shard generation blobs", e);
1496+
} finally {
1497+
finalizeSnapshotContext.onDone(snapshotInfo);
1498+
}
1499+
});
1500+
} else {
1501+
finalizeSnapshotContext.onDone(snapshotInfo);
14911502
}
14921503
}
14931504

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
152152

153153
private final ThreadPool threadPool;
154154

155-
private final Map<Snapshot, List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>>> snapshotCompletionListeners =
156-
new ConcurrentHashMap<>();
155+
private final Map<Snapshot, List<ActionListener<SnapshotInfo>>> snapshotCompletionListeners = new ConcurrentHashMap<>();
157156

158157
/**
159158
* Listeners for snapshot deletion keyed by delete uuid as returned from {@link SnapshotDeletionsInProgress.Entry#uuid()}
@@ -233,7 +232,7 @@ public SnapshotsService(
233232
* @param listener snapshot completion listener
234233
*/
235234
public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
236-
createSnapshot(request, ActionListener.wrap(snapshot -> addListener(snapshot, listener.map(Tuple::v2)), listener::onFailure));
235+
createSnapshot(request, ActionListener.wrap(snapshot -> addListener(snapshot, listener), listener::onFailure));
237236
}
238237

239238
/**
@@ -1591,12 +1590,14 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
15911590
metaForSnapshot,
15921591
snapshotInfo,
15931592
entry.version(),
1594-
ActionListener.wrap(result -> {
1595-
final SnapshotInfo writtenSnapshotInfo = result.v2();
1596-
completeListenersIgnoringException(endAndGetListenersToResolve(writtenSnapshotInfo.snapshot()), result);
1597-
logger.info("snapshot [{}] completed with state [{}]", snapshot, writtenSnapshotInfo.state());
1598-
runNextQueuedOperation(result.v1(), repository, true);
1599-
}, e -> handleFinalizationFailure(e, snapshot, repositoryData))
1593+
ActionListener.wrap(
1594+
updatedRepositoryData -> runNextQueuedOperation(updatedRepositoryData, repository, true),
1595+
e -> handleFinalizationFailure(e, snapshot, repositoryData)
1596+
),
1597+
snInfo -> {
1598+
completeListenersIgnoringException(endAndGetListenersToResolve(snInfo.snapshot()), snInfo);
1599+
logger.info("snapshot [{}] completed with state [{}]", snapshot, snInfo.state());
1600+
}
16001601
)
16011602
);
16021603
}, e -> handleFinalizationFailure(e, snapshot, repositoryData));
@@ -1635,10 +1636,10 @@ private static List<SnapshotFeatureInfo> onlySuccessfulFeatureStates(SnapshotsIn
16351636
/**
16361637
* Remove a snapshot from {@link #endingSnapshots} set and return its completion listeners that must be resolved.
16371638
*/
1638-
private List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> endAndGetListenersToResolve(Snapshot snapshot) {
1639+
private List<ActionListener<SnapshotInfo>> endAndGetListenersToResolve(Snapshot snapshot) {
16391640
// get listeners before removing from the ending snapshots set to not trip assertion in #assertConsistentWithClusterState that
16401641
// makes sure we don't have listeners for snapshots that aren't tracked in any internal state of this class
1641-
final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> listenersToComplete = snapshotCompletionListeners.remove(snapshot);
1642+
final List<ActionListener<SnapshotInfo>> listenersToComplete = snapshotCompletionListeners.remove(snapshot);
16421643
endingSnapshots.remove(snapshot);
16431644
return listenersToComplete;
16441645
}
@@ -2982,7 +2983,7 @@ static Map<String, DataStreamAlias> filterDataStreamAliases(
29822983
* @param snapshot Snapshot to listen for
29832984
* @param listener listener
29842985
*/
2985-
private void addListener(Snapshot snapshot, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
2986+
private void addListener(Snapshot snapshot, ActionListener<SnapshotInfo> listener) {
29862987
snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>())
29872988
.add(ContextPreservingActionListener.wrapPreservingContext(listener, threadPool.getThreadContext()));
29882989
}

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.common.settings.Settings;
2424
import org.elasticsearch.common.util.MockBigArrays;
2525
import org.elasticsearch.core.IOUtils;
26-
import org.elasticsearch.core.Tuple;
2726
import org.elasticsearch.env.Environment;
2827
import org.elasticsearch.env.TestEnvironment;
2928
import org.elasticsearch.index.engine.InternalEngineFactory;
@@ -172,7 +171,7 @@ public void testSnapshotWithConflictingName() throws Exception {
172171
new SnapshotId(snapshot.getSnapshotId().getName(), "_uuid2")
173172
);
174173
final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, shardGen).build();
175-
PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(
174+
PlainActionFuture.<RepositoryData, Exception>get(
176175
f -> repository.finalizeSnapshot(
177176
new FinalizeSnapshotContext(
178177
shardGenerations,
@@ -193,7 +192,8 @@ public void testSnapshotWithConflictingName() throws Exception {
193192
Collections.emptyMap()
194193
),
195194
Version.CURRENT,
196-
f
195+
f,
196+
info -> {}
197197
)
198198
)
199199
);

test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.elasticsearch.common.settings.Settings;
3737
import org.elasticsearch.common.unit.ByteSizeUnit;
3838
import org.elasticsearch.core.Nullable;
39-
import org.elasticsearch.core.Tuple;
4039
import org.elasticsearch.plugins.Plugin;
4140
import org.elasticsearch.repositories.FinalizeSnapshotContext;
4241
import org.elasticsearch.repositories.RepositoriesService;
@@ -528,15 +527,16 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map<St
528527
SnapshotState.FAILED,
529528
Collections.emptyMap()
530529
);
531-
PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(
530+
PlainActionFuture.<RepositoryData, Exception>get(
532531
f -> repo.finalizeSnapshot(
533532
new FinalizeSnapshotContext(
534533
ShardGenerations.EMPTY,
535534
getRepositoryData(repoName).getGenId(),
536535
state.metadata(),
537536
snapshotInfo,
538537
SnapshotsService.OLD_SNAPSHOT_FORMAT,
539-
f
538+
f,
539+
info -> {}
540540
)
541541
)
542542
);

x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
106106
metadataToSnapshot(finalizeSnapshotContext.updatedShardGenerations().indices(), finalizeSnapshotContext.clusterMetadata()),
107107
finalizeSnapshotContext.snapshotInfo(),
108108
finalizeSnapshotContext.repositoryMetaVersion(),
109-
finalizeSnapshotContext
109+
finalizeSnapshotContext,
110+
finalizeSnapshotContext::onDone
110111
)
111112
);
112113
}

x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.lucene.util.Bits;
2323
import org.elasticsearch.ExceptionsHelper;
2424
import org.elasticsearch.Version;
25+
import org.elasticsearch.action.ActionListener;
2526
import org.elasticsearch.action.index.IndexRequest;
2627
import org.elasticsearch.action.support.PlainActionFuture;
2728
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -42,7 +43,6 @@
4243
import org.elasticsearch.common.util.MockBigArrays;
4344
import org.elasticsearch.common.xcontent.XContentHelper;
4445
import org.elasticsearch.core.IOUtils;
45-
import org.elasticsearch.core.Tuple;
4646
import org.elasticsearch.env.Environment;
4747
import org.elasticsearch.env.TestEnvironment;
4848
import org.elasticsearch.index.VersionType;
@@ -303,7 +303,7 @@ public void testRestoreMinmal() throws IOException {
303303
)
304304
);
305305
future.actionGet();
306-
final PlainActionFuture<Tuple<RepositoryData, SnapshotInfo>> finFuture = PlainActionFuture.newFuture();
306+
final PlainActionFuture<SnapshotInfo> finFuture = PlainActionFuture.newFuture();
307307
final ShardGenerations shardGenerations = ShardGenerations.builder()
308308
.put(indexId, 0, indexShardSnapshotStatus.generation())
309309
.build();
@@ -327,7 +327,18 @@ public void testRestoreMinmal() throws IOException {
327327
Collections.emptyMap()
328328
),
329329
Version.CURRENT,
330-
finFuture
330+
new ActionListener<>() {
331+
@Override
332+
public void onResponse(RepositoryData repositoryData) {
333+
// nothing will resolve in the onDone callback below
334+
}
335+
336+
@Override
337+
public void onFailure(Exception e) {
338+
finFuture.onFailure(e);
339+
}
340+
},
341+
finFuture::onResponse
331342
)
332343
);
333344
finFuture.actionGet();

x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,8 @@ public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
238238
finalizeSnapshotContext.clusterMetadata(),
239239
updatedSnapshotInfo,
240240
finalizeSnapshotContext.repositoryMetaVersion(),
241-
finalizeSnapshotContext
241+
finalizeSnapshotContext,
242+
finalizeSnapshotContext::onDone
242243
)
243244
);
244245
}

0 commit comments

Comments
 (0)