Skip to content

CoordinatorTests sometimes needs three term bumps #79574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testScheduling() {
final ClusterApplierService clusterApplierService = new ClusterApplierService("test", settings, clusterSettings, threadPool) {
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor("mock-executor", deterministicTaskQueue, threadPool);
return new MockSinglePrioritizingExecutor("mock-executor", "", deterministicTaskQueue, threadPool);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1628,7 +1628,7 @@ private final class TestClusterNode {
new ClusterApplierService(node.getName(), settings, clusterSettings, threadPool) {
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
return new MockSinglePrioritizingExecutor(node.getName(), node.getId(), deterministicTaskQueue, threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ protected static int defaultInt(Setting<Integer> setting) {
+ DEFAULT_ELECTION_DELAY
// perhaps there is an election collision requiring another publication (which times out) and a term bump
+ defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
// very rarely there is another election collision requiring another publication (which times out) and a term bump
+ defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
// then wait for the new leader to notice that the old leader is unresponsive
+ (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING)) * defaultInt(
FOLLOWER_CHECK_RETRY_COUNT_SETTING
Expand Down Expand Up @@ -1165,6 +1167,7 @@ protected void onSendRequest(
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterApplierService = new DisruptableClusterApplierService(
localNode.getId(),
localNode.getEphemeralId(),
settings,
clusterSettings,
deterministicTaskQueue,
Expand Down Expand Up @@ -1612,20 +1615,23 @@ public void onNodeAck(DiscoveryNode node, Exception e) {

static class DisruptableClusterApplierService extends ClusterApplierService {
private final String nodeName;
private final String nodeId;
private final DeterministicTaskQueue deterministicTaskQueue;
private final ThreadPool threadPool;
ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED;
private boolean applicationMayFail;

DisruptableClusterApplierService(
String nodeName,
String nodeId,
Settings settings,
ClusterSettings clusterSettings,
DeterministicTaskQueue deterministicTaskQueue,
ThreadPool threadPool
) {
super(nodeName, settings, clusterSettings, threadPool);
this.nodeName = nodeName;
this.nodeId = nodeId;
this.deterministicTaskQueue = deterministicTaskQueue;
this.threadPool = threadPool;
addStateApplier(event -> {
Expand All @@ -1648,7 +1654,7 @@ static class DisruptableClusterApplierService extends ClusterApplierService {

@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor(nodeName, deterministicTaskQueue, threadPool);
return new MockSinglePrioritizingExecutor(nodeName, nodeId, deterministicTaskQueue, threadPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,40 @@
*/
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.CloseableThreadContext;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.util.concurrent.DeterministicTaskQueue.NODE_ID_LOG_CONTEXT_KEY;

/**
* Mock single threaded {@link PrioritizedEsThreadPoolExecutor} based on {@link DeterministicTaskQueue},
* simulating the behaviour of an executor returned by {@link EsExecutors#newSinglePrioritizing}.
*/
public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecutor {

public MockSinglePrioritizingExecutor(String name, DeterministicTaskQueue deterministicTaskQueue, ThreadPool threadPool) {
super(name, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() {
public MockSinglePrioritizingExecutor(
String nodeName,
String nodeId,
DeterministicTaskQueue deterministicTaskQueue,
ThreadPool threadPool
) {
super(nodeName, 0, 1, 0L, TimeUnit.MILLISECONDS, r -> new Thread() {
@Override
public void start() {
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
try {
try (
CloseableThreadContext.Instance ignored = CloseableThreadContext.put(
NODE_ID_LOG_CONTEXT_KEY,
'{' + nodeName + "}{" + nodeId + '}'
)
) {
r.run();
} catch (KillWorkerError kwe) {
// hacks everywhere
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ public class MockSinglePrioritizingExecutorTests extends ESTestCase {
public void testPrioritizedEsThreadPoolExecutor() {
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();

final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor("test", taskQueue, taskQueue.getThreadPool());
final PrioritizedEsThreadPoolExecutor executor = new MockSinglePrioritizingExecutor(
"test",
"",
taskQueue,
taskQueue.getThreadPool()
);
final AtomicBoolean called1 = new AtomicBoolean();
final AtomicBoolean called2 = new AtomicBoolean();
executor.execute(new PrioritizedRunnable(Priority.NORMAL) {
Expand Down