Skip to content

Integrate LeaderChecker with Coordinator #34049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final UnicastConfiguredHostsResolver configuredHostsResolver;
private final TimeValue publishTimeout;
private final PublicationTransportHandler publicationHandler;
private final LeaderChecker leaderChecker;
@Nullable
private Releasable electionScheduler;
@Nullable
private Releasable prevotingRound;
@Nullable
private Releasable leaderCheckScheduler;
private AtomicLong maxTermSeen = new AtomicLong();

private Mode mode;
Expand All @@ -108,6 +111,19 @@ public Coordinator(Settings settings, TransportService transportService, Allocat
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit);
this.leaderChecker = new LeaderChecker(settings, transportService, new Runnable() {
@Override
public void run() {
synchronized (mutex) {
becomeCandidate("onLeaderFailure");
}
}

@Override
public String toString() {
return "notification of leader failure";
}
});

masterService.setClusterStateSupplier(this::getStateForMasterService);
}
Expand Down Expand Up @@ -233,6 +249,12 @@ void becomeCandidate(String method) {
joinAccumulator = joinHelper.new CandidateJoinAccumulator();

peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);

if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
leaderCheckScheduler = null;
}
}

preVoteCollector.update(getPreVoteResponse(), null);
Expand All @@ -251,6 +273,11 @@ void becomeLeader(String method) {
peerFinder.deactivate(getLocalNode());
closePrevotingAndElectionScheduler();
preVoteCollector.update(getPreVoteResponse(), getLocalNode());

if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
leaderCheckScheduler = null;
}
}

void becomeFollower(String method, DiscoveryNode leaderNode) {
Expand All @@ -261,13 +288,19 @@ void becomeFollower(String method, DiscoveryNode leaderNode) {
mode = Mode.FOLLOWER;
joinAccumulator.close(mode);
joinAccumulator = new JoinHelper.FollowerJoinAccumulator();
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
}

lastKnownLeader = Optional.of(leaderNode);
peerFinder.deactivate(leaderNode);
closePrevotingAndElectionScheduler();
cancelActivePublication();
preVoteCollector.update(getPreVoteResponse(), leaderNode);

if (leaderCheckScheduler != null) {
leaderCheckScheduler.close();
}
leaderCheckScheduler = leaderChecker.startLeaderChecker(leaderNode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. Does this mean we restart the leader checker on every incoming publication? We call becomeFollower on every incoming publication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did mean a new leader checker on each publication. I pushed be15266 not to do this. However I can't think of a good way to reliably assert that we don't do this: both ways have pretty much the right liveness properties, and my other two ideas are:

  • check we don't send another leader check immediately after each publication (not robust)
  • check for reference equality of the leaderCheckScheduler object before/after a second publication (don't fancy exposing this).

Copy link
Contributor

@ywelsch ywelsch Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However I can't think of a good way to reliably assert that we don't do this

I don't have any good idea here. I'll keep thinking about this. Should not block this PR though.

}

private PreVoteResponse getPreVoteResponse() {
Expand Down Expand Up @@ -339,6 +372,7 @@ public void invariant() {
assert getStateForMasterService().nodes().getMasterNodeId() != null
|| getStateForMasterService().term() != getCurrentTerm() :
getStateForMasterService();
assert leaderCheckScheduler == null : leaderCheckScheduler;
} else if (mode == Mode.FOLLOWER) {
assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false";
assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false);
Expand All @@ -347,12 +381,16 @@ assert getStateForMasterService().nodes().getMasterNodeId() != null
assert electionScheduler == null : electionScheduler;
assert prevotingRound == null : prevotingRound;
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler != null;
} else {
assert mode == Mode.CANDIDATE;
assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator;
assert peerFinderLeader.isPresent() == false : peerFinderLeader;
assert prevotingRound == null || electionScheduler != null;
assert getStateForMasterService().nodes().getMasterNodeId() == null : getStateForMasterService();
assert leaderChecker.currentNodeIsMaster() == false;
assert leaderCheckScheduler == null : leaderCheckScheduler;
}
}
}
Expand Down Expand Up @@ -577,6 +615,8 @@ public String toString() {
return "scheduled timeout for " + publication;
}
});

leaderChecker.setCurrentNodes(publishRequest.getAcceptedState().nodes());
publication.start(Collections.emptySet()); // TODO start failure detector and put faultyNodes here
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class LeaderChecker extends AbstractComponent {
private final TransportService transportService;
private final Runnable onLeaderFailure;

private volatile DiscoveryNodes lastPublishedDiscoveryNodes;
private volatile DiscoveryNodes discoveryNodes;

public LeaderChecker(final Settings settings, final TransportService transportService, final Runnable onLeaderFailure) {
super(settings);
Expand Down Expand Up @@ -111,19 +111,24 @@ public Releasable startLeaderChecker(final DiscoveryNode leader) {
* isLocalNodeElectedMaster() should reflect whether this node is a leader, and nodeExists()
* should indicate whether nodes are known publication targets or not.
*/
public void setLastPublishedDiscoveryNodes(DiscoveryNodes discoveryNodes) {
logger.trace("updating last-published nodes: {}", discoveryNodes);
lastPublishedDiscoveryNodes = discoveryNodes;
public void setCurrentNodes(DiscoveryNodes discoveryNodes) {
logger.trace("setCurrentNodes: {}", discoveryNodes);
this.discoveryNodes = discoveryNodes;
}

// For assertions
boolean currentNodeIsMaster() {
return discoveryNodes.isLocalNodeElectedMaster();
}

private void handleLeaderCheck(LeaderCheckRequest request, TransportChannel transportChannel, Task task) throws IOException {
final DiscoveryNodes lastPublishedDiscoveryNodes = this.lastPublishedDiscoveryNodes;
assert lastPublishedDiscoveryNodes != null;
final DiscoveryNodes discoveryNodes = this.discoveryNodes;
assert discoveryNodes != null;

if (lastPublishedDiscoveryNodes.isLocalNodeElectedMaster() == false) {
if (discoveryNodes.isLocalNodeElectedMaster() == false) {
logger.debug("non-master handling {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("non-leader rejecting leader check"));
} else if (lastPublishedDiscoveryNodes.nodeExists(request.getSender()) == false) {
} else if (discoveryNodes.nodeExists(request.getSender()) == false) {
logger.debug("leader check from unknown node: {}", request);
transportChannel.sendResponse(new CoordinationStateRejectedException("leader check from unknown node"));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,9 @@ public String toString() {
}

static class TestResponse extends ReplicationResponse {
TestResponse() {
setShardInfo(new ShardInfo());
}
}

private class TestAction extends TransportReplicationAction<Request, Request, TestResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;
Expand All @@ -59,7 +60,11 @@
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.setValue;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
Expand All @@ -68,7 +73,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;

@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.cluster.discovery:TRACE")
@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE")
public class CoordinatorTests extends ESTestCase {

public void testCanUpdateClusterStateAfterStabilisation() {
Expand Down Expand Up @@ -101,6 +106,40 @@ public void testNodesJoinAfterStableCluster() {
assertEquals(currentTerm, newTerm);
}

public void testLeaderDisconnectionDetectedQuickly() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.stabilise();

final ClusterNode originalLeader = cluster.getAnyLeader();
logger.info("--> disconnecting leader {}", originalLeader);
originalLeader.disconnect();

synchronized (originalLeader.coordinator.mutex) {
originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated
}

cluster.stabilise();
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}

public void testUnresponsiveLeaderDetectedEventually() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.stabilise();

final ClusterNode originalLeader = cluster.getAnyLeader();
logger.info("--> partitioning leader {}", originalLeader);
originalLeader.partition();

synchronized (originalLeader.coordinator.mutex) {
originalLeader.coordinator.becomeCandidate("simulated failure detection"); // TODO remove once follower checker is integrated
}

cluster.stabilise(Cluster.DEFAULT_STABILISATION_TIME
+ (LEADER_CHECK_INTERVAL_SETTING.get(Settings.EMPTY).millis() + LEADER_CHECK_TIMEOUT_SETTING.get(Settings.EMPTY).millis())
* LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY));
assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}

private static String nodeIdFromIndex(int nodeIndex) {
return "node" + nodeIndex;
}
Expand All @@ -115,6 +154,9 @@ class Cluster {
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build());
private final VotingConfiguration initialConfiguration;

private final Set<String> disconnectedNodes = new HashSet<>();
private final Set<String> partitionedNodes = new HashSet<>();

Cluster(int initialNodeCount) {
logger.info("--> creating cluster of {} nodes", initialNodeCount);

Expand Down Expand Up @@ -142,8 +184,12 @@ void addNodes(int newNodesCount) {
}

void stabilise() {
stabilise(DEFAULT_STABILISATION_TIME);
}

void stabilise(long stabilisationTime) {
final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis();
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + DEFAULT_STABILISATION_TIME) {
while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationStartTime + stabilisationTime) {

while (deterministicTaskQueue.hasRunnableTasks()) {
try {
Expand Down Expand Up @@ -182,16 +228,21 @@ private void assertUniqueLeaderAndExpectedModes() {
}

final String nodeId = clusterNode.getId();
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
// TODO assert that all nodes have actually voted for the leader in this term

assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " is at the same accepted version as the leader",
Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion);
assertThat(nodeId + " is at the same committed version as the leader",
clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
equalTo(Optional.of(true)));

if (disconnectedNodes.contains(nodeId) || partitionedNodes.contains(nodeId)) {
assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE));
} else {
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
// TODO assert that all nodes have actually voted for the leader in this term

assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " is at the same accepted version as the leader",
Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion);
assertThat(nodeId + " is at the same committed version as the leader",
clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
equalTo(Optional.of(true)));
}
}

assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(DiscoveryNodes::getSize),
Expand All @@ -204,6 +255,18 @@ ClusterNode getAnyLeader() {
return randomFrom(allLeaders);
}

private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
ConnectionStatus connectionStatus;
if (partitionedNodes.contains(sender.getId()) || partitionedNodes.contains(destination.getId())) {
connectionStatus = ConnectionStatus.BLACK_HOLE;
} else if (disconnectedNodes.contains(sender.getId()) || disconnectedNodes.contains(destination.getId())) {
connectionStatus = ConnectionStatus.DISCONNECTED;
} else {
connectionStatus = ConnectionStatus.CONNECTED;
}
return connectionStatus;
}

class ClusterNode extends AbstractComponent {
private final int nodeIndex;
private Coordinator coordinator;
Expand Down Expand Up @@ -241,7 +304,7 @@ protected DiscoveryNode getLocalNode() {

@Override
protected ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode destination) {
return ConnectionStatus.CONNECTED;
return Cluster.this.getConnectionStatus(sender, destination);
}

@Override
Expand Down Expand Up @@ -290,7 +353,7 @@ String getId() {
return localNode.getId();
}

public DiscoveryNode getLocalNode() {
DiscoveryNode getLocalNode() {
return localNode;
}

Expand All @@ -316,6 +379,14 @@ public void onFailure(String source, Exception e) {
public String toString() {
return localNode.toString();
}

void disconnect() {
disconnectedNodes.add(localNode.getId());
}

void partition() {
partitionedNodes.add(localNode.getId());
}
}

private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void testLeaderBehaviour() {
= DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()).build();

{
leaderChecker.setLastPublishedDiscoveryNodes(discoveryNodes);
leaderChecker.setCurrentNodes(discoveryNodes);

final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
Expand All @@ -307,7 +307,7 @@ public void testLeaderBehaviour() {
}

{
leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build());
leaderChecker.setCurrentNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).build());

final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
Expand All @@ -318,7 +318,7 @@ public void testLeaderBehaviour() {
}

{
leaderChecker.setLastPublishedDiscoveryNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build());
leaderChecker.setCurrentNodes(DiscoveryNodes.builder(discoveryNodes).add(otherNode).masterNodeId(null).build());

final CapturingTransportResponseHandler handler = new CapturingTransportResponseHandler();
transportService.sendRequest(localNode, LEADER_CHECK_ACTION_NAME, new LeaderCheckRequest(otherNode), handler);
Expand Down
Loading