Skip to content

Commit 17a148c

Browse files
authored
Do not log unsuccessful join attempt each time (elastic#39756)
When performing the test with 57 master-eligible nodes and one node crash, we saw messy elections, when multiple nodes were attempting to become master. JoinHelper has logged 105 long log messages with lengthy stack traces during one such election. To address this, we decided to log these messages every time only on debug level. We will log last unsuccessful join attempt (along with a timestamp) if any with WARN level if the cluster is failing to form.
1 parent 62f0895 commit 17a148c

File tree

5 files changed

+92
-5
lines changed

5 files changed

+92
-5
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,16 @@ public class ClusterFormationFailureHelper {
5454
private final Supplier<ClusterFormationState> clusterFormationStateSupplier;
5555
private final ThreadPool threadPool;
5656
private final TimeValue clusterFormationWarningTimeout;
57+
private final Runnable logLastFailedJoinAttempt;
5758
@Nullable // if no warning is scheduled
5859
private volatile WarningScheduler warningScheduler;
5960

6061
public ClusterFormationFailureHelper(Settings settings, Supplier<ClusterFormationState> clusterFormationStateSupplier,
61-
ThreadPool threadPool) {
62+
ThreadPool threadPool, Runnable logLastFailedJoinAttempt) {
6263
this.clusterFormationStateSupplier = clusterFormationStateSupplier;
6364
this.threadPool = threadPool;
6465
this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings);
66+
this.logLastFailedJoinAttempt = logLastFailedJoinAttempt;
6567
}
6668

6769
public boolean isRunning() {
@@ -94,6 +96,7 @@ public void onFailure(Exception e) {
9496
@Override
9597
protected void doRun() {
9698
if (isActive()) {
99+
logLastFailedJoinAttempt.run();
97100
logger.warn(clusterFormationStateSupplier.get().getDescription());
98101
}
99102
}

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
169169
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
170170
transportService::getLocalNode);
171171
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
172-
transportService.getThreadPool());
172+
transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
173173
}
174174

175175
private ClusterFormationState getClusterFormationState() {

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
*/
1919
package org.elasticsearch.cluster.coordination;
2020

21+
import org.apache.logging.log4j.Level;
2122
import org.apache.logging.log4j.LogManager;
2223
import org.apache.logging.log4j.Logger;
2324
import org.apache.logging.log4j.message.ParameterizedMessage;
2425
import org.elasticsearch.action.ActionListener;
2526
import org.elasticsearch.cluster.ClusterState;
2627
import org.elasticsearch.cluster.ClusterStateTaskConfig;
2728
import org.elasticsearch.cluster.ClusterStateTaskListener;
29+
import org.elasticsearch.cluster.NotMasterException;
2830
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
2931
import org.elasticsearch.cluster.metadata.MetaData;
3032
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -58,6 +60,7 @@
5860
import java.util.Map;
5961
import java.util.Optional;
6062
import java.util.Set;
63+
import java.util.concurrent.atomic.AtomicReference;
6164
import java.util.function.BiConsumer;
6265
import java.util.function.Function;
6366
import java.util.function.LongSupplier;
@@ -83,6 +86,8 @@ public class JoinHelper {
8386

8487
private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
8588

89+
private AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();
90+
8691
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
8792
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
8893
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
@@ -172,7 +177,55 @@ boolean isJoinPending() {
172177
return pendingOutgoingJoins.isEmpty() == false;
173178
}
174179

175-
void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
180+
// package-private for testing
181+
static class FailedJoinAttempt {
182+
private final DiscoveryNode destination;
183+
private final JoinRequest joinRequest;
184+
private final TransportException exception;
185+
private final long timestamp;
186+
187+
FailedJoinAttempt(DiscoveryNode destination, JoinRequest joinRequest, TransportException exception) {
188+
this.destination = destination;
189+
this.joinRequest = joinRequest;
190+
this.exception = exception;
191+
this.timestamp = System.nanoTime();
192+
}
193+
194+
void logNow() {
195+
logger.log(getLogLevel(exception),
196+
() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest),
197+
exception);
198+
}
199+
200+
static Level getLogLevel(TransportException e) {
201+
Throwable cause = e.unwrapCause();
202+
if (cause instanceof CoordinationStateRejectedException ||
203+
cause instanceof FailedToCommitClusterStateException ||
204+
cause instanceof NotMasterException) {
205+
return Level.DEBUG;
206+
}
207+
return Level.INFO;
208+
}
209+
210+
void logWarnWithTimestamp() {
211+
logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ms ago, failed to join {} with {}",
212+
TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)),
213+
destination,
214+
joinRequest),
215+
exception);
216+
}
217+
}
218+
219+
220+
void logLastFailedJoinAttempt() {
221+
FailedJoinAttempt attempt = lastFailedJoinAttempt.get();
222+
if (attempt != null) {
223+
attempt.logWarnWithTimestamp();
224+
lastFailedJoinAttempt.compareAndSet(attempt, null);
225+
}
226+
}
227+
228+
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin) {
176229
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
177230
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
178231
final Tuple<DiscoveryNode, JoinRequest> dedupKey = Tuple.tuple(destination, joinRequest);
@@ -190,12 +243,15 @@ public Empty read(StreamInput in) {
190243
public void handleResponse(Empty response) {
191244
pendingOutgoingJoins.remove(dedupKey);
192245
logger.debug("successfully joined {} with {}", destination, joinRequest);
246+
lastFailedJoinAttempt.set(null);
193247
}
194248

195249
@Override
196250
public void handleException(TransportException exp) {
197251
pendingOutgoingJoins.remove(dedupKey);
198-
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
252+
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
253+
attempt.logNow();
254+
lastFailedJoinAttempt.set(attempt);
199255
}
200256

201257
@Override

server/src/test/java/org/elasticsearch/cluster/coordination/ClusterFormationFailureHelperTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,14 @@ public void testScheduling() {
6565
= new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());
6666

6767
final AtomicLong warningCount = new AtomicLong();
68+
final AtomicLong logLastFailedJoinAttemptWarningCount = new AtomicLong();
6869

6970
final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(),
7071
() -> {
7172
warningCount.incrementAndGet();
7273
return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L);
7374
},
74-
deterministicTaskQueue.getThreadPool());
75+
deterministicTaskQueue.getThreadPool(), () -> logLastFailedJoinAttemptWarningCount.incrementAndGet());
7576

7677
deterministicTaskQueue.runAllTasks();
7778
assertThat("should not schedule anything yet", warningCount.get(), is(0L));
@@ -105,8 +106,10 @@ public void testScheduling() {
105106
deterministicTaskQueue.runAllTasksInTimeOrder();
106107

107108
assertThat(warningCount.get(), is(5L));
109+
assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L));
108110

109111
warningCount.set(0);
112+
logLastFailedJoinAttemptWarningCount.set(0);
110113
clusterFormationFailureHelper.start();
111114
clusterFormationFailureHelper.stop();
112115
clusterFormationFailureHelper.start();
@@ -127,6 +130,7 @@ public void testScheduling() {
127130
deterministicTaskQueue.runAllTasksInTimeOrder();
128131

129132
assertThat(warningCount.get(), is(5L));
133+
assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L));
130134
}
131135

132136
public void testDescriptionOnMasterIneligibleNodes() {

server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
*/
1919
package org.elasticsearch.cluster.coordination;
2020

21+
import org.apache.logging.log4j.Level;
2122
import org.elasticsearch.Version;
23+
import org.elasticsearch.cluster.NotMasterException;
2224
import org.elasticsearch.cluster.node.DiscoveryNode;
2325
import org.elasticsearch.common.settings.Settings;
2426
import org.elasticsearch.test.ESTestCase;
2527
import org.elasticsearch.test.transport.CapturingTransport;
2628
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
29+
import org.elasticsearch.transport.RemoteTransportException;
30+
import org.elasticsearch.transport.TransportException;
2731
import org.elasticsearch.transport.TransportResponse;
2832
import org.elasticsearch.transport.TransportService;
2933

@@ -32,6 +36,7 @@
3236

3337
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
3438
import static org.hamcrest.Matchers.equalTo;
39+
import static org.hamcrest.core.Is.is;
3540

3641
public class JoinHelperTests extends ESTestCase {
3742

@@ -107,4 +112,23 @@ public void testJoinDeduplication() {
107112
capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy"));
108113
assertFalse(joinHelper.isJoinPending());
109114
}
115+
116+
public void testFailedJoinAttemptLogLevel() {
117+
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(new TransportException("generic transport exception")), is(Level.INFO));
118+
119+
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
120+
new RemoteTransportException("remote transport exception with generic cause", new Exception())), is(Level.INFO));
121+
122+
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
123+
new RemoteTransportException("caused by CoordinationStateRejectedException",
124+
new CoordinationStateRejectedException("test"))), is(Level.DEBUG));
125+
126+
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
127+
new RemoteTransportException("caused by FailedToCommitClusterStateException",
128+
new FailedToCommitClusterStateException("test"))), is(Level.DEBUG));
129+
130+
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
131+
new RemoteTransportException("caused by NotMasterException",
132+
new NotMasterException("test"))), is(Level.DEBUG));
133+
}
110134
}

0 commit comments

Comments
 (0)