Skip to content

Commit 87408b0

Browse files
authored
[Zen2] Only elect master-eligible nodes (#35996)
Today any node can win an election. However, the whole point of master-eligibility is that master-ineligible nodes should not be elected as the leader; furthermore master-ineligible nodes do not have any outgoing STATE channels so cannot publish cluster states, so their leadership is ineffective and disruptive. This change ensures that the elected leader is master-eligible by preventing master-ineligible nodes from scheduling an election.
1 parent 0b45fb9 commit 87408b0

File tree

3 files changed

+64
-27
lines changed

3 files changed

+64
-27
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
344344

345345
private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
346346
assert Thread.holdsLock(mutex) == false;
347+
assert getLocalNode().isMasterNode() : getLocalNode() + " received a join but is not master-eligible";
347348
logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest);
348349
transportService.connectToNode(joinRequest.getSourceNode());
349350

@@ -392,6 +393,8 @@ void becomeCandidate(String method) {
392393
void becomeLeader(String method) {
393394
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
394395
assert mode == Mode.CANDIDATE : "expected candidate but was " + mode;
396+
assert getLocalNode().isMasterNode() : getLocalNode() + " became a leader but is not master-eligible";
397+
395398
logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader);
396399

397400
mode = Mode.LEADER;
@@ -409,6 +412,8 @@ void becomeLeader(String method) {
409412

410413
void becomeFollower(String method, DiscoveryNode leaderNode) {
411414
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
415+
assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible";
416+
412417
logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader);
413418

414419
final boolean restartLeaderChecker = (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) == false;
@@ -642,9 +647,9 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
642647
logger.info("setting initial configuration to {}", votingConfiguration);
643648
final Builder builder = masterService.incrementVersion(currentState);
644649
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(currentState.coordinationMetaData())
645-
.lastAcceptedConfiguration(votingConfiguration)
646-
.lastCommittedConfiguration(votingConfiguration)
647-
.build();
650+
.lastAcceptedConfiguration(votingConfiguration)
651+
.lastCommittedConfiguration(votingConfiguration)
652+
.build();
648653

649654
MetaData.Builder metaDataBuilder = MetaData.builder();
650655
// automatically generate a UID for the metadata if we need to
@@ -927,6 +932,11 @@ protected void onFoundPeersUpdated() {
927932

928933
private void startElectionScheduler() {
929934
assert electionScheduler == null : electionScheduler;
935+
936+
if (getLocalNode().isMasterNode() == false) {
937+
return;
938+
}
939+
930940
final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
931941
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
932942
@Override

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ public String toString() {
166166
}
167167

168168
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
169+
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
169170
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
170171
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
171172
if (pendingOutgoingJoins.add(dedupKey)) {
@@ -210,6 +211,8 @@ public String executor() {
210211
}
211212

212213
public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
214+
assert startJoinRequest.getSourceNode().isMasterNode()
215+
: "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
213216
transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
214217
startJoinRequest, new TransportResponseHandler<Empty>() {
215218
@Override

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,22 @@
2323
import org.apache.logging.log4j.Logger;
2424
import org.apache.logging.log4j.message.ParameterizedMessage;
2525
import org.elasticsearch.ElasticsearchException;
26+
import org.elasticsearch.Version;
2627
import org.elasticsearch.cluster.ClusterName;
2728
import org.elasticsearch.cluster.ClusterState;
28-
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
2929
import org.elasticsearch.cluster.ClusterStateUpdateTask;
3030
import org.elasticsearch.cluster.ESAllocationTestCase;
3131
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
32+
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
3233
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
3334
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
3435
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
3536
import org.elasticsearch.cluster.metadata.MetaData;
3637
import org.elasticsearch.cluster.node.DiscoveryNode;
38+
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
3739
import org.elasticsearch.cluster.service.ClusterApplier;
3840
import org.elasticsearch.common.Randomness;
41+
import org.elasticsearch.common.UUIDs;
3942
import org.elasticsearch.common.lease.Releasable;
4043
import org.elasticsearch.common.settings.ClusterSettings;
4144
import org.elasticsearch.common.settings.Setting;
@@ -58,6 +61,7 @@
5861
import java.util.ArrayList;
5962
import java.util.Arrays;
6063
import java.util.Collections;
64+
import java.util.EnumSet;
6165
import java.util.HashMap;
6266
import java.util.HashSet;
6367
import java.util.List;
@@ -136,6 +140,15 @@ public void testCanUpdateClusterStateAfterStabilisation() {
136140
}
137141
}
138142

143+
public void testDoesNotElectNonMasterNode() {
144+
final Cluster cluster = new Cluster(randomIntBetween(1, 5), false);
145+
cluster.runRandomly();
146+
cluster.stabilise();
147+
148+
final ClusterNode leader = cluster.getAnyLeader();
149+
assertTrue(leader.localNode.isMasterNode());
150+
}
151+
139152
public void testNodesJoinAfterStableCluster() {
140153
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
141154
cluster.runRandomly();
@@ -889,10 +902,6 @@ private static int defaultInt(Setting<Integer> setting) {
889902
// then wait for the new leader to commit a state without the old leader
890903
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY;
891904

892-
private static String nodeIdFromIndex(int nodeIndex) {
893-
return "node" + nodeIndex;
894-
}
895-
896905
class Cluster {
897906

898907
static final long EXTREME_DELAY_VARIABILITY = 10000L;
@@ -910,26 +919,29 @@ class Cluster {
910919
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
911920

912921
Cluster(int initialNodeCount) {
922+
this(initialNodeCount, true);
923+
}
924+
925+
Cluster(int initialNodeCount, boolean allNodesMasterEligible) {
913926
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
914927

915928
assertThat(initialNodeCount, greaterThan(0));
916929

917-
final Set<String> initialConfigurationNodeIds = new HashSet<>(initialNodeCount);
918-
while (initialConfigurationNodeIds.isEmpty()) {
919-
for (int i = 0; i < initialNodeCount; i++) {
920-
if (randomBoolean()) {
921-
initialConfigurationNodeIds.add(nodeIdFromIndex(i));
922-
}
923-
}
924-
}
925-
initialConfiguration = new VotingConfiguration(initialConfigurationNodeIds);
926-
logger.info("--> creating cluster of {} nodes with initial configuration {}", initialNodeCount, initialConfiguration);
927-
930+
final Set<String> masterEligibleNodeIds = new HashSet<>(initialNodeCount);
928931
clusterNodes = new ArrayList<>(initialNodeCount);
929932
for (int i = 0; i < initialNodeCount; i++) {
930-
final ClusterNode clusterNode = new ClusterNode(i);
933+
final ClusterNode clusterNode = new ClusterNode(i, allNodesMasterEligible || i == 0 || randomBoolean());
931934
clusterNodes.add(clusterNode);
935+
if (clusterNode.getLocalNode().isMasterNode()) {
936+
masterEligibleNodeIds.add(clusterNode.getId());
937+
}
932938
}
939+
940+
initialConfiguration = new VotingConfiguration(new HashSet<>(
941+
randomSubsetOf(randomIntBetween(1, masterEligibleNodeIds.size()), masterEligibleNodeIds)));
942+
943+
logger.info("--> creating cluster of {} nodes (master-eligible nodes: {}) with initial configuration {}",
944+
initialNodeCount, masterEligibleNodeIds, initialConfiguration);
933945
}
934946

935947
void addNodesAndStabilise(int newNodesCount) {
@@ -950,7 +962,7 @@ void addNodes(int newNodesCount) {
950962

951963
final int nodeSizeAtStart = clusterNodes.size();
952964
for (int i = 0; i < newNodesCount; i++) {
953-
final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i);
965+
final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i, true);
954966
clusterNodes.add(clusterNode);
955967
}
956968
}
@@ -1090,11 +1102,11 @@ void stabilise(long stabilisationDurationMillis) {
10901102
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
10911103
assertFalse("stabilisation requires stable storage", disruptStorage);
10921104

1093-
if (clusterNodes.stream().allMatch(n -> n.coordinator.isInitialConfigurationSet() == false)) {
1105+
if (clusterNodes.stream().allMatch(ClusterNode::isNotUsefullyBootstrapped)) {
10941106
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
10951107
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
10961108
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
1097-
final ClusterNode bootstrapNode = getAnyNode();
1109+
final ClusterNode bootstrapNode = getAnyMasterEligibleNode();
10981110
bootstrapNode.applyInitialConfiguration();
10991111
} else {
11001112
logger.info("setting initial configuration not required");
@@ -1211,6 +1223,10 @@ private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode
12111223
return connectionStatus;
12121224
}
12131225

1226+
ClusterNode getAnyMasterEligibleNode() {
1227+
return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()).collect(Collectors.toList()));
1228+
}
1229+
12141230
ClusterNode getAnyNode() {
12151231
return getAnyNodeExcept();
12161232
}
@@ -1283,16 +1299,20 @@ class ClusterNode {
12831299
private DisruptableMockTransport mockTransport;
12841300
private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
12851301

1286-
ClusterNode(int nodeIndex) {
1302+
ClusterNode(int nodeIndex, boolean masterEligible) {
12871303
this.nodeIndex = nodeIndex;
1288-
localNode = createDiscoveryNode();
1304+
localNode = createDiscoveryNode(masterEligible);
12891305
persistedState = new MockPersistedState(0L,
12901306
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
12911307
onNode(localNode, this::setUp).run();
12921308
}
12931309

1294-
private DiscoveryNode createDiscoveryNode() {
1295-
return CoordinationStateTests.createNode(nodeIdFromIndex(nodeIndex));
1310+
private DiscoveryNode createDiscoveryNode(boolean masterEligible) {
1311+
final TransportAddress address = buildNewFakeTransportAddress();
1312+
return new DiscoveryNode("", "node" + nodeIndex,
1313+
UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests
1314+
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
1315+
masterEligible ? EnumSet.allOf(Role.class) : emptySet(), Version.CURRENT);
12961316
}
12971317

12981318
private void setUp() {
@@ -1483,6 +1503,10 @@ void applyInitialConfiguration() {
14831503
}).run();
14841504
}
14851505

1506+
private boolean isNotUsefullyBootstrapped() {
1507+
return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false;
1508+
}
1509+
14861510
private class FakeClusterApplier implements ClusterApplier {
14871511

14881512
final ClusterName clusterName;

0 commit comments

Comments
 (0)