Skip to content

Commit 0fb695e

Browse files
authored
Never release store using CancellableThreads (elastic#45409)
Today we can release a Store using CancellableThreads. If we are holding the last reference, then we will verify the node lock before deleting the store. Checking node lock performs some I/O on FileChannel. If the current thread is interrupted, then the channel will be closed and the node lock will also be invalid. Closes elastic#45237
1 parent 34d6913 commit 0fb695e

File tree

4 files changed

+44
-3
lines changed

4 files changed

+44
-3
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ public void messageReceived(final StartRecoveryRequest request, final TransportC
112112
}
113113
}
114114

115+
// exposed for testing
116+
final int numberOfOngoingRecoveries() {
117+
return ongoingRecoveries.ongoingRecoveries.size();
118+
}
119+
115120
final class OngoingRecoveries {
116121
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
117122

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import org.elasticsearch.ExceptionsHelper;
3434
import org.elasticsearch.Version;
3535
import org.elasticsearch.action.ActionListener;
36+
import org.elasticsearch.action.ActionRunnable;
3637
import org.elasticsearch.action.StepListener;
38+
import org.elasticsearch.action.support.PlainActionFuture;
3739
import org.elasticsearch.action.support.ThreadedActionListener;
3840
import org.elasticsearch.action.support.replication.ReplicationResponse;
3941
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -230,8 +232,7 @@ && isTargetSameHistory()
230232

231233
try {
232234
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
233-
shard.store().incRef();
234-
final Releasable releaseStore = Releasables.releaseOnce(shard.store()::decRef);
235+
final Releasable releaseStore = acquireStore(shard.store());
235236
resources.add(releaseStore);
236237
sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
237238
try {
@@ -393,6 +394,25 @@ public void onFailure(Exception e) {
393394
});
394395
}
395396

397+
/**
398+
* Increases the store reference and returns a {@link Releasable} that will decrease the store reference using the generic thread pool.
399+
* We must never release the store using an interruptible thread as we can risk invalidating the node lock.
400+
*/
401+
private Releasable acquireStore(Store store) {
402+
store.incRef();
403+
return Releasables.releaseOnce(() -> {
404+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
405+
threadPool.generic().execute(new ActionRunnable<>(future) {
406+
@Override
407+
protected void doRun() {
408+
store.decRef();
409+
listener.onResponse(null);
410+
}
411+
});
412+
FutureUtils.get(future);
413+
});
414+
}
415+
396416
static final class SendFileResult {
397417
final List<String> phase1FileNames;
398418
final List<Long> phase1FileSizes;

server/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ public void testShardsAllocatedAfterDataNodesStart() {
9595
equalTo(false));
9696
}
9797

98-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45237")
9998
public void testAutoExpandReplicasAdjustedWhenDataNodeJoins() {
10099
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
101100
client().admin().indices().create(createIndexRequest("test")

server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.action.index.IndexRequestBuilder;
3737
import org.elasticsearch.action.index.IndexResponse;
3838
import org.elasticsearch.action.search.SearchResponse;
39+
import org.elasticsearch.action.support.ActiveShardCount;
3940
import org.elasticsearch.action.support.PlainActionFuture;
4041
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
4142
import org.elasticsearch.action.support.replication.ReplicationResponse;
@@ -1486,4 +1487,20 @@ public void testPeerRecoveryTrimsLocalTranslog() throws Exception {
14861487
}
14871488
ensureGreen(indexName);
14881489
}
1490+
1491+
public void testCancelRecoveryWithAutoExpandReplicas() throws Exception {
1492+
internalCluster().startMasterOnlyNode();
1493+
assertAcked(client().admin().indices().prepareCreate("test")
1494+
.setSettings(Settings.builder().put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all"))
1495+
.setWaitForActiveShards(ActiveShardCount.NONE));
1496+
internalCluster().startNode();
1497+
internalCluster().startNode();
1498+
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
1499+
assertAcked(client().admin().indices().prepareDelete("test")); // cancel recoveries
1500+
assertBusy(() -> {
1501+
for (PeerRecoverySourceService recoveryService : internalCluster().getDataNodeInstances(PeerRecoverySourceService.class)) {
1502+
assertThat(recoveryService.numberOfOngoingRecoveries(), equalTo(0));
1503+
}
1504+
});
1505+
}
14891506
}

0 commit comments

Comments
 (0)