Skip to content

Do not release safe commit with CancellableThreads #59182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 8, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -85,6 +86,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
Expand Down Expand Up @@ -223,7 +225,7 @@ && isTargetSameHistory()
} else {
final Engine.IndexCommitRef safeCommitRef;
try {
safeCommitRef = shard.acquireSafeIndexCommit();
safeCommitRef = acquireSafeCommit(shard);
resources.add(safeCommitRef);
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);
Expand Down Expand Up @@ -395,17 +397,34 @@ public void onFailure(Exception e) {
*/
private Releasable acquireStore(Store store) {
store.incRef();
return Releasables.releaseOnce(() -> {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
assert threadPool.generic().isShutdown() == false;
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
threadPool.generic().execute(ActionRunnable.run(future, store::decRef));
FutureUtils.get(future);
return Releasables.releaseOnce(() -> runWithGenericThreadPool(store::decRef));
}

/**
* Releasing a safe commit can access some commit files. It's better not to use {@link CancellableThreads} to interact
* with the file systems due to interrupt (see {@link org.apache.lucene.store.NIOFSDirectory} javadocs for more detail).
* This method acquires a safe commit and wraps it to make sure that it will be released using the generic thread pool.
*/
private Engine.IndexCommitRef acquireSafeCommit(IndexShard shard) {
final Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit();
final AtomicBoolean closed = new AtomicBoolean(false);
return new Engine.IndexCommitRef(commitRef.getIndexCommit(), () -> {
if (closed.compareAndSet(false, true)) {
runWithGenericThreadPool(commitRef::close);
}
});
}

private void runWithGenericThreadPool(CheckedRunnable<Exception> task) {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
assert threadPool.generic().isShutdown() == false;
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
threadPool.generic().execute(ActionRunnable.run(future, task));
FutureUtils.get(future);
}

static final class SendFileResult {
final List<String> phase1FileNames;
final List<Long> phase1FileSizes;
Expand Down