Skip to content

Commit f8015f2

Browse files
committed
Drop node if asymmetrically partitioned from master (#39598)
When a node is joining the cluster we ensure that it can send requests to the master _at that time_. If it joins the cluster and _then_ loses the ability to send requests to the master then it should be removed from the cluster. Today this is not the case: the master can still receive responses to its follower checks, and receives acknowledgements to cluster state publications, so has no reason to remove the node. This commit changes the handling of follower checks so that they fail if they come from a master that the other node was following but which it now believes to have failed.
1 parent 2baf6a3 commit f8015f2

File tree

6 files changed

+191
-38
lines changed

6 files changed

+191
-38
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,14 @@ void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) {
229229
// where we would possibly have to remove the NO_MASTER_BLOCK from the applierState when turning a candidate back to follower.
230230
if (getLastAcceptedState().term() < getCurrentTerm()) {
231231
becomeFollower("onFollowerCheckRequest", followerCheckRequest.getSender());
232+
} else if (mode == Mode.FOLLOWER) {
233+
logger.trace("onFollowerCheckRequest: responding successfully to {}", followerCheckRequest);
234+
} else if (joinHelper.isJoinPending()) {
235+
logger.trace("onFollowerCheckRequest: rejoining master, responding successfully to {}", followerCheckRequest);
236+
} else {
237+
logger.trace("onFollowerCheckRequest: received check from faulty master, rejecting {}", followerCheckRequest);
238+
throw new CoordinationStateRejectedException(
239+
"onFollowerCheckRequest: received check from faulty master, rejecting " + followerCheckRequest);
232240
}
233241
}
234242
}
@@ -460,7 +468,7 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
460468

461469
// package private for tests
462470
void sendValidateJoinRequest(ClusterState stateForJoinValidation, JoinRequest joinRequest,
463-
JoinHelper.JoinCallback joinCallback) {
471+
JoinHelper.JoinCallback joinCallback) {
464472
// validate the join on the joining node, will throw a failure if it fails the validation
465473
joinHelper.sendValidateJoinRequest(joinRequest.getSourceNode(), stateForJoinValidation, new ActionListener<Empty>() {
466474
@Override

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ public String toString() {
189189
};
190190
}
191191

192+
boolean isJoinPending() {
193+
// cannot use pendingOutgoingJoins.isEmpty() because it's not properly synchronized.
194+
return pendingOutgoingJoins.iterator().hasNext();
195+
}
196+
192197
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
193198
sendJoinRequest(destination, optionalJoin, () -> {
194199
});

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -919,8 +919,9 @@ public void testStayCandidateAfterReceivingFollowerCheckFromKnownMaster() {
919919
nonLeader.coordinator.becomeCandidate("forced");
920920
}
921921
logger.debug("simulate follower check coming through from {} to {}", leader.getId(), nonLeader.getId());
922-
nonLeader.coordinator.onFollowerCheckRequest(new FollowersChecker.FollowerCheckRequest(leader.coordinator.getCurrentTerm(),
923-
leader.getLocalNode()));
922+
expectThrows(CoordinationStateRejectedException.class, () -> nonLeader.coordinator.onFollowerCheckRequest(
923+
new FollowersChecker.FollowerCheckRequest(leader.coordinator.getCurrentTerm(), leader.getLocalNode())));
924+
assertThat(nonLeader.coordinator.getMode(), equalTo(CANDIDATE));
924925
}).run();
925926
cluster.stabilise();
926927
}
@@ -1081,6 +1082,38 @@ public void testDiscoveryUsesNodesFromLastClusterState() {
10811082
cluster.stabilise();
10821083
}
10831084

1085+
public void testFollowerRemovedIfUnableToSendRequestsToMaster() {
1086+
final Cluster cluster = new Cluster(3);
1087+
cluster.runRandomly();
1088+
cluster.stabilise();
1089+
1090+
final ClusterNode leader = cluster.getAnyLeader();
1091+
final ClusterNode otherNode = cluster.getAnyNodeExcept(leader);
1092+
1093+
cluster.blackholeConnectionsFrom(otherNode, leader);
1094+
1095+
cluster.runFor(
1096+
(defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
1097+
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
1098+
+ (defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + DEFAULT_DELAY_VARIABILITY)
1099+
* defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING)
1100+
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY,
1101+
"awaiting removal of asymmetrically-partitioned node");
1102+
1103+
assertThat(leader.getLastAppliedClusterState().nodes().toString(),
1104+
leader.getLastAppliedClusterState().nodes().getSize(), equalTo(2));
1105+
1106+
cluster.clearBlackholedConnections();
1107+
1108+
cluster.stabilise(
1109+
// time for the disconnected node to find the master again
1110+
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING) * 2
1111+
// time for joining
1112+
+ 4 * DEFAULT_DELAY_VARIABILITY
1113+
// Then a commit of the updated cluster state
1114+
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
1115+
}
1116+
10841117
private static long defaultMillis(Setting<TimeValue> setting) {
10851118
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
10861119
}
@@ -1143,6 +1176,7 @@ class Cluster {
11431176

11441177
private final Set<String> disconnectedNodes = new HashSet<>();
11451178
private final Set<String> blackholedNodes = new HashSet<>();
1179+
private final Set<Tuple<String,String>> blackholedConnections = new HashSet<>();
11461180
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
11471181
private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker();
11481182
private final History history = new History();
@@ -1510,6 +1544,8 @@ private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode
15101544
connectionStatus = ConnectionStatus.BLACK_HOLE;
15111545
} else if (disconnectedNodes.contains(sender.getId()) || disconnectedNodes.contains(destination.getId())) {
15121546
connectionStatus = ConnectionStatus.DISCONNECTED;
1547+
} else if (blackholedConnections.contains(Tuple.tuple(sender.getId(), destination.getId()))) {
1548+
connectionStatus = ConnectionStatus.BLACK_HOLE_REQUESTS_ONLY;
15131549
} else if (nodeExists(sender) && nodeExists(destination)) {
15141550
connectionStatus = ConnectionStatus.CONNECTED;
15151551
} else {
@@ -1560,6 +1596,14 @@ void setEmptySeedHostsList() {
15601596
seedHostsList = emptyList();
15611597
}
15621598

1599+
void blackholeConnectionsFrom(ClusterNode sender, ClusterNode destination) {
1600+
blackholedConnections.add(Tuple.tuple(sender.getId(), destination.getId()));
1601+
}
1602+
1603+
void clearBlackholedConnections() {
1604+
blackholedConnections.clear();
1605+
}
1606+
15631607
class MockPersistedState implements PersistedState {
15641608
private final PersistedState delegate;
15651609
private final NodeEnvironment nodeEnvironment;

server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public void testJoinDeduplication() {
5151
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
5252
DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT);
5353

54+
assertFalse(joinHelper.isJoinPending());
55+
5456
// check that sending a join to node1 works
5557
Optional<Join> optionalJoin1 = randomBoolean() ? Optional.empty() :
5658
Optional.of(new Join(localNode, node1, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
@@ -60,6 +62,8 @@ public void testJoinDeduplication() {
6062
CapturedRequest capturedRequest1 = capturedRequests1[0];
6163
assertEquals(node1, capturedRequest1.node);
6264

65+
assertTrue(joinHelper.isJoinPending());
66+
6367
// check that sending a join to node2 works
6468
Optional<Join> optionalJoin2 = randomBoolean() ? Optional.empty() :
6569
Optional.of(new Join(localNode, node2, randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()));
@@ -95,5 +99,12 @@ public void testJoinDeduplication() {
9599
assertThat(capturedRequests2a.length, equalTo(1));
96100
CapturedRequest capturedRequest2a = capturedRequests2a[0];
97101
assertEquals(node2, capturedRequest2a.node);
102+
103+
// complete all the joins and check that isJoinPending is updated
104+
assertTrue(joinHelper.isJoinPending());
105+
capturingTransport.handleRemoteError(capturedRequest2.requestId, new CoordinationStateRejectedException("dummy"));
106+
capturingTransport.handleRemoteError(capturedRequest1a.requestId, new CoordinationStateRejectedException("dummy"));
107+
capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy"));
108+
assertFalse(joinHelper.isJoinPending());
98109
}
99110
}

test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,10 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
116116
destinationTransport.execute(action, new Runnable() {
117117
@Override
118118
public void run() {
119-
switch (getConnectionStatus(destinationTransport.getLocalNode())) {
119+
final ConnectionStatus connectionStatus = getConnectionStatus(destinationTransport.getLocalNode());
120+
switch (connectionStatus) {
120121
case BLACK_HOLE:
122+
case BLACK_HOLE_REQUESTS_ONLY:
121123
onBlackholedDuringSend(requestId, action, destinationTransport);
122124
break;
123125

@@ -128,6 +130,9 @@ public void run() {
128130
case CONNECTED:
129131
onConnectedDuringSend(requestId, action, request, destinationTransport);
130132
break;
133+
134+
default:
135+
throw new AssertionError("unexpected status: " + connectionStatus);
131136
}
132137
}
133138

@@ -197,11 +202,20 @@ public void sendResponse(final TransportResponse response) {
197202
execute(action, new Runnable() {
198203
@Override
199204
public void run() {
200-
if (destinationTransport.getConnectionStatus(getLocalNode()) != ConnectionStatus.CONNECTED) {
201-
logger.trace("dropping response to {}: channel is not CONNECTED",
202-
requestDescription);
203-
} else {
204-
handleResponse(requestId, response);
205+
final ConnectionStatus connectionStatus = destinationTransport.getConnectionStatus(getLocalNode());
206+
switch (connectionStatus) {
207+
case CONNECTED:
208+
case BLACK_HOLE_REQUESTS_ONLY:
209+
handleResponse(requestId, response);
210+
break;
211+
212+
case BLACK_HOLE:
213+
case DISCONNECTED:
214+
logger.trace("dropping response to {}: channel is {}", requestDescription, connectionStatus);
215+
break;
216+
217+
default:
218+
throw new AssertionError("unexpected status: " + connectionStatus);
205219
}
206220
}
207221

@@ -217,11 +231,20 @@ public void sendResponse(Exception exception) {
217231
execute(action, new Runnable() {
218232
@Override
219233
public void run() {
220-
if (destinationTransport.getConnectionStatus(getLocalNode()) != ConnectionStatus.CONNECTED) {
221-
logger.trace("dropping response to {}: channel is not CONNECTED",
222-
requestDescription);
223-
} else {
224-
handleRemoteError(requestId, exception);
234+
final ConnectionStatus connectionStatus = destinationTransport.getConnectionStatus(getLocalNode());
235+
switch (connectionStatus) {
236+
case CONNECTED:
237+
case BLACK_HOLE_REQUESTS_ONLY:
238+
handleRemoteError(requestId, exception);
239+
break;
240+
241+
case BLACK_HOLE:
242+
case DISCONNECTED:
243+
logger.trace("dropping exception response to {}: channel is {}", requestDescription, connectionStatus);
244+
break;
245+
246+
default:
247+
throw new AssertionError("unexpected status: " + connectionStatus);
225248
}
226249
}
227250

@@ -251,9 +274,29 @@ public String toString() {
251274
}
252275
}
253276

277+
/**
278+
* Response type from {@link DisruptableMockTransport#getConnectionStatus(DiscoveryNode)} indicating whether, and how, messages should
279+
* be disrupted on this transport.
280+
*/
254281
public enum ConnectionStatus {
282+
/**
283+
* No disruption: deliver messages normally.
284+
*/
255285
CONNECTED,
256-
DISCONNECTED, // network requests to or from this node throw a ConnectTransportException
257-
BLACK_HOLE // network traffic to or from the corresponding node is silently discarded
286+
287+
/**
288+
* Simulate disconnection: inbound and outbound messages throw a {@link ConnectTransportException}.
289+
*/
290+
DISCONNECTED,
291+
292+
/**
293+
* Simulate a blackhole partition: inbound and outbound messages are silently discarded.
294+
*/
295+
BLACK_HOLE,
296+
297+
/**
298+
* Simulate an asymmetric partition: outbound messages are silently discarded, but inbound messages are delivered normally.
299+
*/
300+
BLACK_HOLE_REQUESTS_ONLY
258301
}
259302
}

0 commit comments

Comments
 (0)