Skip to content

Commit b213bcb

Browse files
Ensure Node Shutdown Waits for Running Restores to Complete (#76070) (#76092)
We must wait for ongoing restores to complete before shutting down the repositories service. Otherwise we may leak file descriptors because tasks for releasing the store are submitted to the `SNAPSHOT` or some searchable snapshot pools that quietly accept but never reject/fail tasks after shutdown. same as #46178 where we had the same bug in recoveries closes #75686
1 parent cb22468 commit b213bcb

File tree

7 files changed

+81
-1
lines changed

7 files changed

+81
-1
lines changed

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

+5
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ public void cloneShardSnapshot(
187187
in.cloneShardSnapshot(source, target, shardId, shardGeneration, listener);
188188
}
189189

190+
@Override
191+
public void awaitIdle() {
192+
in.awaitIdle();
193+
}
194+
190195
@Override
191196
public Lifecycle.State lifecycleState() {
192197
return in.lifecycleState();

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

+3
Original file line numberDiff line numberDiff line change
@@ -771,5 +771,8 @@ protected void doClose() throws IOException {
771771
repos.addAll(internalRepositories.values());
772772
repos.addAll(repositories.values());
773773
IOUtils.close(repos);
774+
for (Repository repo : repos) {
775+
repo.awaitIdle();
776+
}
774777
}
775778
}

server/src/main/java/org/elasticsearch/repositories/Repository.java

+9
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,15 @@ default Map<String, Object> adaptUserMetadata(Map<String, Object> userMetadata)
335335
return userMetadata;
336336
}
337337

338+
/**
339+
* Block until all in-flight operations for this repository have completed. Must only be called after this instance has been closed
340+
* by a call to stop {@link #close()}.
341+
* Waiting for ongoing operations should be implemented here instead of in {@link #stop()} or {@link #close()} hooks of this interface
342+
* as these are expected to be called on the cluster state applier thread (which must not block) if a repository is removed from the
343+
* cluster. This method is intended to be called on node shutdown instead as a means to ensure no repository operations are leaked.
344+
*/
345+
void awaitIdle();
346+
338347
static boolean assertSnapshotMetaThread() {
339348
final String threadName = Thread.currentThread().getName();
340349
assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') || threadName.startsWith("TEST-")

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+53-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.action.StepListener;
3131
import org.elasticsearch.action.support.GroupedActionListener;
3232
import org.elasticsearch.action.support.ListenableActionFuture;
33+
import org.elasticsearch.action.support.PlainActionFuture;
3334
import org.elasticsearch.action.support.ThreadedActionListener;
3435
import org.elasticsearch.cluster.ClusterState;
3536
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -66,6 +67,7 @@
6667
import org.elasticsearch.common.util.BigArrays;
6768
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
6869
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
70+
import org.elasticsearch.common.util.concurrent.FutureUtils;
6971
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
7072
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
7173
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -427,6 +429,30 @@ protected void doClose() {
427429
}
428430
}
429431

432+
// listeners to invoke when a restore completes and there are no more restores running
433+
@Nullable
434+
private List<ActionListener<Void>> emptyListeners;
435+
436+
// Set of shard ids that this repository is currently restoring
437+
private final Set<ShardId> ongoingRestores = new HashSet<>();
438+
439+
@Override
440+
public void awaitIdle() {
441+
assert lifecycle.stoppedOrClosed();
442+
final PlainActionFuture<Void> future;
443+
synchronized (ongoingRestores) {
444+
if (ongoingRestores.isEmpty()) {
445+
return;
446+
}
447+
future = new PlainActionFuture<>();
448+
if (emptyListeners == null) {
449+
emptyListeners = new ArrayList<>();
450+
}
451+
emptyListeners.add(future);
452+
}
453+
FutureUtils.get(future);
454+
}
455+
430456
@Override
431457
public void executeConsistentStateUpdate(
432458
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
@@ -2897,7 +2923,30 @@ public void restoreShard(
28972923
);
28982924
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
28992925
final BlobContainer container = shardContainer(indexId, snapshotShardId);
2900-
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
2926+
synchronized (ongoingRestores) {
2927+
if (store.isClosing()) {
2928+
restoreListener.onFailure(new AlreadyClosedException("store is closing"));
2929+
return;
2930+
}
2931+
if (lifecycle.started() == false) {
2932+
restoreListener.onFailure(new AlreadyClosedException("repository [" + metadata.name() + "] closed"));
2933+
return;
2934+
}
2935+
final boolean added = ongoingRestores.add(shardId);
2936+
assert added : "add restore for [" + shardId + "] that already has an existing restore";
2937+
}
2938+
executor.execute(ActionRunnable.wrap(ActionListener.runAfter(restoreListener, () -> {
2939+
final List<ActionListener<Void>> onEmptyListeners;
2940+
synchronized (ongoingRestores) {
2941+
if (ongoingRestores.remove(shardId) && ongoingRestores.isEmpty() && emptyListeners != null) {
2942+
onEmptyListeners = emptyListeners;
2943+
emptyListeners = null;
2944+
} else {
2945+
return;
2946+
}
2947+
}
2948+
ActionListener.onResponse(onEmptyListeners, null);
2949+
}), l -> {
29012950
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
29022951
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null);
29032952
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@@ -3003,6 +3052,9 @@ void ensureNotClosing(final Store store) throws AlreadyClosedException {
30033052
if (store.isClosing()) {
30043053
throw new AlreadyClosedException("store is closing");
30053054
}
3055+
if (lifecycle.started() == false) {
3056+
throw new AlreadyClosedException("repository [" + metadata.name() + "] closed");
3057+
}
30063058
}
30073059

30083060
}.restore(snapshotFiles, store, l);

server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

+3
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,9 @@ public void cloneShardSnapshot(
322322

323323
}
324324

325+
@Override
326+
public void awaitIdle() {}
327+
325328
@Override
326329
public Lifecycle.State lifecycleState() {
327330
return null;

test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

+4
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ public void verify(String verificationToken, DiscoveryNode localNode) {
152152
public void updateState(final ClusterState state) {
153153
}
154154

155+
@Override
156+
public void awaitIdle() {
157+
}
158+
155159
@Override
156160
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
157161
Consumer<Exception> onFailure) {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

+4
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,10 @@ public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryS
503503
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
504504
}
505505

506+
@Override
507+
public void awaitIdle() {
508+
}
509+
506510
private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
507511
Client followerClient, Index followerIndex) {
508512
final PlainActionFuture<IndexMetadata> indexMetadataFuture = new PlainActionFuture<>();

0 commit comments

Comments
 (0)