diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index 563cef305276a..b36c13e3fb36c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -201,8 +201,10 @@ public String toString() { } }); } else if (sendFullVersion || !previousState.nodes().nodeExists(destination)) { + logger.trace("sending full cluster state version {} to {}", newState.version(), destination); PublicationTransportHandler.this.sendFullClusterState(newState, serializedStates, destination, responseActionListener); } else { + logger.trace("sending cluster state diff for version {} to {}", newState.version(), destination); PublicationTransportHandler.this.sendClusterStateDiff(newState, serializedDiffs, serializedStates, destination, responseActionListener); } @@ -381,7 +383,6 @@ public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVe private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException { final Compressor compressor = CompressorFactory.compressor(request.bytes()); StreamInput in = request.bytes().streamInput(); - final ClusterState incomingState; try { if (compressor != null) { in = compressor.streamInput(in); @@ -390,11 +391,13 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque in.setVersion(request.version()); // If true we received full cluster state - otherwise diffs if (in.readBoolean()) { - incomingState = ClusterState.readFrom(in, transportService.getLocalNode()); + final ClusterState incomingState = ClusterState.readFrom(in, transportService.getLocalNode()); fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); + final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState)); lastSeenClusterState.set(incomingState); + return response; } else { final ClusterState lastSeen = lastSeenClusterState.get(); if (lastSeen == null) { @@ -402,11 +405,13 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque throw new IncompatibleClusterStateVersionException("have no local cluster state"); } else { Diff diff = ClusterState.readDiffFrom(in, lastSeen.nodes().getLocalNode()); - incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException + final ClusterState incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException compatibleClusterStateDiffReceivedCount.incrementAndGet(); logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]", incomingState.version(), incomingState.stateUUID(), request.bytes().length()); + final PublishWithJoinResponse response = handlePublishRequest.apply(new PublishRequest(incomingState)); lastSeenClusterState.compareAndSet(lastSeen, incomingState); + return response; } } } catch (IncompatibleClusterStateVersionException e) { @@ -418,7 +423,5 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } finally { IOUtils.close(in); } - - return handlePublishRequest.apply(new PublishRequest(incomingState)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 20e64d21abb2e..5509c2b32a7fd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -784,19 +784,19 @@ public void testDiffBasedPublishing() { assertThat(value(cn.getLastAppliedClusterState()), is(finalValue)); if (cn == leader) { // leader does not update publish stats as it's not using the serialized state - assertEquals(prePublishStats.get(cn).getFullClusterStateReceivedCount(), + assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(), postPublishStats.get(cn).getFullClusterStateReceivedCount()); - assertEquals(prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount(), + assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount(), postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount()); - assertEquals(prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(), + assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(), postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount()); } else { // followers receive a diff - assertEquals(prePublishStats.get(cn).getFullClusterStateReceivedCount(), + assertEquals(cn.toString(), prePublishStats.get(cn).getFullClusterStateReceivedCount(), postPublishStats.get(cn).getFullClusterStateReceivedCount()); - assertEquals(prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1, + assertEquals(cn.toString(), prePublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount() + 1, postPublishStats.get(cn).getCompatibleClusterStateDiffReceivedCount()); - assertEquals(prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(), + assertEquals(cn.toString(), prePublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount(), postPublishStats.get(cn).getIncompatibleClusterStateDiffReceivedCount()); } } @@ -818,17 +818,21 @@ public void testJoiningNodeReceivesFullState() { } public void testIncompatibleDiffResendsFullState() { - final Cluster cluster = new Cluster(randomIntBetween(2, 5)); + final Cluster cluster = new Cluster(randomIntBetween(3, 5)); cluster.runRandomly(); cluster.stabilise(); final ClusterNode leader = cluster.getAnyLeader(); final ClusterNode follower = cluster.getAnyNodeExcept(leader); + logger.info("--> blackholing {}", follower); follower.blackhole(); final PublishClusterStateStats prePublishStats = follower.coordinator.stats().getPublishStats(); + logger.info("--> submitting first value to {}", leader); leader.submitValue(randomLong()); cluster.runFor(DEFAULT_CLUSTER_STATE_UPDATE_DELAY + defaultMillis(PUBLISH_TIMEOUT_SETTING), "publish first state"); + logger.info("--> healing {}", follower); follower.heal(); + logger.info("--> submitting second value to {}", leader); leader.submitValue(randomLong()); cluster.stabilise(DEFAULT_CLUSTER_STATE_UPDATE_DELAY); final PublishClusterStateStats postPublishStats = follower.coordinator.stats().getPublishStats();