Skip to content

[Zen2] Elect freshest master in upgrade #37122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import java.util.Optional;
import java.util.Set;

import static org.elasticsearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;

/**
* Represents the current state of the cluster.
* <p>
Expand Down Expand Up @@ -210,6 +212,12 @@ public long getVersion() {
return version();
}

public long getVersionOrMetaDataVersion() {
// When following a Zen1 master, the cluster state version is not guaranteed to increase, so instead it is preferable to use the
// metadata version to determine the freshest node. However when following a Zen2 master the cluster state version should be used.
return term() == ZEN1_BWC_TERM ? metaData().version() : version();
}

/**
* This stateUUID is automatically generated for for each version of cluster state. It is used to make sure that
* we are applying diffs to the right previous state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public long getLastAcceptedVersion() {
return getLastAcceptedState().version();
}

private long getLastAcceptedVersionOrMetaDataVersion() {
return getLastAcceptedState().getVersionOrMetaDataVersion();
}

public VotingConfiguration getLastCommittedConfiguration() {
return getLastAcceptedState().getLastCommittedConfiguration();
}
Expand Down Expand Up @@ -126,27 +130,29 @@ public boolean joinVotesHaveQuorumFor(VotingConfiguration votingConfiguration) {
/**
* Used to bootstrap a cluster by injecting the initial state and configuration.
*
* @param initialState The initial state to use. Must have term 0, version 1, and non-empty configurations.
* @param initialState The initial state to use. Must have term 0, version equal to the last-accepted version, and non-empty
* configurations.
* @throws CoordinationStateRejectedException if the arguments were incompatible with the current state of this object.
*/
public void setInitialState(ClusterState initialState) {
final long lastAcceptedVersion = getLastAcceptedVersion();
if (lastAcceptedVersion != 0) {
logger.debug("setInitialState: rejecting since last-accepted version {} > 0", lastAcceptedVersion);
throw new CoordinationStateRejectedException("initial state already set: last-accepted version now " + lastAcceptedVersion);

final VotingConfiguration lastAcceptedConfiguration = getLastAcceptedConfiguration();
if (lastAcceptedConfiguration.isEmpty() == false) {
logger.debug("setInitialState: rejecting since last-accepted configuration is nonempty: {}", lastAcceptedConfiguration);
throw new CoordinationStateRejectedException(
"initial state already set: last-accepted configuration now " + lastAcceptedConfiguration);
}

assert getLastAcceptedTerm() == 0 : getLastAcceptedTerm();
assert getLastAcceptedConfiguration().isEmpty() : getLastAcceptedConfiguration();
assert getLastCommittedConfiguration().isEmpty() : getLastCommittedConfiguration();
assert lastPublishedVersion == 0 : lastAcceptedVersion;
assert lastPublishedVersion == 0 : lastPublishedVersion;
assert lastPublishedConfiguration.isEmpty() : lastPublishedConfiguration;
assert electionWon == false;
assert joinVotes.isEmpty() : joinVotes;
assert publishVotes.isEmpty() : publishVotes;

assert initialState.term() == 0 : initialState;
assert initialState.version() == 1 : initialState;
assert initialState.term() == 0 : initialState + " should have term 0";
assert initialState.version() == getLastAcceptedVersion() : initialState + " should have version " + getLastAcceptedVersion();
assert initialState.getLastAcceptedConfiguration().isEmpty() == false;
assert initialState.getLastCommittedConfiguration().isEmpty() == false;

Expand Down Expand Up @@ -191,7 +197,8 @@ public Join handleStartJoin(StartJoinRequest startJoinRequest) {
joinVotes = new VoteCollection();
publishVotes = new VoteCollection();

return new Join(localNode, startJoinRequest.getSourceNode(), getCurrentTerm(), getLastAcceptedTerm(), getLastAcceptedVersion());
return new Join(localNode, startJoinRequest.getSourceNode(), getCurrentTerm(), getLastAcceptedTerm(),
getLastAcceptedVersionOrMetaDataVersion());
}

/**
Expand Down Expand Up @@ -224,20 +231,22 @@ public boolean handleJoin(Join join) {
" of join higher than current last accepted term " + lastAcceptedTerm);
}

if (join.getLastAcceptedTerm() == lastAcceptedTerm && join.getLastAcceptedVersion() > getLastAcceptedVersion()) {
logger.debug("handleJoin: ignored join as joiner has a better last accepted version (expected: <=[{}], actual: [{}])",
getLastAcceptedVersion(), join.getLastAcceptedVersion());
if (join.getLastAcceptedTerm() == lastAcceptedTerm && join.getLastAcceptedVersion() > getLastAcceptedVersionOrMetaDataVersion()) {
logger.debug(
"handleJoin: ignored join as joiner has a better last accepted version (expected: <=[{}], actual: [{}]) in term {}",
getLastAcceptedVersionOrMetaDataVersion(), join.getLastAcceptedVersion(), lastAcceptedTerm);
throw new CoordinationStateRejectedException("incoming last accepted version " + join.getLastAcceptedVersion() +
" of join higher than current last accepted version " + getLastAcceptedVersion());
" of join higher than current last accepted version " + getLastAcceptedVersionOrMetaDataVersion()
+ " in term " + lastAcceptedTerm);
}

if (getLastAcceptedVersion() == 0) {
if (getLastAcceptedConfiguration().isEmpty()) {
// We do not check for an election won on setting the initial configuration, so it would be possible to end up in a state where
// we have enough join votes to have won the election immediately on setting the initial configuration. It'd be quite
// complicated to restore all the appropriate invariants when setting the initial configuration (it's not just electionWon)
// so instead we just reject join votes received prior to receiving the initial configuration.
logger.debug("handleJoin: ignored join because initial configuration not set");
throw new CoordinationStateRejectedException("initial configuration not set");
logger.debug("handleJoin: rejecting join since this node has not received its initial configuration yet");
throw new CoordinationStateRejectedException("rejecting join since this node has not received its initial configuration yet");
}

boolean added = joinVotes.addVote(join.getSourceNode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.Builder;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -168,7 +167,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService);
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService,
this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::unsafelySetConfigurationForUpgrade);
this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration);
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
transportService::getLocalNode);
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
Expand Down Expand Up @@ -497,7 +496,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {

private PreVoteResponse getPreVoteResponse() {
return new PreVoteResponse(getCurrentTerm(), coordinationState.get().getLastAcceptedTerm(),
coordinationState.get().getLastAcceptedVersion());
coordinationState.get().getLastAcceptedState().getVersionOrMetaDataVersion());
}

// package-visible for testing
Expand Down Expand Up @@ -705,7 +704,6 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
}

logger.info("setting initial configuration to {}", votingConfiguration);
final Builder builder = masterService.incrementVersion(currentState);
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(currentState.coordinationMetaData())
.lastAcceptedConfiguration(votingConfiguration)
.lastCommittedConfiguration(votingConfiguration)
Expand All @@ -715,57 +713,14 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura
// automatically generate a UID for the metadata if we need to
metaDataBuilder.generateClusterUuidIfNeeded(); // TODO generate UUID in bootstrapping tool?
metaDataBuilder.coordinationMetaData(coordinationMetaData);
builder.metaData(metaDataBuilder);
coordinationState.get().setInitialState(builder.build());

coordinationState.get().setInitialState(ClusterState.builder(currentState).metaData(metaDataBuilder).build());
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
startElectionScheduler();
return true;
}
}

private void unsafelySetConfigurationForUpgrade(VotingConfiguration votingConfiguration) {
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this method once unsafe upgrades are no longer needed";
synchronized (mutex) {
if (mode != Mode.CANDIDATE) {
throw new IllegalStateException("Cannot overwrite configuration in mode " + mode);
}

if (isInitialConfigurationSet()) {
throw new IllegalStateException("Cannot overwrite configuration: configuration is already set to "
+ getLastAcceptedState().getLastAcceptedConfiguration());
}

if (lastKnownLeader.map(Coordinator::isZen1Node).orElse(false) == false) {
throw new IllegalStateException("Cannot upgrade from last-known leader: " + lastKnownLeader);
}

if (getCurrentTerm() != ZEN1_BWC_TERM) {
throw new IllegalStateException("Cannot upgrade, term is " + getCurrentTerm());
}

logger.info("automatically bootstrapping during rolling upgrade, using initial configuration {}", votingConfiguration);

final ClusterState currentState = getStateForMasterService();
final Builder builder = masterService.incrementVersion(currentState);
builder.metaData(MetaData.builder(currentState.metaData()).coordinationMetaData(
CoordinationMetaData.builder(currentState.metaData().coordinationMetaData())
.term(1)
.lastAcceptedConfiguration(votingConfiguration)
.lastCommittedConfiguration(votingConfiguration)
.build()));
final ClusterState newClusterState = builder.build();

coordinationState.get().handleStartJoin(new StartJoinRequest(getLocalNode(), newClusterState.term()));
coordinationState.get().handlePublishRequest(new PublishRequest(newClusterState));

followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);

peerFinder.deactivate(getLocalNode());
peerFinder.activate(newClusterState.nodes());
}
}

// Package-private for testing
ClusterState improveConfiguration(ClusterState clusterState) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private void handlePreVoteResponse(final PreVoteResponse response, final Discove

if (response.getLastAcceptedTerm() > clusterState.term()
|| (response.getLastAcceptedTerm() == clusterState.term()
&& response.getLastAcceptedVersion() > clusterState.version())) {
&& response.getLastAcceptedVersion() > clusterState.getVersionOrMetaDataVersion())) {
logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
return;
}
Expand Down
Loading