Skip to content

Commit 0ada3e2

Browse files
authored
Ensure sendBatch not called recursively (elastic#39988)
This PR introduces AsyncRecoveryTarget which executes remote calls of peer recovery asynchronously. In this change, we also add a new assertion to ensure that method sendBatch, which sends a batch of history operations in phase2, is never called recursively on the same thread. This new assertion will also be used in method sendFileChunks.
1 parent b227cf3 commit 0ada3e2

File tree

8 files changed

+187
-40
lines changed

8 files changed

+187
-40
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,7 @@ private void sendBatch(
584584
final long maxSeqNoOfUpdatesOrDeletes,
585585
final RetentionLeases retentionLeases,
586586
final ActionListener<Long> listener) throws IOException {
587+
assert ThreadPool.assertCurrentMethodIsNotCalledRecursively();
587588
final List<Translog.Operation> operations = nextBatch.get();
588589
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
589590
if (operations.isEmpty() == false || firstBatch) {

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,4 +786,18 @@ public static boolean assertNotScheduleThread(String reason) {
786786
"Expected current thread [" + Thread.currentThread() + "] to not be the scheduler thread. Reason: [" + reason + "]";
787787
return true;
788788
}
789+
790+
public static boolean assertCurrentMethodIsNotCalledRecursively() {
791+
final StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
792+
assert stackTraceElements.length >= 3 : stackTraceElements.length;
793+
assert stackTraceElements[0].getMethodName().equals("getStackTrace") : stackTraceElements[0];
794+
assert stackTraceElements[1].getMethodName().equals("assertCurrentMethodIsNotCalledRecursively") : stackTraceElements[1];
795+
final StackTraceElement testingMethod = stackTraceElements[2];
796+
for (int i = 3; i < stackTraceElements.length; i++) {
797+
assert stackTraceElements[i].getClassName().equals(testingMethod.getClassName()) == false
798+
|| stackTraceElements[i].getMethodName().equals(testingMethod.getMethodName()) == false :
799+
testingMethod.getClassName() + "#" + testingMethod.getMethodName() + " is called recursively";
800+
}
801+
return true;
802+
}
789803
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,13 @@
3131
import org.elasticsearch.index.seqno.RetentionLeaseStats;
3232
import org.elasticsearch.index.seqno.RetentionLeases;
3333
import org.elasticsearch.index.seqno.SequenceNumbers;
34+
import org.elasticsearch.threadpool.TestThreadPool;
3435
import org.elasticsearch.threadpool.ThreadPool;
3536

3637
import java.io.IOException;
3738
import java.util.Collections;
3839
import java.util.HashMap;
3940
import java.util.Map;
40-
import java.util.concurrent.ExecutorService;
41-
import java.util.concurrent.ScheduledExecutorService;
4241
import java.util.concurrent.TimeUnit;
4342
import java.util.concurrent.atomic.AtomicLong;
4443

@@ -47,27 +46,19 @@
4746
import static org.hamcrest.Matchers.equalTo;
4847
import static org.hamcrest.Matchers.hasItem;
4948
import static org.hamcrest.Matchers.hasSize;
50-
import static org.mockito.Matchers.anyString;
51-
import static org.mockito.Mockito.doAnswer;
52-
import static org.mockito.Mockito.mock;
53-
import static org.mockito.Mockito.when;
5449

5550
public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
5651

5752
private final AtomicLong currentTimeMillis = new AtomicLong();
5853

5954
@Override
6055
protected ThreadPool setUpThreadPool() {
61-
final ThreadPool threadPool = mock(ThreadPool.class);
62-
doAnswer(invocationOnMock -> currentTimeMillis.get()).when(threadPool).absoluteTimeInMillis();
63-
when(threadPool.executor(anyString())).thenReturn(mock(ExecutorService.class));
64-
when(threadPool.scheduler()).thenReturn(mock(ScheduledExecutorService.class));
65-
return threadPool;
66-
}
67-
68-
@Override
69-
protected void tearDownThreadPool() {
70-
56+
return new TestThreadPool(getClass().getName(), threadPoolSettings()) {
57+
@Override
58+
public long absoluteTimeInMillis() {
59+
return currentTimeMillis.get();
60+
}
61+
};
7162
}
7263

7364
public void testAddOrRenewRetentionLease() throws IOException {

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2443,7 +2443,13 @@ public void indexTranslogOperations(
24432443
maxSeenAutoIdTimestamp,
24442444
maxSeqNoOfUpdatesOrDeletes,
24452445
retentionLeases,
2446-
ActionListener.runAfter(listener, () -> assertFalse(replica.isSyncNeeded())));
2446+
ActionListener.wrap(
2447+
r -> {
2448+
assertFalse(replica.isSyncNeeded());
2449+
listener.onResponse(r);
2450+
},
2451+
listener::onFailure
2452+
));
24472453
}
24482454
}, true, true);
24492455

@@ -2604,8 +2610,12 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException {
26042610
// we're only checking that listeners are called when the engine is open, before there is no point
26052611
@Override
26062612
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
2607-
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener);
2608-
assertListenerCalled.accept(replica);
2613+
super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps,
2614+
ActionListener.wrap(
2615+
r -> {
2616+
assertListenerCalled.accept(replica);
2617+
listener.onResponse(r);
2618+
}, listener::onFailure));
26092619
}
26102620

26112621
@Override
@@ -2622,15 +2632,21 @@ public void indexTranslogOperations(
26222632
maxAutoIdTimestamp,
26232633
maxSeqNoOfUpdatesOrDeletes,
26242634
retentionLeases,
2625-
ActionListener.map(listener, checkpoint -> {
2626-
assertListenerCalled.accept(replica);
2627-
return checkpoint;
2628-
}));
2635+
ActionListener.wrap(
2636+
r -> {
2637+
assertListenerCalled.accept(replica);
2638+
listener.onResponse(r);
2639+
}, listener::onFailure));
26292640
}
26302641

26312642
@Override
26322643
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
2633-
super.finalizeRecovery(globalCheckpoint, ActionListener.runAfter(listener, () -> assertListenerCalled.accept(replica)));
2644+
super.finalizeRecovery(globalCheckpoint,
2645+
ActionListener.wrap(
2646+
r -> {
2647+
assertListenerCalled.accept(replica);
2648+
listener.onResponse(r);
2649+
}, listener::onFailure));
26342650
}
26352651
}, false, true);
26362652

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -240,10 +240,10 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
240240
RetentionLeases retentionLeases, ActionListener<Long> listener) {
241241
shippedOps.addAll(operations);
242242
checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE));
243-
maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get()));
244-
}
243+
listener.onResponse(checkpointOnTarget.get()); }
245244
};
246-
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
245+
RecoverySourceHandler handler = new RecoverySourceHandler(
246+
shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10));
247247
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
248248
handler.phase2(startingSeqNo, endingSeqNo, newTranslogSnapshot(operations, Collections.emptyList()),
249249
randomNonNegativeLong(), randomNonNegativeLong(), RetentionLeases.EMPTY, future);
@@ -274,14 +274,15 @@ public void testSendSnapshotStopOnError() throws Exception {
274274
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp,
275275
long msu, RetentionLeases retentionLeases, ActionListener<Long> listener) {
276276
if (randomBoolean()) {
277-
maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED));
277+
listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED);
278278
} else {
279-
maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index")));
279+
listener.onFailure(new RuntimeException("test - failed to index"));
280280
wasFailed.set(true);
281281
}
282282
}
283283
};
284-
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
284+
RecoverySourceHandler handler = new RecoverySourceHandler(
285+
shard, new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()), request, fileChunkSizeInBytes, between(1, 10));
285286
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
286287
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
287288
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
@@ -761,12 +762,4 @@ public void close() {
761762
}
762763
};
763764
}
764-
765-
private void maybeExecuteAsync(Runnable runnable) {
766-
if (randomBoolean()) {
767-
threadPool.generic().execute(runnable);
768-
} else {
769-
runnable.run();
770-
}
771-
}
772765
}

server/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@
2020
package org.elasticsearch.threadpool;
2121

2222
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.util.concurrent.EsExecutors;
24+
import org.elasticsearch.common.util.concurrent.FutureUtils;
2325
import org.elasticsearch.test.ESTestCase;
2426

27+
import java.util.concurrent.ExecutorService;
28+
2529
import static org.elasticsearch.threadpool.ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING;
30+
import static org.elasticsearch.threadpool.ThreadPool.assertCurrentMethodIsNotCalledRecursively;
2631
import static org.hamcrest.CoreMatchers.equalTo;
2732

2833
public class ThreadPoolTests extends ESTestCase {
@@ -67,4 +72,35 @@ public void testEstimatedTimeIntervalSettingAcceptsOnlyZeroAndPositiveTime() {
6772
Exception e = expectThrows(IllegalArgumentException.class, () -> ESTIMATED_TIME_INTERVAL_SETTING.get(settings));
6873
assertEquals("failed to parse value [-1] for setting [thread_pool.estimated_time_interval], must be >= [0ms]", e.getMessage());
6974
}
75+
76+
int factorial(int n) {
77+
assertCurrentMethodIsNotCalledRecursively();
78+
if (n <= 1) {
79+
return 1;
80+
} else {
81+
return n * factorial(n - 1);
82+
}
83+
}
84+
85+
int factorialForked(int n, ExecutorService executor) {
86+
assertCurrentMethodIsNotCalledRecursively();
87+
if (n <= 1) {
88+
return 1;
89+
}
90+
return n * FutureUtils.get(executor.submit(() -> factorialForked(n - 1, executor)));
91+
}
92+
93+
public void testAssertCurrentMethodIsNotCalledRecursively() {
94+
expectThrows(AssertionError.class, () -> factorial(between(2, 10)));
95+
assertThat(factorial(1), equalTo(1)); // is not called recursively
96+
assertThat(expectThrows(AssertionError.class, () -> factorial(between(2, 10))).getMessage(),
97+
equalTo("org.elasticsearch.threadpool.ThreadPoolTests#factorial is called recursively"));
98+
TestThreadPool threadPool = new TestThreadPool("test");
99+
assertThat(factorialForked(1, threadPool.generic()), equalTo(1));
100+
assertThat(factorialForked(10, threadPool.generic()), equalTo(3628800));
101+
assertThat(expectThrows(AssertionError.class,
102+
() -> factorialForked(between(2, 10), EsExecutors.newDirectExecutorService())).getMessage(),
103+
equalTo("org.elasticsearch.threadpool.ThreadPoolTests#factorialForked is called recursively"));
104+
terminate(threadPool);
105+
}
70106
}

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.index.translog.Translog;
6868
import org.elasticsearch.indices.breaker.CircuitBreakerService;
6969
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
70+
import org.elasticsearch.indices.recovery.AsyncRecoveryTarget;
7071
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
7172
import org.elasticsearch.indices.recovery.RecoveryFailedException;
7273
import org.elasticsearch.indices.recovery.RecoveryResponse;
@@ -629,8 +630,9 @@ protected final void recoverUnstartedReplica(final IndexShard replica,
629630

630631
final StartRecoveryRequest request = new StartRecoveryRequest(replica.shardId(), targetAllocationId,
631632
pNode, rNode, snapshot, replica.routingEntry().primary(), 0, startingSeqNo);
632-
final RecoverySourceHandler recovery = new RecoverySourceHandler(
633-
primary, recoveryTarget, request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
633+
final RecoverySourceHandler recovery = new RecoverySourceHandler(primary,
634+
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
635+
request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
634636
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
635637
currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable, Collections.emptySet());
636638

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.indices.recovery;
21+
22+
import org.apache.lucene.util.BytesRef;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.common.bytes.BytesArray;
25+
import org.elasticsearch.common.bytes.BytesReference;
26+
import org.elasticsearch.index.seqno.ReplicationTracker;
27+
import org.elasticsearch.index.seqno.RetentionLeases;
28+
import org.elasticsearch.index.store.Store;
29+
import org.elasticsearch.index.store.StoreFileMetaData;
30+
import org.elasticsearch.index.translog.Translog;
31+
32+
import java.io.IOException;
33+
import java.util.List;
34+
import java.util.concurrent.Executor;
35+
36+
/**
37+
* Wraps a {@link RecoveryTarget} to make all remote calls to be executed asynchronously using the provided {@code executor}.
38+
*/
39+
public class AsyncRecoveryTarget implements RecoveryTargetHandler {
40+
private final RecoveryTargetHandler target;
41+
private final Executor executor;
42+
43+
public AsyncRecoveryTarget(RecoveryTargetHandler target, Executor executor) {
44+
this.executor = executor;
45+
this.target = target;
46+
}
47+
48+
@Override
49+
public void ensureClusterStateVersion(long clusterStateVersion) {
50+
target.ensureClusterStateVersion(clusterStateVersion);
51+
}
52+
53+
@Override
54+
public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener) {
55+
executor.execute(() -> target.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps, listener));
56+
}
57+
58+
@Override
59+
public void finalizeRecovery(long globalCheckpoint, ActionListener<Void> listener) {
60+
executor.execute(() -> target.finalizeRecovery(globalCheckpoint, listener));
61+
}
62+
63+
@Override
64+
public void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext) {
65+
target.handoffPrimaryContext(primaryContext);
66+
}
67+
68+
@Override
69+
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
70+
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfDeletesOrUpdatesOnPrimary,
71+
RetentionLeases retentionLeases, ActionListener<Long> listener) {
72+
executor.execute(() -> target.indexTranslogOperations(
73+
operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary, retentionLeases, listener));
74+
}
75+
76+
@Override
77+
public void receiveFileInfo(List<String> phase1FileNames, List<Long> phase1FileSizes, List<String> phase1ExistingFileNames,
78+
List<Long> phase1ExistingFileSizes, int totalTranslogOps) {
79+
target.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames, phase1ExistingFileSizes, totalTranslogOps);
80+
}
81+
82+
@Override
83+
public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
84+
target.cleanFiles(totalTranslogOps, sourceMetaData);
85+
}
86+
87+
@Override
88+
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content,
89+
boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener) {
90+
// TODO: remove this clone once we send file chunk async
91+
final BytesReference copy = new BytesArray(BytesRef.deepCopyOf(content.toBytesRef()));
92+
executor.execute(() -> target.writeFileChunk(fileMetaData, position, copy, lastChunk, totalTranslogOps, listener));
93+
}
94+
}

0 commit comments

Comments
 (0)