Skip to content

Commit 02b483c

Browse files
authored
Logging improvements in CoordinatorTests (#33991)
Today, we know that CoordinatorTests sometimes fail to stabilise due to an election collision. This change improves the logging that occurs when an election collision occurs so it will be easier to see if this is happening when analysing a test failure. We also wrap the call to masterService.submitStateUpdateTask() in a context that logs the node on which it runs. We also introduce the InitialJoinAccumulator instead of using a placeholder CandidateJoinAccumulator at startup, which reduces the cases to consider in CandidateJoinAccumulator.close() and tightens up the assertions we can make here.
1 parent 2e774e1 commit 02b483c

File tree

6 files changed

+60
-15
lines changed

6 files changed

+60
-15
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,25 @@ public void setInitialState(ClusterState initialState) {
158158
*/
159159
public Join handleStartJoin(StartJoinRequest startJoinRequest) {
160160
if (startJoinRequest.getTerm() <= getCurrentTerm()) {
161-
logger.debug("handleStartJoin: ignored as term provided [{}] not greater than current term [{}]",
162-
startJoinRequest.getTerm(), getCurrentTerm());
161+
logger.debug("handleStartJoin: ignoring [{}] as term provided is not greater than current term [{}]",
162+
startJoinRequest, getCurrentTerm());
163163
throw new CoordinationStateRejectedException("incoming term " + startJoinRequest.getTerm() +
164164
" not greater than current term " + getCurrentTerm());
165165
}
166166

167-
logger.debug("handleStartJoin: updating term from [{}] to [{}]", getCurrentTerm(), startJoinRequest.getTerm());
167+
logger.debug("handleStartJoin: leaving term [{}] due to {}", getCurrentTerm(), startJoinRequest);
168+
169+
if (joinVotes.isEmpty() == false) {
170+
final String reason;
171+
if (electionWon == false) {
172+
reason = "failed election";
173+
} else if (startJoinRequest.getSourceNode().equals(localNode)) {
174+
reason = "bumping term";
175+
} else {
176+
reason = "standing down as leader";
177+
}
178+
logger.debug("handleStartJoin: discarding {}: {}", joinVotes, reason);
179+
}
168180

169181
persistedState.setCurrentTerm(startJoinRequest.getTerm());
170182
assert getCurrentTerm() == startJoinRequest.getTerm();
@@ -232,6 +244,7 @@ public boolean handleJoin(Join join) {
232244
join.getSourceNode(), electionWon, lastAcceptedTerm, getLastAcceptedVersion());
233245

234246
if (electionWon && prevElectionWon == false) {
247+
logger.debug("handleJoin: election won in term [{}] with {}", getCurrentTerm(), joinVotes);
235248
lastPublishedVersion = getLastAcceptedVersion();
236249
}
237250
return added;

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.ClusterChangedEvent;
2525
import org.elasticsearch.cluster.ClusterState;
2626
import org.elasticsearch.cluster.block.ClusterBlocks;
27+
import org.elasticsearch.cluster.coordination.JoinHelper.InitialJoinAccumulator;
2728
import org.elasticsearch.cluster.node.DiscoveryNode;
2829
import org.elasticsearch.cluster.node.DiscoveryNodes;
2930
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@@ -99,7 +100,7 @@ public Coordinator(Settings settings, TransportService transportService, Allocat
99100
this.persistedStateSupplier = persistedStateSupplier;
100101
this.lastKnownLeader = Optional.empty();
101102
this.lastJoin = Optional.empty();
102-
this.joinAccumulator = joinHelper.new CandidateJoinAccumulator();
103+
this.joinAccumulator = new InitialJoinAccumulator();
103104
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
104105
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
105106
this.preVoteCollector = new PreVoteCollector(settings, transportService, this::startElection, this::updateMaxTermSeen);

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

+18-5
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ interface JoinAccumulator {
202202
void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback);
203203

204204
default void close(Mode newMode) {
205-
206205
}
207206
}
208207

@@ -220,6 +219,19 @@ public String toString() {
220219
}
221220
}
222221

222+
static class InitialJoinAccumulator implements JoinAccumulator {
223+
@Override
224+
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
225+
assert false : "unexpected join from " + sender + " during initialisation";
226+
joinCallback.onFailure(new CoordinationStateRejectedException("join target is not initialised yet"));
227+
}
228+
229+
@Override
230+
public String toString() {
231+
return "InitialJoinAccumulator";
232+
}
233+
}
234+
223235
static class FollowerJoinAccumulator implements JoinAccumulator {
224236
@Override
225237
public void handleJoinRequest(DiscoveryNode sender, JoinCallback joinCallback) {
@@ -265,13 +277,14 @@ public void close(Mode newMode) {
265277
});
266278
masterService.submitStateUpdateTasks(stateUpdateSource, pendingAsTasks, ClusterStateTaskConfig.build(Priority.URGENT),
267279
joinTaskExecutor);
268-
} else if (newMode == Mode.FOLLOWER) {
280+
} else {
281+
assert newMode == Mode.FOLLOWER : newMode;
269282
joinRequestAccumulator.values().forEach(joinCallback -> joinCallback.onFailure(
270283
new CoordinationStateRejectedException("became follower")));
271-
} else {
272-
assert newMode == Mode.CANDIDATE;
273-
assert joinRequestAccumulator.isEmpty() : joinRequestAccumulator.keySet();
274284
}
285+
286+
// CandidateJoinAccumulator is only closed when becoming leader or follower, otherwise it accumulates all joins received
287+
// regardless of term.
275288
}
276289

277290
@Override

server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,8 @@ private ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState cu
170170
ClusterState tmpState = ClusterState.builder(currentState).nodes(nodesBuilder).blocks(ClusterBlocks.builder()
171171
.blocks(currentState.blocks())
172172
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
173-
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false,
174-
"removed dead nodes on election"));
173+
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
174+
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
175175
}
176176

177177
@Override

server/src/main/java/org/elasticsearch/transport/TransportService.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -1221,7 +1221,17 @@ public void sendResponse(Exception exception) throws IOException {
12211221
if (ThreadPool.Names.SAME.equals(executor)) {
12221222
processException(handler, rtx);
12231223
} else {
1224-
threadPool.executor(handler.executor()).execute(() -> processException(handler, rtx));
1224+
threadPool.executor(handler.executor()).execute(new Runnable() {
1225+
@Override
1226+
public void run() {
1227+
processException(handler, rtx);
1228+
}
1229+
1230+
@Override
1231+
public String toString() {
1232+
return "delivery of exception response to [" + action + "][" + requestId + "]: " + exception;
1233+
}
1234+
});
12251235
}
12261236
}
12271237
}

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public void testCanUpdateClusterStateAfterStabilisation() {
7676

7777
final ClusterNode leader = cluster.getAnyLeader();
7878
long finalValue = randomLong();
79+
80+
logger.info("--> submitting value [{}] to [{}]", finalValue, leader);
7981
leader.submitValue(finalValue);
8082
cluster.stabilise(); // TODO this should only need a short stabilisation
8183

@@ -96,6 +98,7 @@ class Cluster {
9698

9799
final List<ClusterNode> clusterNodes;
98100
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
101+
// TODO does ThreadPool need a node name any more?
99102
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build());
100103
private final VotingConfiguration initialConfiguration;
101104

@@ -155,7 +158,7 @@ private void assertUniqueLeaderAndExpectedModes() {
155158

156159
final String nodeId = clusterNode.getId();
157160
assertThat(nodeId + " has the same term as the leader", clusterNode.coordinator.getCurrentTerm(), is(leaderTerm));
158-
assertTrue("leader should have received a vote from " + nodeId,
161+
assertTrue("leader " + leader.getId() + " should have received a vote from " + nodeId,
159162
leader.coordinator.hasJoinVoteFrom(clusterNode.getLocalNode()));
160163

161164
assertThat(nodeId + " is a follower", clusterNode.coordinator.getMode(), is(FOLLOWER));
@@ -267,7 +270,7 @@ boolean isLeader() {
267270
}
268271

269272
void submitValue(final long value) {
270-
masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() {
273+
onNode(localNode, () -> masterService.submitStateUpdateTask("new value [" + value + "]", new ClusterStateUpdateTask() {
271274
@Override
272275
public ClusterState execute(ClusterState currentState) {
273276
return setValue(currentState, value);
@@ -277,7 +280,12 @@ public ClusterState execute(ClusterState currentState) {
277280
public void onFailure(String source, Exception e) {
278281
logger.debug(() -> new ParameterizedMessage("failed to publish: [{}]", source), e);
279282
}
280-
});
283+
})).run();
284+
}
285+
286+
@Override
287+
public String toString() {
288+
return localNode.toString();
281289
}
282290
}
283291

0 commit comments

Comments
 (0)