Skip to content

Integrate LeaderChecker with Coordinator #34049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final UnicastConfiguredHostsResolver configuredHostsResolver;
private final TimeValue publishTimeout;
private final PublicationTransportHandler publicationHandler;
private final LeaderChecker leaderChecker;
@Nullable
private Releasable electionScheduler;
@Nullable
private Releasable prevotingRound;
@Nullable
private Releasable leaderCheckScheduler;
private AtomicLong maxTermSeen = new AtomicLong();

private Mode mode;
Expand All @@ -108,10 +111,27 @@ public Coordinator(Settings settings, TransportService transportService, Allocat
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure());

masterService.setClusterStateSupplier(this::getStateForMasterService);
}

private Runnable getOnLeaderFailure() {
return new Runnable() {
@Override
public void run() {
synchronized (mutex) {
becomeCandidate("onLeaderFailure");
}
}

@Override
public String toString() {
return "notification of leader failure";
}
};
}

private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) {
synchronized (mutex) {
logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);
Expand Down Expand Up @@ -233,6 +253,12 @@ void becomeCandidate(String method) {
joinAccumulator = joinHelper.new CandidateJoinAccumulator();

peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);

if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
leaderCheckScheduler = null;
}
}

preVoteCollector.update(getPreVoteResponse(), null);
Expand All @@ -251,23 +277,35 @@ void becomeLeader(String method) {
peerFinder.deactivate(getLocalNode());
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());

assert leaderCheckScheduler == null : leaderCheckScheduler;
}

void becomeFollower(String method, DiscoveryNode leaderNode) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
logger.debug("{}: becoming FOLLOWER of [{}] (was {}, lastKnownLeader was [{}])", method, leaderNode, mode, lastKnownLeader);

final boolean restartLeaderChecker = (mode == Mode.FOLLOWER && Optional.of(leaderNode).equals(lastKnownLeader)) == false;

if (mode != Mode.FOLLOWER) {
mode = Mode.FOLLOWER;
joinAccumulator.close(mode);
joinAccumulator = new JoinHelper.FollowerJoinAccumulator();
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
}

lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
closePrevotingAndElectionScheduler();
cancelActivePublication();
preVoteCollector.update(getPreVoteResponse(), leaderNode);

if (restartLeaderChecker) {
if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
}
leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode);
}
}

private PreVoteResponse getPreVoteResponse() {
Expand Down Expand Up @@ -339,6 +377,7 @@ public void invariant() {
assert getStateForMasterService().nodes().getMasterNodeId() != null
|| getStateForMasterService().term() != getCurrentTerm() :
getStateForMasterService();
assert leaderCheckScheduler == null : leaderCheckScheduler;
} else if (mode == Mode.FOLLOWER) {
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
Expand All @@ -347,12 +386,16 @@ assert getStateForMasterService().nodes().getMasterNodeId() != null
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler != null;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
assert peerFinderLeader.isPresent() == false : peerFinderLeader;
assert prevotingRound == null || electionScheduler != null;
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler == null : leaderCheckScheduler;
}
}
}
Expand Down Expand Up @@ -577,6 +620,8 @@ public String toString() {
return "scheduled timeout for " + publication;
}
});

leaderChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes());
publication.start(Collections.emptySet()); // TODO start failure detector and put faultyNodes here
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class LeaderChecker extends AbstractComponent {
private final TransportService transportService;
private final Runnable onLeaderFailure;

private volatile DiscoveryNodes lastPublishedDiscoveryNodes;
private volatile DiscoveryNodes discoveryNodes;

public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
super(settings);
Expand Down Expand Up @@ -111,19 +111,24 @@ public Releasable startLeaderChecker(final DiscoveryNode leader) {
* isLocalNodeElectedMaster() should reflect whether this node is a leader, and nodeExists()
* should indicate whether nodes are known publication targets or not.
*/
public void setLastPublishedDiscoveryNodes(DiscoveryNodes discoveryNodes) {
logger.trace("updating last-published nodes: {}", discoveryNodes);
lastPublishedDiscoveryNodes = discoveryNodes;
public void setCurrentNodes(DiscoveryNodes discoveryNodes) {
logger.trace("setCurrentNodes: {}", discoveryNodes);
this.discoveryNodes = discoveryNodes;
}

// For assertions
boolean currentNodeIsMaster() {
return discoveryNodes.isLocalNodeElectedMaster();
}

private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel transportChannel, Task task) throws IOException {
final DiscoveryNodes lastPublishedDiscoveryNodes = this.lastPublishedDiscoveryNodes;
assert lastPublishedDiscoveryNodes != null;
final DiscoveryNodes discoveryNodes = this.discoveryNodes;
assert discoveryNodes != null;

if (lastPublishedDiscoveryNodes.isLocalNodeElectedMaster() == false) {
if (discoveryNodes.isLocalNodeElectedMaster() == false) {
logger.debug("non-master handling {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("non-leader rejecting leader check"));
} else if (lastPublishedDiscoveryNodes.nodeExists(request.getSender()) == false) {
} else if (discoveryNodes.nodeExists(request.getSender()) == false) {
logger.debug("leader check from unknown node: {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("leader check from unknown node"));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,9 @@ public String toString() {
}

static class TestResponse extends ReplicationResponse {
TestResponse() {
setShardInfo(new ShardInfo());
}
}

private class TestAction extends TransportReplicationAction<Request, Request, TestResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;
Expand All @@ -59,7 +60,11 @@
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
Expand All @@ -68,7 +73,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.cluster.discovery:TRACE")
@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE")
public class CoordinatorTests extends ESTestCase {

public void testCanUpdateClusterStateAfterStabilisation() {
Expand Down Expand Up @@ -101,6 +106,40 @@ public void testNodesJoinAfterStableCluster() {
assertEquals(currentTerm, newTerm);
}

public void testLeaderDisconnectionDetectedQuickly() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.stabilise();

final ClusterNode originalLeader = cluster.getAnyLeader();
logger.info("--> disconnecting leader {}", originalLeader);
originalLeader.disconnect();

synchronized (originalLeader.coordinator.mutex) {
originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated
}

cluster.stabilise();
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}

public void testUnresponsiveLeaderDetectedEventually() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.stabilise();

final ClusterNode originalLeader = cluster.getAnyLeader();
logger.info("--> partitioning leader {}", originalLeader);
originalLeader.partition();

synchronized (originalLeader.coordinator.mutex) {
originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated
}

cluster.stabilise(Cluster.DEFAULT_STABILISATION_TIME
+ (LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis())
* LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY));
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}

private static String nodeIdFromIndex(int nodeIndex) {
return "node" + nodeIndex;
}
Expand All @@ -115,6 +154,9 @@ class Cluster {
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build());
private final VotingConfiguration initialConfiguration;

private final Set<String> disconnectedNodes = new HashSet<>();
private final Set<String> blackholedNodes = new HashSet<>();

Cluster(int initialNodeCount) {
logger.info("--> creating cluster of {} nodes", initialNodeCount);

Expand Down Expand Up @@ -142,8 +184,12 @@ void addNodes(int newNodesCount) {
}

void stabilise() {
stabilise(DEFAULT_STABILISATION_TIME);
}

void stabilise(long stabilisationTime) {
final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis();
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + DEFAULT_STABILISATION_TIME) {
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + stabilisationTime) {

while (deterministicTaskQueue.hasRunnableTasks()) {
try {
Expand Down Expand Up @@ -182,16 +228,21 @@ private void assertUniqueLeaderAndExpectedModes() {
}

final String nodeId = clusterNode.getId();
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
// TODO assert that all nodes have actually voted for the leader in this term

assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " is at the same accepted version as the leader",
Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion);
assertThat(nodeId + " is at the same committed version as the leader",
clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
equalTo(Optional.of(true)));

if (disconnectedNodes.contains(nodeId) || blackholedNodes.contains(nodeId)) {
assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE));
} else {
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
// TODO assert that all nodes have actually voted for the leader in this term

assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " is at the same accepted version as the leader",
Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion);
assertThat(nodeId + " is at the same committed version as the leader",
clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
equalTo(Optional.of(true)));
}
}

assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(DiscoveryNodes::getSize),
Expand All @@ -204,6 +255,18 @@ ClusterNode getAnyLeader() {
return randomFrom(allLeaders);
}

private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
ConnectionStatus connectionStatus;
if (blackholedNodes.contains(sender.getId()) || blackholedNodes.contains(destination.getId())) {
connectionStatus = ConnectionStatus.BLACK_HOLE;
} else if (disconnectedNodes.contains(sender.getId()) || disconnectedNodes.contains(destination.getId())) {
connectionStatus = ConnectionStatus.DISCONNECTED;
} else {
connectionStatus = ConnectionStatus.CONNECTED;
}
return connectionStatus;
}

class ClusterNode extends AbstractComponent {
private final int nodeIndex;
private Coordinator coordinator;
Expand Down Expand Up @@ -241,7 +304,7 @@ protected DiscoveryNode getLocalNode() {

@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
return ConnectionStatus.CONNECTED;
return Cluster.this.getConnectionStatus(sender, destination);
}

@Override
Expand All @@ -264,6 +327,17 @@ protected void handle(DiscoveryNode sender, DiscoveryNode destination, String ac
deterministicTaskQueue.scheduleNow(onNode(destination, doDelivery));
}
}

@Override
protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNode destination) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
logger.trace("ignoring blackhole and delivering {}", getRequestDescription(requestId, action, destination));
// handshakes always have a timeout, and are sent in a blocking fashion, so we must respond with an exception.
sendFromTo(destination, getLocalNode(), action, getDisconnectException(requestId, action, destination));
} else {
super.onBlackholedDuringSend(requestId, action, destination);
}
}
};

masterService = new FakeThreadPoolMasterService("test",
Expand All @@ -290,7 +364,7 @@ String getId() {
return localNode.getId();
}

public DiscoveryNode getLocalNode() {
DiscoveryNode getLocalNode() {
return localNode;
}

Expand All @@ -316,6 +390,14 @@ public void onFailure(String source, Exception e) {
public String toString() {
return localNode.toString();
}

void disconnect() {
disconnectedNodes.add(localNode.getId());
}

void partition() {
blackholedNodes.add(localNode.getId());
}
}

private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
Expand Down
Loading