Skip to content

Commit c1dc523

Browse files
authored
Apply cluster states in system context (#53785)
Today cluster states are sometimes (rarely) applied in the default context rather than system context, which means that any appliers which capture their contexts cannot do things like remote transport actions when security is enabled. There are at least two ways that we end up applying the cluster state in the default context: 1. locally applying a cluster state that indicates that the master has failed 2. the elected master times out while waiting for a response from another node This commit ensures that cluster states are always applied in the system context. Mitigates #53751
1 parent d47d74a commit c1dc523

File tree

11 files changed

+103
-40
lines changed

11 files changed

+103
-40
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

+2
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang
161161
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
162162
ActionListener<PublishWithJoinResponse> originalListener) {
163163
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
164+
assert transportService.getThreadPool().getThreadContext().isSystemContext();
164165
final ActionListener<PublishWithJoinResponse> responseActionListener;
165166
if (destination.equals(nodes.getLocalNode())) {
166167
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
@@ -197,6 +198,7 @@ public void onFailure(Exception e) {
197198
@Override
198199
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
199200
ActionListener<TransportResponse.Empty> responseActionListener) {
201+
assert transportService.getThreadPool().getThreadContext().isSystemContext();
200202
transportService.sendRequest(destination, COMMIT_STATE_ACTION_NAME, applyCommitRequest, stateRequestOptions,
201203
new TransportResponseHandler<TransportResponse.Empty>() {
202204

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.common.util.concurrent.EsExecutors;
4747
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4848
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
49+
import org.elasticsearch.common.util.concurrent.ThreadContext;
4950
import org.elasticsearch.common.util.iterable.Iterables;
5051
import org.elasticsearch.threadpool.Scheduler;
5152
import org.elasticsearch.threadpool.ThreadPool;
@@ -347,7 +348,9 @@ private void submitStateUpdateTask(final String source, final ClusterStateTaskCo
347348
if (!lifecycle.started()) {
348349
return;
349350
}
350-
try {
351+
final ThreadContext threadContext = threadPool.getThreadContext();
352+
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
353+
threadContext.markAsSystemContext();
351354
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor);
352355
if (config.timeout() != null) {
353356
threadPoolExecutor.execute(updateTask, config.timeout(),

server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,10 @@ private static ClusterState initialState(DiscoveryNode localNode, long term, lon
122122
private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) {
123123
deterministicTaskQueue
124124
= new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random());
125+
final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool();
125126
FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test",
126-
deterministicTaskQueue::scheduleNow);
127-
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get());
127+
fakeThreadPool, deterministicTaskQueue::scheduleNow);
128+
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get());
128129
fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
129130
coordinator.handlePublishRequest(new PublishRequest(event.state()));
130131
publishListener.onResponse(null);

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1163,15 +1163,15 @@ private final class TestClusterNode {
11631163
TestClusterNode(DiscoveryNode node) throws IOException {
11641164
this.node = node;
11651165
final Environment environment = createEnvironment(node.getName());
1166-
masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow);
1166+
threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable));
1167+
masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow);
11671168
final Settings settings = environment.settings();
11681169
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
1169-
threadPool = deterministicTaskQueue.getThreadPool();
11701170
clusterService = new ClusterService(settings, clusterSettings, masterService,
11711171
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
11721172
@Override
11731173
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1174-
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue);
1174+
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
11751175
}
11761176

11771177
@Override
@@ -1211,7 +1211,7 @@ protected NamedWriteableRegistry writeableRegistry() {
12111211
}
12121212
};
12131213
transportService = mockTransport.createTransportService(
1214-
settings, deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)),
1214+
settings, threadPool,
12151215
new TransportInterceptor() {
12161216
@Override
12171217
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,

test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

+14-16
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ class ClusterNode {
911911
}
912912

913913
private void setUp() {
914+
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
914915
mockTransport = new DisruptableMockTransport(localNode, logger) {
915916
@Override
916917
protected void execute(Runnable runnable) {
@@ -928,24 +929,20 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
928929
.filter(transport -> transport.getLocalNode().getAddress().equals(address)).findAny();
929930
}
930931
};
931-
932932
final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ?
933933
nodeSettings : Settings.builder().put(nodeSettings)
934934
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
935935
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
936-
transportService = mockTransport.createTransportService(
937-
settings, deterministicTaskQueue.getThreadPool(this::onNode),
938-
getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)),
939-
a -> localNode, null, emptySet());
940-
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test",
936+
transportService = mockTransport.createTransportService(settings, threadPool,
937+
getTransportInterceptor(localNode, threadPool), a -> localNode, null, emptySet());
938+
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", threadPool,
941939
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
942940
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
943941
clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings,
944-
deterministicTaskQueue, this::onNode);
942+
deterministicTaskQueue, threadPool);
945943
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
946944
clusterService.setNodeConnectionsService(
947-
new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode),
948-
transportService));
945+
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
949946
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
950947
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
951948
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
@@ -954,8 +951,8 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
954951
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, p, r) -> {},
955952
getElectionStrategy());
956953
masterService.setClusterStatePublisher(coordinator);
957-
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
958-
deterministicTaskQueue.getThreadPool(this::onNode), coordinator, null);
954+
final GatewayService gatewayService
955+
= new GatewayService(settings, allocationService, clusterService, threadPool, coordinator, null);
959956

960957
logger.trace("starting up [{}]", localNode);
961958
transportService.start();
@@ -1292,8 +1289,9 @@ static class AckedFakeThreadPoolMasterService extends FakeThreadPoolMasterServic
12921289

12931290
AckCollector nextAckCollector = new AckCollector();
12941291

1295-
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
1296-
super(nodeName, serviceName, onTaskAvailableToRun);
1292+
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
1293+
Consumer<Runnable> onTaskAvailableToRun) {
1294+
super(nodeName, serviceName, threadPool, onTaskAvailableToRun);
12971295
}
12981296

12991297
@Override
@@ -1323,8 +1321,8 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
13231321
private boolean applicationMayFail;
13241322

13251323
DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
1326-
DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
1327-
super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper));
1324+
DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
1325+
super(nodeName, settings, clusterSettings, threadPool);
13281326
this.nodeName = nodeName;
13291327
this.deterministicTaskQueue = deterministicTaskQueue;
13301328
addStateApplier(event -> {
@@ -1344,7 +1342,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
13441342

13451343
@Override
13461344
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1347-
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue);
1345+
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
13481346
}
13491347

13501348
@Override

test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.elasticsearch.common.util.concurrent.EsExecutors;
2222
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
23+
import org.elasticsearch.threadpool.ThreadPool;
2324

2425
import java.util.concurrent.TimeUnit;
2526

@@ -29,7 +30,7 @@
2930
*/
3031
public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
3132

32-
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue) {
33+
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
3334
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS,
3435
r -> new Thread() {
3536
@Override
@@ -51,7 +52,7 @@ public String toString() {
5152
});
5253
}
5354
},
54-
deterministicTaskQueue.getThreadPool().getThreadContext(), deterministicTaskQueue.getThreadPool().scheduler());
55+
threadPool.getThreadContext(), threadPool.scheduler());
5556
}
5657

5758
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.service;
20+
21+
import org.elasticsearch.client.Client;
22+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
23+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
24+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
25+
import org.elasticsearch.env.Environment;
26+
import org.elasticsearch.env.NodeEnvironment;
27+
import org.elasticsearch.plugins.Plugin;
28+
import org.elasticsearch.script.ScriptService;
29+
import org.elasticsearch.threadpool.ThreadPool;
30+
import org.elasticsearch.watcher.ResourceWatcherService;
31+
32+
import java.util.Collection;
33+
import java.util.Collections;
34+
35+
public class ClusterApplierAssertionPlugin extends Plugin {
36+
@Override
37+
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
38+
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
39+
NamedXContentRegistry xContentRegistry, Environment environment,
40+
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
41+
IndexNameExpressionResolver indexNameExpressionResolver) {
42+
clusterService.addStateApplier(event -> {
43+
assert threadPool.getThreadContext().isSystemContext();
44+
});
45+
clusterService.addListener(event -> {
46+
assert threadPool.getThreadContext().isSystemContext();
47+
});
48+
return Collections.emptyList();
49+
}
50+
}

test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java

+9-13
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141

4242
import static org.apache.lucene.util.LuceneTestCase.random;
4343
import static org.elasticsearch.test.ESTestCase.randomInt;
44-
import static org.mockito.Mockito.mock;
45-
import static org.mockito.Mockito.when;
4644

4745
public class FakeThreadPoolMasterService extends MasterService {
4846
private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class);
@@ -54,21 +52,14 @@ public class FakeThreadPoolMasterService extends MasterService {
5452
private boolean taskInProgress = false;
5553
private boolean waitForPublish = false;
5654

57-
public FakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
55+
public FakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
56+
Consumer<Runnable> onTaskAvailableToRun) {
5857
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
59-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
60-
createMockThreadPool());
58+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
6159
this.name = serviceName;
6260
this.onTaskAvailableToRun = onTaskAvailableToRun;
6361
}
6462

65-
private static ThreadPool createMockThreadPool() {
66-
final ThreadContext context = new ThreadContext(Settings.EMPTY);
67-
final ThreadPool mockThreadPool = mock(ThreadPool.class);
68-
when(mockThreadPool.getThreadContext()).thenReturn(context);
69-
return mockThreadPool;
70-
}
71-
7263
@Override
7364
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
7465
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name),
@@ -110,7 +101,11 @@ public void run() {
110101
final Runnable task = pendingTasks.remove(taskIndex);
111102
taskInProgress = true;
112103
scheduledNextTask = false;
113-
task.run();
104+
final ThreadContext threadContext = threadPool.getThreadContext();
105+
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
106+
threadContext.markAsSystemContext();
107+
task.run();
108+
}
114109
if (waitForPublish == false) {
115110
taskInProgress = false;
116111
}
@@ -168,4 +163,5 @@ public void onFailure(Exception e) {
168163
protected AckListener wrapAckListener(AckListener ackListener) {
169164
return ackListener;
170165
}
166+
171167
}

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

+4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.cluster.routing.UnassignedInfo;
7373
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
7474
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
75+
import org.elasticsearch.cluster.service.ClusterApplierAssertionPlugin;
7576
import org.elasticsearch.cluster.service.ClusterService;
7677
import org.elasticsearch.common.Nullable;
7778
import org.elasticsearch.common.Priority;
@@ -1784,6 +1785,9 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
17841785
if (randomBoolean()) {
17851786
mocks.add(MockFieldFilterPlugin.class);
17861787
}
1788+
if (randomBoolean()) {
1789+
mocks.add(ClusterApplierAssertionPlugin.class);
1790+
}
17871791
}
17881792

17891793
if (addMockTransportService()) {

test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
2929

3030
public void testPrioritizedEsThreadPoolExecutor() {
3131
final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue();
32-
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue);
32+
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
3333
final AtomicBoolean called1 = new AtomicBoolean();
3434
final AtomicBoolean called2 = new AtomicBoolean();
3535
executor.execute(new PrioritizedRunnable(Priority.NORMAL) {

0 commit comments

Comments
 (0)