Skip to content

Commit e484e67

Browse files
Ensure Node Shutdown Waits for Running Restores to Complete (#76070) (#76095)
We must wait for ongoing restores to complete before shutting down the repositories service. Otherwise we may leak file descriptors because tasks for releasing the store are submitted to the `SNAPSHOT` or some searchable snapshot pools that quietly accept but never reject/fail tasks after shutdown. same as #46178 where we had the same bug in recoveries closes #75686
1 parent 5afcf92 commit e484e67

File tree

7 files changed

+81
-1
lines changed

7 files changed

+81
-1
lines changed

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ public void cloneShardSnapshot(
187187
in.cloneShardSnapshot(source, target, shardId, shardGeneration, listener);
188188
}
189189

190+
@Override
191+
public void awaitIdle() {
192+
in.awaitIdle();
193+
}
194+
190195
@Override
191196
public Lifecycle.State lifecycleState() {
192197
return in.lifecycleState();

server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,5 +771,8 @@ protected void doClose() throws IOException {
771771
repos.addAll(internalRepositories.values());
772772
repos.addAll(repositories.values());
773773
IOUtils.close(repos);
774+
for (Repository repo : repos) {
775+
repo.awaitIdle();
776+
}
774777
}
775778
}

server/src/main/java/org/elasticsearch/repositories/Repository.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,15 @@ default Map<String, Object> adaptUserMetadata(Map<String, Object> userMetadata)
335335
return userMetadata;
336336
}
337337

338+
/**
339+
* Block until all in-flight operations for this repository have completed. Must only be called after this instance has been closed
340+
* by a call to stop {@link #close()}.
341+
* Waiting for ongoing operations should be implemented here instead of in {@link #stop()} or {@link #close()} hooks of this interface
342+
* as these are expected to be called on the cluster state applier thread (which must not block) if a repository is removed from the
343+
* cluster. This method is intended to be called on node shutdown instead as a means to ensure no repository operations are leaked.
344+
*/
345+
void awaitIdle();
346+
338347
static boolean assertSnapshotMetaThread() {
339348
final String threadName = Thread.currentThread().getName();
340349
assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') || threadName.startsWith("TEST-")

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.action.StepListener;
3131
import org.elasticsearch.action.support.GroupedActionListener;
3232
import org.elasticsearch.action.support.ListenableActionFuture;
33+
import org.elasticsearch.action.support.PlainActionFuture;
3334
import org.elasticsearch.action.support.ThreadedActionListener;
3435
import org.elasticsearch.cluster.ClusterState;
3536
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -66,6 +67,7 @@
6667
import org.elasticsearch.common.util.BigArrays;
6768
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
6869
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
70+
import org.elasticsearch.common.util.concurrent.FutureUtils;
6971
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
7072
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
7173
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -428,6 +430,30 @@ protected void doClose() {
428430
}
429431
}
430432

433+
// listeners to invoke when a restore completes and there are no more restores running
434+
@Nullable
435+
private List<ActionListener<Void>> emptyListeners;
436+
437+
// Set of shard ids that this repository is currently restoring
438+
private final Set<ShardId> ongoingRestores = new HashSet<>();
439+
440+
@Override
441+
public void awaitIdle() {
442+
assert lifecycle.stoppedOrClosed();
443+
final PlainActionFuture<Void> future;
444+
synchronized (ongoingRestores) {
445+
if (ongoingRestores.isEmpty()) {
446+
return;
447+
}
448+
future = new PlainActionFuture<>();
449+
if (emptyListeners == null) {
450+
emptyListeners = new ArrayList<>();
451+
}
452+
emptyListeners.add(future);
453+
}
454+
FutureUtils.get(future);
455+
}
456+
431457
@Override
432458
public void executeConsistentStateUpdate(
433459
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
@@ -2907,7 +2933,30 @@ public void restoreShard(
29072933
);
29082934
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
29092935
final BlobContainer container = shardContainer(indexId, snapshotShardId);
2910-
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
2936+
synchronized (ongoingRestores) {
2937+
if (store.isClosing()) {
2938+
restoreListener.onFailure(new AlreadyClosedException("store is closing"));
2939+
return;
2940+
}
2941+
if (lifecycle.started() == false) {
2942+
restoreListener.onFailure(new AlreadyClosedException("repository [" + metadata.name() + "] closed"));
2943+
return;
2944+
}
2945+
final boolean added = ongoingRestores.add(shardId);
2946+
assert added : "add restore for [" + shardId + "] that already has an existing restore";
2947+
}
2948+
executor.execute(ActionRunnable.wrap(ActionListener.runAfter(restoreListener, () -> {
2949+
final List<ActionListener<Void>> onEmptyListeners;
2950+
synchronized (ongoingRestores) {
2951+
if (ongoingRestores.remove(shardId) && ongoingRestores.isEmpty() && emptyListeners != null) {
2952+
onEmptyListeners = emptyListeners;
2953+
emptyListeners = null;
2954+
} else {
2955+
return;
2956+
}
2957+
}
2958+
ActionListener.onResponse(onEmptyListeners, null);
2959+
}), l -> {
29112960
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
29122961
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null);
29132962
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@@ -3013,6 +3062,9 @@ void ensureNotClosing(final Store store) throws AlreadyClosedException {
30133062
if (store.isClosing()) {
30143063
throw new AlreadyClosedException("store is closing");
30153064
}
3065+
if (lifecycle.started() == false) {
3066+
throw new AlreadyClosedException("repository [" + metadata.name() + "] closed");
3067+
}
30163068
}
30173069

30183070
}.restore(snapshotFiles, store, l);

server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,9 @@ public void cloneShardSnapshot(
322322

323323
}
324324

325+
@Override
326+
public void awaitIdle() {}
327+
325328
@Override
326329
public Lifecycle.State lifecycleState() {
327330
return null;

test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ public void verify(String verificationToken, DiscoveryNode localNode) {
153153
public void updateState(final ClusterState state) {
154154
}
155155

156+
@Override
157+
public void awaitIdle() {
158+
}
159+
156160
@Override
157161
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
158162
Consumer<Exception> onFailure) {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,10 @@ public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryS
506506
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
507507
}
508508

509+
@Override
510+
public void awaitIdle() {
511+
}
512+
509513
private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
510514
Client followerClient, Index followerIndex) {
511515
final PlainActionFuture<IndexMetadata> indexMetadataFuture = new PlainActionFuture<>();

0 commit comments

Comments
 (0)