Skip to content

[Zen2] Only elect master-eligible nodes #35996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {

private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback joinCallback) {
assert Thread.holdsLock(mutex) == false;
assert getLocalNode().isMasterNode() : getLocalNode() + " received a join but is not master-eligible";
logger.trace("handleJoinRequest: as {}, handling {}", mode, joinRequest);
transportService.connectToNode(joinRequest.getSourceNode());

Expand Down Expand Up @@ -392,6 +393,8 @@ void becomeCandidate(String method) {
void becomeLeader(String method) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert mode == Mode.CANDIDATE : "expected candidate but was " + mode;
assert getLocalNode().isMasterNode() : getLocalNode() + " became a leader but is not master-eligible";

logger.debug("{}: becoming LEADER (was {}, lastKnownLeader was [{}])", method, mode, lastKnownLeader);

mode = Mode.LEADER;
Expand All @@ -409,6 +412,8 @@ void becomeLeader(String method) {

void becomeFollower(String method, DiscoveryNode leaderNode) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert leaderNode.isMasterNode() : leaderNode + " became a leader but is not master-eligible";

logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader);

final boolean restartLeaderChecker = (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) == false;
Expand Down Expand Up @@ -642,9 +647,9 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
logger.info("setting initial configuration to {}", votingConfiguration);
final Builder builder = masterService.incrementVersion(currentState);
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(currentState.coordinationMetaData())
.lastAcceptedConfiguration(votingConfiguration)
.lastCommittedConfiguration(votingConfiguration)
.build();
.lastAcceptedConfiguration(votingConfiguration)
.lastCommittedConfiguration(votingConfiguration)
.build();

MetaData.Builder metaDataBuilder = MetaData.builder();
// automatically generate a UID for the metadata if we need to
Expand Down Expand Up @@ -927,6 +932,11 @@ protected void onFoundPeersUpdated() {

private void startElectionScheduler() {
assert electionScheduler == null : electionScheduler;

if (getLocalNode().isMasterNode() == false) {
return;
}

final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public String toString() {
}

public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
if (pendingOutgoingJoins.add(dedupKey)) {
Expand Down Expand Up @@ -210,6 +211,8 @@ public String executor() {
}

public void sendStartJoinRequest(final StartJoinRequest startJoinRequest, final DiscoveryNode destination) {
assert startJoinRequest.getSourceNode().isMasterNode()
: "sending start-join request for master-ineligible " + startJoinRequest.getSourceNode();
transportService.sendRequest(destination, START_JOIN_ACTION_NAME,
startJoinRequest, new TransportResponseHandler<Empty>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -58,6 +61,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -136,6 +140,15 @@ public void testCanUpdateClusterStateAfterStabilisation() {
}
}

public void testDoesNotElectNonMasterNode() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5), false);
cluster.runRandomly();
cluster.stabilise();

final ClusterNode leader = cluster.getAnyLeader();
assertTrue(leader.localNode.isMasterNode());
}

public void testNodesJoinAfterStableCluster() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
Expand Down Expand Up @@ -889,10 +902,6 @@ private static int defaultInt(Setting<Integer> setting) {
// then wait for the new leader to commit a state without the old leader
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY;

private static String nodeIdFromIndex(int nodeIndex) {
return "node" + nodeIndex;
}

class Cluster {

static final long EXTREME_DELAY_VARIABILITY = 10000L;
Expand All @@ -910,26 +919,29 @@ class Cluster {
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();

Cluster(int initialNodeCount) {
this(initialNodeCount, true);
}

Cluster(int initialNodeCount, boolean allNodesMasterEligible) {
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);

assertThat(initialNodeCount, greaterThan(0));

final Set<String> initialConfigurationNodeIds = new HashSet<>(initialNodeCount);
while (initialConfigurationNodeIds.isEmpty()) {
for (int i = 0; i < initialNodeCount; i++) {
if (randomBoolean()) {
initialConfigurationNodeIds.add(nodeIdFromIndex(i));
}
}
}
initialConfiguration = new VotingConfiguration(initialConfigurationNodeIds);
logger.info("--> creating cluster of {} nodes with initial configuration {}", initialNodeCount, initialConfiguration);

final Set<String> masterEligibleNodeIds = new HashSet<>(initialNodeCount);
clusterNodes = new ArrayList<>(initialNodeCount);
for (int i = 0; i < initialNodeCount; i++) {
final ClusterNode clusterNode = new ClusterNode(i);
final ClusterNode clusterNode = new ClusterNode(i, allNodesMasterEligible || i == 0 || randomBoolean());
clusterNodes.add(clusterNode);
if (clusterNode.getLocalNode().isMasterNode()) {
masterEligibleNodeIds.add(clusterNode.getId());
}
}

initialConfiguration = new VotingConfiguration(new HashSet<>(
randomSubsetOf(randomIntBetween(1, masterEligibleNodeIds.size()), masterEligibleNodeIds)));

logger.info("--> creating cluster of {} nodes (master-eligible nodes: {}) with initial configuration {}",
initialNodeCount, masterEligibleNodeIds, initialConfiguration);
}

void addNodesAndStabilise(int newNodesCount) {
Expand All @@ -950,7 +962,7 @@ void addNodes(int newNodesCount) {

final int nodeSizeAtStart = clusterNodes.size();
for (int i = 0; i < newNodesCount; i++) {
final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i);
final ClusterNode clusterNode = new ClusterNode(nodeSizeAtStart + i, true);
clusterNodes.add(clusterNode);
}
}
Expand Down Expand Up @@ -1090,11 +1102,11 @@ void stabilise(long stabilisationDurationMillis) {
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
assertFalse("stabilisation requires stable storage", disruptStorage);

if (clusterNodes.stream().allMatch(n -> n.coordinator.isInitialConfigurationSet() == false)) {
if (clusterNodes.stream().allMatch(ClusterNode::isNotUsefullyBootstrapped)) {
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
assertThat("setting initial configuration may fail with blackholed nodes", blackholedNodes, empty());
runFor(defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2, "discovery prior to setting initial configuration");
final ClusterNode bootstrapNode = getAnyNode();
final ClusterNode bootstrapNode = getAnyMasterEligibleNode();
bootstrapNode.applyInitialConfiguration();
} else {
logger.info("setting initial configuration not required");
Expand Down Expand Up @@ -1211,6 +1223,10 @@ private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode
return connectionStatus;
}

ClusterNode getAnyMasterEligibleNode() {
return randomFrom(clusterNodes.stream().filter(n -> n.getLocalNode().isMasterNode()).collect(Collectors.toList()));
}

ClusterNode getAnyNode() {
return getAnyNodeExcept();
}
Expand Down Expand Up @@ -1283,16 +1299,20 @@ class ClusterNode {
private DisruptableMockTransport mockTransport;
private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;

ClusterNode(int nodeIndex) {
ClusterNode(int nodeIndex, boolean masterEligible) {
this.nodeIndex = nodeIndex;
localNode = createDiscoveryNode();
localNode = createDiscoveryNode(masterEligible);
persistedState = new MockPersistedState(0L,
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
onNode(localNode, this::setUp).run();
}

private DiscoveryNode createDiscoveryNode() {
return CoordinationStateTests.createNode(nodeIdFromIndex(nodeIndex));
private DiscoveryNode createDiscoveryNode(boolean masterEligible) {
final TransportAddress address = buildNewFakeTransportAddress();
return new DiscoveryNode("", "node" + nodeIndex,
UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
masterEligible ? EnumSet.allOf(Role.class) : emptySet(), Version.CURRENT);
}

private void setUp() {
Expand Down Expand Up @@ -1483,6 +1503,10 @@ void applyInitialConfiguration() {
}).run();
}

private boolean isNotUsefullyBootstrapped() {
return getLocalNode().isMasterNode() == false || coordinator.isInitialConfigurationSet() == false;
}

private class FakeClusterApplier implements ClusterApplier {

final ClusterName clusterName;
Expand Down