Skip to content

Commit 5c684c3

Browse files
authored
Correct context for CancellableSOCache listener (#83021)
Today the `CancellableSingleObjectCache` completes its listeners in the thread context of the `get()` call that actually computes the value which will be the correct context only if no batching took place. With this commit we make sure to complete each listener in the context in which it was passed to the corresponding `get()` call.
1 parent f599846 commit 5c684c3

File tree

4 files changed

+44
-12
lines changed

4 files changed

+44
-12
lines changed

docs/changelog/83021.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 83021
2+
summary: Correct context for CancellableSOCache listener
3+
area: Stats
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.common.io.stream.StreamInput;
3030
import org.elasticsearch.common.io.stream.StreamOutput;
3131
import org.elasticsearch.common.util.CancellableSingleObjectCache;
32+
import org.elasticsearch.common.util.concurrent.ThreadContext;
3233
import org.elasticsearch.index.IndexService;
3334
import org.elasticsearch.index.engine.CommitStats;
3435
import org.elasticsearch.index.seqno.RetentionLeaseStats;
@@ -69,8 +70,8 @@ public class TransportClusterStatsAction extends TransportNodesAction<
6970
private final NodeService nodeService;
7071
private final IndicesService indicesService;
7172

72-
private final MetadataStatsCache<MappingStats> mappingStatsCache = new MetadataStatsCache<>(MappingStats::of);
73-
private final MetadataStatsCache<AnalysisStats> analysisStatsCache = new MetadataStatsCache<>(AnalysisStats::of);
73+
private final MetadataStatsCache<MappingStats> mappingStatsCache;
74+
private final MetadataStatsCache<AnalysisStats> analysisStatsCache;
7475

7576
@Inject
7677
public TransportClusterStatsAction(
@@ -95,6 +96,8 @@ public TransportClusterStatsAction(
9596
);
9697
this.nodeService = nodeService;
9798
this.indicesService = indicesService;
99+
this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of);
100+
this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of);
98101
}
99102

100103
@Override
@@ -258,7 +261,8 @@ public void writeTo(StreamOutput out) throws IOException {
258261
private static class MetadataStatsCache<T> extends CancellableSingleObjectCache<Metadata, Long, T> {
259262
private final BiFunction<Metadata, Runnable, T> function;
260263

261-
MetadataStatsCache(BiFunction<Metadata, Runnable, T> function) {
264+
MetadataStatsCache(ThreadContext threadContext, BiFunction<Metadata, Runnable, T> function) {
265+
super(threadContext);
262266
this.function = function;
263267
}
264268

server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
package org.elasticsearch.common.util;
1010

1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.support.ContextPreservingActionListener;
1213
import org.elasticsearch.action.support.ListenableActionFuture;
14+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1315
import org.elasticsearch.core.AbstractRefCounted;
1416
import org.elasticsearch.core.Nullable;
1517
import org.elasticsearch.tasks.TaskCancelledException;
@@ -41,8 +43,14 @@
4143
*/
4244
public abstract class CancellableSingleObjectCache<Input, Key, Value> {
4345

46+
private final ThreadContext threadContext;
47+
4448
private final AtomicReference<CachedItem> currentCachedItemRef = new AtomicReference<>();
4549

50+
protected CancellableSingleObjectCache(ThreadContext threadContext) {
51+
this.threadContext = threadContext;
52+
}
53+
4654
/**
4755
* Compute a new value for the cache.
4856
* <p>
@@ -220,7 +228,7 @@ boolean addListener(ActionListener<Value> listener, BooleanSupplier isCancelled)
220228
ActionListener.completeWith(listener, () -> future.actionGet(0L));
221229
} else {
222230
// Refresh is still pending; it's not cancelled because there are still references.
223-
future.addListener(listener);
231+
future.addListener(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
224232
final AtomicBoolean released = new AtomicBoolean();
225233
cancellationChecks.add(() -> {
226234
if (released.get() == false && isCancelled.getAsBoolean() && released.compareAndSet(false, true)) {

server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.StepListener;
1414
import org.elasticsearch.action.support.PlainActionFuture;
15+
import org.elasticsearch.common.settings.Settings;
1516
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
17+
import org.elasticsearch.common.util.concurrent.ThreadContext;
1618
import org.elasticsearch.tasks.TaskCancelledException;
1719
import org.elasticsearch.test.ESTestCase;
1820
import org.elasticsearch.threadpool.TestThreadPool;
@@ -193,7 +195,8 @@ public void testExceptionCompletesListenersButIsNotCached() {
193195
public void testConcurrentRefreshesAndCancellation() throws InterruptedException {
194196
final ThreadPool threadPool = new TestThreadPool("test");
195197
try {
196-
final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<>() {
198+
final ThreadContext threadContext = threadPool.getThreadContext();
199+
final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<>(threadContext) {
197200
@Override
198201
protected void refresh(
199202
String s,
@@ -219,6 +222,7 @@ protected String getKey(String s) {
219222
final CountDownLatch startLatch = new CountDownLatch(1);
220223
final CountDownLatch finishLatch = new CountDownLatch(count);
221224
final BlockingQueue<Runnable> queue = ConcurrentCollections.newBlockingQueue();
225+
final String contextHeader = "test-context-header";
222226

223227
for (int i = 0; i < count; i++) {
224228
final boolean cancel = randomBoolean();
@@ -233,11 +237,14 @@ protected String getKey(String s) {
233237
final StepListener<Integer> stepListener = new StepListener<>();
234238
final AtomicBoolean isComplete = new AtomicBoolean();
235239
final AtomicBoolean isCancelled = new AtomicBoolean();
236-
testCache.get(
237-
input,
238-
isCancelled::get,
239-
ActionListener.runBefore(stepListener, () -> assertTrue(isComplete.compareAndSet(false, true)))
240-
);
240+
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
241+
final String contextValue = randomAlphaOfLength(10);
242+
threadContext.putHeader(contextHeader, contextValue);
243+
testCache.get(input, isCancelled::get, ActionListener.runBefore(stepListener, () -> {
244+
assertTrue(isComplete.compareAndSet(false, true));
245+
assertThat(threadContext.getHeader(contextHeader), equalTo(contextValue));
246+
}));
247+
}
241248

242249
final Runnable next = queue.poll();
243250
if (next != null) {
@@ -277,7 +284,9 @@ protected String getKey(String s) {
277284
public void testConcurrentRefreshesWithFreshnessCheck() throws InterruptedException {
278285
final ThreadPool threadPool = new TestThreadPool("test");
279286
try {
280-
final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<>() {
287+
final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<>(
288+
threadPool.getThreadContext()
289+
) {
281290
@Override
282291
protected void refresh(
283292
String s,
@@ -380,7 +389,7 @@ public void run() {
380389
}
381390
};
382391

383-
final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<>() {
392+
final CancellableSingleObjectCache<String, String, Integer> testCache = new CancellableSingleObjectCache<>(testThreadContext) {
384393
@Override
385394
protected void refresh(
386395
String s,
@@ -424,10 +433,16 @@ protected String getKey(String s) {
424433
expectThrows(TaskCancelledException.class, () -> cancelledFuture.actionGet(0L));
425434
}
426435

436+
private static final ThreadContext testThreadContext = new ThreadContext(Settings.EMPTY);
437+
427438
private static class TestCache extends CancellableSingleObjectCache<String, String, Integer> {
428439

429440
private final LinkedList<StepListener<Function<String, Integer>>> pendingRefreshes = new LinkedList<>();
430441

442+
private TestCache() {
443+
super(testThreadContext);
444+
}
445+
431446
@Override
432447
protected void refresh(
433448
String input,

0 commit comments

Comments
 (0)