Skip to content

Commit a127805

Browse files
authored
[Zen2] Simulate scheduling delays (#34181)
Today we schedule tasks (both immediate and future ones) exactly when requested. In fact it is more realistic to allow for a small amount of delay in the scheduling of tasks, and this helps to exercise more interleavings of actions and therefore to improve test coverage. This change adds to the DeterministicTaskQueue the ability to add a random delay to the scheduling of tasks. This change also provides more explicit timeouts for stabilisation in the CoordinatorTests. Using the randomised scheduling feature in the CoordinatorTests also found a situation in which we could become a leader, then a candidate, and then a leader again very quickly, causing a clash of the _BECOME_MASTER_ and _FINISH_ELECTION_ tasks. We change their behaviour to not consider these duplicates to be problematic.
1 parent 412face commit a127805

File tree

19 files changed

+332
-155
lines changed

19 files changed

+332
-155
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ default void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
5858
* This allows groupd task description but the submitting source.
5959
*/
6060
default String describeTasks(List<T> tasks) {
61-
return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() == 0)::iterator);
61+
return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() > 0)::iterator);
6262
}
6363

6464
/**

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,8 @@ public void invariant() {
417417
synchronized (mutex) {
418418
final Optional<DiscoveryNode> peerFinderLeader = peerFinder.getLeader();
419419
assert peerFinder.getCurrentTerm() == getCurrentTerm();
420-
assert followersChecker.getFastResponseState().term == getCurrentTerm();
421-
assert followersChecker.getFastResponseState().mode == getMode();
420+
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
421+
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
422422
if (mode == Mode.LEADER) {
423423
final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm();
424424

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,9 @@ public void close(Mode newMode) {
287287

288288
final String stateUpdateSource = "elected-as-master ([" + pendingAsTasks.size() + "] nodes joined)";
289289

290-
pendingAsTasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source, e) -> {
290+
pendingAsTasks.put(JoinTaskExecutor.newBecomeMasterTask(), (source, e) -> {
291291
});
292-
pendingAsTasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, (source, e) -> {
292+
pendingAsTasks.put(JoinTaskExecutor.newFinishElectionTask(), (source, e) -> {
293293
});
294294
masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT),
295295
joinTaskExecutor);

server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

+24-11
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,17 @@ public String reason() {
6363
public String toString() {
6464
return node != null ? node + " " + reason : reason;
6565
}
66+
67+
public boolean isBecomeMasterTask() {
68+
return reason.equals(BECOME_MASTER_TASK_REASON);
69+
}
70+
71+
public boolean isFinishElectionTask() {
72+
return reason.equals(FINISH_ELECTION_TASK_REASON);
73+
}
74+
75+
private static final String BECOME_MASTER_TASK_REASON = "_BECOME_MASTER_TASK_";
76+
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
6677
}
6778

6879
public JoinTaskExecutor(AllocationService allocationService, Logger logger) {
@@ -78,10 +89,11 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
7889
boolean nodesChanged = false;
7990
ClusterState.Builder newState;
8091

81-
if (joiningNodes.size() == 1 && joiningNodes.get(0).equals(FINISH_ELECTION_TASK)) {
92+
if (joiningNodes.size() == 1 && joiningNodes.get(0).isFinishElectionTask()) {
8293
return results.successes(joiningNodes).build(currentState);
83-
} else if (currentNodes.getMasterNode() == null && joiningNodes.contains(BECOME_MASTER_TASK)) {
84-
assert joiningNodes.contains(FINISH_ELECTION_TASK) : "becoming a master but election is not finished " + joiningNodes;
94+
} else if (currentNodes.getMasterNode() == null && joiningNodes.stream().anyMatch(Task::isBecomeMasterTask)) {
95+
assert joiningNodes.stream().anyMatch(Task::isFinishElectionTask)
96+
: "becoming a master but election is not finished " + joiningNodes;
8597
// use these joins to try and become the master.
8698
// Note that we don't have to do any validation of the amount of joining nodes - the commit
8799
// during the cluster state publishing guarantees that we have enough
@@ -104,7 +116,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
104116
final boolean enforceMajorVersion = currentState.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false;
105117
// processing any joins
106118
for (final Task joinTask : joiningNodes) {
107-
if (joinTask.equals(BECOME_MASTER_TASK) || joinTask.equals(FINISH_ELECTION_TASK)) {
119+
if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
108120
// noop
109121
} else if (currentNodes.nodeExists(joinTask.node())) {
110122
logger.debug("received a join request for an existing node [{}]", joinTask.node());
@@ -146,7 +158,7 @@ private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState cu
146158
nodesBuilder.masterNodeId(currentState.nodes().getLocalNodeId());
147159

148160
for (final Task joinTask : joiningNodes) {
149-
if (joinTask.equals(BECOME_MASTER_TASK) || joinTask.equals(FINISH_ELECTION_TASK)) {
161+
if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
150162
// noop
151163
} else {
152164
final DiscoveryNode joiningNode = joinTask.node();
@@ -180,16 +192,17 @@ public boolean runOnlyOnMaster() {
180192
return false;
181193
}
182194

183-
/**
184-
* a task indicated that the current node should become master, if no current master is known
185-
*/
186-
public static final Task BECOME_MASTER_TASK = new Task(null, "_BECOME_MASTER_TASK_");
195+
public static Task newBecomeMasterTask() {
196+
return new Task(null, Task.BECOME_MASTER_TASK_REASON);
197+
}
187198

188199
/**
189200
* a task that is used to signal the election is stopped and we should process pending joins.
190-
* it may be use in combination with {@link JoinTaskExecutor#BECOME_MASTER_TASK}
201+
* it may be used in combination with {@link JoinTaskExecutor#newBecomeMasterTask()}
191202
*/
192-
public static final Task FINISH_ELECTION_TASK = new Task(null, "_FINISH_ELECTION_");
203+
public static Task newFinishElectionTask() {
204+
return new Task(null, Task.FINISH_ELECTION_TASK_REASON);
205+
}
193206

194207
/**
195208
* Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata

server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.elasticsearch.Version;
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.cluster.node.DiscoveryNode;
25+
import org.elasticsearch.common.Randomness;
26+
import org.elasticsearch.common.UUIDs;
2527
import org.elasticsearch.common.component.AbstractComponent;
2628
import org.elasticsearch.common.settings.Setting;
2729
import org.elasticsearch.common.settings.Settings;
@@ -69,7 +71,9 @@ protected void doRun() throws Exception {
6971

7072
// TODO if transportService is already connected to this address then skip the handshaking
7173

72-
final DiscoveryNode targetNode = new DiscoveryNode(transportAddress.toString(), transportAddress, emptyMap(),
74+
final DiscoveryNode targetNode = new DiscoveryNode("", transportAddress.toString(),
75+
UUIDs.randomBase64UUID(Randomness.get()), // generated deterministically for reproducible tests
76+
transportAddress.address().getHostString(), transportAddress.getAddress(), transportAddress, emptyMap(),
7377
emptySet(), Version.CURRENT.minimumCompatibilityVersion());
7478

7579
logger.trace("[{}] opening probe connection", this);

server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ public void handleResponse(PeersResponse response) {
411411
@Override
412412
public void handleException(TransportException exp) {
413413
peersRequestInFlight = false;
414-
logger.debug(new ParameterizedMessage("{} peers request failed", this), exp);
414+
logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp);
415415
}
416416

417417
@Override

server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -276,16 +276,16 @@ assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master b
276276
final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";
277277

278278
// noop listener, the election finished listener determines result
279-
tasks.put(JoinTaskExecutor.BECOME_MASTER_TASK, (source1, e) -> {});
280-
tasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, electionFinishedListener);
279+
tasks.put(JoinTaskExecutor.newBecomeMasterTask(), (source1, e) -> {});
280+
tasks.put(JoinTaskExecutor.newFinishElectionTask(), electionFinishedListener);
281281
masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
282282
}
283283

284284
public synchronized void closeAndProcessPending(String reason) {
285285
innerClose();
286286
Map<JoinTaskExecutor.Task, ClusterStateTaskListener> tasks = getPendingAsTasks(reason);
287287
final String source = "zen-disco-election-stop [" + reason + "]";
288-
tasks.put(JoinTaskExecutor.FINISH_ELECTION_TASK, electionFinishedListener);
288+
tasks.put(JoinTaskExecutor.newFinishElectionTask(), electionFinishedListener);
289289
masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
290290
}
291291

0 commit comments

Comments
 (0)