Skip to content

Never release store using CancellableThreads #45409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public void messageReceived(final StartRecoveryRequest request, final TransportC
}
}

// exposed for testing
final int numberOfOngoingRecoveries() {
return ongoingRecoveries.ongoingRecoveries.size();
}

final class OngoingRecoveries {
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -230,8 +232,7 @@ && isTargetSameHistory()

try {
final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo);
shard.store().incRef();
final Releasable releaseStore = Releasables.releaseOnce(shard.store()::decRef);
final Releasable releaseStore = acquireStore(shard.store());
resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {
try {
Expand Down Expand Up @@ -393,6 +394,25 @@ public void onFailure(Exception e) {
});
}

/**
* Increases the store reference and returns a {@link Releasable} that will decrease the store reference using the generic thread pool.
* We must never release the store using an interruptible thread as we can risk invalidating the node lock.
*/
private Releasable acquireStore(Store store) {
store.incRef();
return Releasables.releaseOnce(() -> {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
threadPool.generic().execute(new ActionRunnable<>(future) {
@Override
protected void doRun() {
store.decRef();
listener.onResponse(null);
}
});
FutureUtils.get(future);
});
}

static final class SendFileResult {
final List<String> phase1FileNames;
final List<Long> phase1FileSizes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public void testShardsAllocatedAfterDataNodesStart() {
equalTo(false));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45237")
public void testAutoExpandReplicasAdjustedWhenDataNodeJoins() {
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
client().admin().indices().create(createIndexRequest("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -1486,4 +1487,20 @@ public void testPeerRecoveryTrimsLocalTranslog() throws Exception {
}
ensureGreen(indexName);
}

public void testCancelRecoveryWithAutoExpandReplicas() throws Exception {
internalCluster().startMasterOnlyNode();
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all"))
.setWaitForActiveShards(ActiveShardCount.NONE));
internalCluster().startNode();
internalCluster().startNode();
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
assertAcked(client().admin().indices().prepareDelete("test")); // cancel recoveries
assertBusy(() -> {
for (PeerRecoverySourceService recoveryService : internalCluster().getDataNodeInstances(PeerRecoverySourceService.class)) {
assertThat(recoveryService.numberOfOngoingRecoveries(), equalTo(0));
}
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check that the recoveries are no longer running afterwards?

}