Skip to content

Commit 184cbc5

Browse files
committed
CoordinatorTests sometimes needs three term bumps
Today we require the cluster to stabilise in a time period that allows time for the first election to encounter conflicts. However on very rare occasions there might be an election conflict in the second election too. This commit extends the stabilisation timeout to allow for this. Similar to elastic#64462 Closes elastic#78370
1 parent 6aa0db8 commit 184cbc5

File tree

5 files changed

+44
-11
lines changed

5 files changed

+44
-11
lines changed

server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void testScheduling() {
5959
final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) {
6060
@Override
6161
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
62-
return new MockSinglePrioritizingExecutor("mock-executor", deterministicTaskQueue, threadPool);
62+
return new MockSinglePrioritizingExecutor("mock-executor", "", deterministicTaskQueue, threadPool);
6363
}
6464
};
6565

server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1628,7 +1628,7 @@ private final class TestClusterNode {
16281628
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
16291629
@Override
16301630
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1631-
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
1631+
return new MockSinglePrioritizingExecutor(node.getName(), node.getId(), deterministicTaskQueue, threadPool);
16321632
}
16331633

16341634
@Override

test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ protected static int defaultInt(Setting<Integer> setting) {
235235
+ DEFAULT_ELECTION_DELAY
236236
// perhaps there is an election collision requiring another publication (which times out) and a term bump
237237
+ defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
238+
// very rarely there is another election collision requiring another publication (which times out) and a term bump
239+
+ defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
238240
// then wait for the new leader to notice that the old leader is unresponsive
239241
+ (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
240242
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
@@ -1029,8 +1031,13 @@ protected void onSendRequest(
10291031
masterService = new AckedFakeThreadPoolMasterService(localNode.getId(), "test", threadPool,
10301032
runnable -> deterministicTaskQueue.scheduleNow(onNode(runnable)));
10311033
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
1032-
clusterApplierService = new DisruptableClusterApplierService(localNode.getId(), settings, clusterSettings,
1033-
deterministicTaskQueue, threadPool);
1034+
clusterApplierService = new DisruptableClusterApplierService(
1035+
localNode.getId(),
1036+
localNode.getEphemeralId(),
1037+
settings,
1038+
clusterSettings,
1039+
deterministicTaskQueue,
1040+
threadPool);
10341041
clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
10351042
clusterService.setNodeConnectionsService(
10361043
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService));
@@ -1442,15 +1449,23 @@ public void onNodeAck(DiscoveryNode node, Exception e) {
14421449

14431450
static class DisruptableClusterApplierService extends ClusterApplierService {
14441451
private final String nodeName;
1452+
private final String nodeId;
14451453
private final DeterministicTaskQueue deterministicTaskQueue;
14461454
private final ThreadPool threadPool;
14471455
ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
14481456
private boolean applicationMayFail;
14491457

1450-
DisruptableClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings,
1451-
DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
1458+
DisruptableClusterApplierService(
1459+
String nodeName,
1460+
String nodeId,
1461+
Settings settings,
1462+
ClusterSettings clusterSettings,
1463+
DeterministicTaskQueue deterministicTaskQueue,
1464+
ThreadPool threadPool
1465+
) {
14521466
super(nodeName, settings, clusterSettings, threadPool);
14531467
this.nodeName = nodeName;
1468+
this.nodeId = nodeId;
14541469
this.deterministicTaskQueue = deterministicTaskQueue;
14551470
this.threadPool = threadPool;
14561471
addStateApplier(event -> {
@@ -1470,7 +1485,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService {
14701485

14711486
@Override
14721487
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
1473-
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
1488+
return new MockSinglePrioritizingExecutor(nodeName, nodeId, deterministicTaskQueue, threadPool);
14741489
}
14751490

14761491
@Override

test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,30 @@
77
*/
88
package org.elasticsearch.cluster.coordination;
99

10+
import org.apache.logging.log4j.CloseableThreadContext;
1011
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
1112
import org.elasticsearch.common.util.concurrent.EsExecutors;
1213
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
1314
import org.elasticsearch.threadpool.ThreadPool;
1415

1516
import java.util.concurrent.TimeUnit;
1617

18+
import static org.elasticsearch.common.util.concurrent.DeterministicTaskQueue.NODE_ID_LOG_CONTEXT_KEY;
19+
1720
/**
1821
* Mock single threaded {@link PrioritizedEsThreadPoolExecutor} based on {@link DeterministicTaskQueue},
1922
* simulating the behaviour of an executor returned by {@link EsExecutors#newSinglePrioritizing}.
2023
*/
2124
public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {
2225

23-
public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
26+
public MockSinglePrioritizingExecutor(
27+
String nodeName,
28+
String nodeId,
29+
DeterministicTaskQueue deterministicTaskQueue,
30+
ThreadPool threadPool
31+
) {
2432
super(
25-
name,
33+
nodeName,
2634
0,
2735
1,
2836
0L,
@@ -34,7 +42,12 @@ public void start() {
3442
@Override
3543
public void run() {
3644
try {
37-
r.run();
45+
try (CloseableThreadContext.Instance ignored = CloseableThreadContext.put(
46+
NODE_ID_LOG_CONTEXT_KEY,
47+
'{' + nodeName + "}{" + nodeId + '}'
48+
)) {
49+
r.run();
50+
}
3851
} catch (KillWorkerError kwe) {
3952
// hacks everywhere
4053
}

test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
2020
public void testPrioritizedEsThreadPoolExecutor() {
2121
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
2222

23-
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
23+
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor(
24+
"test",
25+
"",
26+
taskQueue,
27+
taskQueue.getThreadPool()
28+
);
2429
final AtomicBoolean called1 = new AtomicBoolean();
2530
final AtomicBoolean called2 = new AtomicBoolean();
2631
executor.execute(new PrioritizedRunnable(Priority.NORMAL) {

0 commit comments

Comments
 (0)