Skip to content

Commit cfdf666

Browse files
authored
[Zen2] Fix test failures in diff-based publishing (#35684)
`testIncompatibleDiffResendsFullState` sometimes makes a 2-node cluster and then partitions one of the nodes from the leader, which makes the leader stand down. Then when the partition is removed the cluster re-forms but does so by sending full cluster states, not diffs, causing the test to fail. Additionally `testDiffBasedPublishing` sometimes fails if a publication is delivered out-of-order, wiping out a fresher last-received cluster state with a less-fresh one. This is fixed here by passing the received cluster state to the coordinator before recording it as the last-received one, relying on the coordinator's freshness checks.
1 parent c816347 commit cfdf666

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,10 @@ public String toString() {
201201
}
202202
});
203203
} else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) {
204+
logger.trace("sending full cluster state version {} to {}", newState.version(), destination);
204205
PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener);
205206
} else {
207+
logger.trace("sending cluster state diff for version {} to {}", newState.version(), destination);
206208
PublicationTransportHandler.this.sendClusterStateDiff(newState, serializedDiffs, serializedStates, destination,
207209
responseActionListener);
208210
}
@@ -381,7 +383,6 @@ public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVe
381383
private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
382384
final Compressor compressor = CompressorFactory.compressor(request.bytes());
383385
StreamInput in = request.bytes().streamInput();
384-
final ClusterState incomingState;
385386
try {
386387
if (compressor != null) {
387388
in = compressor.streamInput(in);
@@ -390,23 +391,27 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
390391
in.setVersion(request.version());
391392
// If true we received full cluster state - otherwise diffs
392393
if (in.readBoolean()) {
393-
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
394+
final ClusterState incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
394395
fullClusterStateReceivedCount.incrementAndGet();
395396
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
396397
request.bytes().length());
398+
final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState));
397399
lastSeenClusterState.set(incomingState);
400+
return response;
398401
} else {
399402
final ClusterState lastSeen = lastSeenClusterState.get();
400403
if (lastSeen == null) {
401404
logger.debug("received diff for but don't have any local cluster state - requesting full state");
402405
throw new IncompatibleClusterStateVersionException("have no local cluster state");
403406
} else {
404407
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode());
405-
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
408+
final ClusterState incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
406409
compatibleClusterStateDiffReceivedCount.incrementAndGet();
407410
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
408411
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
412+
final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState));
409413
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
414+
return response;
410415
}
411416
}
412417
} catch (IncompatibleClusterStateVersionException e) {
@@ -418,7 +423,5 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
418423
} finally {
419424
IOUtils.close(in);
420425
}
421-
422-
return handlePublishRequest.apply(new PublishRequest(incomingState));
423426
}
424427
}

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -784,19 +784,19 @@ public void testDiffBasedPublishing() {
784784
assertThat(value(cn.getLastAppliedClusterState()), is(finalValue));
785785
if (cn == leader) {
786786
// leader does not update publish stats as it's not using the serialized state
787-
assertEquals(prePublishStats.get(cn).getFullClusterStateReceivedCount(),
787+
assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
788788
postPublishStats.get(cn).getFullClusterStateReceivedCount());
789-
assertEquals(prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount(),
789+
assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount(),
790790
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
791-
assertEquals(prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
791+
assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
792792
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
793793
} else {
794794
// followers receive a diff
795-
assertEquals(prePublishStats.get(cn).getFullClusterStateReceivedCount(),
795+
assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(),
796796
postPublishStats.get(cn).getFullClusterStateReceivedCount());
797-
assertEquals(prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
797+
assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1,
798798
postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount());
799-
assertEquals(prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
799+
assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(),
800800
postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount());
801801
}
802802
}
@@ -818,17 +818,21 @@ public void testJoiningNodeReceivesFullState() {
818818
}
819819

820820
public void testIncompatibleDiffResendsFullState() {
821-
final Cluster cluster = new Cluster(randomIntBetween(2, 5));
821+
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
822822
cluster.runRandomly();
823823
cluster.stabilise();
824824

825825
final ClusterNode leader = cluster.getAnyLeader();
826826
final ClusterNode follower = cluster.getAnyNodeExcept(leader);
827+
logger.info("--> blackholing {}", follower);
827828
follower.blackhole();
828829
final PublishClusterStateStats prePublishStats = follower.coordinator.stats().getPublishStats();
830+
logger.info("--> submitting first value to {}", leader);
829831
leader.submitValue(randomLong());
830832
cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY + defaultMillis(PUBLISH_TIMEOUT_SETTING), "publish first state");
833+
logger.info("--> healing {}", follower);
831834
follower.heal();
835+
logger.info("--> submitting second value to {}", leader);
832836
leader.submitValue(randomLong());
833837
cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY);
834838
final PublishClusterStateStats postPublishStats = follower.coordinator.stats().getPublishStats();

0 commit comments

Comments
 (0)