Skip to content

Resolve some coordination-layer TODOs #54511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ String getDescription() {
if (INITIAL_MASTER_NODES_SETTING.get(Settings.EMPTY).equals(INITIAL_MASTER_NODES_SETTING.get(settings))) {
bootstrappingDescription = "[" + INITIAL_MASTER_NODES_SETTING.getKey() + "] is empty on this node";
} else {
// TODO update this when we can bootstrap on only a quorum of the initial nodes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, see call to ClusterBootstrapService::isBootstrapPlaceholder in describeQuorum.

bootstrappingDescription = String.format(Locale.ROOT,
"this node must discover master-eligible nodes %s to bootstrap a cluster",
INITIAL_MASTER_NODES_SETTING.get(settings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
private final Supplier<CoordinationState.PersistedState> persistedStateSupplier;
private final NoMasterBlockService noMasterBlockService;
// TODO: the following field is package-private as some tests require access to it
// These tests can be rewritten to use public methods once Coordinator is more feature-complete
final Object mutex = new Object();
final Object mutex = new Object(); // package-private to allow tests to call methods that assert that the mutex is held
private final SetOnce<CoordinationState> coordinationState = new SetOnce<>(); // initialized on start-up (see doStart)
private volatile ClusterState applierState; // the state that should be exposed to the cluster state applier

Expand Down Expand Up @@ -855,7 +853,7 @@ public boolean setInitialConfiguration(final VotingConfiguration votingConfigura

Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata());
// automatically generate a UID for the metadata if we need to
metadataBuilder.generateClusterUuidIfNeeded(); // TODO generate UUID in bootstrapping tool?
metadataBuilder.generateClusterUuidIfNeeded();
metadataBuilder.coordinationMetadata(coordinationMetadata);

coordinationState.get().setInitialState(ClusterState.builder(currentState).metadata(metadataBuilder).build());
Expand Down Expand Up @@ -1159,7 +1157,7 @@ private void startElectionScheduler() {
return;
}

final TimeValue gracePeriod = TimeValue.ZERO; // TODO variable grace period
final TimeValue gracePeriod = TimeValue.ZERO;
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -1189,7 +1187,6 @@ public String toString() {
}

public Iterable<DiscoveryNode> getFoundPeers() {
// TODO everyone takes this and adds the local node. Maybe just add the local node here?
return peerFinder.getFoundPeers();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ private void handleFollowerCheck(FollowerCheckRequest request, TransportChannel
FastResponseState responder = this.fastResponseState;

if (responder.mode == Mode.FOLLOWER && responder.term == request.term) {
// TODO trigger a term bump if we voted for a different leader in this term
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens by sending a publish response with no associated join.

logger.trace("responding to {} on fast path", request);
transportChannel.sendResponse(Empty.INSTANCE);
return;
Expand Down Expand Up @@ -198,15 +197,6 @@ public String toString() {
});
}

// TODO in the PoC a faulty node was considered non-faulty again if it sent us a PeersRequest:
// - node disconnects, detected faulty, removal is enqueued
// - node reconnects, pings us, finds we are master, requests to join, all before removal is applied
// - join is processed before removal, but we do not publish to known-faulty nodes so the joining node does not receive this publication
// - it doesn't start its leader checker since it receives nothing to cause it to become a follower
// Apparently this meant that it remained a candidate for too long, leading to a test failure. At the time this logic was added, we did
// not have gossip-based discovery which would (I think) have retried this joining process a short time later. It's therefore possible
// that this is no longer required, so it's omitted here until we can be sure if it's necessary or not.

/**
* @return nodes in the current cluster state which have failed their follower checks.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.cluster.ClusterState;

public class InMemoryPersistedState implements CoordinationState.PersistedState {
// TODO add support and tests for behaviour with persistence-layer failures
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, see AbstractCoordinatorTestCase.MockPersistedState.


private long currentTerm;
private ClusterState acceptedState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public class PreVoteCollector {
this.updateMaxTermSeen = updateMaxTermSeen;
this.electionStrategy = electionStrategy;

// TODO does this need to be on the generic threadpool or can it use SAME?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, SAME is bad because updateMaxTermSeen.accept may block on Coordinator#mutex.

transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
PreVoteRequest::new,
(request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ void sendPublishRequest() {
assert state == PublicationTargetState.NOT_STARTED : state + " -> " + PublicationTargetState.SENT_PUBLISH_REQUEST;
state = PublicationTargetState.SENT_PUBLISH_REQUEST;
Publication.this.sendPublishRequest(discoveryNode, publishRequest, new PublishResponseHandler());
// TODO Can this ^ fail with an exception? Target should be failed if so.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think so, it notifies the listener on a failure.

assert publicationCompletedIffAllTargetsInactiveOrCancelled();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,6 @@ public void testUnresponsiveLeaderDetectedEventually() {
logger.info("--> blackholing leader {}", originalLeader);
originalLeader.blackhole();

// This stabilisation time bound is undesirably long. TODO try and reduce it.
cluster.stabilise(Math.max(
// first wait for all the followers to notice the leader has gone
(defaultMillis(LEADER_CHECK_INTERVAL_SETTING) + defaultMillis(LEADER_CHECK_TIMEOUT_SETTING))
Expand Down Expand Up @@ -685,7 +684,7 @@ public void testAckListenerReceivesNacksIfLeaderStandsDown() {
// cluster has two nodes in mode LEADER, in different terms ofc, and the one in the lower term won’t be able to publish anything
leader.heal();
AckCollector ackCollector = leader.submitValue(randomLong());
cluster.stabilise(); // TODO: check if can find a better bound here
cluster.stabilise();
assertTrue("expected nack from " + leader, ackCollector.hasAckedUnsuccessfully(leader));
assertTrue("expected nack from " + follower0, ackCollector.hasAckedUnsuccessfully(follower0));
assertTrue("expected nack from " + follower1, ackCollector.hasAckedUnsuccessfully(follower1));
Expand Down Expand Up @@ -738,14 +737,13 @@ public void testSettingInitialConfigurationTriggersElection() {
cluster.stabilise(
// the first election should succeed, because only one node knows of the initial configuration and therefore can win a
// pre-voting round and proceed to an election, so there cannot be any collisions
defaultMillis(ELECTION_INITIAL_TIMEOUT_SETTING) // TODO this wait is unnecessary, we could trigger the election immediately
defaultMillis(ELECTION_INITIAL_TIMEOUT_SETTING)
// Allow two round-trip for pre-voting and voting
+ 4 * DEFAULT_DELAY_VARIABILITY
// Then a commit of the new leader's first cluster state
+ DEFAULT_CLUSTER_STATE_UPDATE_DELAY
// Then allow time for all the other nodes to join, each of which might cause a reconfiguration
+ (cluster.size() - 1) * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY
// TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great.
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public String toString() {
assert electionOccurred == false;
electionOccurred = true;
}, l -> {
}, ElectionStrategy.DEFAULT_INSTANCE); // TODO need tests that check that the max term seen is updated
}, ElectionStrategy.DEFAULT_INSTANCE);
preVoteCollector.update(getLocalPreVoteResponse(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ class Cluster implements Releasable {

final List<ClusterNode> clusterNodes;
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
// TODO does ThreadPool need a node name any more?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really relevant here.

Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random());
private boolean disruptStorage;

Expand Down Expand Up @@ -289,8 +288,13 @@ class Cluster implements Releasable {
initialNodeCount, masterEligibleNodeIds, initialConfiguration);
}

List<ClusterNode> addNodesAndStabilise(int newNodesCount) {
final List<ClusterNode> addedNodes = addNodes(newNodesCount);
void addNodesAndStabilise(int newNodesCount) {

// The stabilisation time bound is O(#new nodes) which isn't ideal; it's possible that the real bound is O(1) since node-join
// events are batched together, but in practice we have not seen problems in this area so have not invested the time needed to
// investigate this more closely.

addNodes(newNodesCount);
stabilise(
// The first pinging discovers the master
defaultMillis(DISCOVERY_FIND_PEERS_INTERVAL_SETTING)
Expand All @@ -299,8 +303,6 @@ List<ClusterNode> addNodesAndStabilise(int newNodesCount) {
// Commit a new cluster state with the new node(s). Might be split into multiple commits, and each might need a
// followup reconfiguration
+ newNodesCount * 2 * DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
// TODO Investigate whether 4 publications is sufficient due to batching? A bound linear in the number of nodes isn't great.
return addedNodes;
}

List<ClusterNode> addNodes(int newNodesCount) {
Expand Down Expand Up @@ -331,7 +333,6 @@ void runRandomly() {
*/
void runRandomly(boolean allowReboots, boolean coolDown, long delayVariability) {

// TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it
assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty());
assertThat("may reconnect blackholed nodes, probably unexpected", blackholedNodes, empty());

Expand Down Expand Up @@ -449,11 +450,6 @@ public String toString() {
deterministicTaskQueue.runRandomTask();
}
}

// TODO other random steps:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, or at least covered elsewhere.

// - reboot a node
// - abdicate leadership

} catch (CoordinationStateRejectedException | UncheckedIOException ignored) {
// This is ok: it just means a message couldn't currently be handled.
}
Expand Down Expand Up @@ -1254,7 +1250,6 @@ static class AckCollector implements ClusterStatePublisher.AckListener {

@Override
public void onCommit(TimeValue commitTime) {
// TODO we only currently care about per-node acks
}

@Override
Expand Down