Skip to content

[Zen2] Reconfigure cluster as its membership changes #34592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Oct 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6e96ec8
Reconfigure cluster as its membership changes
DaveCTurner Oct 18, 2018
54eca68
Adding each node might take two publications due to reconfiguration
DaveCTurner Oct 18, 2018
96fba05
TODO
DaveCTurner Oct 18, 2018
8d50dc7
Assert specifics about the configuration
DaveCTurner Oct 18, 2018
81c8094
Use local method
DaveCTurner Oct 19, 2018
447f26f
Rename to improveConfiguration, always calculates better configuration
DaveCTurner Oct 19, 2018
a6f50e3
Cluster#size()
DaveCTurner Oct 19, 2018
4b0986f
Add assertions, better messages
DaveCTurner Oct 19, 2018
50abb94
Assert that last-committed and last-accepted states are equal
DaveCTurner Oct 19, 2018
4fd70fd
Better log message
DaveCTurner Oct 19, 2018
37d95bb
log when IO exceptions are simulated
DaveCTurner Oct 19, 2018
b53cf0c
Handle failure when bumping term by standing down
DaveCTurner Oct 19, 2018
aea29ed
TODO
DaveCTurner Oct 19, 2018
9cfc15e
TODO
DaveCTurner Oct 19, 2018
dbdaf0b
TODO
DaveCTurner Oct 19, 2018
5692417
Finer-grained tests
DaveCTurner Oct 19, 2018
17ff409
URGENT
DaveCTurner Oct 19, 2018
c9b976e
Do not schedule reconfiguration if still becoming master
DaveCTurner Oct 19, 2018
1a8a3ae
Comments
DaveCTurner Oct 19, 2018
a33840e
Only expect correct state if the cluster applier is working
DaveCTurner Oct 19, 2018
1eb28a2
It's ok to leave the failure tolerance setting behind
DaveCTurner Oct 19, 2018
015785c
The leader might be disconnected
DaveCTurner Oct 19, 2018
3c95c5f
Lag detection doesn't fix the situation where the applier failed
DaveCTurner Oct 19, 2018
919436d
Merge branch 'zen2' into 2018-10-18-auto-reconfigure
DaveCTurner Oct 19, 2018
d3975a9
Allow lag-fixing to take longer if any cluster-appliers are hanging
DaveCTurner Oct 19, 2018
56fac17
Ignore leftover fault-tolerance setting in ITs
DaveCTurner Oct 19, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.ClusterState.Builder;
import org.elasticsearch.cluster.ClusterState.VotingConfiguration;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.FollowersChecker.FollowerCheckRequest;
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
Expand All @@ -42,6 +43,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -64,9 +66,13 @@
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_MASTER_NODES_FAILURE_TOLERANCE;
import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

Expand Down Expand Up @@ -104,16 +110,18 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
@Nullable
private Releasable leaderCheckScheduler;
private long maxTermSeen;
private final Reconfigurator reconfigurator;

private Mode mode;
private Optional<DiscoveryNode> lastKnownLeader;
private Optional<Join> lastJoin;
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();

public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService,
MasterService masterService, Supplier<CoordinationState.PersistedState> persistedStateSupplier,
UnicastHostsProvider unicastHostsProvider, ClusterApplier clusterApplier, Random random) {
public Coordinator(Settings settings, ClusterSettings clusterSettings, TransportService transportService,
AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, UnicastHostsProvider unicastHostsProvider,
ClusterApplier clusterApplier, Random random) {
super(settings);
this.transportService = transportService;
this.masterService = masterService;
Expand All @@ -136,6 +144,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat
this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
this.clusterApplier = clusterApplier;
masterService.setClusterStateSupplier(this::getStateForMasterService);
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
}

private Runnable getOnLeaderFailure() {
Expand Down Expand Up @@ -269,8 +278,13 @@ private void updateMaxTermSeen(final long term) {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump",
maxTermSeen, currentTerm);
} else {
ensureTermAtLeast(getLocalNode(), maxTermSeen);
startElection();
try {
ensureTermAtLeast(getLocalNode(), maxTermSeen);
startElection();
} catch (Exception e) {
logger.warn(new ParameterizedMessage("failed to bump term to {}", maxTermSeen), e);
becomeCandidate("updateMaxTermSeen");
}
}
}
}
Expand Down Expand Up @@ -524,6 +538,12 @@ public void invariant() {
assert lastPublishedNodes.equals(followersChecker.getKnownFollowers()) :
lastPublishedNodes + " != " + followersChecker.getKnownFollowers();
}

assert becomingMaster || activePublication ||
coordinationState.get().getLastAcceptedConfiguration().equals(coordinationState.get().getLastCommittedConfiguration())
: coordinationState.get().getLastAcceptedConfiguration() + " != "
+ coordinationState.get().getLastCommittedConfiguration();

} else if (mode == Mode.FOLLOWER) {
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
Expand Down Expand Up @@ -582,13 +602,59 @@ public void setInitialConfiguration(final VotingConfiguration votingConfiguratio
MetaData.Builder metaDataBuilder = MetaData.builder();
// automatically generate a UID for the metadata if we need to
metaDataBuilder.generateClusterUuidIfNeeded(); // TODO generate UUID in bootstrapping tool?
metaDataBuilder.persistentSettings(Settings.builder().put(CLUSTER_MASTER_NODES_FAILURE_TOLERANCE.getKey(),
(votingConfiguration.getNodeIds().size() - 1) / 2).build()); // TODO set this in bootstrapping tool?
builder.metaData(metaDataBuilder);
coordinationState.get().setInitialState(builder.build());
preVoteCollector.update(getPreVoteResponse(), null); // pick up the change to last-accepted version
startElectionScheduler();
}
}

// Package-private for testing
ClusterState improveConfiguration(ClusterState clusterState) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";

final Set<DiscoveryNode> liveNodes = StreamSupport.stream(clusterState.nodes().spliterator(), false)
.filter(this::hasJoinVoteFrom).collect(Collectors.toSet());
final ClusterState.VotingConfiguration newConfig = reconfigurator.reconfigure(
liveNodes, emptySet(), clusterState.getLastAcceptedConfiguration());
if (newConfig.equals(clusterState.getLastAcceptedConfiguration()) == false) {
assert coordinationState.get().joinVotesHaveQuorumFor(newConfig);
return ClusterState.builder(clusterState).lastAcceptedConfiguration(newConfig).build();
}

return clusterState;
}

private AtomicBoolean reconfigurationTaskScheduled = new AtomicBoolean();

private void scheduleReconfigurationIfNeeded() {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
assert mode == Mode.LEADER : mode;
assert currentPublication.isPresent() == false : "Expected no publication in progress";

final ClusterState state = getLastAcceptedState();
if (improveConfiguration(state) != state && reconfigurationTaskScheduled.compareAndSet(false, true)) {
logger.trace("scheduling reconfiguration");
masterService.submitStateUpdateTask("reconfigure", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
reconfigurationTaskScheduled.set(false);
synchronized (mutex) {
return improveConfiguration(currentState);
}
}

@Override
public void onFailure(String source, Exception e) {
reconfigurationTaskScheduled.set(false);
logger.debug("reconfiguration failed", e);
}
});
}
}

// for tests
boolean hasJoinVoteFrom(DiscoveryNode localNode) {
return coordinationState.get().containsJoinVoteFor(localNode);
Expand All @@ -599,19 +665,34 @@ private void handleJoin(Join join) {
ensureTermAtLeast(getLocalNode(), join.getTerm()).ifPresent(this::handleJoin);

if (coordinationState.get().electionWon()) {
// if we have already won the election then the actual join does not matter for election purposes,
// so swallow any exception
try {
coordinationState.get().handleJoin(join);
} catch (CoordinationStateRejectedException e) {
logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e);
// If we have already won the election then the actual join does not matter for election purposes, so swallow any exception
final boolean isNewJoin = handleJoinIgnoringExceptions(join);

// If we haven't completely finished becoming master then there's already a publication scheduled which will, in turn,
// schedule a reconfiguration if needed. It's benign to schedule a reconfiguration anyway, but it might fail if it wins the
// race against the election-winning publication and log a big error message, which we can prevent by checking this here:
final boolean establishedAsMaster = mode == Mode.LEADER && getLastAcceptedState().term() == getCurrentTerm();
if (isNewJoin && establishedAsMaster && publicationInProgress() == false) {
scheduleReconfigurationIfNeeded();
}
} else {
coordinationState.get().handleJoin(join); // this might fail and bubble up the exception
}
}
}

/**
* @return true iff the join was from a new node and was successfully added
*/
private boolean handleJoinIgnoringExceptions(Join join) {
try {
return coordinationState.get().handleJoin(join);
} catch (CoordinationStateRejectedException e) {
logger.debug(new ParameterizedMessage("failed to add {} - ignoring", join), e);
return false;
}
}

public ClusterState getLastAcceptedState() {
synchronized (mutex) {
return coordinationState.get().getLastAcceptedState();
Expand Down Expand Up @@ -904,6 +985,10 @@ public void onSuccess(String source) {
logger.debug("publication ended successfully: {}", CoordinatorPublication.this);
// trigger term bump if new term was found during publication
updateMaxTermSeen(getCurrentTerm());

if (mode == Mode.LEADER) {
scheduleReconfigurationIfNeeded();
}
}
ackListener.onNodeAck(getLocalNode(), null);
publishListener.onResponse(null);
Expand All @@ -916,8 +1001,7 @@ public void onFailure(Exception e) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)");

FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException(
"publication failed", e);
final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e);
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
publishListener.onFailure(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class Reconfigurator extends AbstractComponent {
Setting.intSetting("cluster.master_nodes_failure_tolerance", 0, 0, Property.NodeScope, Property.Dynamic);
// the default is not supposed to be important since we expect to set this setting explicitly at bootstrapping time
// TODO contemplate setting the default to something larger than 0 (1? 1<<30?)
// TODO prevent this being set as a transient or a per-node setting?

private volatile int masterNodesFailureTolerance;

Expand Down
Loading