Skip to content

Commit 4937f5c

Browse files
author
Andrey Ershov
committed
Do not log unsuccessful join attempt each time (elastic#39756)
When performing the test with 57 master-eligible nodes and one node crash, we saw messy elections, when multiple nodes were attempting to become master. JoinHelper has logged 105 long log messages with lengthy stack traces during one such election. To address this, we decided to log these messages every time only on debug level. We will log last unsuccessful join attempt (along with a timestamp) if any with WARN level if the cluster is failing to form. (cherry picked from commit 17a148c)
1 parent cc25f60 commit 4937f5c

File tree

5 files changed

+91
-3
lines changed

5 files changed

+91
-3
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,16 @@ public class ClusterFormationFailureHelper {
5454
private final Supplier<ClusterFormationState> clusterFormationStateSupplier;
5555
private final ThreadPool threadPool;
5656
private final TimeValue clusterFormationWarningTimeout;
57+
private final Runnable logLastFailedJoinAttempt;
5758
@Nullable // if no warning is scheduled
5859
private volatile WarningScheduler warningScheduler;
5960

6061
public ClusterFormationFailureHelper(Settings settings, Supplier<ClusterFormationState> clusterFormationStateSupplier,
61-
ThreadPool threadPool) {
62+
ThreadPool threadPool, Runnable logLastFailedJoinAttempt) {
6263
this.clusterFormationStateSupplier = clusterFormationStateSupplier;
6364
this.threadPool = threadPool;
6465
this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings);
66+
this.logLastFailedJoinAttempt = logLastFailedJoinAttempt;
6567
}
6668

6769
public boolean isRunning() {
@@ -94,6 +96,7 @@ public void onFailure(Exception e) {
9496
@Override
9597
protected void doRun() {
9698
if (isActive()) {
99+
logLastFailedJoinAttempt.run();
97100
logger.warn(clusterFormationStateSupplier.get().getDescription());
98101
}
99102
}

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
177177
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
178178
transportService::getLocalNode);
179179
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
180-
transportService.getThreadPool());
180+
transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
181181
}
182182

183183
private ClusterFormationState getClusterFormationState() {

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
*/
1919
package org.elasticsearch.cluster.coordination;
2020

21+
import org.apache.logging.log4j.Level;
2122
import org.apache.logging.log4j.LogManager;
2223
import org.apache.logging.log4j.Logger;
2324
import org.apache.logging.log4j.message.ParameterizedMessage;
2425
import org.elasticsearch.action.ActionListener;
2526
import org.elasticsearch.cluster.ClusterState;
2627
import org.elasticsearch.cluster.ClusterStateTaskConfig;
2728
import org.elasticsearch.cluster.ClusterStateTaskListener;
29+
import org.elasticsearch.cluster.NotMasterException;
2830
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
2931
import org.elasticsearch.cluster.metadata.MetaData;
3032
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -60,6 +62,7 @@
6062
import java.util.Map;
6163
import java.util.Optional;
6264
import java.util.Set;
65+
import java.util.concurrent.atomic.AtomicReference;
6366
import java.util.function.BiConsumer;
6467
import java.util.function.Function;
6568
import java.util.function.LongSupplier;
@@ -85,6 +88,8 @@ public class JoinHelper {
8588

8689
private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
8790

91+
private AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();
92+
8893
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
8994
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
9095
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
@@ -199,6 +204,54 @@ public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJo
199204
});
200205
}
201206

207+
// package-private for testing
208+
static class FailedJoinAttempt {
209+
private final DiscoveryNode destination;
210+
private final JoinRequest joinRequest;
211+
private final TransportException exception;
212+
private final long timestamp;
213+
214+
FailedJoinAttempt(DiscoveryNode destination, JoinRequest joinRequest, TransportException exception) {
215+
this.destination = destination;
216+
this.joinRequest = joinRequest;
217+
this.exception = exception;
218+
this.timestamp = System.nanoTime();
219+
}
220+
221+
void logNow() {
222+
logger.log(getLogLevel(exception),
223+
() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest),
224+
exception);
225+
}
226+
227+
static Level getLogLevel(TransportException e) {
228+
Throwable cause = e.unwrapCause();
229+
if (cause instanceof CoordinationStateRejectedException ||
230+
cause instanceof FailedToCommitClusterStateException ||
231+
cause instanceof NotMasterException) {
232+
return Level.DEBUG;
233+
}
234+
return Level.INFO;
235+
}
236+
237+
void logWarnWithTimestamp() {
238+
logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ms ago, failed to join {} with {}",
239+
TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)),
240+
destination,
241+
joinRequest),
242+
exception);
243+
}
244+
}
245+
246+
247+
void logLastFailedJoinAttempt() {
248+
FailedJoinAttempt attempt = lastFailedJoinAttempt.get();
249+
if (attempt != null) {
250+
attempt.logWarnWithTimestamp();
251+
lastFailedJoinAttempt.compareAndSet(attempt, null);
252+
}
253+
}
254+
202255
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin, Runnable onCompletion) {
203256
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
204257
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
@@ -227,13 +280,17 @@ public void handleResponse(Empty response) {
227280
pendingOutgoingJoins.remove(dedupKey);
228281
logger.debug("successfully joined {} with {}", destination, joinRequest);
229282
onCompletion.run();
283+
lastFailedJoinAttempt.set(null);
230284
}
231285

232286
@Override
233287
public void handleException(TransportException exp) {
234288
pendingOutgoingJoins.remove(dedupKey);
235289
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
236290
onCompletion.run();
291+
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
292+
attempt.logNow();
293+
lastFailedJoinAttempt.set(attempt);
237294
}
238295

239296
@Override

server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,14 @@ public void testScheduling() {
6565
= new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());
6666

6767
final AtomicLong warningCount = new AtomicLong();
68+
final AtomicLong logLastFailedJoinAttemptWarningCount = new AtomicLong();
6869

6970
final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(),
7071
() -> {
7172
warningCount.incrementAndGet();
7273
return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L);
7374
},
74-
deterministicTaskQueue.getThreadPool());
75+
deterministicTaskQueue.getThreadPool(), () -> logLastFailedJoinAttemptWarningCount.incrementAndGet());
7576

7677
deterministicTaskQueue.runAllTasks();
7778
assertThat("should not schedule anything yet", warningCount.get(), is(0L));
@@ -105,8 +106,10 @@ public void testScheduling() {
105106
deterministicTaskQueue.runAllTasksInTimeOrder();
106107

107108
assertThat(warningCount.get(), is(5L));
109+
assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L));
108110

109111
warningCount.set(0);
112+
logLastFailedJoinAttemptWarningCount.set(0);
110113
clusterFormationFailureHelper.start();
111114
clusterFormationFailureHelper.stop();
112115
clusterFormationFailureHelper.start();
@@ -127,6 +130,7 @@ public void testScheduling() {
127130
deterministicTaskQueue.runAllTasksInTimeOrder();
128131

129132
assertThat(warningCount.get(), is(5L));
133+
assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L));
130134
}
131135

132136
public void testDescriptionOnMasterIneligibleNodes() {

server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
*/
1919
package org.elasticsearch.cluster.coordination;
2020

21+
import org.apache.logging.log4j.Level;
2122
import org.elasticsearch.Version;
23+
import org.elasticsearch.cluster.NotMasterException;
2224
import org.elasticsearch.cluster.node.DiscoveryNode;
2325
import org.elasticsearch.common.settings.Settings;
2426
import org.elasticsearch.test.ESTestCase;
2527
import org.elasticsearch.test.transport.CapturingTransport;
2628
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
29+
import org.elasticsearch.transport.RemoteTransportException;
30+
import org.elasticsearch.transport.TransportException;
2731
import org.elasticsearch.transport.TransportResponse;
2832
import org.elasticsearch.transport.TransportService;
2933

@@ -32,6 +36,7 @@
3236

3337
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
3438
import static org.hamcrest.Matchers.equalTo;
39+
import static org.hamcrest.core.Is.is;
3540

3641
public class JoinHelperTests extends ESTestCase {
3742

@@ -107,4 +112,23 @@ public void testJoinDeduplication() {
107112
capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy"));
108113
assertFalse(joinHelper.isJoinPending());
109114
}
115+
116+
public void testFailedJoinAttemptLogLevel() {
117+
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(new TransportException("generic transport exception")), is(Level.INFO));
118+
119+
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
120+
new RemoteTransportException("remote transport exception with generic cause", new Exception())), is(Level.INFO));
121+
122+
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
123+
new RemoteTransportException("caused by CoordinationStateRejectedException",
124+
new CoordinationStateRejectedException("test"))), is(Level.DEBUG));
125+
126+
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
127+
new RemoteTransportException("caused by FailedToCommitClusterStateException",
128+
new FailedToCommitClusterStateException("test"))), is(Level.DEBUG));
129+
130+
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
131+
new RemoteTransportException("caused by NotMasterException",
132+
new NotMasterException("test"))), is(Level.DEBUG));
133+
}
110134
}

0 commit comments

Comments
 (0)