From 54632b6b9b209a6e5b0a7ecac673de67dd8cac54 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 2 Oct 2018 16:57:51 +0200 Subject: [PATCH 01/20] Hook in Cluster State Applier --- .../cluster/coordination/Coordinator.java | 129 ++++-- .../PublicationTransportHandler.java | 31 +- .../util/concurrent/ListenableFuture.java | 2 +- .../coordination/CoordinatorTests.java | 428 ++++++++++++++++-- .../cluster/coordination/NodeJoinTests.java | 15 +- .../cluster/FakeThreadPoolMasterService.java | 10 +- 6 files changed, 550 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 72e626399878e..006714c1edff5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -22,6 +22,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -30,6 +31,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; @@ -39,6 +42,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoverySettings; @@ -60,6 +64,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES; +import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; + public class Coordinator extends AbstractLifecycleComponent implements Discovery { // the timeout for the publication of each value @@ -77,6 +84,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery final Object mutex = new Object(); final SetOnce coordinationState = new SetOnce<>(); // initialized on start-up (see doStart) private volatile Optional lastCommittedState = Optional.empty(); + private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier private final PeerFinder peerFinder; private final PreVoteCollector preVoteCollector; @@ -86,6 +94,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final PublicationTransportHandler publicationHandler; private final LeaderChecker leaderChecker; private final FollowersChecker followersChecker; + private final ClusterApplier clusterApplier; @Nullable private Releasable electionScheduler; @Nullable @@ -102,7 +111,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService, MasterService masterService, Supplier persistedStateSupplier, - UnicastHostsProvider unicastHostsProvider, Random random) { + UnicastHostsProvider unicastHostsProvider, ClusterApplier clusterApplier, Random random) { super(settings); this.transportService = transportService; this.masterService = masterService; @@ -118,10 +127,12 @@ public Coordinator(Settings settings, TransportService transportService, Allocat configuredHostsResolver = new UnicastConfiguredHostsResolver(settings, transportService, unicastHostsProvider); this.peerFinder = new CoordinatorPeerFinder(settings, transportService, new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); - this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit); + this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit, + logger); this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); + this.clusterApplier = clusterApplier; masterService.setClusterStateSupplier(this::getStateForMasterService); } @@ -167,13 +178,31 @@ private void onFollowerCheckRequest(FollowerCheckRequest followerCheckRequest) { } } - private void handleApplyCommit(ApplyCommitRequest applyCommitRequest) { + private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionListener applyListener) { synchronized (mutex) { logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest); coordinationState.get().handleCommit(applyCommitRequest); lastCommittedState = Optional.of(coordinationState.get().getLastAcceptedState()); - // TODO: send to applier + applierState = mode == Mode.CANDIDATE ? clusterStateWithNoMasterBlock(lastCommittedState.get()) : lastCommittedState.get(); + if (applyCommitRequest.getSourceNode().equals(getLocalNode())) { + // master node applies the committed state at the end of the publication process, not here. + applyListener.onResponse(null); + } else { + clusterApplier.onNewClusterState(applyCommitRequest.toString(), () -> applierState, + new ClusterApplyListener() { + + @Override + public void onFailure(String source, Exception e) { + applyListener.onFailure(e); + } + + @Override + public void onSuccess(String source) { + applyListener.onResponse(null); + } + }); + } } } @@ -299,6 +328,12 @@ void becomeCandidate(String method) { followersChecker.clearCurrentNodes(); followersChecker.updateFastResponseState(getCurrentTerm(), mode); + + if (applierState.nodes().getMasterNodeId() != null) { + applierState = clusterStateWithNoMasterBlock(applierState); + clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> { + }); + } } preVoteCollector.update(getPreVoteResponse(), null); @@ -385,10 +420,20 @@ boolean publicationInProgress() { @Override protected void doStart() { - CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); - coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState)); - peerFinder.setCurrentTerm(getCurrentTerm()); - configuredHostsResolver.start(); + synchronized (mutex) { + CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); + coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState)); + peerFinder.setCurrentTerm(getCurrentTerm()); + configuredHostsResolver.start(); + ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) + .blocks(ClusterBlocks.builder() + .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK) + .addGlobalBlock(NO_MASTER_BLOCK_WRITES)) + .nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId())) + .build(); + applierState = initialState; + clusterApplier.setInitialState(initialState); + } } @Override @@ -419,6 +464,13 @@ public void invariant() { assert peerFinder.getCurrentTerm() == getCurrentTerm(); assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); + if (lastCommittedState.isPresent()) { + assert applierState != null; + assert lastCommittedState.get().term() == applierState.term(); + assert lastCommittedState.get().version() == applierState.version(); + } + assert mode != Mode.CANDIDATE || applierState.nodes().getMasterNodeId() == null; + assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id()); if (mode == Mode.LEADER) { final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); @@ -524,7 +576,7 @@ private ClusterState clusterStateWithNoMasterBlock(ClusterState clusterState) { "NO_MASTER_BLOCK should only be added by Coordinator"; // TODO: allow dynamically configuring NO_MASTER_BLOCK_ALL final ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks()).addGlobalBlock( - DiscoverySettings.NO_MASTER_BLOCK_WRITES).build(); + NO_MASTER_BLOCK_WRITES).build(); final DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build(); return ClusterState.builder(clusterState).blocks(clusterBlocks).nodes(discoveryNodes).build(); } else { @@ -593,42 +645,68 @@ public void onNodeAck(DiscoveryNode node, Exception e) { final Publication publication = new Publication(settings, publishRequest, wrappedAckListener, transportService.getThreadPool()::relativeTimeInMillis) { + final Publication thisPublication = this; + + private void failPublicationAndPossiblyBecomeCandidate(String reason) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + + assert currentPublication.get() == this; + currentPublication = Optional.empty(); + + // check if node has not already switched modes (by bumping term) + if (mode == Mode.LEADER && publishRequest.getAcceptedState().term() == getCurrentTerm()) { + becomeCandidate(reason); + } + } + @Override protected void onCompletion(boolean committed) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert currentPublication.get() == this; - currentPublication = Optional.empty(); - updateMaxTermSeen(getCurrentTerm()); // triggers term bump if new term was found during publication localNodeAckEvent.addListener(new ActionListener() { @Override public void onResponse(Void ignore) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - assert coordinationState.get().getLastAcceptedTerm() == publishRequest.getAcceptedState().term() - && coordinationState.get().getLastAcceptedVersion() == publishRequest.getAcceptedState().version() - : "onPossibleCompletion: term or version mismatch when publishing [" + this - + "]: current version is now [" + coordinationState.get().getLastAcceptedVersion() - + "] in term [" + coordinationState.get().getLastAcceptedTerm() + "]"; assert committed; - // TODO: send to applier - ackListener.onNodeAck(getLocalNode(), null); - publishListener.onResponse(null); + clusterApplier.onNewClusterState(thisPublication.toString(), () -> applierState, + new ClusterApplyListener() { + @Override + public void onFailure(String source, Exception e) { + synchronized (mutex) { + failPublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState"); + } + ackListener.onNodeAck(getLocalNode(), e); + publishListener.onFailure(e); + } + + @Override + public void onSuccess(String source) { + synchronized (mutex) { + assert currentPublication.get() == thisPublication; + currentPublication = Optional.empty(); + // trigger term bump if new term was found during publication + updateMaxTermSeen(getCurrentTerm()); + } + + ackListener.onNodeAck(getLocalNode(), null); + publishListener.onResponse(null); + } + }); } @Override public void onFailure(Exception e) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - if (publishRequest.getAcceptedState().term() == coordinationState.get().getCurrentTerm() && - publishRequest.getAcceptedState().version() == coordinationState.get().getLastPublishedVersion()) { - becomeCandidate("Publication.onCompletion(false)"); - } + failPublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)"); + FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException( "publication failed", e); ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. publishListener.onFailure(exception); } - }, transportService.getThreadPool().generic()); + }, EsExecutors.newDirectExecutorService()); } @Override @@ -667,8 +745,6 @@ protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest app } }; - assert currentPublication.isPresent() == false - : "[" + currentPublication.get() + "] in progress, cannot start [" + publication + ']'; currentPublication = Optional.of(publication); transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() { @@ -717,7 +793,6 @@ private void cancelActivePublication() { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; if (currentPublication.isPresent()) { currentPublication.get().onTimeout(); - assert currentPublication.isPresent() == false; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index 2e6e5bb7c2caf..804f3b14de206 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.coordination; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; @@ -29,7 +30,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import java.util.function.Function; public class PublicationTransportHandler { @@ -41,7 +42,8 @@ public class PublicationTransportHandler { public PublicationTransportHandler(TransportService transportService, Function handlePublishRequest, - Consumer handleApplyCommit) { + BiConsumer> handleApplyCommit, + Logger logger) { this.transportService = transportService; transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, @@ -50,10 +52,27 @@ public PublicationTransportHandler(TransportService transportService, transportService.registerRequestHandler(COMMIT_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, ApplyCommitRequest::new, - (request, channel, task) -> { - handleApplyCommit.accept(request); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - }); + (request, channel, task) -> handleApplyCommit.accept(request, new ActionListener() { + + @Override + public void onResponse(Void aVoid) { + try { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } catch (IOException e) { + logger.debug("failed to send response on commit", e); + } + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (IOException ie) { + e.addSuppressed(ie); + logger.debug("failed to send response on commit", e); + } + } + })); } public void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java index 5fb8e9517b26e..971fa1f40acde 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java @@ -82,7 +82,7 @@ protected synchronized void done() { private void notifyListener(ActionListener listener, ExecutorService executorService) { try { - executorService.submit(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { try { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 4b72b029dfc7d..af9e797f2ec04 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -20,21 +20,25 @@ import org.apache.logging.log4j.CloseableThreadContext; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState.VotingConfiguration; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; import org.elasticsearch.cluster.coordination.CoordinationStateTests.InMemoryPersistedState; import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -52,11 +56,16 @@ import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static java.util.Collections.emptySet; @@ -80,10 +89,12 @@ import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; -@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") +//@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") public class CoordinatorTests extends ESTestCase { public void testCanUpdateClusterStateAfterStabilisation() { @@ -164,7 +175,7 @@ public void testUnresponsiveLeaderDetectedEventually() { final ClusterNode originalLeader = cluster.getAnyLeader(); logger.info("--> partitioning leader {}", originalLeader); - originalLeader.partition(); + originalLeader.blackhole(); cluster.stabilise(Math.max( // first wait for all the followers to notice the leader has gone @@ -225,7 +236,7 @@ public void testUnresponsiveFollowerDetectedEventually() { final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower = cluster.getAnyNodeExcept(leader); logger.info("--> partitioning follower {}", follower); - follower.partition(); + follower.blackhole(); cluster.stabilise(Math.max( // wait for the leader to notice that the follower is unresponsive @@ -241,6 +252,131 @@ public void testUnresponsiveFollowerDetectedEventually() { assertThat(cluster.getAnyLeader().getId(), equalTo(leader.getId())); } + public void testAckListenerReceivesAcksFromAllNodes() { + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.stabilise(); + final ClusterNode leader = cluster.getAnyLeader(); + AckCollector ackCollector = leader.submitValue(randomLong()); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + + for (final ClusterNode clusterNode : cluster.clusterNodes) { + assertTrue("expected ack from " + clusterNode, ackCollector.hasAckedSuccessfully(clusterNode)); + } + assertThat("leader should be last to ack", ackCollector.getSuccessfulAckIndex(leader), equalTo(cluster.clusterNodes.size() - 1)); + } + + public void testAckListenerReceivesNackFromFollower() { + final Cluster cluster = new Cluster(3); + cluster.stabilise(); + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); + final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + + follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL); + AckCollector ackCollector = leader.submitValue(randomLong()); + cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertTrue("expected ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); + assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); + assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); + assertThat("leader should be last to ack", ackCollector.getSuccessfulAckIndex(leader), equalTo(1)); + } + + public void testAckListenerReceivesNackFromLeader() { + final Cluster cluster = new Cluster(3); + cluster.stabilise(); + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); + final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + final long startingTerm = leader.coordinator.getCurrentTerm(); + + leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL); + AckCollector ackCollector = leader.submitValue(randomLong()); + cluster.runUntil(cluster.getCurrentTimeMillis() + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertTrue(leader.coordinator.getMode() != Coordinator.Mode.LEADER || leader.coordinator.getCurrentTerm() > startingTerm); + leader.setClusterStateApplyResponse(ClusterStateApplyResponse.SUCCEED); + cluster.stabilise(); + assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader)); + assertTrue("expected ack from " + follower0, ackCollector.hasAckedSuccessfully(follower0)); + assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); + assertTrue(leader.coordinator.getMode() != Coordinator.Mode.LEADER || leader.coordinator.getCurrentTerm() > startingTerm); + } + + public void testAckListenerReceivesNoAckFromHangingFollower() { + final Cluster cluster = new Cluster(3); + cluster.stabilise(); + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); + final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + + follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.HANG); + AckCollector ackCollector = leader.submitValue(randomLong()); + cluster.runUntil(cluster.getCurrentTimeMillis() + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); + assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); + cluster.stabilise(); + assertTrue("expected eventual ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); + assertFalse("expected no ack from " + follower0, ackCollector.hasAcked(follower0)); + } + + public void testAckListenerReceivesNacksIfPublicationTimesOut() { + final Cluster cluster = new Cluster(3); + cluster.stabilise(); + final ClusterNode leader = cluster.getAnyLeader(); + final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); + final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); + + follower0.blackhole(); + follower1.blackhole(); + AckCollector ackCollector = leader.submitValue(randomLong()); + cluster.runUntil(cluster.getCurrentTimeMillis() + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + assertFalse("expected no immediate ack from " + leader, ackCollector.hasAcked(leader)); + assertFalse("expected no immediate ack from " + follower0, ackCollector.hasAcked(follower0)); + assertFalse("expected no immediate ack from " + follower1, ackCollector.hasAcked(follower1)); + follower0.heal(); + follower1.heal(); + cluster.stabilise(); + assertTrue("expected eventual nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); + assertTrue("expected eventual nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1)); + assertTrue("expected eventual nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader)); + } + + public void testAckListenerReceivesNacksIfLeaderStandsDown() { + // TODO: needs support for handling disconnects +// final Cluster cluster = new Cluster(3); +// cluster.stabilise(); +// final ClusterNode leader = cluster.getAnyLeader(); +// final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); +// final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); +// +// leader.partition(); +// follower0.coordinator.handleDisconnectedNode(leader.localNode); +// follower1.coordinator.handleDisconnectedNode(leader.localNode); +// cluster.runUntil(cluster.getCurrentTimeMillis() + cluster.DEFAULT_ELECTION_TIME); +// AckCollector ackCollector = leader.submitRandomValue(); +// cluster.runUntil(cluster.currentTimeMillis + Cluster.DEFAULT_DELAY_VARIABILITY); +// leader.connectionStatus = ConnectionStatus.CONNECTED; +// cluster.stabilise(cluster.DEFAULT_STABILISATION_TIME, 0L); +// assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader)); +// assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); +// assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1)); + } + + public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { + // TODO: needs auto-term bumping for cluster to form again +// final Cluster cluster = new Cluster(3); +// cluster.stabilise(); +// final ClusterNode leader = cluster.getAnyLeader(); +// final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); +// final ClusterNode follower1 = cluster.getAnyNodeExcept(leader, follower0); +// +// follower0.coordinator.joinLeaderInTerm(new StartJoinRequest(follower0.localNode, follower0.coordinator.getCurrentTerm() + 1)); +// AckCollector ackCollector = leader.submitValue(randomLong()); +// cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, 0L); +// assertTrue("expected ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); +// assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); +// assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); + } + private static long defaultMillis(Setting setting) { return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY; } @@ -299,6 +435,7 @@ class Cluster { private final Set disconnectedNodes = new HashSet<>(); private final Set blackholedNodes = new HashSet<>(); + Map committedStatesByVersion = new HashMap<>(); Cluster(int initialNodeCount) { deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); @@ -329,15 +466,33 @@ void addNodes(int newNodesCount) { } void stabilise() { - stabilise(DEFAULT_STABILISATION_TIME); + stabilise(DEFAULT_STABILISATION_TIME, DEFAULT_DELAY_VARIABILITY); } void stabilise(long stabiliationDurationMillis) { + stabilise(stabiliationDurationMillis, DEFAULT_DELAY_VARIABILITY); + } + + void stabilise(long stabiliationDurationMillis, long delayVariability) { final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis(); final long stabilisationEndTime = stabilisationStartTime + stabiliationDurationMillis; logger.info("--> stabilising until [{}ms]", stabilisationEndTime); - while (deterministicTaskQueue.getCurrentTimeMillis() < stabilisationEndTime) { + deterministicTaskQueue.setExecutionDelayVariabilityMillis(delayVariability); + + runUntil(stabilisationEndTime); + + deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); + + for (ClusterNode clusterNode : clusterNodes) { + assert clusterNode.coordinator.publicationInProgress() == false; + } + + assertUniqueLeaderAndExpectedModes(); + } + + void runUntil(long endTime) { + while (deterministicTaskQueue.getCurrentTimeMillis() < endTime) { while (deterministicTaskQueue.hasRunnableTasks()) { try { @@ -348,6 +503,7 @@ void stabilise(long stabiliationDurationMillis) { for (final ClusterNode clusterNode : clusterNodes) { clusterNode.coordinator.invariant(); } + updateCommittedStates(); } if (deterministicTaskQueue.hasDeferredTasks() == false) { @@ -358,12 +514,6 @@ void stabilise(long stabiliationDurationMillis) { deterministicTaskQueue.advanceTime(); } - - for (ClusterNode clusterNode : clusterNodes) { - assert clusterNode.coordinator.publicationInProgress() == false; - } - - assertUniqueLeaderAndExpectedModes(); } private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) { @@ -436,14 +586,20 @@ ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) { return randomFrom(acceptableNodes); } + public long getCurrentTimeMillis() { + return deterministicTaskQueue.getCurrentTimeMillis(); + } + class ClusterNode extends AbstractComponent { private final int nodeIndex; private Coordinator coordinator; private DiscoveryNode localNode; private final PersistedState persistedState; - private MasterService masterService; + private FakeClusterApplier clusterApplier; + private AckedFakeThreadPoolMasterService masterService; private TransportService transportService; private DisruptableMockTransport mockTransport; + private ClusterStateApplyResponse clusterStateApplyResponse = ClusterStateApplyResponse.SUCCEED; ClusterNode(int nodeIndex) { super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); @@ -509,13 +665,15 @@ protected void onBlackholedDuringSend(long requestId, String action, DiscoveryNo } }; - masterService = new FakeThreadPoolMasterService("test", + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterApplier = new FakeClusterApplier(settings, clusterSettings); + masterService = new AckedFakeThreadPoolMasterService("test", runnable -> deterministicTaskQueue.scheduleNow(onNode(localNode, runnable))); transportService = mockTransport.createTransportService( settings, deterministicTaskQueue.getThreadPool(runnable -> onNode(localNode, runnable)), NOOP_TRANSPORT_INTERCEPTOR, a -> localNode, null, emptySet()); coordinator = new Coordinator(settings, transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), - masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, Randomness.get()); + masterService, this::getPersistedState, Cluster.this::provideUnicastHosts, clusterApplier, Randomness.get()); masterService.setClusterStatePublisher(coordinator); transportService.start(); @@ -541,18 +699,44 @@ boolean isLeader() { return coordinator.getMode() == Coordinator.Mode.LEADER; } - void submitValue(final long value) { - onNode(localNode, () -> masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return setValue(currentState, value); - } + void setClusterStateApplyResponse(ClusterStateApplyResponse clusterStateApplyResponse) { + this.clusterStateApplyResponse = clusterStateApplyResponse; + } - @Override - public void onFailure(String source, Exception e) { - logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e); - } - })).run(); + AckCollector submitValue(final long value) { + return submitUpdateTask("new value [" + value + "]", cs -> setValue(cs, value)); + } + + AckCollector submitUpdateTask(String source, UnaryOperator clusterStateUpdate) { + final AckCollector ackCollector = new AckCollector(); + onNode(localNode, () -> { + logger.trace("[{}] submitUpdateTask: enqueueing [{}]", localNode.getId(), source); + final long submittedTerm = coordinator.getCurrentTerm(); + masterService.submitStateUpdateTask(source, + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + assertThat(currentState.term(), greaterThanOrEqualTo(submittedTerm)); + masterService.nextAckCollector = ackCollector; + return clusterStateUpdate.apply(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + updateCommittedStates(); + ClusterState state = committedStatesByVersion.get(newState.version()); + assertNotNull("State not committed : " + newState.toString(), state); + assertEquals(value(state), value(newState)); + logger.trace("successfully published: [{}]", newState); + } + }); + }).run(); + return ackCollector; } @Override @@ -564,9 +748,111 @@ void disconnect() { disconnectedNodes.add(localNode.getId()); } - void partition() { + void blackhole() { blackholedNodes.add(localNode.getId()); } + + void heal() { + disconnectedNodes.remove(localNode.getId()); + blackholedNodes.remove(localNode.getId()); + } + + ClusterState getLastAppliedClusterState() { + return clusterApplier.lastAppliedClusterState; + } + + private class FakeClusterApplier implements ClusterApplier { + + final ClusterName clusterName; + private final ClusterSettings clusterSettings; + ClusterState lastAppliedClusterState; + + private FakeClusterApplier(Settings settings, ClusterSettings clusterSettings) { + clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); + this.clusterSettings = clusterSettings; + } + + @Override + public void setInitialState(ClusterState initialState) { + assert lastAppliedClusterState == null; + assert initialState != null; + lastAppliedClusterState = initialState; + } + + @Override + public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { + switch (clusterStateApplyResponse) { + case SUCCEED: + deterministicTaskQueue.scheduleNow(onNode(localNode, new Runnable() { + @Override + public void run() { + final ClusterState oldClusterState = clusterApplier.lastAppliedClusterState; + final ClusterState newClusterState = clusterStateSupplier.get(); + assert oldClusterState.version() <= newClusterState.version() : "updating cluster state from version " + + oldClusterState.version() + " to stale version " + newClusterState.version(); + clusterApplier.lastAppliedClusterState = newClusterState; + final Settings incomingSettings = newClusterState.metaData().settings(); + clusterSettings.applySettings(incomingSettings); // TODO validation might throw exceptions here. + listener.onSuccess(source); + } + + @Override + public String toString() { + return "apply cluster state from [" + source + "]"; + } + })); + break; + case FAIL: + deterministicTaskQueue.scheduleNow(onNode(localNode, new Runnable() { + @Override + public void run() { + listener.onFailure(source, new ElasticsearchException("cluster state application failed")); + } + + @Override + public String toString() { + return "fail to apply cluster state from [" + source + "]"; + } + })); + break; + case HANG: + if (randomBoolean()) { + deterministicTaskQueue.scheduleNow(onNode(localNode, new Runnable() { + @Override + public void run() { + final ClusterState oldClusterState = clusterApplier.lastAppliedClusterState; + final ClusterState newClusterState = clusterStateSupplier.get(); + assert oldClusterState.version() <= newClusterState.version() : + "updating cluster state from version " + + oldClusterState.version() + " to stale version " + newClusterState.version(); + clusterApplier.lastAppliedClusterState = newClusterState; + } + + @Override + public String toString() { + return "apply cluster state from [" + source + "] without ack"; + } + })); + } + break; + } + } + } + } + + private void updateCommittedStates() { + for (final ClusterNode clusterNode : clusterNodes) { + Optional committedState = clusterNode.coordinator.getLastCommittedState(); + if (committedState.isPresent()) { + ClusterState storedState = committedStatesByVersion.get(committedState.get().getVersion()); + if (storedState == null) { + committedStatesByVersion.put(committedState.get().getVersion(), committedState.get()); + } else { + assertEquals("expected " + committedState.get() + " but got " + storedState, + value(committedState.get()), value(storedState)); + } + } + } } private List provideUnicastHosts(HostsResolver ignored) { @@ -590,4 +876,92 @@ public String toString() { } }; } + + static class AckCollector implements AckListener { + + private final Set ackedNodes = new HashSet<>(); + private final List successfulNodes = new ArrayList<>(); + private final List unsuccessfulNodes = new ArrayList<>(); + + @Override + public void onCommit(TimeValue commitTime) { + // TODO we only currently care about per-node acks + } + + @Override + public void onNodeAck(DiscoveryNode node, Exception e) { + assertTrue("duplicate ack from " + node, ackedNodes.add(node)); + if (e == null) { + successfulNodes.add(node); + } else { + unsuccessfulNodes.add(node); + } + } + + boolean hasAckedSuccessfully(ClusterNode clusterNode) { + return successfulNodes.contains(clusterNode.localNode); + } + + boolean hasAckedUnsuccessfully(ClusterNode clusterNode) { + return unsuccessfulNodes.contains(clusterNode.localNode); + } + + boolean hasAcked(ClusterNode clusterNode) { + return ackedNodes.contains(clusterNode.localNode); + } + + int getSuccessfulAckIndex(ClusterNode clusterNode) { + assert successfulNodes.contains(clusterNode.localNode) : "get index of " + clusterNode; + return successfulNodes.indexOf(clusterNode.localNode); + } + } + + static class AckedFakeThreadPoolMasterService extends FakeThreadPoolMasterService { + + AckCollector nextAckCollector = new AckCollector(); + + AckedFakeThreadPoolMasterService(String serviceName, Consumer onTaskAvailableToRun) { + super(serviceName, onTaskAvailableToRun); + } + + @Override + protected AckListener wrapAckListener(AckListener ackListener) { + final AckCollector ackCollector = nextAckCollector; + nextAckCollector = new AckCollector(); + return new AckListener() { + @Override + public void onCommit(TimeValue commitTime) { + ackCollector.onCommit(commitTime); + ackListener.onCommit(commitTime); + } + + @Override + public void onNodeAck(DiscoveryNode node, Exception e) { + ackCollector.onNodeAck(node, e); + ackListener.onNodeAck(node, e); + } + }; + } + } + + /** + * How to behave with a new cluster state + */ + enum ClusterStateApplyResponse { + /** + * Apply the state (default) + */ + SUCCEED, + + /** + * Reject the state with an exception. + */ + FAIL, + + /** + * Never respond either way. + */ + HANG, + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index d2cbfe5c47f5e..59eca7c0fee02 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.service.MasterServiceTests; import org.elasticsearch.common.Randomness; @@ -63,6 +64,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -162,7 +164,18 @@ protected void onSendRequest(long requestId, String action, TransportRequest req transportService, ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, - () -> new CoordinationStateTests.InMemoryPersistedState(term, initialState), r -> emptyList(), random); + () -> new CoordinationStateTests.InMemoryPersistedState(term, initialState), r -> emptyList(), + new ClusterApplier() { + @Override + public void setInitialState(ClusterState initialState) { + + } + + @Override + public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { + listener.onSuccess(source); + } + }, random); transportService.start(); transportService.acceptIncomingRequests(); transportRequestHandler = capturingTransport.getRequestHandler(JoinHelper.JOIN_ACTION_NAME); diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java index 628b0917d906f..70ba29393a96d 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/FakeThreadPoolMasterService.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -28,7 +29,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.discovery.Discovery; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -120,7 +120,7 @@ public ClusterState.Builder incrementVersion(ClusterState clusterState) { protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeNS) { assert waitForPublish == false; waitForPublish = true; - final Discovery.AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()); + final AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()); clusterStatePublisher.publish(clusterChangedEvent, new ActionListener() { private boolean listenerCalled = false; @@ -152,6 +152,10 @@ public void onFailure(Exception e) { scheduleNextTaskIfNecessary(); } } - }, ackListener); + }, wrapAckListener(ackListener)); + } + + protected AckListener wrapAckListener(AckListener ackListener) { + return ackListener; } } From 4813b3e1fab240b33f3783b9464a2269714b5b56 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 3 Oct 2018 12:07:29 +0200 Subject: [PATCH 02/20] unused import --- .../elasticsearch/cluster/coordination/CoordinatorTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index af9e797f2ec04..6fc1fc2993dcb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -92,9 +92,8 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.sameInstance; -//@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") +@TestLogging("org.elasticsearch.cluster.coordination:TRACE,org.elasticsearch.discovery:TRACE") public class CoordinatorTests extends ESTestCase { public void testCanUpdateClusterStateAfterStabilisation() { From f8d105705e4a7e96ae064531296f51ff115930f2 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 3 Oct 2018 13:03:19 +0200 Subject: [PATCH 03/20] fix assertion --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 006714c1edff5..d6cdcf571a53d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -485,7 +485,8 @@ public void invariant() { final Set knownFollowers = followersChecker.getKnownFollowers(); final Set lastPublishedNodes = new HashSet<>(); - if (becomingMaster == false || publicationInProgress()) { + if (becomingMaster == false || + (publicationInProgress() && getCurrentTerm() == currentPublication.get().publishedState().term())) { final ClusterState lastPublishedState = currentPublication.map(Publication::publishedState).orElse(coordinationState.get().getLastAcceptedState()); lastPublishedState.nodes().forEach(lastPublishedNodes::add); From 15a4da84fd41c84c7cbafa74616aa29e86c2f789 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 3 Oct 2018 22:12:11 +0200 Subject: [PATCH 04/20] add comment --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index d6cdcf571a53d..04e627b4e3763 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -428,7 +428,7 @@ protected void doStart() { ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) .blocks(ClusterBlocks.builder() .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK) - .addGlobalBlock(NO_MASTER_BLOCK_WRITES)) + .addGlobalBlock(NO_MASTER_BLOCK_WRITES)) // TODO: allow dynamically configuring NO_MASTER_BLOCK_ALL .nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId())) .build(); applierState = initialState; From 42451ba54620d36e9e616f3d53d8bc6807bf6404 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 3 Oct 2018 22:20:56 +0200 Subject: [PATCH 05/20] move assertion --- .../org/elasticsearch/cluster/coordination/Coordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 04e627b4e3763..6431f0a8d62a6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -469,7 +469,6 @@ public void invariant() { assert lastCommittedState.get().term() == applierState.term(); assert lastCommittedState.get().version() == applierState.version(); } - assert mode != Mode.CANDIDATE || applierState.nodes().getMasterNodeId() == null; assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id()); if (mode == Mode.LEADER) { final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); @@ -482,6 +481,7 @@ public void invariant() { assert prevotingRound == null : prevotingRound; assert becomingMaster || getStateForMasterService().nodes().getMasterNodeId() != null : getStateForMasterService(); assert leaderCheckScheduler == null : leaderCheckScheduler; + assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); final Set knownFollowers = followersChecker.getKnownFollowers(); final Set lastPublishedNodes = new HashSet<>(); @@ -513,6 +513,7 @@ public void invariant() { assert leaderChecker.currentNodeIsMaster() == false; assert leaderCheckScheduler == null : leaderCheckScheduler; assert followersChecker.getKnownFollowers().isEmpty(); + assert applierState.nodes().getMasterNodeId() == null; } } } From 8c846dcbd2d52e9b97450da2b2b54e1b6891ed54 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 3 Oct 2018 22:46:10 +0200 Subject: [PATCH 06/20] move anonymous to inner class --- .../cluster/coordination/Coordinator.java | 267 +++++++++--------- 1 file changed, 139 insertions(+), 128 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 6431f0a8d62a6..1139783872aac 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -619,134 +619,8 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) getLocalNode() + " should be in published " + clusterState; final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState); - - final ListenableFuture localNodeAckEvent = new ListenableFuture<>(); - final AckListener wrappedAckListener = new AckListener() { - @Override - public void onCommit(TimeValue commitTime) { - ackListener.onCommit(commitTime); - } - - @Override - public void onNodeAck(DiscoveryNode node, Exception e) { - // acking and cluster state application for local node is handled specially - if (node.equals(getLocalNode())) { - synchronized (mutex) { - if (e == null) { - localNodeAckEvent.onResponse(null); - } else { - localNodeAckEvent.onFailure(e); - } - } - } else { - ackListener.onNodeAck(node, e); - } - } - }; - - final Publication publication = new Publication(settings, publishRequest, wrappedAckListener, - transportService.getThreadPool()::relativeTimeInMillis) { - - final Publication thisPublication = this; - - private void failPublicationAndPossiblyBecomeCandidate(String reason) { - assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - - assert currentPublication.get() == this; - currentPublication = Optional.empty(); - - // check if node has not already switched modes (by bumping term) - if (mode == Mode.LEADER && publishRequest.getAcceptedState().term() == getCurrentTerm()) { - becomeCandidate(reason); - } - } - - @Override - protected void onCompletion(boolean committed) { - assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - assert currentPublication.get() == this; - - localNodeAckEvent.addListener(new ActionListener() { - @Override - public void onResponse(Void ignore) { - assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - assert committed; - - clusterApplier.onNewClusterState(thisPublication.toString(), () -> applierState, - new ClusterApplyListener() { - @Override - public void onFailure(String source, Exception e) { - synchronized (mutex) { - failPublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState"); - } - ackListener.onNodeAck(getLocalNode(), e); - publishListener.onFailure(e); - } - - @Override - public void onSuccess(String source) { - synchronized (mutex) { - assert currentPublication.get() == thisPublication; - currentPublication = Optional.empty(); - // trigger term bump if new term was found during publication - updateMaxTermSeen(getCurrentTerm()); - } - - ackListener.onNodeAck(getLocalNode(), null); - publishListener.onResponse(null); - } - }); - } - - @Override - public void onFailure(Exception e) { - assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - failPublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)"); - - FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException( - "publication failed", e); - ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. - publishListener.onFailure(exception); - } - }, EsExecutors.newDirectExecutorService()); - } - - @Override - protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) { - assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - return coordinationState.get().isPublishQuorum(votes); - } - - @Override - protected Optional handlePublishResponse(DiscoveryNode sourceNode, - PublishResponse publishResponse) { - assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - assert getCurrentTerm() >= publishResponse.getTerm(); - return coordinationState.get().handlePublishResponse(sourceNode, publishResponse); - } - - @Override - protected void onJoin(Join join) { - assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - if (join.getTerm() == getCurrentTerm()) { - handleJoin(join); - } - // TODO: what to do on missing join? - } - - @Override - protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, - ActionListener responseActionListener) { - publicationHandler.sendPublishRequest(destination, publishRequest, wrapWithMutex(responseActionListener)); - } - - @Override - protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, - ActionListener responseActionListener) { - publicationHandler.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener)); - } - }; - + final Publication publication = new CoordinatorPublication(publishRequest, new ListenableFuture<>(), ackListener, + publishListener); currentPublication = Optional.of(publication); transportService.getThreadPool().schedule(publishTimeout, Names.GENERIC, new Runnable() { @@ -856,4 +730,141 @@ public String toString() { } } } + + class CoordinatorPublication extends Publication { + + private final PublishRequest publishRequest; + private final ListenableFuture localNodeAckEvent; + private final AckListener ackListener; + private final ActionListener publishListener; + + CoordinatorPublication(PublishRequest publishRequest, ListenableFuture localNodeAckEvent, AckListener ackListener, + ActionListener publishListener) { + super(Coordinator.this.settings, publishRequest, + new AckListener() { + @Override + public void onCommit(TimeValue commitTime) { + ackListener.onCommit(commitTime); + } + + @Override + public void onNodeAck(DiscoveryNode node, Exception e) { + // acking and cluster state application for local node is handled specially + if (node.equals(getLocalNode())) { + synchronized (mutex) { + if (e == null) { + localNodeAckEvent.onResponse(null); + } else { + localNodeAckEvent.onFailure(e); + } + } + } else { + ackListener.onNodeAck(node, e); + } + } + }, + transportService.getThreadPool()::relativeTimeInMillis); + this.publishRequest = publishRequest; + this.localNodeAckEvent = localNodeAckEvent; + this.ackListener = ackListener; + this.publishListener = publishListener; + } + + private void failPublicationAndPossiblyBecomeCandidate(String reason) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + + assert currentPublication.get() == this; + currentPublication = Optional.empty(); + + // check if node has not already switched modes (by bumping term) + if (mode == Mode.LEADER && publishRequest.getAcceptedState().term() == getCurrentTerm()) { + becomeCandidate(reason); + } + } + + @Override + protected void onCompletion(boolean committed) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + assert currentPublication.get() == this; + + localNodeAckEvent.addListener(new ActionListener() { + @Override + public void onResponse(Void ignore) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + assert committed; + + clusterApplier.onNewClusterState(CoordinatorPublication.this.toString(), () -> applierState, + new ClusterApplyListener() { + @Override + public void onFailure(String source, Exception e) { + synchronized (mutex) { + failPublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState"); + } + ackListener.onNodeAck(getLocalNode(), e); + publishListener.onFailure(e); + } + + @Override + public void onSuccess(String source) { + synchronized (mutex) { + assert currentPublication.get() == CoordinatorPublication.this; + currentPublication = Optional.empty(); + // trigger term bump if new term was found during publication + updateMaxTermSeen(getCurrentTerm()); + } + + ackListener.onNodeAck(getLocalNode(), null); + publishListener.onResponse(null); + } + }); + } + + @Override + public void onFailure(Exception e) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + failPublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)"); + + FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException( + "publication failed", e); + ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. + publishListener.onFailure(exception); + } + }, EsExecutors.newDirectExecutorService()); + } + + @Override + protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + return coordinationState.get().isPublishQuorum(votes); + } + + @Override + protected Optional handlePublishResponse(DiscoveryNode sourceNode, + PublishResponse publishResponse) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + assert getCurrentTerm() >= publishResponse.getTerm(); + return coordinationState.get().handlePublishResponse(sourceNode, publishResponse); + } + + @Override + protected void onJoin(Join join) { + assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + if (join.getTerm() == getCurrentTerm()) { + handleJoin(join); + } + // TODO: what to do on missing join? + } + + @Override + protected void sendPublishRequest(DiscoveryNode destination, PublishRequest publishRequest, + ActionListener responseActionListener) { + publicationHandler.sendPublishRequest(destination, publishRequest, wrapWithMutex(responseActionListener)); + } + + @Override + protected void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommit, + ActionListener responseActionListener) { + publicationHandler.sendApplyCommit(destination, applyCommit, wrapWithMutex(responseActionListener)); + } + } } From 8fecbd609b40a15a38c35c8b22782c344ee5efd9 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 3 Oct 2018 22:50:20 +0200 Subject: [PATCH 07/20] remove extra assert --- .../elasticsearch/cluster/coordination/Coordinator.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 1139783872aac..62119cda16f97 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -770,7 +770,7 @@ public void onNodeAck(DiscoveryNode node, Exception e) { this.publishListener = publishListener; } - private void failPublicationAndPossiblyBecomeCandidate(String reason) { + private void removePublicationAndPossiblyBecomeCandidate(String reason) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; assert currentPublication.get() == this; @@ -785,7 +785,6 @@ private void failPublicationAndPossiblyBecomeCandidate(String reason) { @Override protected void onCompletion(boolean committed) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - assert currentPublication.get() == this; localNodeAckEvent.addListener(new ActionListener() { @Override @@ -798,7 +797,7 @@ public void onResponse(Void ignore) { @Override public void onFailure(String source, Exception e) { synchronized (mutex) { - failPublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState"); + removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState"); } ackListener.onNodeAck(getLocalNode(), e); publishListener.onFailure(e); @@ -812,7 +811,6 @@ public void onSuccess(String source) { // trigger term bump if new term was found during publication updateMaxTermSeen(getCurrentTerm()); } - ackListener.onNodeAck(getLocalNode(), null); publishListener.onResponse(null); } @@ -822,7 +820,7 @@ public void onSuccess(String source) { @Override public void onFailure(Exception e) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - failPublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)"); + removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)"); FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException( "publication failed", e); From b77e5abad81aa93e08e63f2d71ac233b3907e5ca Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 3 Oct 2018 23:38:36 +0200 Subject: [PATCH 08/20] clarify assertions around currentPublication --- .../cluster/coordination/Coordinator.java | 41 +++++++++++++------ .../coordination/CoordinatorTests.java | 9 ++-- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 62119cda16f97..abbbfef8ce7b7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -107,7 +107,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private Optional lastKnownLeader; private Optional lastJoin; private JoinHelper.JoinAccumulator joinAccumulator; - private Optional currentPublication = Optional.empty(); + private Optional currentPublication = Optional.empty(); public Coordinator(Settings settings, TransportService transportService, AllocationService allocationService, MasterService masterService, Supplier persistedStateSupplier, @@ -412,9 +412,9 @@ DiscoveryNode getLocalNode() { } // package-visible for testing - boolean publicationInProgress() { + boolean activePublicationInProgress() { synchronized (mutex) { - return currentPublication.isPresent(); + return currentPublication.map(CoordinatorPublication::isActive).orElse(false); } } @@ -483,16 +483,24 @@ public void invariant() { assert leaderCheckScheduler == null : leaderCheckScheduler; assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); - final Set knownFollowers = followersChecker.getKnownFollowers(); - final Set lastPublishedNodes = new HashSet<>(); - if (becomingMaster == false || - (publicationInProgress() && getCurrentTerm() == currentPublication.get().publishedState().term())) { - final ClusterState lastPublishedState - = currentPublication.map(Publication::publishedState).orElse(coordinationState.get().getLastAcceptedState()); + if (becomingMaster && activePublicationInProgress() == false) { + // cluster state update task to become master is submitted to MasterService, but publication has not started yet + assert followersChecker.getKnownFollowers().isEmpty() : followersChecker.getKnownFollowers(); + } else { + final ClusterState lastPublishedState; + if (activePublicationInProgress()) { + // active publication in progress: followersChecker is up-to-date with nodes that we're actively publishing to + lastPublishedState = currentPublication.get().publishedState(); + } else { + // no active publication: followersChecker is up-to-date with the nodes of the latest publication + lastPublishedState = coordinationState.get().getLastAcceptedState(); + } + final Set lastPublishedNodes = new HashSet<>(); lastPublishedState.nodes().forEach(lastPublishedNodes::add); - assert lastPublishedNodes.remove(getLocalNode()); + assert lastPublishedNodes.remove(getLocalNode()); // followersChecker excludes local node + assert lastPublishedNodes.equals(followersChecker.getKnownFollowers()) : + lastPublishedNodes + " != " + followersChecker.getKnownFollowers(); } - assert lastPublishedNodes.equals(knownFollowers) : lastPublishedNodes + " != " + knownFollowers; } else if (mode == Mode.FOLLOWER) { assert coordinationState.get().electionWon() == false : getLocalNode() + " is FOLLOWER so electionWon() should be false"; assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); @@ -504,6 +512,7 @@ public void invariant() { assert leaderChecker.currentNodeIsMaster() == false; assert leaderCheckScheduler != null; assert followersChecker.getKnownFollowers().isEmpty(); + assert activePublicationInProgress() == false; } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; @@ -514,6 +523,7 @@ public void invariant() { assert leaderCheckScheduler == null : leaderCheckScheduler; assert followersChecker.getKnownFollowers().isEmpty(); assert applierState.nodes().getMasterNodeId() == null; + assert activePublicationInProgress() == false; } } } @@ -619,7 +629,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) getLocalNode() + " should be in published " + clusterState; final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState); - final Publication publication = new CoordinatorPublication(publishRequest, new ListenableFuture<>(), ackListener, + final CoordinatorPublication publication = new CoordinatorPublication(publishRequest, new ListenableFuture<>(), ackListener, publishListener); currentPublication = Optional.of(publication); @@ -737,6 +747,7 @@ class CoordinatorPublication extends Publication { private final ListenableFuture localNodeAckEvent; private final AckListener ackListener; private final ActionListener publishListener; + private boolean completed; CoordinatorPublication(PublishRequest publishRequest, ListenableFuture localNodeAckEvent, AckListener ackListener, ActionListener publishListener) { @@ -786,6 +797,8 @@ private void removePublicationAndPossiblyBecomeCandidate(String reason) { protected void onCompletion(boolean committed) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; + completed = true; + localNodeAckEvent.addListener(new ActionListener() { @Override public void onResponse(Void ignore) { @@ -830,6 +843,10 @@ public void onFailure(Exception e) { }, EsExecutors.newDirectExecutorService()); } + boolean isActive() { + return completed == false; + } + @Override protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 6fc1fc2993dcb..0117a65a1e640 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -483,10 +483,6 @@ void stabilise(long stabiliationDurationMillis, long delayVariability) { deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); - for (ClusterNode clusterNode : clusterNodes) { - assert clusterNode.coordinator.publicationInProgress() == false; - } - assertUniqueLeaderAndExpectedModes(); } @@ -532,12 +528,13 @@ private void assertUniqueLeaderAndExpectedModes() { equalTo(Optional.of(true))); for (final ClusterNode clusterNode : clusterNodes) { + final String nodeId = clusterNode.getId(); + assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.activePublicationInProgress()); + if (clusterNode == leader) { continue; } - final String nodeId = clusterNode.getId(); - if (disconnectedNodes.contains(nodeId) || blackholedNodes.contains(nodeId)) { assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); } else { From a7c286cc435895ed9bf72045ba6f734c6b5ab1a6 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 3 Oct 2018 23:42:08 +0200 Subject: [PATCH 09/20] remove superfluous parameter to stabilize --- .../cluster/coordination/CoordinatorTests.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 0117a65a1e640..06241608fcd0c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -465,24 +465,18 @@ void addNodes(int newNodesCount) { } void stabilise() { - stabilise(DEFAULT_STABILISATION_TIME, DEFAULT_DELAY_VARIABILITY); + stabilise(DEFAULT_STABILISATION_TIME); } void stabilise(long stabiliationDurationMillis) { - stabilise(stabiliationDurationMillis, DEFAULT_DELAY_VARIABILITY); - } - - void stabilise(long stabiliationDurationMillis, long delayVariability) { final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis(); final long stabilisationEndTime = stabilisationStartTime + stabiliationDurationMillis; logger.info("--> stabilising until [{}ms]", stabilisationEndTime); - deterministicTaskQueue.setExecutionDelayVariabilityMillis(delayVariability); + deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); runUntil(stabilisationEndTime); - deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); - assertUniqueLeaderAndExpectedModes(); } From 00938c282d39912222d6f4ec97ed92149b566c21 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 09:12:41 +0200 Subject: [PATCH 10/20] add safety phase to acking tests --- .../cluster/coordination/CoordinatorTests.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index b3cd208f4549c..b714f9a953020 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -266,6 +266,7 @@ public void testUnresponsiveFollowerDetectedEventually() { public void testAckListenerReceivesAcksFromAllNodes() { final Cluster cluster = new Cluster(randomIntBetween(3, 5)); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); AckCollector ackCollector = leader.submitValue(randomLong()); @@ -279,6 +280,7 @@ public void testAckListenerReceivesAcksFromAllNodes() { public void testAckListenerReceivesNackFromFollower() { final Cluster cluster = new Cluster(3); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -295,6 +297,7 @@ public void testAckListenerReceivesNackFromFollower() { public void testAckListenerReceivesNackFromLeader() { final Cluster cluster = new Cluster(3); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -315,6 +318,7 @@ public void testAckListenerReceivesNackFromLeader() { public void testAckListenerReceivesNoAckFromHangingFollower() { final Cluster cluster = new Cluster(3); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -332,6 +336,7 @@ public void testAckListenerReceivesNoAckFromHangingFollower() { public void testAckListenerReceivesNacksIfPublicationTimesOut() { final Cluster cluster = new Cluster(3); + cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -355,6 +360,7 @@ public void testAckListenerReceivesNacksIfPublicationTimesOut() { public void testAckListenerReceivesNacksIfLeaderStandsDown() { // TODO: needs support for handling disconnects // final Cluster cluster = new Cluster(3); +// cluster.runRandomly(); // cluster.stabilise(); // final ClusterNode leader = cluster.getAnyLeader(); // final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); @@ -376,6 +382,7 @@ public void testAckListenerReceivesNacksIfLeaderStandsDown() { public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { // TODO: needs auto-term bumping for cluster to form again // final Cluster cluster = new Cluster(3); +// cluster.runRandomly(); // cluster.stabilise(); // final ClusterNode leader = cluster.getAnyLeader(); // final ClusterNode follower0 = cluster.getAnyNodeExcept(leader); From cd22d4ad16a7f3dd7837db4a1164a6ec3a725b04 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 11:42:30 +0200 Subject: [PATCH 11/20] Separate logger for PublicationTransportHandler --- .../cluster/coordination/Coordinator.java | 4 ++-- .../coordination/PublicationTransportHandler.java | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index a92fe407e27ca..e3d6d55fb5ac4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -127,8 +127,8 @@ public Coordinator(Settings settings, TransportService transportService, Allocat configuredHostsResolver = new UnicastConfiguredHostsResolver(settings, transportService, unicastHostsProvider); this.peerFinder = new CoordinatorPeerFinder(settings, transportService, new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver); - this.publicationHandler = new PublicationTransportHandler(transportService, this::handlePublishRequest, this::handleApplyCommit, - logger); + this.publicationHandler = new PublicationTransportHandler(settings, transportService, this::handlePublishRequest, + this::handleApplyCommit); this.leaderChecker = new LeaderChecker(settings, transportService, getOnLeaderFailure()); this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::onFollowerFailure); this.nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index 804f3b14de206..91024ce4d63ce 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -18,10 +18,11 @@ */ package org.elasticsearch.cluster.coordination; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; @@ -33,17 +34,17 @@ import java.util.function.BiConsumer; import java.util.function.Function; -public class PublicationTransportHandler { +public class PublicationTransportHandler extends AbstractComponent { public static final String PUBLISH_STATE_ACTION_NAME = "internal:cluster/coordination/publish_state"; public static final String COMMIT_STATE_ACTION_NAME = "internal:cluster/coordination/commit_state"; private final TransportService transportService; - public PublicationTransportHandler(TransportService transportService, + public PublicationTransportHandler(Settings settings, TransportService transportService, Function handlePublishRequest, - BiConsumer> handleApplyCommit, - Logger logger) { + BiConsumer> handleApplyCommit) { + super(settings); this.transportService = transportService; transportService.registerRequestHandler(PUBLISH_STATE_ACTION_NAME, ThreadPool.Names.GENERIC, false, false, From c59ef80c582088e3de25d187e14df3f611bc767a Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 11:45:21 +0200 Subject: [PATCH 12/20] deduplicate completed field --- .../elasticsearch/cluster/coordination/Coordinator.java | 9 +-------- .../elasticsearch/cluster/coordination/Publication.java | 4 ++++ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index e3d6d55fb5ac4..8c43a60e80580 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -421,7 +421,7 @@ DiscoveryNode getLocalNode() { // package-visible for testing boolean activePublicationInProgress() { synchronized (mutex) { - return currentPublication.map(CoordinatorPublication::isActive).orElse(false); + return currentPublication.map(p -> p.isCompleted() == false).orElse(false); } } @@ -754,7 +754,6 @@ class CoordinatorPublication extends Publication { private final ListenableFuture localNodeAckEvent; private final AckListener ackListener; private final ActionListener publishListener; - private boolean completed; CoordinatorPublication(PublishRequest publishRequest, ListenableFuture localNodeAckEvent, AckListener ackListener, ActionListener publishListener) { @@ -804,8 +803,6 @@ private void removePublicationAndPossiblyBecomeCandidate(String reason) { protected void onCompletion(boolean committed) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - completed = true; - localNodeAckEvent.addListener(new ActionListener() { @Override public void onResponse(Void ignore) { @@ -850,10 +847,6 @@ public void onFailure(Exception e) { }, EsExecutors.newDirectExecutorService()); } - boolean isActive() { - return completed == false; - } - @Override protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index 41a0a7c78dfed..e58fe9bf860be 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -91,6 +91,10 @@ public void onFaultyNode(DiscoveryNode faultyNode) { onPossibleCompletion(); } + public boolean isCompleted() { + return isCompleted; + } + private void onPossibleCompletion() { if (isCompleted) { return; From a6739a92c06ba1e01e1fbd98ccd9d8f94b5ec6ea Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 11:49:04 +0200 Subject: [PATCH 13/20] NoOpClusterApplier --- .../cluster/coordination/NodeJoinTests.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 59eca7c0fee02..1cc93126dab0c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -165,17 +165,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, () -> new CoordinationStateTests.InMemoryPersistedState(term, initialState), r -> emptyList(), - new ClusterApplier() { - @Override - public void setInitialState(ClusterState initialState) { - - } - - @Override - public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { - listener.onSuccess(source); - } - }, random); + new NoOpClusterApplier(), + random); transportService.start(); transportService.acceptIncomingRequests(); transportRequestHandler = capturingTransport.getRequestHandler(JoinHelper.JOIN_ACTION_NAME); @@ -528,4 +519,16 @@ private boolean isLocalNodeElectedMaster() { private boolean clusterStateHasNode(DiscoveryNode node) { return node.equals(MasterServiceTests.discoveryState(masterService).nodes().get(node.getId())); } + + private static class NoOpClusterApplier implements ClusterApplier { + @Override + public void setInitialState(ClusterState initialState) { + + } + + @Override + public void onNewClusterState(String source, Supplier clusterStateSupplier, ClusterApplyListener listener) { + listener.onSuccess(source); + } + } } From ca833e02ec75870fea4bfef556960bff31839c86 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 11:49:13 +0200 Subject: [PATCH 14/20] connect -> heal --- .../cluster/coordination/CoordinatorTests.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index b714f9a953020..5b05daae36a19 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -523,8 +523,8 @@ void runRandomly() { switch (randomInt(2)) { case 0: - if (clusterNode.connect()) { - logger.debug("----> [runRandomly {}] connecting {}", step, clusterNode.getId()); + if (clusterNode.heal()) { + logger.debug("----> [runRandomly {}] healing {}", step, clusterNode.getId()); } break; case 1: @@ -869,7 +869,7 @@ public String toString() { return localNode.toString(); } - boolean connect() { + boolean heal() { boolean unBlackholed = blackholedNodes.remove(localNode.getId()); boolean unDisconnected = disconnectedNodes.remove(localNode.getId()); assert unBlackholed == false || unDisconnected == false; @@ -890,12 +890,6 @@ boolean blackhole() { return blackholed; } - void heal() { - boolean unDisconnected = disconnectedNodes.remove(localNode.getId()); - boolean unBlackholed = blackholedNodes.remove(localNode.getId()); - assert unDisconnected || unBlackholed; - } - ClusterState getLastAppliedClusterState() { return clusterApplier.lastAppliedClusterState; } From 627d68b790c00958daceec973595482f9bc6f0fa Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 14:09:10 +0200 Subject: [PATCH 15/20] revisit currentPublication check --- .../cluster/coordination/Coordinator.java | 10 +++++----- .../cluster/coordination/Publication.java | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 8c43a60e80580..ea82fc5660e3f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -421,7 +421,7 @@ DiscoveryNode getLocalNode() { // package-visible for testing boolean activePublicationInProgress() { synchronized (mutex) { - return currentPublication.map(p -> p.isCompleted() == false).orElse(false); + return currentPublication.isPresent(); } } @@ -490,12 +490,12 @@ public void invariant() { assert leaderCheckScheduler == null : leaderCheckScheduler; assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); - if (becomingMaster && activePublicationInProgress() == false) { + if (becomingMaster && currentPublication.isPresent() == false) { // cluster state update task to become master is submitted to MasterService, but publication has not started yet assert followersChecker.getKnownFollowers().isEmpty() : followersChecker.getKnownFollowers(); } else { final ClusterState lastPublishedState; - if (activePublicationInProgress()) { + if (currentPublication.isPresent()) { // active publication in progress: followersChecker is up-to-date with nodes that we're actively publishing to lastPublishedState = currentPublication.get().publishedState(); } else { @@ -519,7 +519,7 @@ public void invariant() { assert leaderChecker.currentNodeIsMaster() == false; assert leaderCheckScheduler != null; assert followersChecker.getKnownFollowers().isEmpty(); - assert activePublicationInProgress() == false; + assert currentPublication.map(Publication::isCommitted).orElse(true); } else { assert mode == Mode.CANDIDATE; assert joinAccumulator instanceof JoinHelper.CandidateJoinAccumulator; @@ -530,7 +530,7 @@ public void invariant() { assert leaderCheckScheduler == null : leaderCheckScheduler; assert followersChecker.getKnownFollowers().isEmpty(); assert applierState.nodes().getMasterNodeId() == null; - assert activePublicationInProgress() == false; + assert currentPublication.map(Publication::isCommitted).orElse(true); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java index e58fe9bf860be..20104755996b4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java @@ -91,8 +91,8 @@ public void onFaultyNode(DiscoveryNode faultyNode) { onPossibleCompletion(); } - public boolean isCompleted() { - return isCompleted; + public boolean isCommitted() { + return applyCommitRequest.isPresent(); } private void onPossibleCompletion() { From ff009b6bfe59fc47126b7704e9ff88aa4d461979 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 14:20:57 +0200 Subject: [PATCH 16/20] remove lastCommittedState --- .../cluster/coordination/Coordinator.java | 15 +++---- .../coordination/CoordinatorTests.java | 43 ++++++++----------- 2 files changed, 23 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index ea82fc5660e3f..e082558266b5a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -83,7 +83,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery // These tests can be rewritten to use public methods once Coordinator is more feature-complete final Object mutex = new Object(); final SetOnce coordinationState = new SetOnce<>(); // initialized on start-up (see doStart) - private volatile Optional lastCommittedState = Optional.empty(); private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier private final PeerFinder peerFinder; @@ -183,8 +182,8 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionList logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest); coordinationState.get().handleCommit(applyCommitRequest); - lastCommittedState = Optional.of(coordinationState.get().getLastAcceptedState()); - applierState = mode == Mode.CANDIDATE ? clusterStateWithNoMasterBlock(lastCommittedState.get()) : lastCommittedState.get(); + final ClusterState committedState = coordinationState.get().getLastAcceptedState(); + applierState = mode == Mode.CANDIDATE ? clusterStateWithNoMasterBlock(committedState) : committedState; if (applyCommitRequest.getSourceNode().equals(getLocalNode())) { // master node applies the committed state at the end of the publication process, not here. applyListener.onResponse(null); @@ -471,11 +470,6 @@ public void invariant() { assert peerFinder.getCurrentTerm() == getCurrentTerm(); assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState(); assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState(); - if (lastCommittedState.isPresent()) { - assert applierState != null; - assert lastCommittedState.get().term() == applierState.term(); - assert lastCommittedState.get().version() == applierState.version(); - } assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlock(NO_MASTER_BLOCK_WRITES.id()); if (mode == Mode.LEADER) { final boolean becomingMaster = getStateForMasterService().term() != getCurrentTerm(); @@ -564,8 +558,9 @@ public ClusterState getLastAcceptedState() { } } - public Optional getLastCommittedState() { - return lastCommittedState; + @Nullable + public ClusterState getApplierState() { + return applierState; } private List getDiscoveredNodes() { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 5b05daae36a19..2f932297c99e9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -33,7 +33,6 @@ import org.elasticsearch.cluster.coordination.CoordinatorTests.Cluster.ClusterNode; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode.Role; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; @@ -117,8 +116,8 @@ public void testCanUpdateClusterStateAfterStabilisation() { for (final ClusterNode clusterNode : cluster.clusterNodes) { final String nodeId = clusterNode.getId(); - final ClusterState committedState = clusterNode.coordinator.getLastCommittedState().get(); - assertThat(nodeId + " has the committed value", value(committedState), is(finalValue)); + final ClusterState applierState = clusterNode.coordinator.getApplierState(); + assertThat(nodeId + " has the committed value", value(applierState), is(finalValue)); } } @@ -571,15 +570,13 @@ private void assertConsistentStates() { private void updateCommittedStates() { for (final ClusterNode clusterNode : clusterNodes) { - Optional committedState = clusterNode.coordinator.getLastCommittedState(); - if (committedState.isPresent()) { - ClusterState storedState = committedStatesByVersion.get(committedState.get().getVersion()); - if (storedState == null) { - committedStatesByVersion.put(committedState.get().getVersion(), committedState.get()); - } else { - assertEquals("expected " + committedState.get() + " but got " + storedState, - value(committedState.get()), value(storedState)); - } + ClusterState applierState = clusterNode.coordinator.getApplierState(); + ClusterState storedState = committedStatesByVersion.get(applierState.getVersion()); + if (storedState == null) { + committedStatesByVersion.put(applierState.getVersion(), applierState); + } else { + assertEquals("expected " + applierState + " but got " + storedState, + value(applierState), value(storedState)); } } } @@ -634,12 +631,11 @@ private boolean isConnectedPair(ClusterNode n1, ClusterNode n2) { private void assertUniqueLeaderAndExpectedModes() { final ClusterNode leader = getAnyLeader(); final long leaderTerm = leader.coordinator.getCurrentTerm(); - Matcher> isPresentAndEqualToLeaderVersion - = equalTo(Optional.of(leader.coordinator.getLastAcceptedState().getVersion())); + Matcher isPresentAndEqualToLeaderVersion + = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); - assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getVersion), isPresentAndEqualToLeaderVersion); - assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(leader.getId())), - equalTo(Optional.of(true))); + assertThat(leader.coordinator.getApplierState().getVersion(), isPresentAndEqualToLeaderVersion); + assertTrue(leader.coordinator.getApplierState().getNodes().nodeExists(leader.getId())); for (final ClusterNode clusterNode : clusterNodes) { final String nodeId = clusterNode.getId(); @@ -655,20 +651,17 @@ private void assertUniqueLeaderAndExpectedModes() { // TODO assert that this node has actually voted for the leader in this term // TODO assert that this node's accepted and committed states are the same as the leader's - assertThat(nodeId + " is in the leader's committed state", - leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)), - equalTo(Optional.of(true))); + assertTrue(nodeId + " is in the leader's applier state", + leader.coordinator.getApplierState().getNodes().nodeExists(nodeId)); } else { assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); - assertThat(nodeId + " is not in the leader's committed state", - leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(dn -> dn.nodeExists(nodeId)), - equalTo(Optional.of(false))); + assertFalse(nodeId + " is not in the leader's applier state", + leader.coordinator.getApplierState().getNodes().nodeExists(nodeId)); } } int connectedNodeCount = Math.toIntExact(clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).count()); - assertThat(leader.coordinator.getLastCommittedState().map(ClusterState::getNodes).map(DiscoveryNodes::getSize), - equalTo(Optional.of(connectedNodeCount))); + assertThat(leader.coordinator.getApplierState().getNodes().getSize(), equalTo(connectedNodeCount)); } ClusterNode getAnyLeader() { From cc10b84625ed8b351cb938b1681f211f336bae08 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 14:25:50 +0200 Subject: [PATCH 17/20] add term-bump hack --- .../cluster/coordination/Coordinator.java | 6 ++++-- .../coordination/CoordinatorTests.java | 21 +++++++++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index e082558266b5a..fdf2214b2171e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -260,7 +260,8 @@ private void updateMaxTermSeen(final long term) { // do check for this after the publication completes) } - private void startElection() { + // TODO: make private again after removing term-bump workaround + void startElection() { synchronized (mutex) { // The preVoteCollector is only active while we are candidate, but it does not call this method with synchronisation, so we have // to check our mode again here. @@ -272,7 +273,8 @@ private void startElection() { } } - private Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) { + // TODO: make private again after removing term-bump workaround + Optional ensureTermAtLeast(DiscoveryNode sourceNode, long targetTerm) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; if (getCurrentTerm() < targetTerm) { return Optional.of(joinLeaderInTerm(new StartJoinRequest(sourceNode, targetTerm))); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 2f932297c99e9..0618c877f7ee9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -379,7 +379,7 @@ public void testAckListenerReceivesNacksIfLeaderStandsDown() { } public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { - // TODO: needs auto-term bumping for cluster to form again + // TODO: needs proper term bumping // final Cluster cluster = new Cluster(3); // cluster.runRandomly(); // cluster.stabilise(); @@ -389,7 +389,7 @@ public void testAckListenerReceivesNacksFromFollowerInHigherTerm() { // // follower0.coordinator.joinLeaderInTerm(new StartJoinRequest(follower0.localNode, follower0.coordinator.getCurrentTerm() + 1)); // AckCollector ackCollector = leader.submitValue(randomLong()); -// cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY, 0L); +// cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); // assertTrue("expected ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); // assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0)); // assertTrue("expected ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); @@ -594,6 +594,23 @@ void stabilise(long stabiliationDurationMillis) { runUntil(stabilisationEndTime); + // TODO remove when term-bumping is enabled + final long maxTerm = clusterNodes.stream().map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L); + final long maxLeaderTerm = clusterNodes.stream().filter(n -> n.coordinator.getMode() == Coordinator.Mode.LEADER) + .map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L); + + if (maxLeaderTerm < maxTerm) { + logger.info("--> forcing a term bump, maxTerm={}, maxLeaderTerm={}", maxTerm, maxLeaderTerm); + final ClusterNode leader = getAnyLeader(); + synchronized (leader.coordinator.mutex) { + leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1); + } + leader.coordinator.startElection(); + final long termBumpEndTime = stabilisationEndTime + DEFAULT_ELECTION_DELAY; + logger.info("--> re-stabilising after term bump until [{}ms]", termBumpEndTime); + runUntil(termBumpEndTime); + } + assertUniqueLeaderAndExpectedModes(); } From e2a4f41db18e55b987167d6592491c6d6a844c62 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 14:38:38 +0200 Subject: [PATCH 18/20] Adapt DEFAULT_CLUSTER_STATE_UPDATE_DELAY --- .../cluster/coordination/CoordinatorTests.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 0618c877f7ee9..c942ce4aec41d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -403,13 +403,15 @@ private static int defaultInt(Setting setting) { return setting.get(Settings.EMPTY); } - // Updating the cluster state involves up to 5 delays: + // Updating the cluster state involves up to 7 delays: // 1. submit the task to the master service // 2. send PublishRequest // 3. receive PublishResponse // 4. send ApplyCommitRequest - // 5. receive ApplyCommitResponse and apply committed state - private static final long DEFAULT_CLUSTER_STATE_UPDATE_DELAY = 5 * DEFAULT_DELAY_VARIABILITY; + // 5. apply committed cluster state + // 6. receive ApplyCommitResponse + // 7. apply committed state on master (last one to apply cluster state) + private static final long DEFAULT_CLUSTER_STATE_UPDATE_DELAY = 7 * DEFAULT_DELAY_VARIABILITY; private static final int ELECTION_RETRIES = 10; From 84a69c1bba3440e0376e459982f72d1132ea68ff Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 15:07:33 +0200 Subject: [PATCH 19/20] refine activePublication check --- .../cluster/coordination/Coordinator.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index fdf2214b2171e..5e72b839968b7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -486,12 +486,13 @@ public void invariant() { assert leaderCheckScheduler == null : leaderCheckScheduler; assert applierState.nodes().getMasterNodeId() == null || getLocalNode().equals(applierState.nodes().getMasterNode()); - if (becomingMaster && currentPublication.isPresent() == false) { + final boolean activePublication = currentPublication.map(CoordinatorPublication::isActiveForCurrentLeader).orElse(false); + if (becomingMaster && activePublication == false) { // cluster state update task to become master is submitted to MasterService, but publication has not started yet assert followersChecker.getKnownFollowers().isEmpty() : followersChecker.getKnownFollowers(); } else { final ClusterState lastPublishedState; - if (currentPublication.isPresent()) { + if (activePublication) { // active publication in progress: followersChecker is up-to-date with nodes that we're actively publishing to lastPublishedState = currentPublication.get().publishedState(); } else { @@ -791,11 +792,16 @@ private void removePublicationAndPossiblyBecomeCandidate(String reason) { currentPublication = Optional.empty(); // check if node has not already switched modes (by bumping term) - if (mode == Mode.LEADER && publishRequest.getAcceptedState().term() == getCurrentTerm()) { + if (isActiveForCurrentLeader()) { becomeCandidate(reason); } } + boolean isActiveForCurrentLeader() { + // checks if this publication can still influence the mode of the current publication + return mode == Mode.LEADER && publishRequest.getAcceptedState().term() == getCurrentTerm(); + } + @Override protected void onCompletion(boolean committed) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; From a77328815472a3a2c82300bc08f69147c0a64a80 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 4 Oct 2018 17:36:56 +0200 Subject: [PATCH 20/20] misc review feedback --- .../cluster/coordination/Coordinator.java | 2 +- .../coordination/CoordinatorTests.java | 48 ++++++++----------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 5e72b839968b7..3ef43cead664a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -420,7 +420,7 @@ DiscoveryNode getLocalNode() { } // package-visible for testing - boolean activePublicationInProgress() { + boolean publicationInProgress() { synchronized (mutex) { return currentPublication.isPresent(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index c942ce4aec41d..286ee3f5c0d37 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -116,8 +116,8 @@ public void testCanUpdateClusterStateAfterStabilisation() { for (final ClusterNode clusterNode : cluster.clusterNodes) { final String nodeId = clusterNode.getId(); - final ClusterState applierState = clusterNode.coordinator.getApplierState(); - assertThat(nodeId + " has the committed value", value(applierState), is(finalValue)); + final ClusterState appliedState = clusterNode.getLastAppliedClusterState(); + assertThat(nodeId + " has the applied value", value(appliedState), is(finalValue)); } } @@ -305,7 +305,7 @@ public void testAckListenerReceivesNackFromLeader() { leader.setClusterStateApplyResponse(ClusterStateApplyResponse.FAIL); AckCollector ackCollector = leader.submitValue(randomLong()); - cluster.runUntil(cluster.getCurrentTimeMillis() + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); assertTrue(leader.coordinator.getMode() != Coordinator.Mode.LEADER || leader.coordinator.getCurrentTerm() > startingTerm); leader.setClusterStateApplyResponse(ClusterStateApplyResponse.SUCCEED); cluster.stabilise(); @@ -325,7 +325,7 @@ public void testAckListenerReceivesNoAckFromHangingFollower() { follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.HANG); AckCollector ackCollector = leader.submitValue(randomLong()); - cluster.runUntil(cluster.getCurrentTimeMillis() + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); assertTrue("expected immediate ack from " + follower1, ackCollector.hasAckedSuccessfully(follower1)); assertFalse("expected no ack from " + leader, ackCollector.hasAckedSuccessfully(leader)); cluster.stabilise(); @@ -344,7 +344,7 @@ public void testAckListenerReceivesNacksIfPublicationTimesOut() { follower0.blackhole(); follower1.blackhole(); AckCollector ackCollector = leader.submitValue(randomLong()); - cluster.runUntil(cluster.getCurrentTimeMillis() + DEFAULT_CLUSTER_STATE_UPDATE_DELAY); + cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); assertFalse("expected no immediate ack from " + leader, ackCollector.hasAcked(leader)); assertFalse("expected no immediate ack from " + follower0, ackCollector.hasAcked(follower0)); assertFalse("expected no immediate ack from " + follower1, ackCollector.hasAcked(follower1)); @@ -588,13 +588,9 @@ void stabilise() { } void stabilise(long stabiliationDurationMillis) { - final long stabilisationStartTime = deterministicTaskQueue.getCurrentTimeMillis(); - final long stabilisationEndTime = stabilisationStartTime + stabiliationDurationMillis; - logger.info("--> stabilising until [{}ms]", stabilisationEndTime); - + logger.info("--> stabilising until [{}ms]", deterministicTaskQueue.getCurrentTimeMillis() + stabiliationDurationMillis); deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); - - runUntil(stabilisationEndTime); + runFor(stabiliationDurationMillis); // TODO remove when term-bumping is enabled final long maxTerm = clusterNodes.stream().map(n -> n.coordinator.getCurrentTerm()).max(Long::compare).orElse(0L); @@ -608,15 +604,17 @@ void stabilise(long stabiliationDurationMillis) { leader.coordinator.ensureTermAtLeast(leader.localNode, maxTerm + 1); } leader.coordinator.startElection(); - final long termBumpEndTime = stabilisationEndTime + DEFAULT_ELECTION_DELAY; - logger.info("--> re-stabilising after term bump until [{}ms]", termBumpEndTime); - runUntil(termBumpEndTime); + logger.info("--> re-stabilising after term bump until [{}ms]", + deterministicTaskQueue.getCurrentTimeMillis() + DEFAULT_ELECTION_DELAY); + runFor(DEFAULT_ELECTION_DELAY); } assertUniqueLeaderAndExpectedModes(); } - void runUntil(long endTime) { + void runFor(long runDurationMillis) { + final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + runDurationMillis; + while (deterministicTaskQueue.getCurrentTimeMillis() < endTime) { while (deterministicTaskQueue.hasRunnableTasks()) { @@ -653,12 +651,12 @@ private void assertUniqueLeaderAndExpectedModes() { Matcher isPresentAndEqualToLeaderVersion = equalTo(leader.coordinator.getLastAcceptedState().getVersion()); - assertThat(leader.coordinator.getApplierState().getVersion(), isPresentAndEqualToLeaderVersion); - assertTrue(leader.coordinator.getApplierState().getNodes().nodeExists(leader.getId())); + assertTrue(leader.getLastAppliedClusterState().getNodes().nodeExists(leader.getId())); + assertThat(leader.getLastAppliedClusterState().getVersion(), isPresentAndEqualToLeaderVersion); for (final ClusterNode clusterNode : clusterNodes) { final String nodeId = clusterNode.getId(); - assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.activePublicationInProgress()); + assertFalse(nodeId + " should not have an active publication", clusterNode.coordinator.publicationInProgress()); if (clusterNode == leader) { continue; @@ -670,17 +668,17 @@ private void assertUniqueLeaderAndExpectedModes() { // TODO assert that this node has actually voted for the leader in this term // TODO assert that this node's accepted and committed states are the same as the leader's - assertTrue(nodeId + " is in the leader's applier state", - leader.coordinator.getApplierState().getNodes().nodeExists(nodeId)); + assertTrue(nodeId + " is in the leader's applied state", + leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); } else { assertThat(nodeId + " is a candidate", clusterNode.coordinator.getMode(), is(CANDIDATE)); - assertFalse(nodeId + " is not in the leader's applier state", - leader.coordinator.getApplierState().getNodes().nodeExists(nodeId)); + assertFalse(nodeId + " is not in the leader's applied state", + leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); } } int connectedNodeCount = Math.toIntExact(clusterNodes.stream().filter(n -> isConnectedPair(leader, n)).count()); - assertThat(leader.coordinator.getApplierState().getNodes().getSize(), equalTo(connectedNodeCount)); + assertThat(leader.getLastAppliedClusterState().getNodes().getSize(), equalTo(connectedNodeCount)); } ClusterNode getAnyLeader() { @@ -713,10 +711,6 @@ ClusterNode getAnyNodeExcept(ClusterNode... clusterNodes) { return randomFrom(acceptableNodes); } - public long getCurrentTimeMillis() { - return deterministicTaskQueue.getCurrentTimeMillis(); - } - ClusterNode getAnyNodePreferringLeaders() { for (int i = 0; i < 3; i++) { ClusterNode clusterNode = getAnyNode();