Skip to content

Commit d88a869

Browse files
CoordinatorTests sometimes needs three term bumps (#79574) (#80007)
Today we require the cluster to stabilise in a time period that allows time for the first election to encounter conflicts. However on very rare occasions there might be an election conflict in the second election too. This commit extends the stabilisation timeout to allow for this. Similar to #64462 Closes #78370 Co-authored-by: Elastic Machine <[email protected]>
1 parent 39e6db5 commit d88a869

File tree

5 files changed

+31
-7
lines changed

5 files changed

+31
-7
lines changed

server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void testScheduling() {
5959
final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) {
6060
@Override
6161
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
62-
return new MockSinglePrioritizingExecutor("mock-executor", deterministicTaskQueue, threadPool);
62+
return new MockSinglePrioritizingExecutor("mock-executor", "", deterministicTaskQueue, threadPool);
6363
}
6464
};
6565

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1628,7 +1628,7 @@ private final class TestClusterNode {
16281628
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
16291629
@Override
16301630
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1631-
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
1631+
return new MockSinglePrioritizingExecutor(node.getName(), node.getId(), deterministicTaskQueue, threadPool);
16321632
}
16331633

16341634
@Override

test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,8 @@ protected static int defaultInt(Setting<Integer> setting) {
236236
+ DEFAULT_ELECTION_DELAY
237237
// perhaps there is an election collision requiring another publication (which times out) and a term bump
238238
+ defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
239+
// very rarely there is another election collision requiring another publication (which times out) and a term bump
240+
+ defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
239241
// then wait for the new leader to notice that the old leader is unresponsive
240242
+ (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) * defaultInt(
241243
FOLLOWER_CHECK_RETRY_COUNT_SETTING
@@ -1165,6 +1167,7 @@ protected void onSendRequest(
11651167
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
11661168
clusterApplierService = new DisruptableClusterApplierService(
11671169
localNode.getId(),
1170+
localNode.getEphemeralId(),
11681171
settings,
11691172
clusterSettings,
11701173
deterministicTaskQueue,
@@ -1612,20 +1615,23 @@ public void onNodeAck(DiscoveryNode node, Exception e) {
16121615

16131616
static class DisruptableClusterApplierService extends ClusterApplierService {
16141617
private final String nodeName;
1618+
private final String nodeId;
16151619
private final DeterministicTaskQueue deterministicTaskQueue;
16161620
private final ThreadPool threadPool;
16171621
ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
16181622
private boolean applicationMayFail;
16191623

16201624
DisruptableClusterApplierService(
16211625
String nodeName,
1626+
String nodeId,
16221627
Settings settings,
16231628
ClusterSettings clusterSettings,
16241629
DeterministicTaskQueue deterministicTaskQueue,
16251630
ThreadPool threadPool
16261631
) {
16271632
super(nodeName, settings, clusterSettings, threadPool);
16281633
this.nodeName = nodeName;
1634+
this.nodeId = nodeId;
16291635
this.deterministicTaskQueue = deterministicTaskQueue;
16301636
this.threadPool = threadPool;
16311637
addStateApplier(event -> {
@@ -1648,7 +1654,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
16481654

16491655
@Override
16501656
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1651-
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
1657+
return new MockSinglePrioritizingExecutor(nodeName, nodeId, deterministicTaskQueue, threadPool);
16521658
}
16531659

16541660
@Override

test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,40 @@
77
*/
88
package org.elasticsearch.cluster.coordination;
99

10+
import org.apache.logging.log4j.CloseableThreadContext;
1011
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
1112
import org.elasticsearch.common.util.concurrent.EsExecutors;
1213
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
1314
import org.elasticsearch.threadpool.ThreadPool;
1415

1516
import java.util.concurrent.TimeUnit;
1617

18+
import static org.elasticsearch.common.util.concurrent.DeterministicTaskQueue.NODE_ID_LOG_CONTEXT_KEY;
19+
1720
/**
1821
* Mock single threaded {@link PrioritizedEsThreadPoolExecutor} based on {@link DeterministicTaskQueue},
1922
* simulating the behaviour of an executor returned by {@link EsExecutors#newSinglePrioritizing}.
2023
*/
2124
public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
2225

23-
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
24-
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() {
26+
public MockSinglePrioritizingExecutor(
27+
String nodeName,
28+
String nodeId,
29+
DeterministicTaskQueue deterministicTaskQueue,
30+
ThreadPool threadPool
31+
) {
32+
super(nodeName, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() {
2533
@Override
2634
public void start() {
2735
deterministicTaskQueue.scheduleNow(new Runnable() {
2836
@Override
2937
public void run() {
30-
try {
38+
try (
39+
CloseableThreadContext.Instance ignored = CloseableThreadContext.put(
40+
NODE_ID_LOG_CONTEXT_KEY,
41+
'{' + nodeName + "}{" + nodeId + '}'
42+
)
43+
) {
3144
r.run();
3245
} catch (KillWorkerError kwe) {
3346
// hacks everywhere

test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
2020
public void testPrioritizedEsThreadPoolExecutor() {
2121
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
2222

23-
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
23+
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor(
24+
"test",
25+
"",
26+
taskQueue,
27+
taskQueue.getThreadPool()
28+
);
2429
final AtomicBoolean called1 = new AtomicBoolean();
2530
final AtomicBoolean called2 = new AtomicBoolean();
2631
executor.execute(new PrioritizedRunnable(Priority.NORMAL) {

0 commit comments

Comments
 (0)