Skip to content

Commit fa99aba

Browse files
committed
Cancelling a peer recovery on the source can leak a primary permit (#30318)
The code in `SourceRecoveryHandler` runs under a `CancellableThreads` instance in order to allow long running operations to be interrupted when the recovery is cancelled. Sadly if this happens at just the wrong moment while acquiring a permit from the primary, that primary can be leaked and never be freed. Note that this is slightly better than it sounds - we only cancel recoveries on the source side if the primary shard itself is closed. Relates to #30316
1 parent 1f7aa0d commit fa99aba

File tree

4 files changed

+128
-52
lines changed

4 files changed

+128
-52
lines changed

server/src/main/java/org/elasticsearch/action/support/AdapterActionFuture.java

+3-40
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,19 @@
1919

2020
package org.elasticsearch.action.support;
2121

22-
import org.elasticsearch.ElasticsearchException;
23-
import org.elasticsearch.ElasticsearchTimeoutException;
2422
import org.elasticsearch.action.ActionFuture;
2523
import org.elasticsearch.action.ActionListener;
2624
import org.elasticsearch.common.unit.TimeValue;
2725
import org.elasticsearch.common.util.concurrent.BaseFuture;
28-
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
26+
import org.elasticsearch.common.util.concurrent.FutureUtils;
2927

30-
import java.util.concurrent.ExecutionException;
3128
import java.util.concurrent.TimeUnit;
32-
import java.util.concurrent.TimeoutException;
3329

3430
public abstract class AdapterActionFuture<T, L> extends BaseFuture<T> implements ActionFuture<T>, ActionListener<L> {
3531

3632
@Override
3733
public T actionGet() {
38-
try {
39-
return get();
40-
} catch (InterruptedException e) {
41-
Thread.currentThread().interrupt();
42-
throw new IllegalStateException("Future got interrupted", e);
43-
} catch (ExecutionException e) {
44-
throw rethrowExecutionException(e);
45-
}
34+
return FutureUtils.get(this);
4635
}
4736

4837
@Override
@@ -62,33 +51,7 @@ public T actionGet(TimeValue timeout) {
6251

6352
@Override
6453
public T actionGet(long timeout, TimeUnit unit) {
65-
try {
66-
return get(timeout, unit);
67-
} catch (TimeoutException e) {
68-
throw new ElasticsearchTimeoutException(e);
69-
} catch (InterruptedException e) {
70-
Thread.currentThread().interrupt();
71-
throw new IllegalStateException("Future got interrupted", e);
72-
} catch (ExecutionException e) {
73-
throw rethrowExecutionException(e);
74-
}
75-
}
76-
77-
static RuntimeException rethrowExecutionException(ExecutionException e) {
78-
if (e.getCause() instanceof ElasticsearchException) {
79-
ElasticsearchException esEx = (ElasticsearchException) e.getCause();
80-
Throwable root = esEx.unwrapCause();
81-
if (root instanceof ElasticsearchException) {
82-
return (ElasticsearchException) root;
83-
} else if (root instanceof RuntimeException) {
84-
return (RuntimeException) root;
85-
}
86-
return new UncategorizedExecutionException("Failed execution", root);
87-
} else if (e.getCause() instanceof RuntimeException) {
88-
return (RuntimeException) e.getCause();
89-
} else {
90-
return new UncategorizedExecutionException("Failed execution", e);
91-
}
54+
return FutureUtils.get(this, timeout, unit);
9255
}
9356

9457
@Override

server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java

+62
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@
1919

2020
package org.elasticsearch.common.util.concurrent;
2121

22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.ElasticsearchTimeoutException;
2224
import org.elasticsearch.common.SuppressForbidden;
2325

26+
import java.util.concurrent.ExecutionException;
2427
import java.util.concurrent.Future;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.TimeoutException;
2530

2631
public class FutureUtils {
2732

@@ -33,4 +38,61 @@ public static boolean cancel(Future<?> toCancel) {
3338
return false;
3439
}
3540

41+
/**
42+
* Calls {@link Future#get()} without the checked exceptions.
43+
*
44+
* @param future to dereference
45+
* @param <T> the type returned
46+
* @return the value of the future
47+
*/
48+
public static <T> T get(Future<T> future) {
49+
try {
50+
return future.get();
51+
} catch (InterruptedException e) {
52+
Thread.currentThread().interrupt();
53+
throw new IllegalStateException("Future got interrupted", e);
54+
} catch (ExecutionException e) {
55+
throw rethrowExecutionException(e);
56+
}
57+
}
58+
59+
/**
60+
* Calls {@link Future#get(long, TimeUnit)} without the checked exceptions.
61+
*
62+
* @param future to dereference
63+
* @param timeout to wait
64+
* @param unit for timeout
65+
* @param <T> the type returned
66+
* @return the value of the future
67+
*/
68+
public static <T> T get(Future<T> future, long timeout, TimeUnit unit) {
69+
try {
70+
return future.get(timeout, unit);
71+
} catch (TimeoutException e) {
72+
throw new ElasticsearchTimeoutException(e);
73+
} catch (InterruptedException e) {
74+
Thread.currentThread().interrupt();
75+
throw new IllegalStateException("Future got interrupted", e);
76+
} catch (ExecutionException e) {
77+
throw FutureUtils.rethrowExecutionException(e);
78+
}
79+
}
80+
81+
static RuntimeException rethrowExecutionException(ExecutionException e) {
82+
if (e.getCause() instanceof ElasticsearchException) {
83+
ElasticsearchException esEx = (ElasticsearchException) e.getCause();
84+
Throwable root = esEx.unwrapCause();
85+
if (root instanceof ElasticsearchException) {
86+
return (ElasticsearchException) root;
87+
} else if (root instanceof RuntimeException) {
88+
return (RuntimeException) root;
89+
}
90+
return new UncategorizedExecutionException("Failed execution", root);
91+
} else if (e.getCause() instanceof RuntimeException) {
92+
return (RuntimeException) e.getCause();
93+
} else {
94+
return new UncategorizedExecutionException("Failed execution", e);
95+
}
96+
}
97+
3698
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

+38-12
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@
2929
import org.apache.lucene.store.IndexInput;
3030
import org.apache.lucene.store.RateLimiter;
3131
import org.apache.lucene.util.ArrayUtil;
32-
import org.elasticsearch.core.internal.io.IOUtils;
3332
import org.elasticsearch.ExceptionsHelper;
3433
import org.elasticsearch.Version;
35-
import org.elasticsearch.action.support.PlainActionFuture;
34+
import org.elasticsearch.action.ActionListener;
3635
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
3736
import org.elasticsearch.cluster.routing.ShardRouting;
3837
import org.elasticsearch.common.Nullable;
@@ -44,6 +43,8 @@
4443
import org.elasticsearch.common.settings.Settings;
4544
import org.elasticsearch.common.unit.ByteSizeValue;
4645
import org.elasticsearch.common.util.CancellableThreads;
46+
import org.elasticsearch.common.util.concurrent.FutureUtils;
47+
import org.elasticsearch.core.internal.io.IOUtils;
4748
import org.elasticsearch.core.internal.io.Streams;
4849
import org.elasticsearch.index.engine.Engine;
4950
import org.elasticsearch.index.engine.RecoveryEngineException;
@@ -67,6 +68,7 @@
6768
import java.util.Comparator;
6869
import java.util.List;
6970
import java.util.Locale;
71+
import java.util.concurrent.CompletableFuture;
7072
import java.util.concurrent.atomic.AtomicLong;
7173
import java.util.function.Function;
7274
import java.util.function.Supplier;
@@ -142,7 +144,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
142144
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
143145
}
144146
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
145-
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ");
147+
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);
146148

147149
try (Closeable ignored = shard.acquireTranslogRetentionLock()) {
148150
final long startingSeqNo;
@@ -196,7 +198,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
196198
* all documents up to maxSeqNo in phase2.
197199
*/
198200
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
199-
shardId + " initiating tracking of " + request.targetAllocationId());
201+
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);
200202

201203
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
202204
/*
@@ -227,17 +229,41 @@ private boolean isTargetSameHistory() {
227229
return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID());
228230
}
229231

230-
private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason) {
232+
static void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason,
233+
IndexShard primary, CancellableThreads cancellableThreads, Logger logger) {
231234
cancellableThreads.execute(() -> {
232-
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
233-
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
234-
try (Releasable ignored = onAcquired.actionGet()) {
235+
CompletableFuture<Releasable> permit = new CompletableFuture<>();
236+
final ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
237+
@Override
238+
public void onResponse(Releasable releasable) {
239+
if (permit.complete(releasable) == false) {
240+
releasable.close();
241+
}
242+
}
243+
244+
@Override
245+
public void onFailure(Exception e) {
246+
permit.completeExceptionally(e);
247+
}
248+
};
249+
primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
250+
try (Releasable ignored = FutureUtils.get(permit)) {
235251
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
236252
// races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
237-
if (shard.isPrimaryMode() == false) {
238-
throw new IndexShardRelocatedException(shard.shardId());
253+
if (primary.isPrimaryMode() == false) {
254+
throw new IndexShardRelocatedException(primary.shardId());
239255
}
240256
runnable.run();
257+
} finally {
258+
// just in case we got an exception (likely interrupted) while waiting for the get
259+
permit.whenComplete((r, e) -> {
260+
if (r != null) {
261+
r.close();
262+
}
263+
if (e != null) {
264+
logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
265+
}
266+
});
241267
}
242268
});
243269
}
@@ -489,11 +515,11 @@ public void finalizeRecovery(final long targetLocalCheckpoint) throws IOExceptio
489515
* the permit then the state of the shard will be relocated and this recovery will fail.
490516
*/
491517
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
492-
shardId + " marking " + request.targetAllocationId() + " as in sync");
518+
shardId + " marking " + request.targetAllocationId() + " as in sync", shard, cancellableThreads, logger);
493519
final long globalCheckpoint = shard.getGlobalCheckpoint();
494520
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
495521
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
496-
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint");
522+
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, cancellableThreads, logger);
497523

498524
if (request.isPrimaryRelocation()) {
499525
logger.trace("performing relocation hand-off");

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

+25
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.common.lucene.uid.Versions;
4747
import org.elasticsearch.common.settings.ClusterSettings;
4848
import org.elasticsearch.common.settings.Settings;
49+
import org.elasticsearch.common.util.CancellableThreads;
4950
import org.elasticsearch.common.xcontent.XContentType;
5051
import org.elasticsearch.core.internal.io.IOUtils;
5152
import org.elasticsearch.index.IndexSettings;
@@ -439,6 +440,30 @@ long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
439440
assertFalse(phase2Called.get());
440441
}
441442

443+
public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception {
444+
final CancellableThreads cancellableThreads = new CancellableThreads();
445+
final IndexShard shard = mock(IndexShard.class);
446+
final AtomicBoolean freed = new AtomicBoolean(true);
447+
when(shard.isPrimaryMode()).thenReturn(true);
448+
doAnswer(invocation -> {
449+
freed.set(false);
450+
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> freed.set(true));
451+
return null;
452+
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());
453+
454+
Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test"));
455+
cancelingThread.start();
456+
try {
457+
RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads, logger);
458+
} catch (CancellableThreads.ExecutionCancelledException e) {
459+
// expected.
460+
}
461+
cancelingThread.join();
462+
// we have to use assert busy as we may be interrupted while acquiring the permit, if so we want to check
463+
// that the permit is released.
464+
assertBusy(() -> assertTrue(freed.get()));
465+
}
466+
442467
private Store newStore(Path path) throws IOException {
443468
return newStore(path, true);
444469
}

0 commit comments

Comments
 (0)