Skip to content

Commit f62618c

Browse files
Ensure Node Shutdown Waits for Running Restores to Complete (#76070)
We must wait for ongoing restores to complete before shutting down the repositories service. Otherwise we may leak file descriptors because tasks for releasing the store are submitted to the `SNAPSHOT` or some searchable snapshot pools that quietly accept but never reject/fail tasks after shutdown. same as #46178 where we had the same bug in recoveries closes #75686
1 parent 0ae9e77 commit f62618c

File tree

7 files changed

+81
-1
lines changed

7 files changed

+81
-1
lines changed

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

+5
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ public void cloneShardSnapshot(
181181
in.cloneShardSnapshot(source, target, shardId, shardGeneration, listener);
182182
}
183183

184+
@Override
185+
public void awaitIdle() {
186+
in.awaitIdle();
187+
}
188+
184189
@Override
185190
public Lifecycle.State lifecycleState() {
186191
return in.lifecycleState();

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

+3
Original file line numberDiff line numberDiff line change
@@ -771,5 +771,8 @@ protected void doClose() throws IOException {
771771
repos.addAll(internalRepositories.values());
772772
repos.addAll(repositories.values());
773773
IOUtils.close(repos);
774+
for (Repository repo : repos) {
775+
repo.awaitIdle();
776+
}
774777
}
775778
}

server/src/main/java/org/elasticsearch/repositories/Repository.java

+9
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,15 @@ default Map<String, Object> adaptUserMetadata(Map<String, Object> userMetadata)
321321
return userMetadata;
322322
}
323323

324+
/**
325+
* Block until all in-flight operations for this repository have completed. Must only be called after this instance has been closed
326+
* by a call to stop {@link #close()}.
327+
* Waiting for ongoing operations should be implemented here instead of in {@link #stop()} or {@link #close()} hooks of this interface
328+
* as these are expected to be called on the cluster state applier thread (which must not block) if a repository is removed from the
329+
* cluster. This method is intended to be called on node shutdown instead as a means to ensure no repository operations are leaked.
330+
*/
331+
void awaitIdle();
332+
324333
static boolean assertSnapshotMetaThread() {
325334
final String threadName = Thread.currentThread().getName();
326335
assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') || threadName.startsWith("TEST-")

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+53-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.action.StepListener;
3131
import org.elasticsearch.action.support.GroupedActionListener;
3232
import org.elasticsearch.action.support.ListenableActionFuture;
33+
import org.elasticsearch.action.support.PlainActionFuture;
3334
import org.elasticsearch.action.support.ThreadedActionListener;
3435
import org.elasticsearch.cluster.ClusterState;
3536
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -66,6 +67,7 @@
6667
import org.elasticsearch.common.util.BigArrays;
6768
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
6869
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
70+
import org.elasticsearch.common.util.concurrent.FutureUtils;
6971
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
7072
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
7173
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -423,6 +425,30 @@ protected void doClose() {
423425
}
424426
}
425427

428+
// listeners to invoke when a restore completes and there are no more restores running
429+
@Nullable
430+
private List<ActionListener<Void>> emptyListeners;
431+
432+
// Set of shard ids that this repository is currently restoring
433+
private final Set<ShardId> ongoingRestores = new HashSet<>();
434+
435+
@Override
436+
public void awaitIdle() {
437+
assert lifecycle.stoppedOrClosed();
438+
final PlainActionFuture<Void> future;
439+
synchronized (ongoingRestores) {
440+
if (ongoingRestores.isEmpty()) {
441+
return;
442+
}
443+
future = new PlainActionFuture<>();
444+
if (emptyListeners == null) {
445+
emptyListeners = new ArrayList<>();
446+
}
447+
emptyListeners.add(future);
448+
}
449+
FutureUtils.get(future);
450+
}
451+
426452
@Override
427453
public void executeConsistentStateUpdate(
428454
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
@@ -2885,7 +2911,30 @@ public void restoreShard(
28852911
);
28862912
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
28872913
final BlobContainer container = shardContainer(indexId, snapshotShardId);
2888-
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
2914+
synchronized (ongoingRestores) {
2915+
if (store.isClosing()) {
2916+
restoreListener.onFailure(new AlreadyClosedException("store is closing"));
2917+
return;
2918+
}
2919+
if (lifecycle.started() == false) {
2920+
restoreListener.onFailure(new AlreadyClosedException("repository [" + metadata.name() + "] closed"));
2921+
return;
2922+
}
2923+
final boolean added = ongoingRestores.add(shardId);
2924+
assert added : "add restore for [" + shardId + "] that already has an existing restore";
2925+
}
2926+
executor.execute(ActionRunnable.wrap(ActionListener.runAfter(restoreListener, () -> {
2927+
final List<ActionListener<Void>> onEmptyListeners;
2928+
synchronized (ongoingRestores) {
2929+
if (ongoingRestores.remove(shardId) && ongoingRestores.isEmpty() && emptyListeners != null) {
2930+
onEmptyListeners = emptyListeners;
2931+
emptyListeners = null;
2932+
} else {
2933+
return;
2934+
}
2935+
}
2936+
ActionListener.onResponse(onEmptyListeners, null);
2937+
}), l -> {
28892938
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
28902939
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null);
28912940
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@@ -2991,6 +3040,9 @@ void ensureNotClosing(final Store store) throws AlreadyClosedException {
29913040
if (store.isClosing()) {
29923041
throw new AlreadyClosedException("store is closing");
29933042
}
3043+
if (lifecycle.started() == false) {
3044+
throw new AlreadyClosedException("repository [" + metadata.name() + "] closed");
3045+
}
29943046
}
29953047

29963048
}.restore(snapshotFiles, store, l);

server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

+3
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,9 @@ public void cloneShardSnapshot(
317317

318318
}
319319

320+
@Override
321+
public void awaitIdle() {}
322+
320323
@Override
321324
public Lifecycle.State lifecycleState() {
322325
return null;

test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

+4
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ public void verify(String verificationToken, DiscoveryNode localNode) {
148148
public void updateState(final ClusterState state) {
149149
}
150150

151+
@Override
152+
public void awaitIdle() {
153+
}
154+
151155
@Override
152156
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
153157
Consumer<Exception> onFailure) {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

+4
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,10 @@ public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryS
494494
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
495495
}
496496

497+
@Override
498+
public void awaitIdle() {
499+
}
500+
497501
private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
498502
Client followerClient, Index followerIndex) {
499503
final PlainActionFuture<IndexMetadata> indexMetadataFuture = new PlainActionFuture<>();

0 commit comments

Comments
 (0)