Skip to content

Commit 9797257

Browse files
committed
Drain master task queue when stabilising (#42504)
Today the default stabilisation time is calculated on the assumption that the elected master has no pending tasks to process when it is elected, but this is not a safe assumption to make. This can result in a cluster reaching the end of its stabilisation time without having stabilised. Furthermore in #36943 we increased the probability that each step in `runRandomly()` enqueues another task, vastly increasing the chance that we hit such a situation. This change extends the stabilisation process to allow time for all pending tasks, plus a task that might currently be in flight. Fixes #41967, in which the master entered the stabilisation phase with over 800 tasks to process.
1 parent 7078dc5 commit 9797257

File tree

3 files changed

+11
-2
lines changed

3 files changed

+11
-2
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1264,7 +1264,7 @@ public void run() {
12641264

12651265
@Override
12661266
public String toString() {
1267-
return "scheduled timeout for " + this;
1267+
return "scheduled timeout for " + CoordinatorPublication.this;
12681268
}
12691269
}, publishTimeout, Names.GENERIC);
12701270
}

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -1065,7 +1065,6 @@ public void testCannotJoinClusterWithDifferentUUID() throws IllegalAccessExcepti
10651065
cluster1.stabilise();
10661066
}
10671067

1068-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41967")
10691068
public void testDiscoveryUsesNodesFromLastClusterState() {
10701069
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
10711070
cluster.runRandomly();
@@ -1517,6 +1516,10 @@ void stabilise(long stabilisationDurationMillis) {
15171516

15181517
final ClusterNode leader = getAnyLeader();
15191518
final long leaderTerm = leader.coordinator.getCurrentTerm();
1519+
1520+
final int pendingTaskCount = leader.masterService.getFakeMasterServicePendingTaskCount();
1521+
runFor((pendingTaskCount + 1) * DEFAULT_CLUSTER_STATE_UPDATE_DELAY, "draining task queue");
1522+
15201523
final Matcher<Long> isEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion());
15211524
final String leaderId = leader.getId();
15221525

@@ -1529,6 +1532,8 @@ void stabilise(long stabilisationDurationMillis) {
15291532
assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress());
15301533

15311534
if (clusterNode == leader) {
1535+
assertThat(nodeId + " is still the leader", clusterNode.coordinator.getMode(), is(LEADER));
1536+
assertThat(nodeId + " did not change term", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
15321537
continue;
15331538
}
15341539

server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java

+4
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ public void execute(Runnable command) {
8484
};
8585
}
8686

87+
public int getFakeMasterServicePendingTaskCount() {
88+
return pendingTasks.size();
89+
}
90+
8791
private void scheduleNextTaskIfNecessary() {
8892
if (taskInProgress == false && pendingTasks.isEmpty() == false && scheduledNextTask == false) {
8993
scheduledNextTask = true;

0 commit comments

Comments
 (0)