Skip to content

Commit 2662c1b

Browse files
Wait for all Rec. to Stop on Node Close (#46178) (#46237)
* Wait for all Rec. to Stop on Node Close * This issue is in the `RecoverySourceHandler#acquireStore`. If we submit the store release to the generic threadpool while it is getting shut down we never complete the futue we wait on (in the generic pool as well) and fail to ever release the store potentially. * Fixed by waiting for all recoveries to end on node close so that we aways have a healthy thread pool here * Closes #45956
1 parent bd7a04c commit 2662c1b

File tree

4 files changed

+57
-8
lines changed

4 files changed

+57
-8
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424
import org.elasticsearch.ExceptionsHelper;
2525
import org.elasticsearch.action.ActionListener;
2626
import org.elasticsearch.action.support.ChannelActionListener;
27+
import org.elasticsearch.action.support.PlainActionFuture;
2728
import org.elasticsearch.cluster.routing.ShardRouting;
2829
import org.elasticsearch.common.Nullable;
30+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
2931
import org.elasticsearch.common.inject.Inject;
3032
import org.elasticsearch.common.settings.Settings;
33+
import org.elasticsearch.common.util.concurrent.FutureUtils;
3134
import org.elasticsearch.index.IndexService;
3235
import org.elasticsearch.index.shard.IndexEventListener;
3336
import org.elasticsearch.index.shard.IndexShard;
@@ -50,7 +53,7 @@
5053
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
5154
* source shard to the target shard.
5255
*/
53-
public class PeerRecoverySourceService implements IndexEventListener {
56+
public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener {
5457

5558
private static final Logger logger = LogManager.getLogger(PeerRecoverySourceService.class);
5659

@@ -74,6 +77,19 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi
7477
new StartRecoveryTransportRequestHandler());
7578
}
7679

80+
@Override
81+
protected void doStart() {
82+
}
83+
84+
@Override
85+
protected void doStop() {
86+
ongoingRecoveries.awaitEmpty();
87+
}
88+
89+
@Override
90+
protected void doClose() {
91+
}
92+
7793
@Override
7894
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
7995
Settings indexSettings) {
@@ -118,9 +134,14 @@ final int numberOfOngoingRecoveries() {
118134
}
119135

120136
final class OngoingRecoveries {
137+
121138
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
122139

140+
@Nullable
141+
private List<ActionListener<Void>> emptyListeners;
142+
123143
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
144+
assert lifecycle.started();
124145
final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
125146
RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
126147
shard.recoveryStats().incCurrentAsSource();
@@ -138,6 +159,13 @@ synchronized void remove(IndexShard shard, RecoverySourceHandler handler) {
138159
if (shardRecoveryContext.recoveryHandlers.isEmpty()) {
139160
ongoingRecoveries.remove(shard);
140161
}
162+
if (ongoingRecoveries.isEmpty()) {
163+
if (emptyListeners != null) {
164+
final List<ActionListener<Void>> onEmptyListeners = emptyListeners;
165+
emptyListeners = null;
166+
ActionListener.onResponse(onEmptyListeners, null);
167+
}
168+
}
141169
}
142170

143171
synchronized void cancel(IndexShard shard, String reason) {
@@ -157,6 +185,22 @@ synchronized void cancel(IndexShard shard, String reason) {
157185
}
158186
}
159187

188+
void awaitEmpty() {
189+
assert lifecycle.stoppedOrClosed();
190+
final PlainActionFuture<Void> future;
191+
synchronized (this) {
192+
if (ongoingRecoveries.isEmpty()) {
193+
return;
194+
}
195+
future = new PlainActionFuture<>();
196+
if (emptyListeners == null) {
197+
emptyListeners = new ArrayList<>();
198+
}
199+
emptyListeners.add(future);
200+
}
201+
FutureUtils.get(future);
202+
}
203+
160204
private final class ShardRecoveryContext {
161205
final Set<RecoverySourceHandler> recoveryHandlers = new HashSet<>();
162206

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -405,13 +405,14 @@ private Releasable acquireStore(Store store) {
405405
store.incRef();
406406
return Releasables.releaseOnce(() -> {
407407
final PlainActionFuture<Void> future = new PlainActionFuture<>();
408-
threadPool.generic().execute(new ActionRunnable<Void>(future) {
409-
@Override
410-
protected void doRun() {
411-
store.decRef();
412-
listener.onResponse(null);
413-
}
414-
});
408+
assert threadPool.generic().isShutdown() == false;
409+
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
410+
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
411+
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
412+
threadPool.generic().execute(ActionRunnable.wrap(future, l -> {
413+
store.decRef();
414+
l.onResponse(null);
415+
}));
415416
FutureUtils.get(future);
416417
});
417418
}

server/src/main/java/org/elasticsearch/node/Node.java

+3
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,7 @@ protected Node(
602602
pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()
603603
.map(injector::getInstance).collect(Collectors.toList()));
604604
resourcesToClose.addAll(pluginLifecycleComponents);
605+
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
605606
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
606607
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}),
607608
() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
@@ -697,6 +698,7 @@ public Node start() throws NodeValidationException {
697698
assert localNodeFactory.getNode() != null;
698699
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
699700
: "transportService has a different local node than the factory provided";
701+
injector.getInstance(PeerRecoverySourceService.class).start();
700702
final MetaData onDiskMetadata;
701703
// we load the global state here (the persistent part of the cluster state stored on disk) to
702704
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
@@ -842,6 +844,7 @@ public synchronized void close() throws IOException {
842844
toClose.add(injector.getInstance(IndicesService.class));
843845
// close filter/fielddata caches after indices
844846
toClose.add(injector.getInstance(IndicesStore.class));
847+
toClose.add(injector.getInstance(PeerRecoverySourceService.class));
845848
toClose.add(() -> stopWatch.stop().start("cluster"));
846849
toClose.add(injector.getInstance(ClusterService.class));
847850
toClose.add(() -> stopWatch.stop().start("node_connections_service"));

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public void testDuplicateRecoveries() throws IOException {
4343
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
4444
getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
4545
SequenceNumbers.UNASSIGNED_SEQ_NO);
46+
peerRecoverySourceService.start();
4647
RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
4748
DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class,
4849
() -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));

0 commit comments

Comments
 (0)