Skip to content

Commit 5688e0e

Browse files
authored
Do not release safe commit with CancellableThreads (#59182)
We are leaking a FileChannel in #39585 if we release a safe commit with CancellableThreads. Although it is a bug in Lucene where we do not close a FileChannel if we failed to create a NIOFSIndexInput, I think it's safer if we release a safe commit using the generic thread pool instead. Closes #39585 Relates #45409
1 parent 611fb03 commit 5688e0e

File tree

1 file changed

+28
-9
lines changed

1 file changed

+28
-9
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.action.support.replication.ReplicationResponse;
4141
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
4242
import org.elasticsearch.cluster.routing.ShardRouting;
43+
import org.elasticsearch.common.CheckedRunnable;
4344
import org.elasticsearch.common.StopWatch;
4445
import org.elasticsearch.common.bytes.BytesArray;
4546
import org.elasticsearch.common.bytes.BytesReference;
@@ -85,6 +86,7 @@
8586
import java.util.concurrent.CompletableFuture;
8687
import java.util.concurrent.ConcurrentLinkedDeque;
8788
import java.util.concurrent.CopyOnWriteArrayList;
89+
import java.util.concurrent.atomic.AtomicBoolean;
8890
import java.util.concurrent.atomic.AtomicInteger;
8991
import java.util.concurrent.atomic.AtomicLong;
9092
import java.util.function.Consumer;
@@ -223,7 +225,7 @@ && isTargetSameHistory()
223225
} else {
224226
final Engine.IndexCommitRef safeCommitRef;
225227
try {
226-
safeCommitRef = shard.acquireSafeIndexCommit();
228+
safeCommitRef = acquireSafeCommit(shard);
227229
resources.add(safeCommitRef);
228230
} catch (final Exception e) {
229231
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
@@ -395,17 +397,34 @@ public void onFailure(Exception e) {
395397
*/
396398
private Releasable acquireStore(Store store) {
397399
store.incRef();
398-
return Releasables.releaseOnce(() -> {
399-
final PlainActionFuture<Void> future = new PlainActionFuture<>();
400-
assert threadPool.generic().isShutdown() == false;
401-
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
402-
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
403-
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
404-
threadPool.generic().execute(ActionRunnable.run(future, store::decRef));
405-
FutureUtils.get(future);
400+
return Releasables.releaseOnce(() -> runWithGenericThreadPool(store::decRef));
401+
}
402+
403+
/**
404+
* Releasing a safe commit can access some commit files. It's better not to use {@link CancellableThreads} to interact
405+
* with the file systems due to interrupt (see {@link org.apache.lucene.store.NIOFSDirectory} javadocs for more detail).
406+
* This method acquires a safe commit and wraps it to make sure that it will be released using the generic thread pool.
407+
*/
408+
private Engine.IndexCommitRef acquireSafeCommit(IndexShard shard) {
409+
final Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit();
410+
final AtomicBoolean closed = new AtomicBoolean(false);
411+
return new Engine.IndexCommitRef(commitRef.getIndexCommit(), () -> {
412+
if (closed.compareAndSet(false, true)) {
413+
runWithGenericThreadPool(commitRef::close);
414+
}
406415
});
407416
}
408417

418+
private void runWithGenericThreadPool(CheckedRunnable<Exception> task) {
419+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
420+
assert threadPool.generic().isShutdown() == false;
421+
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
422+
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
423+
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
424+
threadPool.generic().execute(ActionRunnable.run(future, task));
425+
FutureUtils.get(future);
426+
}
427+
409428
static final class SendFileResult {
410429
final List<String> phase1FileNames;
411430
final List<Long> phase1FileSizes;

0 commit comments

Comments
 (0)