Skip to content

Commit 4a66397

Browse files
authored
Use consistent threadpools in CoordinatorTests (#53868)
Today in the `CoordinatorTests` each node uses multiple threadpools. This is mostly fine as they are almost completely stateless, except for the `ThreadContext`: by using multiple threadpools we cannot make assertions that the thread context is/isn't preserved as we expect. This commit consolidates the threadpool instances in use so that each node uses just one.
1 parent a8c501b commit 4a66397

File tree

8 files changed

+44
-39
lines changed

8 files changed

+44
-39
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang
161161
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
162162
ActionListener<PublishWithJoinResponse> originalListener) {
163163
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
164+
assert transportService.getThreadPool().getThreadContext().isSystemContext();
164165
final ActionListener<PublishWithJoinResponse> responseActionListener;
165166
if (destination.equals(nodes.getLocalNode())) {
166167
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
@@ -197,6 +198,7 @@ public void onFailure(Exception e) {
197198
@Override
198199
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
199200
ActionListener<TransportResponse.Empty> responseActionListener) {
201+
assert transportService.getThreadPool().getThreadContext().isSystemContext();
200202
transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommitRequest, stateRequestOptions,
201203
new TransportResponseHandler<TransportResponse.Empty>() {
202204

server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,10 @@ private static ClusterState initialState(DiscoveryNode localNode, long term, lon
122122
private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) {
123123
deterministicTaskQueue
124124
= new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random());
125+
final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool();
125126
FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test",
126-
deterministicTaskQueue::scheduleNow);
127-
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get());
127+
fakeThreadPool, deterministicTaskQueue::scheduleNow);
128+
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get());
128129
fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
129130
coordinator.handlePublishRequest(new PublishRequest(event.state()));
130131
publishListener.onResponse(null);

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,15 +1163,15 @@ private final class TestClusterNode {
11631163
TestClusterNode(DiscoveryNode node) throws IOException {
11641164
this.node = node;
11651165
final Environment environment = createEnvironment(node.getName());
1166-
masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow);
1166+
threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable));
1167+
masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow);
11671168
final Settings settings = environment.settings();
11681169
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
1169-
threadPool = deterministicTaskQueue.getThreadPool();
11701170
clusterService = new ClusterService(settings, clusterSettings, masterService,
11711171
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
11721172
@Override
11731173
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1174-
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue);
1174+
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
11751175
}
11761176

11771177
@Override
@@ -1211,7 +1211,7 @@ protected NamedWriteableRegistry writeableRegistry() {
12111211
}
12121212
};
12131213
transportService = mockTransport.createTransportService(
1214-
settings, deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)),
1214+
settings, threadPool,
12151215
new TransportInterceptor() {
12161216
@Override
12171217
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,

test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ class ClusterNode {
911911
}
912912

913913
private void setUp() {
914+
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
914915
mockTransport = new DisruptableMockTransport(localNode, logger) {
915916
@Override
916917
protected void execute(Runnable runnable) {
@@ -928,24 +929,20 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
928929
.filter(transport -> transport.getLocalNode().getAddress().equals(address)).findAny();
929930
}
930931
};
931-
932932
final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ?
933933
nodeSettings : Settings.builder().put(nodeSettings)
934934
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
935935
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
936-
transportService = mockTransport.createTransportService(
937-
settings, deterministicTaskQueue.getThreadPool(this::onNode),
938-
getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)),
939-
a -> localNode, null, emptySet());
940-
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test",
936+
transportService = mockTransport.createTransportService(settings, threadPool,
937+
getTransportInterceptor(localNode, threadPool), a -> localNode, null, emptySet());
938+
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", threadPool,
941939
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
942940
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
943941
clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings,
944-
deterministicTaskQueue, this::onNode);
942+
deterministicTaskQueue, threadPool);
945943
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
946944
clusterService.setNodeConnectionsService(
947-
new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode),
948-
transportService));
945+
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
949946
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
950947
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
951948
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
@@ -954,8 +951,8 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
954951
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, p, r) -> {},
955952
getElectionStrategy());
956953
masterService.setClusterStatePublisher(coordinator);
957-
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
958-
deterministicTaskQueue.getThreadPool(this::onNode), coordinator, null);
954+
final GatewayService gatewayService
955+
= new GatewayService(settings, allocationService, clusterService, threadPool, coordinator, null);
959956

960957
logger.trace("starting up [{}]", localNode);
961958
transportService.start();
@@ -1292,8 +1289,9 @@ static class AckedFakeThreadPoolMasterService extends FakeThreadPoolMasterServic
12921289

12931290
AckCollector nextAckCollector = new AckCollector();
12941291

1295-
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
1296-
super(nodeName, serviceName, onTaskAvailableToRun);
1292+
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
1293+
Consumer<Runnable> onTaskAvailableToRun) {
1294+
super(nodeName, serviceName, threadPool, onTaskAvailableToRun);
12971295
}
12981296

12991297
@Override
@@ -1323,8 +1321,8 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
13231321
private boolean applicationMayFail;
13241322

13251323
DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
1326-
DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
1327-
super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper));
1324+
DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
1325+
super(nodeName, settings, clusterSettings, threadPool);
13281326
this.nodeName = nodeName;
13291327
this.deterministicTaskQueue = deterministicTaskQueue;
13301328
addStateApplier(event -> {
@@ -1344,7 +1342,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
13441342

13451343
@Override
13461344
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1347-
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue);
1345+
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
13481346
}
13491347

13501348
@Override

test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.elasticsearch.common.util.concurrent.EsExecutors;
2222
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
23+
import org.elasticsearch.threadpool.ThreadPool;
2324

2425
import java.util.concurrent.TimeUnit;
2526

@@ -29,7 +30,7 @@
2930
*/
3031
public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
3132

32-
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue) {
33+
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
3334
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS,
3435
r -> new Thread() {
3536
@Override
@@ -51,7 +52,7 @@ public String toString() {
5152
});
5253
}
5354
},
54-
deterministicTaskQueue.getThreadPool().getThreadContext(), deterministicTaskQueue.getThreadPool().scheduler());
55+
threadPool.getThreadContext(), threadPool.scheduler());
5556
}
5657

5758
@Override

test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141

4242
import static org.apache.lucene.util.LuceneTestCase.random;
4343
import static org.elasticsearch.test.ESTestCase.randomInt;
44-
import static org.mockito.Mockito.mock;
45-
import static org.mockito.Mockito.when;
4644

4745
public class FakeThreadPoolMasterService extends MasterService {
4846
private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class);
@@ -54,21 +52,14 @@ public class FakeThreadPoolMasterService extends MasterService {
5452
private boolean taskInProgress = false;
5553
private boolean waitForPublish = false;
5654

57-
public FakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
55+
public FakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
56+
Consumer<Runnable> onTaskAvailableToRun) {
5857
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
59-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
60-
createMockThreadPool());
58+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
6159
this.name = serviceName;
6260
this.onTaskAvailableToRun = onTaskAvailableToRun;
6361
}
6462

65-
private static ThreadPool createMockThreadPool() {
66-
final ThreadContext context = new ThreadContext(Settings.EMPTY);
67-
final ThreadPool mockThreadPool = mock(ThreadPool.class);
68-
when(mockThreadPool.getThreadContext()).thenReturn(context);
69-
return mockThreadPool;
70-
}
71-
7263
@Override
7364
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
7465
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name),
@@ -110,7 +101,11 @@ public void run() {
110101
final Runnable task = pendingTasks.remove(taskIndex);
111102
taskInProgress = true;
112103
scheduledNextTask = false;
113-
task.run();
104+
final ThreadContext threadContext = threadPool.getThreadContext();
105+
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
106+
threadContext.markAsSystemContext();
107+
task.run();
108+
}
114109
if (waitForPublish == false) {
115110
taskInProgress = false;
116111
}

test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
2929

3030
public void testPrioritizedEsThreadPoolExecutor() {
3131
final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue();
32-
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue);
32+
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
3333
final AtomicBoolean called1 = new AtomicBoolean();
3434
final AtomicBoolean called2 = new AtomicBoolean();
3535
executor.execute(new PrioritizedRunnable(Priority.NORMAL) {

test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
import org.elasticsearch.cluster.metadata.MetaData;
2828
import org.elasticsearch.cluster.node.DiscoveryNode;
2929
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
30+
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.common.util.concurrent.ThreadContext;
3032
import org.elasticsearch.test.ESTestCase;
33+
import org.elasticsearch.threadpool.ThreadPool;
3134

3235
import java.util.ArrayList;
3336
import java.util.Collections;
@@ -37,6 +40,8 @@
3740
import java.util.concurrent.atomic.AtomicReference;
3841

3942
import static org.hamcrest.Matchers.equalTo;
43+
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.when;
4045

4146
public class FakeThreadPoolMasterServiceTests extends ESTestCase {
4247

@@ -48,7 +53,10 @@ public void testFakeMasterService() {
4853
lastClusterStateRef.set(ClusterStateCreationUtils.state(discoveryNode, discoveryNode));
4954
long firstClusterStateVersion = lastClusterStateRef.get().version();
5055
AtomicReference<ActionListener<Void>> publishingCallback = new AtomicReference<>();
51-
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", runnableTasks::add);
56+
final ThreadContext context = new ThreadContext(Settings.EMPTY);
57+
final ThreadPool mockThreadPool = mock(ThreadPool.class);
58+
when(mockThreadPool.getThreadContext()).thenReturn(context);
59+
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add);
5260
masterService.setClusterStateSupplier(lastClusterStateRef::get);
5361
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
5462
lastClusterStateRef.set(event.state());

0 commit comments

Comments
 (0)