diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index b1dae43648b6a..056bca7861823 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -59,7 +59,7 @@ public void testScheduling() { final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { - return new MockSinglePrioritizingExecutor("mock-executor", deterministicTaskQueue, threadPool); + return new MockSinglePrioritizingExecutor("mock-executor", "", deterministicTaskQueue, threadPool); } }; diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index d7ca3fbc93b4c..b65e145e4610f 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1628,7 +1628,7 @@ private final class TestClusterNode { new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { - return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool); + return new MockSinglePrioritizingExecutor(node.getName(), node.getId(), deterministicTaskQueue, threadPool); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 03b85c28933f1..6212b8ae924b2 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -236,6 +236,8 @@ protected static int defaultInt(Setting setting) { + DEFAULT_ELECTION_DELAY // perhaps there is an election collision requiring another publication (which times out) and a term bump + defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY + // very rarely there is another election collision requiring another publication (which times out) and a term bump + + defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY // then wait for the new leader to notice that the old leader is unresponsive + (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) * defaultInt( FOLLOWER_CHECK_RETRY_COUNT_SETTING @@ -1165,6 +1167,7 @@ protected void onSendRequest( final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterApplierService = new DisruptableClusterApplierService( localNode.getId(), + localNode.getEphemeralId(), settings, clusterSettings, deterministicTaskQueue, @@ -1612,6 +1615,7 @@ public void onNodeAck(DiscoveryNode node, Exception e) { static class DisruptableClusterApplierService extends ClusterApplierService { private final String nodeName; + private final String nodeId; private final DeterministicTaskQueue deterministicTaskQueue; private final ThreadPool threadPool; ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; @@ -1619,6 +1623,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService { DisruptableClusterApplierService( String nodeName, + String nodeId, Settings settings, ClusterSettings clusterSettings, DeterministicTaskQueue deterministicTaskQueue, @@ -1626,6 +1631,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService { ) { super(nodeName, settings, clusterSettings, threadPool); this.nodeName = nodeName; + this.nodeId = nodeId; this.deterministicTaskQueue = deterministicTaskQueue; this.threadPool = threadPool; addStateApplier(event -> { @@ -1648,7 +1654,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService { @Override protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { - return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool); + return new MockSinglePrioritizingExecutor(nodeName, nodeId, deterministicTaskQueue, threadPool); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java index e9234f812e487..7697de7508507 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutor.java @@ -7,6 +7,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.CloseableThreadContext; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; @@ -14,20 +15,32 @@ import java.util.concurrent.TimeUnit; +import static org.elasticsearch.common.util.concurrent.DeterministicTaskQueue.NODE_ID_LOG_CONTEXT_KEY; + /** * Mock single threaded {@link PrioritizedEsThreadPoolExecutor} based on {@link DeterministicTaskQueue}, * simulating the behaviour of an executor returned by {@link EsExecutors#newSinglePrioritizing}. */ public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor { - public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) { - super(name, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() { + public MockSinglePrioritizingExecutor( + String nodeName, + String nodeId, + DeterministicTaskQueue deterministicTaskQueue, + ThreadPool threadPool + ) { + super(nodeName, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() { @Override public void start() { deterministicTaskQueue.scheduleNow(new Runnable() { @Override public void run() { - try { + try ( + CloseableThreadContext.Instance ignored = CloseableThreadContext.put( + NODE_ID_LOG_CONTEXT_KEY, + '{' + nodeName + "}{" + nodeId + '}' + ) + ) { r.run(); } catch (KillWorkerError kwe) { // hacks everywhere diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java index 2594643604e93..7d351d820ff6e 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/MockSinglePrioritizingExecutorTests.java @@ -20,7 +20,12 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase { public void testPrioritizedEsThreadPoolExecutor() { final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); - final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool()); + final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor( + "test", + "", + taskQueue, + taskQueue.getThreadPool() + ); final AtomicBoolean called1 = new AtomicBoolean(); final AtomicBoolean called2 = new AtomicBoolean(); executor.execute(new PrioritizedRunnable(Priority.NORMAL) {