Skip to content

Commit f1b4358

Browse files
authored
Treat lagging nodes as faulty (elastic#51)
Today, if a node detects it is lagging the master it falls back to CANDIDATE but then receives a heartbeat from the master and goes back to FOLLOWER. If it is lagging because it missed a cluster state update for some reason then it will never recover, because there's no impetus to publish a further update. This change fixes this, crudely, by causing nodes to consider themselves as faulty while they are lagging, which eventually results in them being kicked out of the cluster and, later, rejoining, which fixes the lag. Fixes elastic#43, but will eventually be superseded by elastic#53.
1 parent 44c55e8 commit f1b4358

File tree

2 files changed

+129
-67
lines changed

2 files changed

+129
-67
lines changed

server/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java

Lines changed: 45 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.elasticsearch.threadpool.ThreadPool;
5656
import org.elasticsearch.transport.TransportException;
5757
import org.elasticsearch.transport.TransportResponse;
58+
import org.elasticsearch.transport.TransportResponse.Empty;
5859
import org.elasticsearch.transport.TransportResponseHandler;
5960

6061
import java.io.IOException;
@@ -136,6 +137,7 @@ public class Legislator extends AbstractComponent {
136137
private final Map<DiscoveryNode, MembershipAction.JoinCallback> joinRequestAccumulator = new HashMap<>();
137138

138139
private Optional<Publication> currentPublication = Optional.empty();
140+
private long laggingUntilCommittedVersionExceeds;
139141

140142
public Legislator(Settings settings, ConsensusState.PersistedState persistedState,
141143
Transport transport, MasterService masterService, DiscoveryNode localNode, LongSupplier currentTimeSupplier,
@@ -279,35 +281,16 @@ private void startSeekingJoins() {
279281
private Join joinLeaderInTerm(DiscoveryNode sourceNode, long term) {
280282
logger.debug("joinLeaderInTerm: from [{}] with term {}", sourceNode, term);
281283
Join join = consensusState.handleStartJoin(sourceNode, term);
284+
lastJoin = Optional.of(join);
282285
if (mode != Mode.CANDIDATE) {
283286
becomeCandidate("joinLeaderInTerm");
284287
}
285288
return join;
286289
}
287290

288291
public void handleStartJoin(DiscoveryNode sourceNode, StartJoinRequest startJoinRequest) {
289-
Join join = joinLeaderInTerm(sourceNode, startJoinRequest.getTerm());
290-
291-
transport.sendJoin(sourceNode, join, new TransportResponseHandler<TransportResponse.Empty>() {
292-
@Override
293-
public void handleResponse(TransportResponse.Empty response) {
294-
logger.debug("SendJoinResponseHandler: successfully joined {}", sourceNode);
295-
}
296-
297-
@Override
298-
public void handleException(TransportException exp) {
299-
if (exp.getRootCause() instanceof ConsensusMessageRejectedException) {
300-
logger.debug("SendJoinResponseHandler: [{}] failed: {}", sourceNode, exp.getRootCause().getMessage());
301-
} else {
302-
logger.debug(() -> new ParameterizedMessage("SendJoinResponseHandler: [{}] failed", sourceNode), exp);
303-
}
304-
}
305-
306-
@Override
307-
public String executor() {
308-
return ThreadPool.Names.SAME;
309-
}
310-
});
292+
final Join join = joinLeaderInTerm(sourceNode, startJoinRequest.getTerm());
293+
sendJoin(sourceNode, join);
311294
}
312295

313296
private Optional<Join> ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) {
@@ -899,11 +882,7 @@ public LegislatorPublishResponse handlePublishRequest(DiscoveryNode sourceNode,
899882
DiscoveryNodes.builder(publishRequest.getAcceptedState().nodes()).localNodeId(getLocalNode().getId()).build()).build();
900883
publishRequest = new PublishRequest(clusterState);
901884

902-
final Optional<Join> optionalJoin = ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
903-
904-
if (optionalJoin.isPresent()) {
905-
lastJoin = optionalJoin;
906-
}
885+
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
907886

908887
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);
909888

@@ -931,36 +910,46 @@ public HeartbeatResponse handleHeartbeatRequest(DiscoveryNode sourceNode, Heartb
931910
}
932911

933912
ensureTermAtLeast(sourceNode, heartbeatRequest.getTerm()).ifPresent(join -> {
934-
logger.debug("handleHeartbeatRequest: sending join [{}] for term [{}] to {}",
935-
join, heartbeatRequest.getTerm(), sourceNode);
936-
937-
transport.sendJoin(sourceNode, join, new TransportResponseHandler<TransportResponse.Empty>() {
938-
@Override
939-
public void handleResponse(TransportResponse.Empty response) {
940-
logger.debug("SendJoinResponseHandler: successfully joined {}", sourceNode);
941-
}
942-
943-
@Override
944-
public void handleException(TransportException exp) {
945-
if (exp.getRootCause() instanceof ConsensusMessageRejectedException) {
946-
logger.debug("SendJoinResponseHandler: [{}] failed: {}", sourceNode, exp.getRootCause().getMessage());
947-
} else {
948-
logger.debug(() -> new ParameterizedMessage("SendJoinResponseHandler: [{}] failed", sourceNode), exp);
949-
}
950-
}
951-
952-
@Override
953-
public String executor() {
954-
return ThreadPool.Names.SAME;
955-
}
956-
});
913+
logger.debug("handleHeartbeatRequest: sending join [{}] for term [{}] to {}", join, heartbeatRequest.getTerm(), sourceNode);
914+
sendJoin(sourceNode, join);
957915
});
958916

917+
if (laggingUntilCommittedVersionExceeds > 0
918+
&& (lastCommittedState.isPresent() == false || lastCommittedState.get().version() <= laggingUntilCommittedVersionExceeds)) {
919+
logger.debug("handleHeartbeatRequest: rejecting [{}] from [{}] due to lag at version [{}]",
920+
heartbeatRequest, sourceNode, laggingUntilCommittedVersionExceeds);
921+
throw new ConsensusMessageRejectedException("HeartbeatRequest rejected: lagging at version [{}]",
922+
laggingUntilCommittedVersionExceeds);
923+
}
924+
959925
becomeFollower("handleHeartbeatRequest", sourceNode);
960926

961927
return new HeartbeatResponse(consensusState.getLastAcceptedVersion(), consensusState.getCurrentTerm());
962928
}
963929

930+
private void sendJoin(DiscoveryNode sourceNode, Join join) {
931+
transport.sendJoin(sourceNode, join, new TransportResponseHandler<Empty>() {
932+
@Override
933+
public void handleResponse(Empty response) {
934+
logger.debug("SendJoinResponseHandler: successfully joined {}", sourceNode);
935+
}
936+
937+
@Override
938+
public void handleException(TransportException exp) {
939+
if (exp.getRootCause() instanceof ConsensusMessageRejectedException) {
940+
logger.debug("SendJoinResponseHandler: [{}] failed: {}", sourceNode, exp.getRootCause().getMessage());
941+
} else {
942+
logger.debug(() -> new ParameterizedMessage("SendJoinResponseHandler: [{}] failed", sourceNode), exp);
943+
}
944+
}
945+
946+
@Override
947+
public String executor() {
948+
return ThreadPool.Names.SAME;
949+
}
950+
});
951+
}
952+
964953
public void handleApplyCommit(DiscoveryNode sourceNode, ApplyCommit applyCommit) {
965954
logger.trace("handleApplyCommit: applying {} from [{}]", applyCommit, sourceNode);
966955
consensusState.handleCommit(applyCommit);
@@ -1004,6 +993,9 @@ public OfferJoin handleSeekJoins(DiscoveryNode sender, SeekJoins seekJoins) {
1004993
consensusState.getCurrentTerm(), seekJoins, sender, newTerm);
1005994
sendStartJoin(new StartJoinRequest(newTerm));
1006995
throw new ConsensusMessageRejectedException("I'm still a leader");
996+
997+
// TODO what about a node that sent a join to a different node in our term? Is it now stuck until the next term?
998+
1007999
} else {
10081000
// TODO: remove this once we have a discovery layer. If a node finds an active master node during discovery,
10091001
// it will try to join that one, and not start seeking joins.
@@ -1464,15 +1456,15 @@ public void handleResponse(LeaderCheckResponse leaderCheckResponse) {
14641456

14651457
final long leaderVersion = leaderCheckResponse.getVersion();
14661458
long localVersion = getLastCommittedState().map(ClusterState::getVersion).orElse(-1L);
1467-
if (leaderVersion > localVersion) {
1459+
if (leaderVersion > localVersion && running) {
14681460
logger.trace("LeaderCheck.handleResponse: heartbeat for version {} > local version {}, starting lag detector",
14691461
leaderVersion, localVersion);
14701462
futureExecutor.schedule(publishTimeout, "LeaderCheck#lagDetection", () -> {
14711463
long localVersion2 = getLastCommittedState().map(ClusterState::getVersion).orElse(-1L);
1472-
if (leaderVersion > localVersion2) {
1464+
if (leaderVersion > localVersion2 && running) {
14731465
logger.debug("LeaderCheck.handleResponse: lag detected: local version {} < leader version {} after {}",
14741466
localVersion2, leaderVersion, publishTimeout);
1475-
becomeCandidate("LeaderCheck.handleResponse");
1467+
laggingUntilCommittedVersionExceeds = localVersion2;
14761468
}
14771469
});
14781470
}

server/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java

Lines changed: 84 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import static org.elasticsearch.discovery.zen2.Legislator.CONSENSUS_HEARTBEAT_TIMEOUT_SETTING;
7676
import static org.elasticsearch.discovery.zen2.Legislator.CONSENSUS_LEADER_CHECK_RETRY_COUNT_SETTING;
7777
import static org.elasticsearch.discovery.zen2.Legislator.CONSENSUS_MIN_DELAY_SETTING;
78+
import static org.elasticsearch.discovery.zen2.Legislator.CONSENSUS_PUBLISH_TIMEOUT_SETTING;
7879
import static org.hamcrest.Matchers.equalTo;
7980
import static org.hamcrest.Matchers.greaterThan;
8081
import static org.hamcrest.core.Is.is;
@@ -407,6 +408,57 @@ public void testFastRemovalWhenFollowerDropsConnections() {
407408
cluster.assertUniqueLeaderAndExpectedModes();
408409
}
409410

411+
@TestLogging("org.elasticsearch.discovery.zen2:TRACE")
412+
public void testLagDetectionCausesRejoin() {
413+
Cluster cluster = new Cluster(3);
414+
cluster.runRandomly(true);
415+
cluster.stabilise();
416+
ClusterNode leader = cluster.getAnyLeader();
417+
418+
final VotingConfiguration allNodes = new VotingConfiguration(
419+
cluster.clusterNodes.stream().map(cn -> cn.localNode.getId()).collect(Collectors.toSet()));
420+
421+
// TODO: have the following automatically done as part of a reconfiguration subsystem
422+
if (leader.legislator.hasElectionQuorum(allNodes) == false) {
423+
logger.info("--> leader does not have a join quorum for the new configuration, abdicating to self");
424+
// abdicate to self to acquire all join votes
425+
leader.legislator.abdicateTo(leader.localNode);
426+
427+
cluster.stabilise();
428+
leader = cluster.getAnyLeader();
429+
}
430+
431+
logger.info("--> start of reconfiguration to make all nodes into voting nodes");
432+
433+
leader.handleClientValue(ConsensusStateTests.nextStateWithConfig(leader.legislator.getLastAcceptedState(), allNodes));
434+
cluster.deliverNextMessageUntilQuiescent();
435+
436+
logger.info("--> end of reconfiguration to make all nodes into voting nodes");
437+
438+
for (final ClusterNode clusterNode : cluster.clusterNodes) {
439+
if (clusterNode != leader) {
440+
logger.info("--> disconnecting {}", clusterNode.getLocalNode());
441+
clusterNode.isConnected = false;
442+
break;
443+
}
444+
}
445+
446+
leader.handleClientValue(ConsensusStateTests.nextStateWithValue(leader.legislator.getLastAcceptedState(), randomLong()));
447+
cluster.deliverNextMessageUntilQuiescent();
448+
449+
for (final ClusterNode clusterNode : cluster.clusterNodes) {
450+
logger.info("--> reconnecting {}", clusterNode.getLocalNode());
451+
clusterNode.isConnected = true;
452+
}
453+
454+
cluster.stabilise();
455+
456+
// Furthermore the first one to wake up causes an election to complete successfully, because we run to quiescence
457+
// before waking any other nodes up. Therefore the cluster has a unique leader and all connected nodes are FOLLOWERs.
458+
cluster.assertConsistentStates();
459+
cluster.assertUniqueLeaderAndExpectedModes();
460+
}
461+
410462
class Cluster {
411463
private final List<ClusterNode> clusterNodes;
412464
private final List<InFlightMessage> inFlightMessages = new ArrayList<>();
@@ -416,20 +468,38 @@ class Cluster {
416468
private static final long RANDOM_MODE_DELAY_VARIABILITY = 10000L;
417469
private long masterServicesTaskId = 0L;
418470

419-
// How long to wait? The worst case is that a leader just committed a value to all the other nodes, and then
420-
// dropped off the network, which would mean that all the other nodes must detect its failure. It takes
421-
// CONSENSUS_LEADER_CHECK_RETRY_COUNT_SETTING consecutive leader checks to fail before a follower becomes a
422-
// candidates, and with an unresponsive leader each leader check takes up to
423-
// CONSENSUS_HEARTBEAT_DELAY_SETTING + CONSENSUS_HEARTBEAT_TIMEOUT_SETTING. After all the retries have
424-
// failed, nodes wake up, become candidates, and wait for up to 2 * CONSENSUS_MIN_DELAY_SETTING before
425-
// attempting an election. The first election is expected to succeed, however, because we run to quiescence
426-
// before waking any other nodes up.
427-
private final long DEFAULT_STABILISATION_TIME =
428-
(CONSENSUS_HEARTBEAT_DELAY_SETTING.get(Settings.EMPTY).millis() +
429-
CONSENSUS_HEARTBEAT_TIMEOUT_SETTING.get(Settings.EMPTY).millis()) *
430-
CONSENSUS_LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY) +
431-
2 * CONSENSUS_MIN_DELAY_SETTING.get(Settings.EMPTY).millis() +
432-
RANDOM_MODE_DELAY_VARIABILITY + DEFAULT_DELAY_VARIABILITY;
471+
// How long does it take for the cluster to stabilise?
472+
473+
// Each heartbeat takes at most this long:
474+
private final long DEFAULT_MAX_HEARTBEAT_TIME
475+
= CONSENSUS_HEARTBEAT_DELAY_SETTING.get(Settings.EMPTY).millis()
476+
+ CONSENSUS_HEARTBEAT_TIMEOUT_SETTING.get(Settings.EMPTY).millis()
477+
+ 2 * DEFAULT_DELAY_VARIABILITY;
478+
// Multiple heartbeat failures are needed before the leader's failure is detected:
479+
private final long DEFAULT_MAX_FAILURE_DETECTION_TIME
480+
= DEFAULT_MAX_HEARTBEAT_TIME * CONSENSUS_LEADER_CHECK_RETRY_COUNT_SETTING.get(Settings.EMPTY);
481+
// When stabilising, there are no election collisions, because we run to quiescence before waking any other nodes up.
482+
// Therefore elections takes this long:
483+
private final long DEFAULT_ELECTION_TIME
484+
= 2 * (CONSENSUS_MIN_DELAY_SETTING.get(Settings.EMPTY).millis() + DEFAULT_DELAY_VARIABILITY);
485+
// Lag detection takes this long to notice that a follower is lagging the leader:
486+
private final long DEFAULT_LAG_DETECTION_TIME
487+
= CONSENSUS_HEARTBEAT_DELAY_SETTING.get(Settings.EMPTY).millis() // before the heartbeat that hears about the lag
488+
+ CONSENSUS_PUBLISH_TIMEOUT_SETTING.get(Settings.EMPTY).millis() // waiting for the lag
489+
+ 2 * DEFAULT_DELAY_VARIABILITY;
490+
491+
// Worst cases for stabilisation:
492+
//
493+
// 1. Just before stabilisation there was a leader which committed a value and then dropped off the network. All nodes must first
494+
// detect its failure and then elect a new one.
495+
//
496+
// 2. Just before stabilisation the leader published a value which wasn't received by one of the followers. The follower's lag
497+
// detection must notice this omission, then the master must notice the node is rejecting heartbeats and remove it from the cluster,
498+
// then the follower must notice it is missing from the cluster and rejoin.
499+
//
500+
private final long DEFAULT_STABILISATION_TIME = RANDOM_MODE_DELAY_VARIABILITY
501+
+ Math.max(DEFAULT_MAX_FAILURE_DETECTION_TIME + DEFAULT_ELECTION_TIME, // case 1
502+
DEFAULT_LAG_DETECTION_TIME + DEFAULT_MAX_FAILURE_DETECTION_TIME + DEFAULT_MAX_HEARTBEAT_TIME); // case 2
433503

434504
Cluster(int nodeCount) {
435505
clusterNodes = new ArrayList<>(nodeCount);

0 commit comments

Comments
 (0)