Skip to content

Commit 7d3ac4f

Browse files
committed
Revert "Apply cluster states in system context (#53785)"
This reverts commit 4178c57.
1 parent 4178c57 commit 7d3ac4f

File tree

11 files changed

+39
-102
lines changed

11 files changed

+39
-102
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

-2
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang
185185
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
186186
ActionListener<PublishWithJoinResponse> originalListener) {
187187
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
188-
assert transportService.getThreadPool().getThreadContext().isSystemContext();
189188
final ActionListener<PublishWithJoinResponse> responseActionListener;
190189
if (destination.equals(nodes.getLocalNode())) {
191190
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
@@ -222,7 +221,6 @@ public void onFailure(Exception e) {
222221
@Override
223222
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
224223
ActionListener<TransportResponse.Empty> responseActionListener) {
225-
assert transportService.getThreadPool().getThreadContext().isSystemContext();
226224
final String actionName;
227225
final TransportRequest transportRequest;
228226
if (Coordinator.isZen1Node(destination)) {

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.elasticsearch.common.util.concurrent.EsExecutors;
4747
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4848
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
49-
import org.elasticsearch.common.util.concurrent.ThreadContext;
5049
import org.elasticsearch.common.util.iterable.Iterables;
5150
import org.elasticsearch.threadpool.Scheduler;
5251
import org.elasticsearch.threadpool.ThreadPool;
@@ -348,9 +347,7 @@ private void submitStateUpdateTask(final String source, final ClusterStateTaskCo
348347
if (!lifecycle.started()) {
349348
return;
350349
}
351-
final ThreadContext threadContext = threadPool.getThreadContext();
352-
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
353-
threadContext.markAsSystemContext();
350+
try {
354351
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor);
355352
if (config.timeout() != null) {
356353
threadPoolExecutor.execute(updateTask, config.timeout(),

server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,9 @@ private static ClusterState initialState(DiscoveryNode localNode, long term, lon
122122
private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) {
123123
deterministicTaskQueue
124124
= new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random());
125-
final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool();
126125
FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test",
127-
fakeThreadPool, deterministicTaskQueue::scheduleNow);
128-
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get());
126+
deterministicTaskQueue::scheduleNow);
127+
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get());
129128
fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
130129
coordinator.handlePublishRequest(new PublishRequest(event.state()));
131130
publishListener.onResponse(null);

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1167,15 +1167,15 @@ private final class TestClusterNode {
11671167
TestClusterNode(DiscoveryNode node) throws IOException {
11681168
this.node = node;
11691169
final Environment environment = createEnvironment(node.getName());
1170-
threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable));
1171-
masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow);
1170+
masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow);
11721171
final Settings settings = environment.settings();
11731172
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
1173+
threadPool = deterministicTaskQueue.getThreadPool();
11741174
clusterService = new ClusterService(settings, clusterSettings, masterService,
11751175
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
11761176
@Override
11771177
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1178-
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
1178+
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue);
11791179
}
11801180

11811181
@Override
@@ -1215,7 +1215,7 @@ protected NamedWriteableRegistry writeableRegistry() {
12151215
}
12161216
};
12171217
transportService = mockTransport.createTransportService(
1218-
settings, threadPool,
1218+
settings, deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)),
12191219
new TransportInterceptor() {
12201220
@Override
12211221
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,

test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

+15-13
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,6 @@ class ClusterNode {
911911
}
912912

913913
private void setUp() {
914-
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
915914
mockTransport = new DisruptableMockTransport(localNode, logger) {
916915
@Override
917916
protected void execute(Runnable runnable) {
@@ -929,20 +928,24 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
929928
.filter(transport -> transport.getLocalNode().getAddress().equals(address)).findAny();
930929
}
931930
};
931+
932932
final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ?
933933
nodeSettings : Settings.builder().put(nodeSettings)
934934
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
935935
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
936-
transportService = mockTransport.createTransportService(settings, threadPool,
937-
getTransportInterceptor(localNode, threadPool), a -> localNode, null, emptySet());
938-
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", threadPool,
936+
transportService = mockTransport.createTransportService(
937+
settings, deterministicTaskQueue.getThreadPool(this::onNode),
938+
getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)),
939+
a -> localNode, null, emptySet());
940+
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test",
939941
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
940942
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
941943
clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings,
942-
deterministicTaskQueue, threadPool);
944+
deterministicTaskQueue, this::onNode);
943945
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
944946
clusterService.setNodeConnectionsService(
945-
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
947+
new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode),
948+
transportService));
946949
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
947950
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
948951
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
@@ -952,7 +955,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
952955
getElectionStrategy());
953956
masterService.setClusterStatePublisher(coordinator);
954957
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
955-
threadPool, null, coordinator);
958+
deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);
956959

957960
logger.trace("starting up [{}]", localNode);
958961
transportService.start();
@@ -1289,9 +1292,8 @@ static class AckedFakeThreadPoolMasterService extends FakeThreadPoolMasterServic
12891292

12901293
AckCollector nextAckCollector = new AckCollector();
12911294

1292-
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
1293-
Consumer<Runnable> onTaskAvailableToRun) {
1294-
super(nodeName, serviceName, threadPool, onTaskAvailableToRun);
1295+
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
1296+
super(nodeName, serviceName, onTaskAvailableToRun);
12951297
}
12961298

12971299
@Override
@@ -1321,8 +1323,8 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
13211323
private boolean applicationMayFail;
13221324

13231325
DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
1324-
DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
1325-
super(nodeName, settings, clusterSettings, threadPool);
1326+
DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
1327+
super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper));
13261328
this.nodeName = nodeName;
13271329
this.deterministicTaskQueue = deterministicTaskQueue;
13281330
addStateApplier(event -> {
@@ -1342,7 +1344,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
13421344

13431345
@Override
13441346
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1345-
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
1347+
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue);
13461348
}
13471349

13481350
@Override

test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.elasticsearch.common.util.concurrent.EsExecutors;
2222
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
23-
import org.elasticsearch.threadpool.ThreadPool;
2423

2524
import java.util.concurrent.TimeUnit;
2625

@@ -30,7 +29,7 @@
3029
*/
3130
public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
3231

33-
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
32+
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue) {
3433
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS,
3534
r -> new Thread() {
3635
@Override
@@ -52,7 +51,7 @@ public String toString() {
5251
});
5352
}
5453
},
55-
threadPool.getThreadContext(), threadPool.scheduler());
54+
deterministicTaskQueue.getThreadPool().getThreadContext(), deterministicTaskQueue.getThreadPool().scheduler());
5655
}
5756

5857
@Override

test/framework/src/main/java/org/elasticsearch/cluster/service/ClusterApplierAssertionPlugin.java

-50
This file was deleted.

test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java

+13-9
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141

4242
import static org.apache.lucene.util.LuceneTestCase.random;
4343
import static org.elasticsearch.test.ESTestCase.randomInt;
44+
import static org.mockito.Mockito.mock;
45+
import static org.mockito.Mockito.when;
4446

4547
public class FakeThreadPoolMasterService extends MasterService {
4648
private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class);
@@ -52,14 +54,21 @@ public class FakeThreadPoolMasterService extends MasterService {
5254
private boolean taskInProgress = false;
5355
private boolean waitForPublish = false;
5456

55-
public FakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
56-
Consumer<Runnable> onTaskAvailableToRun) {
57+
public FakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
5758
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
58-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
59+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
60+
createMockThreadPool());
5961
this.name = serviceName;
6062
this.onTaskAvailableToRun = onTaskAvailableToRun;
6163
}
6264

65+
private static ThreadPool createMockThreadPool() {
66+
final ThreadContext context = new ThreadContext(Settings.EMPTY);
67+
final ThreadPool mockThreadPool = mock(ThreadPool.class);
68+
when(mockThreadPool.getThreadContext()).thenReturn(context);
69+
return mockThreadPool;
70+
}
71+
6372
@Override
6473
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
6574
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name),
@@ -101,11 +110,7 @@ public void run() {
101110
final Runnable task = pendingTasks.remove(taskIndex);
102111
taskInProgress = true;
103112
scheduledNextTask = false;
104-
final ThreadContext threadContext = threadPool.getThreadContext();
105-
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
106-
threadContext.markAsSystemContext();
107-
task.run();
108-
}
113+
task.run();
109114
if (waitForPublish == false) {
110115
taskInProgress = false;
111116
}
@@ -163,5 +168,4 @@ public void onFailure(Exception e) {
163168
protected AckListener wrapAckListener(AckListener ackListener) {
164169
return ackListener;
165170
}
166-
167171
}

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

-4
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import org.elasticsearch.cluster.routing.UnassignedInfo;
8080
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
8181
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
82-
import org.elasticsearch.cluster.service.ClusterApplierAssertionPlugin;
8382
import org.elasticsearch.cluster.service.ClusterService;
8483
import org.elasticsearch.common.Nullable;
8584
import org.elasticsearch.common.Priority;
@@ -1936,9 +1935,6 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
19361935
if (randomBoolean()) {
19371936
mocks.add(MockFieldFilterPlugin.class);
19381937
}
1939-
if (randomBoolean()) {
1940-
mocks.add(ClusterApplierAssertionPlugin.class);
1941-
}
19421938
}
19431939

19441940
if (addMockTransportService()) {

test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
2929

3030
public void testPrioritizedEsThreadPoolExecutor() {
3131
final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue();
32-
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
32+
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue);
3333
final AtomicBoolean called1 = new AtomicBoolean();
3434
final AtomicBoolean called2 = new AtomicBoolean();
3535
executor.execute(new PrioritizedRunnable(Priority.NORMAL) {

0 commit comments

Comments
 (0)