Skip to content

Commit 8c9360b

Browse files
authored
Log warnings when cluster state publication failed to some nodes (#31233)
If the publishing of a cluster state to a node fails, we currently only log it as debug information and only on the master. This makes it hard to see the cause of (test) failures when logging is set to default levels. This PR adds a warn level log on the node receiving the cluster state when it fails to deserialise the cluster state and a warn level log on the master with a list of nodes for which publication failed.
1 parent 8b4d80a commit 8c9360b

File tree

3 files changed

+52
-20
lines changed

3 files changed

+52
-20
lines changed

server/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.unit.TimeValue;
2323
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2424

25+
import java.util.Collections;
2526
import java.util.Set;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.TimeUnit;
@@ -35,6 +36,7 @@ public class BlockingClusterStatePublishResponseHandler {
3536

3637
private final CountDownLatch latch;
3738
private final Set<DiscoveryNode> pendingNodes;
39+
private final Set<DiscoveryNode> failedNodes;
3840

3941
/**
4042
* Creates a new BlockingClusterStatePublishResponseHandler
@@ -44,6 +46,7 @@ public BlockingClusterStatePublishResponseHandler(Set<DiscoveryNode> publishingT
4446
this.pendingNodes = ConcurrentCollections.newConcurrentSet();
4547
this.pendingNodes.addAll(publishingToNodes);
4648
this.latch = new CountDownLatch(pendingNodes.size());
49+
this.failedNodes = ConcurrentCollections.newConcurrentSet();
4750
}
4851

4952
/**
@@ -64,6 +67,8 @@ public void onResponse(DiscoveryNode node) {
6467
public void onFailure(DiscoveryNode node, Exception e) {
6568
boolean found = pendingNodes.remove(node);
6669
assert found : "node [" + node + "] already responded or failed";
70+
boolean added = failedNodes.add(node);
71+
assert added : "duplicate failures for " + node;
6772
latch.countDown();
6873
}
6974

@@ -86,4 +91,11 @@ public DiscoveryNode[] pendingNodes() {
8691
// nulls if some nodes responded in the meanwhile
8792
return pendingNodes.toArray(new DiscoveryNode[0]);
8893
}
94+
95+
/**
96+
* returns a set of nodes for which publication has failed.
97+
*/
98+
public Set<DiscoveryNode> getFailedNodes() {
99+
return Collections.unmodifiableSet(failedNodes);
100+
}
89101
}

server/src/main/java/org/elasticsearch/discovery/zen/PublishClusterStateAction.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.discovery.zen;
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
23-
import org.elasticsearch.core.internal.io.IOUtils;
2423
import org.elasticsearch.ElasticsearchException;
2524
import org.elasticsearch.Version;
2625
import org.elasticsearch.action.ActionListener;
@@ -41,6 +40,7 @@
4140
import org.elasticsearch.common.io.stream.StreamOutput;
4241
import org.elasticsearch.common.settings.Settings;
4342
import org.elasticsearch.common.unit.TimeValue;
43+
import org.elasticsearch.core.internal.io.IOUtils;
4444
import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
4545
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
4646
import org.elasticsearch.discovery.Discovery;
@@ -207,6 +207,12 @@ private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final S
207207
clusterState.version(), publishTimeout, pendingNodes);
208208
}
209209
}
210+
// The failure is logged under debug when a sending failed. we now log a summary.
211+
Set<DiscoveryNode> failedNodes = publishResponseHandler.getFailedNodes();
212+
if (failedNodes.isEmpty() == false) {
213+
logger.warn("publishing cluster state with version [{}] failed for the following nodes: [{}]",
214+
clusterChangedEvent.state().version(), failedNodes);
215+
}
210216
} catch (InterruptedException e) {
211217
// ignore & restore interrupt
212218
Thread.currentThread().interrupt();
@@ -367,14 +373,14 @@ public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVe
367373
protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
368374
Compressor compressor = CompressorFactory.compressor(request.bytes());
369375
StreamInput in = request.bytes().streamInput();
370-
try {
371-
if (compressor != null) {
372-
in = compressor.streamInput(in);
373-
}
374-
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
375-
in.setVersion(request.version());
376-
synchronized (lastSeenClusterStateMutex) {
377-
final ClusterState incomingState;
376+
final ClusterState incomingState;
377+
synchronized (lastSeenClusterStateMutex) {
378+
try {
379+
if (compressor != null) {
380+
in = compressor.streamInput(in);
381+
}
382+
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
383+
in.setVersion(request.version());
378384
// If true we received full cluster state - otherwise diffs
379385
if (in.readBoolean()) {
380386
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
@@ -391,14 +397,17 @@ protected void handleIncomingClusterStateRequest(BytesTransportRequest request,
391397
logger.debug("received diff for but don't have any local cluster state - requesting full state");
392398
throw new IncompatibleClusterStateVersionException("have no local cluster state");
393399
}
394-
incomingClusterStateListener.onIncomingClusterState(incomingState);
395-
lastSeenClusterState = incomingState;
400+
} catch (IncompatibleClusterStateVersionException e) {
401+
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
402+
throw e;
403+
} catch (Exception e) {
404+
logger.warn("unexpected error while deserializing an incoming cluster state", e);
405+
throw e;
406+
} finally {
407+
IOUtils.close(in);
396408
}
397-
} catch (IncompatibleClusterStateVersionException e) {
398-
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
399-
throw e;
400-
} finally {
401-
IOUtils.close(in);
409+
incomingClusterStateListener.onIncomingClusterState(incomingState);
410+
lastSeenClusterState = incomingState;
402411
}
403412
channel.sendResponse(TransportResponse.Empty.INSTANCE);
404413
}

server/src/test/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandlerTests.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,16 @@ public void testConcurrentAccess() throws InterruptedException {
8585
int firstRound = randomIntBetween(5, nodeCount - 1);
8686
Thread[] threads = new Thread[firstRound];
8787
CyclicBarrier barrier = new CyclicBarrier(firstRound);
88+
Set<DiscoveryNode> expectedFailures = new HashSet<>();
8889
Set<DiscoveryNode> completedNodes = new HashSet<>();
8990
for (int i = 0; i < threads.length; i++) {
90-
completedNodes.add(allNodes[i]);
91-
threads[i] = new Thread(new PublishResponder(randomBoolean(), allNodes[i], barrier, logger, handler));
91+
final DiscoveryNode node = allNodes[i];
92+
completedNodes.add(node);
93+
final boolean fail = randomBoolean();
94+
if (fail) {
95+
expectedFailures.add(node);
96+
}
97+
threads[i] = new Thread(new PublishResponder(fail, node, barrier, logger, handler));
9298
threads[i].start();
9399
}
94100
// wait on the threads to finish
@@ -105,7 +111,12 @@ public void testConcurrentAccess() throws InterruptedException {
105111
barrier = new CyclicBarrier(secondRound);
106112

107113
for (int i = 0; i < threads.length; i++) {
108-
threads[i] = new Thread(new PublishResponder(randomBoolean(), allNodes[firstRound + i], barrier, logger, handler));
114+
final DiscoveryNode node = allNodes[firstRound + i];
115+
final boolean fail = randomBoolean();
116+
if (fail) {
117+
expectedFailures.add(node);
118+
}
119+
threads[i] = new Thread(new PublishResponder(fail, node, barrier, logger, handler));
109120
threads[i].start();
110121
}
111122
// wait on the threads to finish
@@ -114,6 +125,6 @@ public void testConcurrentAccess() throws InterruptedException {
114125
}
115126
assertTrue("expected handler not to timeout as all nodes responded", handler.awaitAllNodes(new TimeValue(10)));
116127
assertThat(handler.pendingNodes(), arrayWithSize(0));
117-
128+
assertThat(handler.getFailedNodes(), equalTo(expectedFailures));
118129
}
119130
}

0 commit comments

Comments
 (0)