Skip to content

Commit 52e8a64

Browse files
committed
Cluster state from API should always have a master (#42454)
Today the `TransportClusterStateAction` ignores the state passed by the `TransportMasterNodeAction` and obtains its state from the cluster applier. This might be inconsistent, showing a different node as the master or maybe even having no master. This change adjusts the action to use the passed-in state directly, and adds tests showing that the state returned is consistent with our expectations even if there is a concurrent master failover. Fixes #38331 Relates #38432
1 parent ed40c91 commit 52e8a64

File tree

5 files changed

+240
-44
lines changed

5 files changed

+240
-44
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestBuilder.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.action.support.IndicesOptions;
2323
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
2424
import org.elasticsearch.client.ElasticsearchClient;
25+
import org.elasticsearch.common.unit.TimeValue;
2526

2627
public class ClusterStateRequestBuilder extends MasterNodeReadOperationRequestBuilder<ClusterStateRequest,
2728
ClusterStateResponse, ClusterStateRequestBuilder> {
@@ -100,4 +101,21 @@ public ClusterStateRequestBuilder setIndicesOptions(IndicesOptions indicesOption
100101
request.indicesOptions(indicesOptions);
101102
return this;
102103
}
104+
105+
/**
106+
* Causes the request to wait for the metadata version to advance to at least the given version.
107+
* @param waitForMetaDataVersion The metadata version for which to wait
108+
*/
109+
public ClusterStateRequestBuilder setWaitForMetaDataVersion(long waitForMetaDataVersion) {
110+
request.waitForMetaDataVersion(waitForMetaDataVersion);
111+
return this;
112+
}
113+
114+
/**
115+
* If {@link ClusterStateRequest#waitForMetaDataVersion()} is set then this determines how long to wait
116+
*/
117+
public ClusterStateRequestBuilder setWaitForTimeOut(TimeValue waitForTimeout) {
118+
request.waitForTimeout(waitForTimeout);
119+
return this;
120+
}
103121
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
2828
import org.elasticsearch.cluster.ClusterState;
2929
import org.elasticsearch.cluster.ClusterStateObserver;
30+
import org.elasticsearch.cluster.NotMasterException;
3031
import org.elasticsearch.cluster.block.ClusterBlockException;
3132
import org.elasticsearch.cluster.metadata.IndexMetaData;
3233
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -86,50 +87,50 @@ protected ClusterStateResponse newResponse() {
8687
protected void masterOperation(final ClusterStateRequest request, final ClusterState state,
8788
final ActionListener<ClusterStateResponse> listener) throws IOException {
8889

89-
if (request.waitForMetaDataVersion() != null) {
90-
final Predicate<ClusterState> metadataVersionPredicate = clusterState -> {
91-
return clusterState.metaData().version() >= request.waitForMetaDataVersion();
92-
};
93-
final ClusterStateObserver observer =
94-
new ClusterStateObserver(clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext());
95-
final ClusterState clusterState = observer.setAndGetObservedState();
96-
if (metadataVersionPredicate.test(clusterState)) {
97-
buildResponse(request, clusterState, listener);
98-
} else {
99-
observer.waitForNextChange(new ClusterStateObserver.Listener() {
100-
@Override
101-
public void onNewClusterState(ClusterState state) {
102-
try {
103-
buildResponse(request, state, listener);
104-
} catch (Exception e) {
105-
listener.onFailure(e);
106-
}
107-
}
90+
final Predicate<ClusterState> acceptableClusterStatePredicate
91+
= request.waitForMetaDataVersion() == null ? clusterState -> true
92+
: clusterState -> clusterState.metaData().version() >= request.waitForMetaDataVersion();
93+
94+
final Predicate<ClusterState> acceptableClusterStateOrNotMasterPredicate = request.local()
95+
? acceptableClusterStatePredicate
96+
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedMaster() == false);
10897

109-
@Override
110-
public void onClusterServiceClose() {
111-
listener.onFailure(new NodeClosedException(clusterService.localNode()));
98+
if (acceptableClusterStatePredicate.test(state)) {
99+
ActionListener.completeWith(listener, () -> buildResponse(request, state));
100+
} else {
101+
assert acceptableClusterStateOrNotMasterPredicate.test(state) == false;
102+
new ClusterStateObserver(state, clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext())
103+
.waitForNextChange(new ClusterStateObserver.Listener() {
104+
105+
@Override
106+
public void onNewClusterState(ClusterState newState) {
107+
if (acceptableClusterStatePredicate.test(newState)) {
108+
ActionListener.completeWith(listener, () -> buildResponse(request, newState));
109+
} else {
110+
listener.onFailure(new NotMasterException(
111+
"master stepped down waiting for metadata version " + request.waitForMetaDataVersion()));
112112
}
113+
}
113114

114-
@Override
115-
public void onTimeout(TimeValue timeout) {
116-
try {
117-
listener.onResponse(new ClusterStateResponse(clusterState.getClusterName(), null, true));
118-
} catch (Exception e) {
119-
listener.onFailure(e);
120-
}
115+
@Override
116+
public void onClusterServiceClose() {
117+
listener.onFailure(new NodeClosedException(clusterService.localNode()));
118+
}
119+
120+
@Override
121+
public void onTimeout(TimeValue timeout) {
122+
try {
123+
listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true));
124+
} catch (Exception e) {
125+
listener.onFailure(e);
121126
}
122-
}, metadataVersionPredicate);
123-
}
124-
} else {
125-
ClusterState currentState = clusterService.state();
126-
buildResponse(request, currentState, listener);
127+
}
128+
}, acceptableClusterStateOrNotMasterPredicate);
127129
}
128130
}
129131

130-
private void buildResponse(final ClusterStateRequest request,
131-
final ClusterState currentState,
132-
final ActionListener<ClusterStateResponse> listener) throws IOException {
132+
private ClusterStateResponse buildResponse(final ClusterStateRequest request,
133+
final ClusterState currentState) {
133134
logger.trace("Serving cluster state request using version {}", currentState.version());
134135
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
135136
builder.version(currentState.version());
@@ -192,8 +193,7 @@ private void buildResponse(final ClusterStateRequest request,
192193
}
193194
}
194195

195-
listener.onResponse(new ClusterStateResponse(currentState.getClusterName(), builder.build(), false));
196+
return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
196197
}
197198

198-
199199
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.admin.cluster.state;
20+
21+
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
23+
import org.elasticsearch.cluster.metadata.MetaData;
24+
import org.elasticsearch.cluster.node.DiscoveryNode;
25+
import org.elasticsearch.cluster.node.DiscoveryNodes;
26+
import org.elasticsearch.cluster.service.ClusterService;
27+
import org.elasticsearch.common.settings.Settings;
28+
import org.elasticsearch.common.unit.TimeValue;
29+
import org.elasticsearch.discovery.MasterNotDiscoveredException;
30+
import org.elasticsearch.plugins.Plugin;
31+
import org.elasticsearch.test.ESIntegTestCase;
32+
import org.elasticsearch.test.transport.MockTransportService;
33+
import org.elasticsearch.transport.TransportService;
34+
35+
import java.util.Collection;
36+
import java.util.Collections;
37+
import java.util.List;
38+
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.stream.Collectors;
40+
import java.util.stream.StreamSupport;
41+
42+
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
43+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
44+
import static org.hamcrest.Matchers.equalTo;
45+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
46+
import static org.hamcrest.Matchers.hasSize;
47+
import static org.hamcrest.Matchers.not;
48+
49+
@ESIntegTestCase.ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0)
50+
public class TransportClusterStateActionDisruptionIT extends ESIntegTestCase {
51+
52+
@Override
53+
protected Collection<Class<? extends Plugin>> nodePlugins() {
54+
return Collections.singletonList(MockTransportService.TestPlugin.class);
55+
}
56+
57+
public void testNonLocalRequestAlwaysFindsMaster() throws Exception {
58+
runRepeatedlyWhileChangingMaster(() -> {
59+
final ClusterStateRequestBuilder clusterStateRequestBuilder = client().admin().cluster().prepareState()
60+
.clear().setNodes(true).setMasterNodeTimeout("100ms");
61+
final ClusterStateResponse clusterStateResponse;
62+
try {
63+
clusterStateResponse = clusterStateRequestBuilder.get();
64+
} catch (MasterNotDiscoveredException e) {
65+
return; // ok, we hit the disconnected node
66+
}
67+
assertNotNull("should always contain a master node", clusterStateResponse.getState().nodes().getMasterNodeId());
68+
});
69+
}
70+
71+
public void testLocalRequestAlwaysSucceeds() throws Exception {
72+
runRepeatedlyWhileChangingMaster(() -> {
73+
final String node = randomFrom(internalCluster().getNodeNames());
74+
final DiscoveryNodes discoveryNodes = client(node).admin().cluster().prepareState()
75+
.clear().setLocal(true).setNodes(true).setMasterNodeTimeout("100ms").get().getState().nodes();
76+
for (DiscoveryNode discoveryNode : discoveryNodes) {
77+
if (discoveryNode.getName().equals(node)) {
78+
return;
79+
}
80+
}
81+
fail("nodes did not contain [" + node + "]: " + discoveryNodes);
82+
});
83+
}
84+
85+
public void testNonLocalRequestAlwaysFindsMasterAndWaitsForMetadata() throws Exception {
86+
runRepeatedlyWhileChangingMaster(() -> {
87+
final String node = randomFrom(internalCluster().getNodeNames());
88+
final long metadataVersion
89+
= internalCluster().getInstance(ClusterService.class, node).getClusterApplierService().state().metaData().version();
90+
final long waitForMetaDataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5);
91+
final ClusterStateRequestBuilder clusterStateRequestBuilder = client(node).admin().cluster().prepareState()
92+
.clear().setNodes(true).setMetaData(true)
93+
.setMasterNodeTimeout(TimeValue.timeValueMillis(100)).setWaitForTimeOut(TimeValue.timeValueMillis(100))
94+
.setWaitForMetaDataVersion(waitForMetaDataVersion);
95+
final ClusterStateResponse clusterStateResponse;
96+
try {
97+
clusterStateResponse = clusterStateRequestBuilder.get();
98+
} catch (MasterNotDiscoveredException e) {
99+
return; // ok, we hit the disconnected node
100+
}
101+
if (clusterStateResponse.isWaitForTimedOut() == false) {
102+
final ClusterState state = clusterStateResponse.getState();
103+
assertNotNull("should always contain a master node", state.nodes().getMasterNodeId());
104+
assertThat("waited for metadata version", state.metaData().version(), greaterThanOrEqualTo(waitForMetaDataVersion));
105+
}
106+
});
107+
}
108+
109+
public void testLocalRequestWaitsForMetadata() throws Exception {
110+
runRepeatedlyWhileChangingMaster(() -> {
111+
final String node = randomFrom(internalCluster().getNodeNames());
112+
final long metadataVersion
113+
= internalCluster().getInstance(ClusterService.class, node).getClusterApplierService().state().metaData().version();
114+
final long waitForMetaDataVersion = randomLongBetween(Math.max(1, metadataVersion - 3), metadataVersion + 5);
115+
final ClusterStateResponse clusterStateResponse = client(node).admin().cluster()
116+
.prepareState().clear().setLocal(true).setMetaData(true).setWaitForMetaDataVersion(waitForMetaDataVersion)
117+
.setMasterNodeTimeout(TimeValue.timeValueMillis(100)).setWaitForTimeOut(TimeValue.timeValueMillis(100))
118+
.get();
119+
if (clusterStateResponse.isWaitForTimedOut() == false) {
120+
final MetaData metaData = clusterStateResponse.getState().metaData();
121+
assertThat("waited for metadata version " + waitForMetaDataVersion + " with node " + node,
122+
metaData.version(), greaterThanOrEqualTo(waitForMetaDataVersion));
123+
}
124+
});
125+
}
126+
127+
public void runRepeatedlyWhileChangingMaster(Runnable runnable) throws Exception {
128+
internalCluster().startNodes(3);
129+
130+
assertBusy(() -> assertThat(client().admin().cluster().prepareState().clear().setMetaData(true)
131+
.get().getState().getLastCommittedConfiguration().getNodeIds().stream()
132+
.filter(n -> ClusterBootstrapService.isBootstrapPlaceholder(n) == false).collect(Collectors.toSet()), hasSize(3)));
133+
134+
final String masterName = internalCluster().getMasterName();
135+
136+
final AtomicBoolean shutdown = new AtomicBoolean();
137+
final Thread assertingThread = new Thread(() -> {
138+
while (shutdown.get() == false) {
139+
runnable.run();
140+
}
141+
}, "asserting thread");
142+
143+
final Thread updatingThread = new Thread(() -> {
144+
String value = "none";
145+
while (shutdown.get() == false) {
146+
value = "none".equals(value) ? "all" : "none";
147+
final String nonMasterNode = randomValueOtherThan(masterName, () -> randomFrom(internalCluster().getNodeNames()));
148+
assertAcked(client(nonMasterNode).admin().cluster().prepareUpdateSettings().setPersistentSettings(
149+
Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), value)));
150+
}
151+
}, "updating thread");
152+
153+
final List<MockTransportService> mockTransportServices
154+
= StreamSupport.stream(internalCluster().getInstances(TransportService.class).spliterator(), false)
155+
.map(ts -> (MockTransportService) ts).collect(Collectors.toList());
156+
157+
assertingThread.start();
158+
updatingThread.start();
159+
160+
final MockTransportService masterTransportService
161+
= (MockTransportService) internalCluster().getInstance(TransportService.class, masterName);
162+
163+
for (MockTransportService mockTransportService : mockTransportServices) {
164+
if (masterTransportService != mockTransportService) {
165+
masterTransportService.addFailToSendNoConnectRule(mockTransportService);
166+
mockTransportService.addFailToSendNoConnectRule(masterTransportService);
167+
}
168+
}
169+
170+
assertBusy(() -> {
171+
final String nonMasterNode = randomValueOtherThan(masterName, () -> randomFrom(internalCluster().getNodeNames()));
172+
final String claimedMasterName = internalCluster().getMasterName(nonMasterNode);
173+
assertThat(claimedMasterName, not(equalTo(masterName)));
174+
});
175+
176+
shutdown.set(true);
177+
assertingThread.join();
178+
updatingThread.join();
179+
internalCluster().close();
180+
}
181+
182+
}

server/src/test/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ public void testSimpleOnlyMasterNodeElection() throws IOException {
8686
.execute().actionGet().getState().nodes().getMasterNode().getName(), equalTo(nextMasterEligibleNodeName));
8787
}
8888

89-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38331")
9089
public void testElectOnlyBetweenMasterNodes() throws Exception {
9190
internalCluster().setBootstrapMasterNodeIndex(0);
9291
logger.info("--> start data node / non master node");

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@
167167
import static org.hamcrest.Matchers.not;
168168
import static org.hamcrest.Matchers.nullValue;
169169
import static org.junit.Assert.assertFalse;
170-
import static org.junit.Assert.assertNotNull;
171170
import static org.junit.Assert.assertThat;
172171
import static org.junit.Assert.assertTrue;
173172
import static org.junit.Assert.fail;
@@ -1957,9 +1956,7 @@ public String getMasterName() {
19571956
public String getMasterName(@Nullable String viaNode) {
19581957
try {
19591958
Client client = viaNode != null ? client(viaNode) : client();
1960-
final DiscoveryNode masterNode = client.admin().cluster().prepareState().get().getState().nodes().getMasterNode();
1961-
assertNotNull(masterNode);
1962-
return masterNode.getName();
1959+
return client.admin().cluster().prepareState().get().getState().nodes().getMasterNode().getName();
19631960
} catch (Exception e) {
19641961
logger.warn("Can't fetch cluster state", e);
19651962
throw new RuntimeException("Can't get master node " + e.getMessage(), e);

0 commit comments

Comments
 (0)