Skip to content

[Zen2] Add safety phase to CoordinatorTests #34241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
aa41f6e
Introduce runRandomly
DaveCTurner Oct 2, 2018
24b4a54
Ignore publications from self if no longer leading
DaveCTurner Oct 2, 2018
29cab38
Must become follower after successfully processing publish request
DaveCTurner Oct 2, 2018
289d39f
Fix log messages
DaveCTurner Oct 2, 2018
a556784
Must guard peer removal
DaveCTurner Oct 2, 2018
91b0314
Timeout RequestPeersRequests
DaveCTurner Oct 2, 2018
19bd1ae
Reset port counter more frequently
DaveCTurner Oct 2, 2018
4d717b9
Remove failing assertion for now
DaveCTurner Oct 2, 2018
2b9883e
Add comment describing why we reject a publication here
DaveCTurner Oct 2, 2018
c4ffe22
Re-qualify mode
DaveCTurner Oct 2, 2018
c66c4b0
Revert
DaveCTurner Oct 2, 2018
a4d5c74
No need for this guard without the affected assertion
DaveCTurner Oct 2, 2018
f777097
Add test that PeersRequest has a timeout
DaveCTurner Oct 2, 2018
3a7fc1d
Line length
DaveCTurner Oct 2, 2018
d1b2535
Merge branch 'zen2' into 2018-10-02-run-randomly
DaveCTurner Oct 3, 2018
76c1d05
Rework stabilisation assertions
DaveCTurner Oct 3, 2018
0b180f0
Term mismatch is rejected by CoordinatorState
DaveCTurner Oct 3, 2018
c14cf7a
Register setting
DaveCTurner Oct 3, 2018
ff33d68
Reduce default for discovery.request_peers_timeout to 3s
DaveCTurner Oct 3, 2018
e5bb9b3
Execute directly, no need for mutex
DaveCTurner Oct 3, 2018
af227e7
Only runRandomly if no disruptions in place
DaveCTurner Oct 3, 2018
f8d5458
Only log changes to connected state
DaveCTurner Oct 3, 2018
78757f9
Revamp disruption-management logic
DaveCTurner Oct 3, 2018
cf7047a
Fix description of publication
DaveCTurner Oct 3, 2018
a470eca
Fix order of requestId/action in request description
DaveCTurner Oct 3, 2018
dc2bddc
Remove bogus assertion
DaveCTurner Oct 3, 2018
257eb85
No need for this method any more
DaveCTurner Oct 3, 2018
1d9a917
More state in assertion message
DaveCTurner Oct 3, 2018
1a58b48
Stand down as master in rare condition
DaveCTurner Oct 3, 2018
b7a5328
Revert "Stand down as master in rare condition"
DaveCTurner Oct 3, 2018
34eb71b
Move assertion
DaveCTurner Oct 3, 2018
98a24f4
Timeout join requests
DaveCTurner Oct 3, 2018
a9f3e00
Use assertThat for better error messages
DaveCTurner Oct 3, 2018
29ad624
Merge branch 'zen2' into 2018-10-02-run-randomly
DaveCTurner Oct 3, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,14 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
synchronized (mutex) {
final DiscoveryNode sourceNode = publishRequest.getAcceptedState().nodes().getMasterNode();
logger.trace("handlePublishRequest: handling [{}] from [{}]", publishRequest, sourceNode);

if (sourceNode.equals(getLocalNode())
&& (mode != Mode.LEADER || getCurrentTerm() != publishRequest.getAcceptedState().term())) {
// Rare case in which we stood down as leader between starting this publication and receiving it ourselves. The publication
// is already failed so there is no point in proceeding.
throw new CoordinationStateRejectedException("no longer leading this publication's term: " + publishRequest);
}

ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);

Expand Down Expand Up @@ -600,7 +608,7 @@ protected void onCompletion(boolean committed) {
currentPublication = Optional.empty();
updateMaxTermSeen(getCurrentTerm()); // triggers term bump if new term was found during publication

localNodeAckEvent.addListener(new ActionListener<Void>() {
localNodeAckEvent.addListener(wrapWithMutex(new ActionListener<Void>() {
@Override
public void onResponse(Void ignore) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
Expand Down Expand Up @@ -628,7 +636,7 @@ public void onFailure(Exception e) {
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
publishListener.onFailure(exception);
}
}, transportService.getThreadPool().generic());
}), transportService.getThreadPool().generic());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected synchronized void done() {

private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
try {
executorService.submit(new Runnable() {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Expand Down
20 changes: 15 additions & 5 deletions server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

Expand All @@ -59,7 +60,12 @@ public abstract class PeerFinder extends AbstractComponent {
Setting.timeSetting("discovery.find_peers_interval",
TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

private final TimeValue findPeersDelay;
public static final Setting<TimeValue> DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING =
Setting.timeSetting("discovery.request_peers_timeout",
TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

private final TimeValue findPeersInterval;
private final TimeValue requestPeersTimeout;

private final Object mutex = new Object();
private final TransportService transportService;
Expand All @@ -75,7 +81,8 @@ public abstract class PeerFinder extends AbstractComponent {
public PeerFinder(Settings settings, TransportService transportService, TransportAddressConnector transportAddressConnector,
ConfiguredHostsResolver configuredHostsResolver) {
super(settings);
findPeersDelay = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
this.transportAddressConnector = transportAddressConnector;
this.configuredHostsResolver = configuredHostsResolver;
Expand Down Expand Up @@ -241,7 +248,7 @@ private boolean handleWakeUp() {
}
});

transportService.getThreadPool().schedule(findPeersDelay, Names.GENERIC, new AbstractRunnable() {
transportService.getThreadPool().schedule(findPeersInterval, Names.GENERIC, new AbstractRunnable() {
@Override
public boolean isForceExecution() {
return true;
Expand Down Expand Up @@ -360,9 +367,11 @@ public void onFailure(Exception e) {
});
}

private void removePeer() {
void removePeer() {
final Peer removed = peersByAddress.remove(transportAddress);
assert removed == Peer.this;
// assert removed == Peer.this : removed + " != " + Peer.this;
// ^ This assertion sometimes trips if we are deactivated and reactivated while a request is in flight.
// TODO be more careful about avoiding multiple active Peer objects for each address
}

private void requestPeers() {
Expand All @@ -380,6 +389,7 @@ private void requestPeers() {

transportService.sendRequest(discoveryNode, REQUEST_PEERS_ACTION_NAME,
new PeersRequest(getLocalNode(), knownNodes),
TransportRequestOptions.builder().withTimeout(requestPeersTimeout).build(),
new TransportResponseHandler<PeersResponse>() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matcher;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
Expand All @@ -65,6 +68,7 @@
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.value;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.FOLLOWER;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.LEADER;
import static org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.DEFAULT_DELAY_VARIABILITY;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_BACK_OFF_TIME_SETTING;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING;
Expand All @@ -86,8 +90,14 @@
@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE")
public class CoordinatorTests extends ESTestCase {

@Before
public void resetPortCounterBeforeEachTest() {
resetPortCounter();
}

public void testCanUpdateClusterStateAfterStabilisation() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
cluster.stabilise();

final ClusterNode leader = cluster.getAnyLeader();
Expand All @@ -106,6 +116,7 @@ public void testCanUpdateClusterStateAfterStabilisation() {

public void testNodesJoinAfterStableCluster() {
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
cluster.runRandomly();
cluster.stabilise();

final long currentTerm = cluster.getAnyLeader().coordinator.getCurrentTerm();
Expand All @@ -125,6 +136,7 @@ public void testNodesJoinAfterStableCluster() {

public void testLeaderDisconnectionDetectedQuickly() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();

final ClusterNode originalLeader = cluster.getAnyLeader();
Expand Down Expand Up @@ -153,13 +165,14 @@ public void testLeaderDisconnectionDetectedQuickly() {
+ DEFAULT_DELAY_VARIABILITY
// then wait for the removal to be committed
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
));
));

assertThat(cluster.getAnyLeader().getId(), not(equalTo(originalLeader.getId())));
}

public void testUnresponsiveLeaderDetectedEventually() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();

final ClusterNode originalLeader = cluster.getAnyLeader();
Expand Down Expand Up @@ -189,6 +202,7 @@ public void testUnresponsiveLeaderDetectedEventually() {

public void testFollowerDisconnectionDetectedQuickly() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();

final ClusterNode leader = cluster.getAnyLeader();
Expand Down Expand Up @@ -220,6 +234,7 @@ public void testFollowerDisconnectionDetectedQuickly() {

public void testUnresponsiveFollowerDetectedEventually() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();

final ClusterNode leader = cluster.getAnyLeader();
Expand Down Expand Up @@ -289,6 +304,7 @@ private static String nodeIdFromIndex(int nodeIndex) {

class Cluster {

static final long EXTREME_DELAY_VARIABILITY = 10000L;
static final long DEFAULT_DELAY_VARIABILITY = 100L;

final List<ClusterNode> clusterNodes;
Expand All @@ -299,6 +315,7 @@ class Cluster {

private final Set<String> disconnectedNodes = new HashSet<>();
private final Set<String> blackholedNodes = new HashSet<>();
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();

Cluster(int initialNodeCount) {
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
Expand Down Expand Up @@ -328,6 +345,100 @@ void addNodes(int newNodesCount) {
}
}

void runRandomly() {

final int randomSteps = scaledRandomIntBetween(10, 10000);
logger.info("--> start of safety phase of at least [{}] steps", randomSteps);

deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY);
int step = 0;
long finishTime = -1;

while (finishTime == -1 || deterministicTaskQueue.getCurrentTimeMillis() <= finishTime) {
step++;
if (randomSteps <= step && finishTime == -1) {
finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime();
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}]", step, finishTime);
}

try {
if (rarely()) {
final ClusterNode clusterNode = getAnyNodePreferringLeaders();
final int newValue = randomInt();
logger.debug("----> [runRandomly {}] proposing new value [{}] to [{}]", step, newValue, clusterNode.getId());
clusterNode.submitValue(newValue);
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();
logger.debug("----> [runRandomly {}] forcing {} to become candidate", step, clusterNode.getId());
synchronized (clusterNode.coordinator.mutex) {
clusterNode.coordinator.becomeCandidate("runRandomly");
}
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();
final String id = clusterNode.getId();
disconnectedNodes.remove(id);
blackholedNodes.remove(id);

switch (randomInt(2)) {
case 0:
logger.debug("----> [runRandomly {}] connecting {}", step, id);
break;
case 1:
logger.debug("----> [runRandomly {}] disconnecting {}", step, id);
disconnectedNodes.add(id);
break;
case 2:
logger.debug("----> [runRandomly {}] blackholing {}", step, id);
blackholedNodes.add(id);
break;
}
} else {
if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) {
deterministicTaskQueue.advanceTime();
} else if (deterministicTaskQueue.hasRunnableTasks()) {
deterministicTaskQueue.runRandomTask();
}
}

// TODO other random steps:
// - reboot a node
// - abdicate leadership
// - bootstrap

} catch (CoordinationStateRejectedException ignored) {
// This is ok: it just means a message couldn't currently be handled.
}

assertConsistentStates();
}

disconnectedNodes.clear();
blackholedNodes.clear();
}

private void assertConsistentStates() {
for (final ClusterNode clusterNode : clusterNodes) {
clusterNode.coordinator.invariant();
}
updateCommittedStates();
}

private void updateCommittedStates() {
for (final ClusterNode clusterNode : clusterNodes) {
Optional<ClusterState> committedState = clusterNode.coordinator.getLastCommittedState();
if (committedState.isPresent()) {
ClusterState storedState = committedStatesByVersion.get(committedState.get().getVersion());
if (storedState == null) {
committedStatesByVersion.put(committedState.get().getVersion(), committedState.get());
} else {
assertEquals("expected " + committedState.get() + " but got " + storedState,
value(committedState.get()), value(storedState));
}
}
}
}

void stabilise() {
stabilise(DEFAULT_STABILISATION_TIME);
}
Expand Down Expand Up @@ -389,19 +500,20 @@ private void assertUniqueLeaderAndExpectedModes() {

final String nodeId = clusterNode.getId();

if (disconnectedNodes.contains(nodeId) || blackholedNodes.contains(nodeId)) {
assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE));
} else {
if (isConnectedPair(leader, clusterNode)) {
assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
// TODO assert that all nodes have actually voted for the leader in this term
// TODO assert that this node has actually voted for the leader in this term
// TODO assert that this node's accepted and committed states are the same as the leader's

assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
assertThat(nodeId + " is at the same accepted version as the leader",
Optional.of(clusterNode.coordinator.getLastAcceptedState().getVersion()), isPresentAndEqualToLeaderVersion);
assertThat(nodeId + " is at the same committed version as the leader",
clusterNode.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion);
assertThat(clusterNode.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
assertThat(nodeId + " is in the leader's committed state",
leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
equalTo(Optional.of(true)));
} else {
assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE));
assertThat(nodeId + " is not in the leader's committed state",
leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)),
equalTo(Optional.of(false)));
}
}

Expand All @@ -428,6 +540,10 @@ private ConnectionStatus getConnectionStatus(DiscoveryNode sender, DiscoveryNode
return connectionStatus;
}

ClusterNode getAnyNode() {
return getAnyNodeExcept();
}

ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) {
Set<String> forbiddenIds = Arrays.stream(clusterNodes).map(ClusterNode::getId).collect(Collectors.toSet());
List<ClusterNode> acceptableNodes
Expand All @@ -436,6 +552,16 @@ ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) {
return randomFrom(acceptableNodes);
}

ClusterNode getAnyNodePreferringLeaders() {
for (int i = 0; i < 3; i++) {
ClusterNode clusterNode = getAnyNode();
if (clusterNode.coordinator.getMode() == LEADER) {
return clusterNode;
}
}
return getAnyNode();
}

class ClusterNode extends AbstractComponent {
private final int nodeIndex;
private Coordinator coordinator;
Expand Down Expand Up @@ -538,7 +664,7 @@ DiscoveryNode getLocalNode() {
}

boolean isLeader() {
return coordinator.getMode() == Coordinator.Mode.LEADER;
return coordinator.getMode() == LEADER;
}

void submitValue(final long value) {
Expand Down
Loading