Skip to content

Commit 4178c57

Browse files
committed
Apply cluster states in system context (#53785)
Today cluster states are sometimes (rarely) applied in the default context rather than system context, which means that any appliers which capture their contexts cannot do things like remote transport actions when security is enabled. There are at least two ways that we end up applying the cluster state in the default context: 1. locally applying a cluster state that indicates that the master has failed 2. the elected master times out while waiting for a response from another node This commit ensures that cluster states are always applied in the system context. Mitigates #53751
1 parent 00203c3 commit 4178c57

File tree

11 files changed

+102
-39
lines changed

11 files changed

+102
-39
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

+2
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ public PublicationContext newPublicationContext(ClusterChangedEvent clusterChang
185185
public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest,
186186
ActionListener<PublishWithJoinResponse> originalListener) {
187187
assert publishRequest.getAcceptedState() == clusterChangedEvent.state() : "state got switched on us";
188+
assert transportService.getThreadPool().getThreadContext().isSystemContext();
188189
final ActionListener<PublishWithJoinResponse> responseActionListener;
189190
if (destination.equals(nodes.getLocalNode())) {
190191
// if publishing to self, use original request instead (see currentPublishRequestToSelf for explanation)
@@ -221,6 +222,7 @@ public void onFailure(Exception e) {
221222
@Override
222223
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
223224
ActionListener<TransportResponse.Empty> responseActionListener) {
225+
assert transportService.getThreadPool().getThreadContext().isSystemContext();
224226
final String actionName;
225227
final TransportRequest transportRequest;
226228
if (Coordinator.isZen1Node(destination)) {

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.common.util.concurrent.EsExecutors;
4747
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4848
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
49+
import org.elasticsearch.common.util.concurrent.ThreadContext;
4950
import org.elasticsearch.common.util.iterable.Iterables;
5051
import org.elasticsearch.threadpool.Scheduler;
5152
import org.elasticsearch.threadpool.ThreadPool;
@@ -347,7 +348,9 @@ private void submitStateUpdateTask(final String source, final ClusterStateTaskCo
347348
if (!lifecycle.started()) {
348349
return;
349350
}
350-
try {
351+
final ThreadContext threadContext = threadPool.getThreadContext();
352+
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
353+
threadContext.markAsSystemContext();
351354
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor);
352355
if (config.timeout() != null) {
353356
threadPoolExecutor.execute(updateTask, config.timeout(),

server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,10 @@ private static ClusterState initialState(DiscoveryNode localNode, long term, lon
122122
private void setupFakeMasterServiceAndCoordinator(long term, ClusterState initialState) {
123123
deterministicTaskQueue
124124
= new DeterministicTaskQueue(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(), random());
125+
final ThreadPool fakeThreadPool = deterministicTaskQueue.getThreadPool();
125126
FakeThreadPoolMasterService fakeMasterService = new FakeThreadPoolMasterService("test_node","test",
126-
deterministicTaskQueue::scheduleNow);
127-
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, deterministicTaskQueue.getThreadPool(), Randomness.get());
127+
fakeThreadPool, deterministicTaskQueue::scheduleNow);
128+
setupMasterServiceAndCoordinator(term, initialState, fakeMasterService, fakeThreadPool, Randomness.get());
128129
fakeMasterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
129130
coordinator.handlePublishRequest(new PublishRequest(event.state()));
130131
publishListener.onResponse(null);

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1167,15 +1167,15 @@ private final class TestClusterNode {
11671167
TestClusterNode(DiscoveryNode node) throws IOException {
11681168
this.node = node;
11691169
final Environment environment = createEnvironment(node.getName());
1170-
masterService = new FakeThreadPoolMasterService(node.getName(), "test", deterministicTaskQueue::scheduleNow);
1170+
threadPool = deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable));
1171+
masterService = new FakeThreadPoolMasterService(node.getName(), "test", threadPool, deterministicTaskQueue::scheduleNow);
11711172
final Settings settings = environment.settings();
11721173
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
1173-
threadPool = deterministicTaskQueue.getThreadPool();
11741174
clusterService = new ClusterService(settings, clusterSettings, masterService,
11751175
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
11761176
@Override
11771177
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1178-
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue);
1178+
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
11791179
}
11801180

11811181
@Override
@@ -1215,7 +1215,7 @@ protected NamedWriteableRegistry writeableRegistry() {
12151215
}
12161216
};
12171217
transportService = mockTransport.createTransportService(
1218-
settings, deterministicTaskQueue.getThreadPool(runnable -> CoordinatorTests.onNodeLog(node, runnable)),
1218+
settings, threadPool,
12191219
new TransportInterceptor() {
12201220
@Override
12211221
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,

test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

+13-15
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ class ClusterNode {
911911
}
912912

913913
private void setUp() {
914+
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(this::onNode);
914915
mockTransport = new DisruptableMockTransport(localNode, logger) {
915916
@Override
916917
protected void execute(Runnable runnable) {
@@ -928,24 +929,20 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
928929
.filter(transport -> transport.getLocalNode().getAddress().equals(address)).findAny();
929930
}
930931
};
931-
932932
final Settings settings = nodeSettings.hasValue(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey()) ?
933933
nodeSettings : Settings.builder().put(nodeSettings)
934934
.putList(ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.getKey(),
935935
ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY)).build(); // suppress auto-bootstrap
936-
transportService = mockTransport.createTransportService(
937-
settings, deterministicTaskQueue.getThreadPool(this::onNode),
938-
getTransportInterceptor(localNode, deterministicTaskQueue.getThreadPool(this::onNode)),
939-
a -> localNode, null, emptySet());
940-
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test",
936+
transportService = mockTransport.createTransportService(settings, threadPool,
937+
getTransportInterceptor(localNode, threadPool), a -> localNode, null, emptySet());
938+
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", threadPool,
941939
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
942940
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
943941
clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings,
944-
deterministicTaskQueue, this::onNode);
942+
deterministicTaskQueue, threadPool);
945943
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
946944
clusterService.setNodeConnectionsService(
947-
new NodeConnectionsService(clusterService.getSettings(), deterministicTaskQueue.getThreadPool(this::onNode),
948-
transportService));
945+
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
949946
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
950947
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
951948
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
@@ -955,7 +952,7 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
955952
getElectionStrategy());
956953
masterService.setClusterStatePublisher(coordinator);
957954
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
958-
deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);
955+
threadPool, null, coordinator);
959956

960957
logger.trace("starting up [{}]", localNode);
961958
transportService.start();
@@ -1292,8 +1289,9 @@ static class AckedFakeThreadPoolMasterService extends FakeThreadPoolMasterServic
12921289

12931290
AckCollector nextAckCollector = new AckCollector();
12941291

1295-
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
1296-
super(nodeName, serviceName, onTaskAvailableToRun);
1292+
AckedFakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
1293+
Consumer<Runnable> onTaskAvailableToRun) {
1294+
super(nodeName, serviceName, threadPool, onTaskAvailableToRun);
12971295
}
12981296

12991297
@Override
@@ -1323,8 +1321,8 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
13231321
private boolean applicationMayFail;
13241322

13251323
DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
1326-
DeterministicTaskQueue deterministicTaskQueue, Function<Runnable, Runnable> runnableWrapper) {
1327-
super(nodeName, settings, clusterSettings, deterministicTaskQueue.getThreadPool(runnableWrapper));
1324+
DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
1325+
super(nodeName, settings, clusterSettings, threadPool);
13281326
this.nodeName = nodeName;
13291327
this.deterministicTaskQueue = deterministicTaskQueue;
13301328
addStateApplier(event -> {
@@ -1344,7 +1342,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
13441342

13451343
@Override
13461344
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1347-
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue);
1345+
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
13481346
}
13491347

13501348
@Override

test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.elasticsearch.common.util.concurrent.EsExecutors;
2222
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
23+
import org.elasticsearch.threadpool.ThreadPool;
2324

2425
import java.util.concurrent.TimeUnit;
2526

@@ -29,7 +30,7 @@
2930
*/
3031
public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
3132

32-
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue) {
33+
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
3334
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS,
3435
r -> new Thread() {
3536
@Override
@@ -51,7 +52,7 @@ public String toString() {
5152
});
5253
}
5354
},
54-
deterministicTaskQueue.getThreadPool().getThreadContext(), deterministicTaskQueue.getThreadPool().scheduler());
55+
threadPool.getThreadContext(), threadPool.scheduler());
5556
}
5657

5758
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.service;
20+
21+
import org.elasticsearch.client.Client;
22+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
23+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
24+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
25+
import org.elasticsearch.env.Environment;
26+
import org.elasticsearch.env.NodeEnvironment;
27+
import org.elasticsearch.plugins.Plugin;
28+
import org.elasticsearch.script.ScriptService;
29+
import org.elasticsearch.threadpool.ThreadPool;
30+
import org.elasticsearch.watcher.ResourceWatcherService;
31+
32+
import java.util.Collection;
33+
import java.util.Collections;
34+
35+
public class ClusterApplierAssertionPlugin extends Plugin {
36+
@Override
37+
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
38+
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
39+
NamedXContentRegistry xContentRegistry, Environment environment,
40+
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
41+
IndexNameExpressionResolver indexNameExpressionResolver) {
42+
clusterService.addStateApplier(event -> {
43+
assert threadPool.getThreadContext().isSystemContext();
44+
});
45+
clusterService.addListener(event -> {
46+
assert threadPool.getThreadContext().isSystemContext();
47+
});
48+
return Collections.emptyList();
49+
}
50+
}

test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java

+9-13
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141

4242
import static org.apache.lucene.util.LuceneTestCase.random;
4343
import static org.elasticsearch.test.ESTestCase.randomInt;
44-
import static org.mockito.Mockito.mock;
45-
import static org.mockito.Mockito.when;
4644

4745
public class FakeThreadPoolMasterService extends MasterService {
4846
private static final Logger logger = LogManager.getLogger(FakeThreadPoolMasterService.class);
@@ -54,21 +52,14 @@ public class FakeThreadPoolMasterService extends MasterService {
5452
private boolean taskInProgress = false;
5553
private boolean waitForPublish = false;
5654

57-
public FakeThreadPoolMasterService(String nodeName, String serviceName, Consumer<Runnable> onTaskAvailableToRun) {
55+
public FakeThreadPoolMasterService(String nodeName, String serviceName, ThreadPool threadPool,
56+
Consumer<Runnable> onTaskAvailableToRun) {
5857
super(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(),
59-
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
60-
createMockThreadPool());
58+
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool);
6159
this.name = serviceName;
6260
this.onTaskAvailableToRun = onTaskAvailableToRun;
6361
}
6462

65-
private static ThreadPool createMockThreadPool() {
66-
final ThreadContext context = new ThreadContext(Settings.EMPTY);
67-
final ThreadPool mockThreadPool = mock(ThreadPool.class);
68-
when(mockThreadPool.getThreadContext()).thenReturn(context);
69-
return mockThreadPool;
70-
}
71-
7263
@Override
7364
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
7465
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 1, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(name),
@@ -110,7 +101,11 @@ public void run() {
110101
final Runnable task = pendingTasks.remove(taskIndex);
111102
taskInProgress = true;
112103
scheduledNextTask = false;
113-
task.run();
104+
final ThreadContext threadContext = threadPool.getThreadContext();
105+
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
106+
threadContext.markAsSystemContext();
107+
task.run();
108+
}
114109
if (waitForPublish == false) {
115110
taskInProgress = false;
116111
}
@@ -168,4 +163,5 @@ public void onFailure(Exception e) {
168163
protected AckListener wrapAckListener(AckListener ackListener) {
169164
return ackListener;
170165
}
166+
171167
}

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

+4
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.elasticsearch.cluster.routing.UnassignedInfo;
8080
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
8181
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
82+
import org.elasticsearch.cluster.service.ClusterApplierAssertionPlugin;
8283
import org.elasticsearch.cluster.service.ClusterService;
8384
import org.elasticsearch.common.Nullable;
8485
import org.elasticsearch.common.Priority;
@@ -1935,6 +1936,9 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
19351936
if (randomBoolean()) {
19361937
mocks.add(MockFieldFilterPlugin.class);
19371938
}
1939+
if (randomBoolean()) {
1940+
mocks.add(ClusterApplierAssertionPlugin.class);
1941+
}
19381942
}
19391943

19401944
if (addMockTransportService()) {

test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
2929

3030
public void testPrioritizedEsThreadPoolExecutor() {
3131
final DeterministicTaskQueue taskQueue = DeterministicTaskQueueTests.newTaskQueue();
32-
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue);
32+
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
3333
final AtomicBoolean called1 = new AtomicBoolean();
3434
final AtomicBoolean called2 = new AtomicBoolean();
3535
executor.execute(new PrioritizedRunnable(Priority.NORMAL) {

0 commit comments

Comments
 (0)