Skip to content

Commit 5b4e68f

Browse files
committed
Cluster state from API should always have a master
Today the `TransportClusterStateAction` ignores the state passed by the `TransportMasterNodeAction` and obtains its state from the cluster applier. This might be inconsistent, showing a different node as the master or maybe even having no master. This change adjusts the action to use the passed-in state directly, and adds tests showing that the state returned is consistent with our expectations even if there is a concurrent master failover. Fixes elastic#38331 Relates elastic#38432
1 parent 39a3d63 commit 5b4e68f

File tree

4 files changed

+237
-44
lines changed

4 files changed

+237
-44
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateRequestBuilder.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.action.support.IndicesOptions;
2323
import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder;
2424
import org.elasticsearch.client.ElasticsearchClient;
25+
import org.elasticsearch.common.unit.TimeValue;
2526

2627
public class ClusterStateRequestBuilder extends MasterNodeReadOperationRequestBuilder<ClusterStateRequest,
2728
ClusterStateResponse, ClusterStateRequestBuilder> {
@@ -100,4 +101,21 @@ public ClusterStateRequestBuilder setIndicesOptions(IndicesOptions indicesOption
100101
request.indicesOptions(indicesOptions);
101102
return this;
102103
}
104+
105+
/**
106+
* Causes the request to wait for the metadata version to advance to at least the given version.
107+
* @param waitForMetaDataVersion The metadata version for which to wait
108+
*/
109+
public ClusterStateRequestBuilder setWaitForMetaDataVersion(long waitForMetaDataVersion) {
110+
request.waitForMetaDataVersion(waitForMetaDataVersion);
111+
return this;
112+
}
113+
114+
/**
115+
* If {@link ClusterStateRequest#waitForMetaDataVersion()} is set then this determines how long to wait
116+
*/
117+
public ClusterStateRequestBuilder setWaitForTimeOut(TimeValue waitForTimeout) {
118+
request.waitForTimeout(waitForTimeout);
119+
return this;
120+
}
103121
}

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
2828
import org.elasticsearch.cluster.ClusterState;
2929
import org.elasticsearch.cluster.ClusterStateObserver;
30+
import org.elasticsearch.cluster.NotMasterException;
3031
import org.elasticsearch.cluster.block.ClusterBlockException;
3132
import org.elasticsearch.cluster.metadata.IndexMetaData;
3233
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -79,50 +80,50 @@ protected ClusterStateResponse newResponse() {
7980
protected void masterOperation(final ClusterStateRequest request, final ClusterState state,
8081
final ActionListener<ClusterStateResponse> listener) throws IOException {
8182

82-
if (request.waitForMetaDataVersion() != null) {
83-
final Predicate<ClusterState> metadataVersionPredicate = clusterState -> {
84-
return clusterState.metaData().version() >= request.waitForMetaDataVersion();
85-
};
86-
final ClusterStateObserver observer =
87-
new ClusterStateObserver(clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext());
88-
final ClusterState clusterState = observer.setAndGetObservedState();
89-
if (metadataVersionPredicate.test(clusterState)) {
90-
buildResponse(request, clusterState, listener);
91-
} else {
92-
observer.waitForNextChange(new ClusterStateObserver.Listener() {
93-
@Override
94-
public void onNewClusterState(ClusterState state) {
95-
try {
96-
buildResponse(request, state, listener);
97-
} catch (Exception e) {
98-
listener.onFailure(e);
99-
}
100-
}
83+
final Predicate<ClusterState> acceptableClusterStatePredicate
84+
= request.waitForMetaDataVersion() == null ? clusterState -> true
85+
: clusterState -> clusterState.metaData().version() >= request.waitForMetaDataVersion();
86+
87+
final Predicate<ClusterState> acceptableClusterStateOrNotMasterPredicate = request.local()
88+
? acceptableClusterStatePredicate
89+
: acceptableClusterStatePredicate.or(clusterState -> clusterState.nodes().isLocalNodeElectedMaster() == false);
10190

102-
@Override
103-
public void onClusterServiceClose() {
104-
listener.onFailure(new NodeClosedException(clusterService.localNode()));
91+
if (acceptableClusterStatePredicate.test(state)) {
92+
ActionListener.completeWith(listener, () -> buildResponse(request, state));
93+
} else {
94+
assert acceptableClusterStateOrNotMasterPredicate.test(state) == false;
95+
new ClusterStateObserver(state, clusterService, request.waitForTimeout(), logger, threadPool.getThreadContext())
96+
.waitForNextChange(new ClusterStateObserver.Listener() {
97+
98+
@Override
99+
public void onNewClusterState(ClusterState newState) {
100+
if (acceptableClusterStatePredicate.test(newState)) {
101+
ActionListener.completeWith(listener, () -> buildResponse(request, newState));
102+
} else {
103+
listener.onFailure(new NotMasterException(
104+
"master stepped down waiting for metadata version " + request.waitForMetaDataVersion()));
105105
}
106+
}
106107

107-
@Override
108-
public void onTimeout(TimeValue timeout) {
109-
try {
110-
listener.onResponse(new ClusterStateResponse(clusterState.getClusterName(), null, true));
111-
} catch (Exception e) {
112-
listener.onFailure(e);
113-
}
108+
@Override
109+
public void onClusterServiceClose() {
110+
listener.onFailure(new NodeClosedException(clusterService.localNode()));
111+
}
112+
113+
@Override
114+
public void onTimeout(TimeValue timeout) {
115+
try {
116+
listener.onResponse(new ClusterStateResponse(state.getClusterName(), null, true));
117+
} catch (Exception e) {
118+
listener.onFailure(e);
114119
}
115-
}, metadataVersionPredicate);
116-
}
117-
} else {
118-
ClusterState currentState = clusterService.state();
119-
buildResponse(request, currentState, listener);
120+
}
121+
}, acceptableClusterStateOrNotMasterPredicate);
120122
}
121123
}
122124

123-
private void buildResponse(final ClusterStateRequest request,
124-
final ClusterState currentState,
125-
final ActionListener<ClusterStateResponse> listener) throws IOException {
125+
private ClusterStateResponse buildResponse(final ClusterStateRequest request,
126+
final ClusterState currentState) {
126127
logger.trace("Serving cluster state request using version {}", currentState.version());
127128
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
128129
builder.version(currentState.version());
@@ -184,8 +185,7 @@ private void buildResponse(final ClusterStateRequest request,
184185
}
185186
}
186187

187-
listener.onResponse(new ClusterStateResponse(currentState.getClusterName(), builder.build(), false));
188+
return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
188189
}
189190

190-
191191
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.admin.cluster.state;
20+
21+
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
23+
import org.elasticsearch.cluster.metadata.MetaData;
24+
import org.elasticsearch.cluster.node.DiscoveryNode;
25+
import org.elasticsearch.cluster.node.DiscoveryNodes;
26+
import org.elasticsearch.common.settings.Settings;
27+
import org.elasticsearch.common.unit.TimeValue;
28+
import org.elasticsearch.discovery.MasterNotDiscoveredException;
29+
import org.elasticsearch.plugins.Plugin;
30+
import org.elasticsearch.test.ESIntegTestCase;
31+
import org.elasticsearch.test.junit.annotations.TestLogging;
32+
import org.elasticsearch.test.transport.MockTransportService;
33+
import org.elasticsearch.transport.TransportService;
34+
35+
import java.util.Collection;
36+
import java.util.Collections;
37+
import java.util.List;
38+
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.stream.Collectors;
40+
import java.util.stream.StreamSupport;
41+
42+
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
43+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
44+
import static org.hamcrest.Matchers.equalTo;
45+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
46+
import static org.hamcrest.Matchers.hasSize;
47+
import static org.hamcrest.Matchers.not;
48+
49+
@ESIntegTestCase.ClusterScope(numDataNodes = 0, scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0)
50+
@TestLogging("org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction:TRACE")
51+
public class TransportClusterStateActionDisruptionIT extends ESIntegTestCase {
52+
53+
@Override
54+
protected Collection<Class<? extends Plugin>> nodePlugins() {
55+
return Collections.singletonList(MockTransportService.TestPlugin.class);
56+
}
57+
58+
public void testNonLocalRequestAlwaysFindsMaster() throws Exception {
59+
runRepeatedlyWhileChangingMaster(() -> {
60+
final ClusterStateRequestBuilder clusterStateRequestBuilder = client().admin().cluster().prepareState()
61+
.clear().setNodes(true).setMasterNodeTimeout("100ms");
62+
final ClusterStateResponse clusterStateResponse;
63+
try {
64+
clusterStateResponse = clusterStateRequestBuilder.get();
65+
} catch (MasterNotDiscoveredException e) {
66+
return; // ok, we hit the disconnected node
67+
}
68+
assertNotNull("should always contain a master node", clusterStateResponse.getState().nodes().getMasterNodeId());
69+
});
70+
}
71+
72+
public void testLocalRequestAlwaysSucceeds() throws Exception {
73+
runRepeatedlyWhileChangingMaster(() -> {
74+
final String node = randomFrom(internalCluster().getNodeNames());
75+
final DiscoveryNodes discoveryNodes = client(node).admin().cluster().prepareState()
76+
.clear().setLocal(true).setNodes(true).setMasterNodeTimeout("100ms").get().getState().nodes();
77+
for (DiscoveryNode discoveryNode : discoveryNodes) {
78+
if (discoveryNode.getName().equals(node)) {
79+
return;
80+
}
81+
}
82+
fail("nodes did not contain [" + node + "]: " + discoveryNodes);
83+
});
84+
}
85+
86+
public void testNonLocalRequestAlwaysFindsMasterAndWaitsForMetadata() throws Exception {
87+
runRepeatedlyWhileChangingMaster(() -> {
88+
final long waitForMetaDataVersion = randomLongBetween(1, 20);
89+
final ClusterStateRequestBuilder clusterStateRequestBuilder = client().admin().cluster().prepareState()
90+
.clear().setNodes(true).setMetaData(true)
91+
.setMasterNodeTimeout(TimeValue.timeValueMillis(100)).setWaitForTimeOut(TimeValue.timeValueMillis(100))
92+
.setWaitForMetaDataVersion(waitForMetaDataVersion);
93+
final ClusterStateResponse clusterStateResponse;
94+
try {
95+
clusterStateResponse = clusterStateRequestBuilder.get();
96+
} catch (MasterNotDiscoveredException e) {
97+
return; // ok, we hit the disconnected node
98+
}
99+
if (clusterStateResponse.isWaitForTimedOut() == false) {
100+
final ClusterState state = clusterStateResponse.getState();
101+
assertNotNull("should always contain a master node", state.nodes().getMasterNodeId());
102+
assertThat("waited for metadata version", state.metaData().version(), greaterThanOrEqualTo(waitForMetaDataVersion));
103+
}
104+
});
105+
}
106+
107+
public void testLocalRequestWaitsForMetadata() throws Exception {
108+
runRepeatedlyWhileChangingMaster(() -> {
109+
final long waitForMetaDataVersion = randomLongBetween(1, 20);
110+
final String node = randomFrom(internalCluster().getNodeNames());
111+
final ClusterStateResponse clusterStateResponse = client(node).admin().cluster()
112+
.prepareState().clear().setLocal(true).setMetaData(true).setWaitForMetaDataVersion(waitForMetaDataVersion)
113+
.setMasterNodeTimeout(TimeValue.timeValueMillis(100)).setWaitForTimeOut(TimeValue.timeValueMillis(100))
114+
.get();
115+
if (clusterStateResponse.isWaitForTimedOut() == false) {
116+
final MetaData metaData = clusterStateResponse.getState().metaData();
117+
assertThat("waited for metadata version " + waitForMetaDataVersion + " with node " + node,
118+
metaData.version(), greaterThanOrEqualTo(waitForMetaDataVersion));
119+
}
120+
});
121+
}
122+
123+
public void runRepeatedlyWhileChangingMaster(Runnable runnable) throws Exception {
124+
internalCluster().startNodes(3);
125+
126+
assertBusy(() -> assertThat(client().admin().cluster().prepareState().clear().setMetaData(true)
127+
.get().getState().getLastCommittedConfiguration().getNodeIds().stream()
128+
.filter(n -> ClusterBootstrapService.isBootstrapPlaceholder(n) == false).collect(Collectors.toSet()), hasSize(3)));
129+
130+
final String masterName = internalCluster().getMasterName();
131+
132+
final AtomicBoolean shutdown = new AtomicBoolean();
133+
final Thread assertingThread = new Thread(() -> {
134+
while (shutdown.get() == false) {
135+
runnable.run();
136+
}
137+
}, "asserting thread");
138+
139+
final Thread updatingThread = new Thread(() -> {
140+
String value = "none";
141+
while (shutdown.get() == false) {
142+
value = "none".equals(value) ? "all" : "none";
143+
final String nonMasterNode = randomValueOtherThan(masterName, () -> randomFrom(internalCluster().getNodeNames()));
144+
assertAcked(client(nonMasterNode).admin().cluster().prepareUpdateSettings().setPersistentSettings(
145+
Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), value)));
146+
}
147+
}, "updating thread");
148+
149+
final List<MockTransportService> mockTransportServices
150+
= StreamSupport.stream(internalCluster().getInstances(TransportService.class).spliterator(), false)
151+
.map(ts -> (MockTransportService) ts).collect(Collectors.toList());
152+
153+
assertingThread.start();
154+
updatingThread.start();
155+
156+
final MockTransportService masterTransportService
157+
= (MockTransportService) internalCluster().getInstance(TransportService.class, masterName);
158+
159+
for (MockTransportService mockTransportService : mockTransportServices) {
160+
if (masterTransportService != mockTransportService) {
161+
masterTransportService.addFailToSendNoConnectRule(mockTransportService);
162+
mockTransportService.addFailToSendNoConnectRule(masterTransportService);
163+
}
164+
}
165+
166+
assertBusy(() -> {
167+
final String nonMasterNode = randomValueOtherThan(masterName, () -> randomFrom(internalCluster().getNodeNames()));
168+
final String claimedMasterName = internalCluster().getMasterName(nonMasterNode);
169+
assertThat(claimedMasterName, not(equalTo(masterName)));
170+
});
171+
172+
shutdown.set(true);
173+
assertingThread.join();
174+
updatingThread.join();
175+
internalCluster().close();
176+
}
177+
178+
}

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@
149149
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
150150
import static org.elasticsearch.discovery.DiscoveryModule.DISCOVERY_TYPE_SETTING;
151151
import static org.elasticsearch.discovery.DiscoveryModule.ZEN2_DISCOVERY_TYPE;
152-
import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
153152
import static org.elasticsearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE;
153+
import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
154154
import static org.elasticsearch.test.ESTestCase.assertBusy;
155155
import static org.elasticsearch.test.ESTestCase.awaitBusy;
156156
import static org.elasticsearch.test.ESTestCase.getTestTransportType;
@@ -161,7 +161,6 @@
161161
import static org.hamcrest.Matchers.not;
162162
import static org.hamcrest.Matchers.nullValue;
163163
import static org.junit.Assert.assertFalse;
164-
import static org.junit.Assert.assertNotNull;
165164
import static org.junit.Assert.assertThat;
166165
import static org.junit.Assert.assertTrue;
167166
import static org.junit.Assert.fail;
@@ -1884,9 +1883,7 @@ public String getMasterName() {
18841883
public String getMasterName(@Nullable String viaNode) {
18851884
try {
18861885
Client client = viaNode != null ? client(viaNode) : client();
1887-
final DiscoveryNode masterNode = client.admin().cluster().prepareState().get().getState().nodes().getMasterNode();
1888-
assertNotNull(masterNode);
1889-
return masterNode.getName();
1886+
return client.admin().cluster().prepareState().get().getState().nodes().getMasterNode().getName();
18901887
} catch (Exception e) {
18911888
logger.warn("Can't fetch cluster state", e);
18921889
throw new RuntimeException("Can't get master node " + e.getMessage(), e);

0 commit comments

Comments
 (0)