Skip to content

Commit 12a4177

Browse files
Wait for all Rec. to Stop on Node Close (#46178)
* Wait for all Rec. to Stop on Node Close * This issue is in the `RecoverySourceHandler#acquireStore`. If we submit the store release to the generic threadpool while it is getting shut down we never complete the futue we wait on (in the generic pool as well) and fail to ever release the store potentially. * Fixed by waiting for all recoveries to end on node close so that we aways have a healthy thread pool here * Closes #45956
1 parent 7a0f261 commit 12a4177

File tree

4 files changed

+57
-8
lines changed

4 files changed

+57
-8
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424
import org.elasticsearch.ExceptionsHelper;
2525
import org.elasticsearch.action.ActionListener;
2626
import org.elasticsearch.action.support.ChannelActionListener;
27+
import org.elasticsearch.action.support.PlainActionFuture;
2728
import org.elasticsearch.cluster.routing.ShardRouting;
2829
import org.elasticsearch.common.Nullable;
30+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
2931
import org.elasticsearch.common.inject.Inject;
3032
import org.elasticsearch.common.settings.Settings;
33+
import org.elasticsearch.common.util.concurrent.FutureUtils;
3134
import org.elasticsearch.index.IndexService;
3235
import org.elasticsearch.index.shard.IndexEventListener;
3336
import org.elasticsearch.index.shard.IndexShard;
@@ -50,7 +53,7 @@
5053
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
5154
* source shard to the target shard.
5255
*/
53-
public class PeerRecoverySourceService implements IndexEventListener {
56+
public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener {
5457

5558
private static final Logger logger = LogManager.getLogger(PeerRecoverySourceService.class);
5659

@@ -74,6 +77,19 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi
7477
new StartRecoveryTransportRequestHandler());
7578
}
7679

80+
@Override
81+
protected void doStart() {
82+
}
83+
84+
@Override
85+
protected void doStop() {
86+
ongoingRecoveries.awaitEmpty();
87+
}
88+
89+
@Override
90+
protected void doClose() {
91+
}
92+
7793
@Override
7894
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
7995
Settings indexSettings) {
@@ -118,9 +134,14 @@ final int numberOfOngoingRecoveries() {
118134
}
119135

120136
final class OngoingRecoveries {
137+
121138
private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();
122139

140+
@Nullable
141+
private List<ActionListener<Void>> emptyListeners;
142+
123143
synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
144+
assert lifecycle.started();
124145
final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
125146
RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
126147
shard.recoveryStats().incCurrentAsSource();
@@ -138,6 +159,13 @@ synchronized void remove(IndexShard shard, RecoverySourceHandler handler) {
138159
if (shardRecoveryContext.recoveryHandlers.isEmpty()) {
139160
ongoingRecoveries.remove(shard);
140161
}
162+
if (ongoingRecoveries.isEmpty()) {
163+
if (emptyListeners != null) {
164+
final List<ActionListener<Void>> onEmptyListeners = emptyListeners;
165+
emptyListeners = null;
166+
ActionListener.onResponse(onEmptyListeners, null);
167+
}
168+
}
141169
}
142170

143171
synchronized void cancel(IndexShard shard, String reason) {
@@ -157,6 +185,22 @@ synchronized void cancel(IndexShard shard, String reason) {
157185
}
158186
}
159187

188+
void awaitEmpty() {
189+
assert lifecycle.stoppedOrClosed();
190+
final PlainActionFuture<Void> future;
191+
synchronized (this) {
192+
if (ongoingRecoveries.isEmpty()) {
193+
return;
194+
}
195+
future = new PlainActionFuture<>();
196+
if (emptyListeners == null) {
197+
emptyListeners = new ArrayList<>();
198+
}
199+
emptyListeners.add(future);
200+
}
201+
FutureUtils.get(future);
202+
}
203+
160204
private final class ShardRecoveryContext {
161205
final Set<RecoverySourceHandler> recoveryHandlers = new HashSet<>();
162206

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -402,13 +402,14 @@ private Releasable acquireStore(Store store) {
402402
store.incRef();
403403
return Releasables.releaseOnce(() -> {
404404
final PlainActionFuture<Void> future = new PlainActionFuture<>();
405-
threadPool.generic().execute(new ActionRunnable<>(future) {
406-
@Override
407-
protected void doRun() {
408-
store.decRef();
409-
listener.onResponse(null);
410-
}
411-
});
405+
assert threadPool.generic().isShutdown() == false;
406+
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
407+
// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
408+
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
409+
threadPool.generic().execute(ActionRunnable.wrap(future, l -> {
410+
store.decRef();
411+
l.onResponse(null);
412+
}));
412413
FutureUtils.get(future);
413414
});
414415
}

server/src/main/java/org/elasticsearch/node/Node.java

+3
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ protected Node(
593593
.filter(p -> p instanceof LifecycleComponent)
594594
.map(p -> (LifecycleComponent) p).collect(Collectors.toList());
595595
resourcesToClose.addAll(pluginLifecycleComponents);
596+
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
596597
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
597598
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}), transportService.getTaskManager(),
598599
() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
@@ -689,6 +690,7 @@ public Node start() throws NodeValidationException {
689690
assert localNodeFactory.getNode() != null;
690691
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
691692
: "transportService has a different local node than the factory provided";
693+
injector.getInstance(PeerRecoverySourceService.class).start();
692694
final MetaData onDiskMetadata;
693695
// we load the global state here (the persistent part of the cluster state stored on disk) to
694696
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
@@ -834,6 +836,7 @@ public synchronized void close() throws IOException {
834836
toClose.add(injector.getInstance(IndicesService.class));
835837
// close filter/fielddata caches after indices
836838
toClose.add(injector.getInstance(IndicesStore.class));
839+
toClose.add(injector.getInstance(PeerRecoverySourceService.class));
837840
toClose.add(() -> stopWatch.stop().start("cluster"));
838841
toClose.add(injector.getInstance(ClusterService.class));
839842
toClose.add(() -> stopWatch.stop().start("node_connections_service"));

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public void testDuplicateRecoveries() throws IOException {
4343
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
4444
getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
4545
SequenceNumbers.UNASSIGNED_SEQ_NO);
46+
peerRecoverySourceService.start();
4647
RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
4748
DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class,
4849
() -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));

0 commit comments

Comments
 (0)