Skip to content

Increase PeerFinder verbosity on persistent failure #73128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/reference/modules/discovery/discovery-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ handshake. Defaults to `30s`.
Sets how long a node will wait after asking its peers again before considering
the request to have failed. Defaults to `3s`.

`discovery.find_peers_warning_timeout`::
(<<static-cluster-setting,Static>>)
Sets how long a node will attempt to discover its peers before it starts to log
verbose messages describing why the connection attempts are failing. Defaults
to `5m`.

`discovery.seed_resolver.max_concurrent_resolvers`::
(<<static-cluster-setting,Static>>)
Specifies how many concurrent DNS lookups to perform when resolving the
Expand Down
47 changes: 38 additions & 9 deletions server/src/main/java/org/elasticsearch/discovery/PeerFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,17 @@ public abstract class PeerFinder {
Setting.timeSetting("discovery.request_peers_timeout",
TimeValue.timeValueMillis(3000), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

// We do not log connection failures immediately: some failures are expected, especially if the hosts list isn't perfectly up-to-date
// or contains some unnecessary junk. However if the node cannot find a master for an extended period of time then it is helpful to
// users to describe in more detail why we cannot connect to the remote nodes. This setting defines how long we wait without discovering
// the master before we start to emit more verbose logs.
public static final Setting<TimeValue> VERBOSITY_INCREASE_TIMEOUT_SETTING =
Setting.timeSetting("discovery.find_peers_warning_timeout",
TimeValue.timeValueMinutes(5), TimeValue.timeValueMillis(1), Setting.Property.NodeScope);

private final TimeValue findPeersInterval;
private final TimeValue requestPeersTimeout;
private final TimeValue verbosityIncreaseTimeout;

private final Object mutex = new Object();
private final TransportService transportService;
Expand All @@ -66,6 +75,7 @@ public abstract class PeerFinder {

private volatile long currentTerm;
private boolean active;
private long activatedAtMillis;
private DiscoveryNodes lastAcceptedNodes;
private final Map<TransportAddress, Peer> peersByAddress = new LinkedHashMap<>();
private Optional<DiscoveryNode> leader = Optional.empty();
Expand All @@ -75,6 +85,7 @@ public PeerFinder(Settings settings, TransportService transportService, Transpor
ConfiguredHostsResolver configuredHostsResolver) {
findPeersInterval = DISCOVERY_FIND_PEERS_INTERVAL_SETTING.get(settings);
requestPeersTimeout = DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING.get(settings);
verbosityIncreaseTimeout = VERBOSITY_INCREASE_TIMEOUT_SETTING.get(settings);
this.transportService = transportService;
this.transportAddressConnector = transportAddressConnector;
this.configuredHostsResolver = configuredHostsResolver;
Expand All @@ -90,6 +101,7 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) {
synchronized (mutex) {
assert assertInactiveWithNoKnownPeers();
active = true;
activatedAtMillis = transportService.getThreadPool().relativeTimeInMillis();
this.lastAcceptedNodes = lastAcceptedNodes;
leader = Optional.empty();
handleWakeUp(); // return value discarded: there are no known peers, so none can be disconnected
Expand Down Expand Up @@ -193,7 +205,7 @@ public interface TransportAddressConnector {

public interface ConfiguredHostsResolver {
/**
* Attempt to resolve the configured unicast hosts list to a list of transport addresses.
* Attempt to resolve the configured hosts list to a list of transport addresses.
*
* @param consumer Consumer for the resolved list. May not be called if an error occurs or if another resolution attempt is in
* progress.
Expand Down Expand Up @@ -293,7 +305,7 @@ protected void startProbe(TransportAddress transportAddress) {

private class Peer {
private final TransportAddress transportAddress;
private SetOnce<DiscoveryNode> discoveryNode = new SetOnce<>();
private final SetOnce<DiscoveryNode> discoveryNode = new SetOnce<>();
private volatile boolean peersRequestInFlight;

Peer(TransportAddress transportAddress) {
Expand Down Expand Up @@ -334,6 +346,9 @@ void establishConnection() {
assert getDiscoveryNode() == null : "unexpectedly connected to " + getDiscoveryNode();
assert active;

final boolean verboseFailureLogging
= transportService.getThreadPool().relativeTimeInMillis() - activatedAtMillis > verbosityIncreaseTimeout.millis();

logger.trace("{} attempting connection", this);
transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener<DiscoveryNode>() {
@Override
Expand All @@ -356,7 +371,25 @@ public void onResponse(DiscoveryNode remoteNode) {

@Override
public void onFailure(Exception e) {
logger.debug(() -> new ParameterizedMessage("{} connection failed", Peer.this), e);
if (verboseFailureLogging) {
if (logger.isDebugEnabled()) {
// log message at level WARN, but since DEBUG logging is enabled we include the full stack trace
logger.warn(new ParameterizedMessage("{} connection failed", Peer.this), e);
} else {
final StringBuilder messageBuilder = new StringBuilder();
Throwable cause = e;
while (cause != null && messageBuilder.length() <= 1024) {
messageBuilder.append(": ").append(cause.getMessage());
cause = cause.getCause();
}
final String message = messageBuilder.length() < 1024
? messageBuilder.toString()
: (messageBuilder.substring(0, 1023) + "...");
logger.warn("{} connection failed{}", Peer.this, message);
}
} else {
logger.debug(new ParameterizedMessage("{} connection failed", Peer.this), e);
}
synchronized (mutex) {
peersByAddress.remove(transportAddress);
}
Expand Down Expand Up @@ -413,7 +446,7 @@ public void handleResponse(PeersResponse response) {
@Override
public void handleException(TransportException exp) {
peersRequestInFlight = false;
logger.debug(new ParameterizedMessage("{} peers request failed", Peer.this), exp);
logger.warn(new ParameterizedMessage("{} peers request failed", Peer.this), exp);
}

@Override
Expand All @@ -429,11 +462,7 @@ public String executor() {

@Override
public String toString() {
return "Peer{" +
"transportAddress=" + transportAddress +
", discoveryNode=" + discoveryNode.get() +
", peersRequestInFlight=" + peersRequestInFlight +
'}';
return "address [" + transportAddress + "], node [" + discoveryNode.get() + "], requesting [" + peersRequestInFlight + "]";
}
}
}
105 changes: 102 additions & 3 deletions server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.elasticsearch.discovery;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand All @@ -17,10 +20,13 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.PeerFinder.TransportAddressConnector;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
import org.elasticsearch.test.transport.StubbableConnectionManager;
Expand Down Expand Up @@ -55,6 +61,7 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.elasticsearch.discovery.PeerFinder.REQUEST_PEERS_ACTION_NAME;
import static org.elasticsearch.discovery.PeerFinder.VERBOSITY_INCREASE_TIMEOUT_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
Expand All @@ -74,13 +81,13 @@ public class PeerFinderTests extends ESTestCase {
private List<TransportAddress> providedAddresses;
private long addressResolveDelay; // -1 means address resolution fails

private Set<DiscoveryNode> disconnectedNodes = new HashSet<>();
private Set<DiscoveryNode> connectedNodes = new HashSet<>();
private final Set<DiscoveryNode> disconnectedNodes = new HashSet<>();
private final Set<DiscoveryNode> connectedNodes = new HashSet<>();
private DiscoveryNodes lastAcceptedNodes;
private TransportService transportService;
private Iterable<DiscoveryNode> foundPeersFromNotification;

private static long CONNECTION_TIMEOUT_MILLIS = 30000;
private static final long CONNECTION_TIMEOUT_MILLIS = 30000;

class MockTransportAddressConnector implements TransportAddressConnector {
final Map<TransportAddress, DiscoveryNode> reachableNodes = new HashMap<>();
Expand Down Expand Up @@ -734,6 +741,98 @@ public void testTimesOutAndRetriesConnectionsToBlackholedNodes() {
assertFoundPeers(nodeToFind, otherNode);
}

@TestLogging(reason = "testing logging at WARN level", value = "org.elasticsearch.discovery:WARN")
public void testLogsWarningsIfActiveForLongEnough() throws IllegalAccessException {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");

providedAddresses.add(otherNode.getAddress());
transportAddressConnector.unreachableAddresses.add(otherNode.getAddress());

peerFinder.activate(lastAcceptedNodes);
final long endTime
= deterministicTaskQueue.getCurrentTimeMillis() + VERBOSITY_INCREASE_TIMEOUT_SETTING.get(Settings.EMPTY).millis();

MockLogAppender appender = new MockLogAppender();
try {
appender.start();
Loggers.addAppender(LogManager.getLogger("org.elasticsearch.discovery.PeerFinder"), appender);

appender.addExpectation(new MockLogAppender.SeenEventExpectation(
"connection failed",
"org.elasticsearch.discovery.PeerFinder",
Level.WARN,
"address [" + otherNode.getAddress() + "]* connection failed: cannot connect to*")
{
@Override
public boolean innerMatch(LogEvent event) {
return event.getThrown() == null; // no stack trace at this log level
}
});
while (deterministicTaskQueue.getCurrentTimeMillis() <= endTime) {
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
}
appender.assertAllExpectationsMatched();

} finally {
Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.discovery.PeerFinder"), appender);
appender.stop();
}
}

@TestLogging(reason = "testing logging at DEBUG level", value = "org.elasticsearch.discovery:DEBUG")
public void testLogsStackTraceInConnectionFailedMessages() throws IllegalAccessException {
final DiscoveryNode otherNode = newDiscoveryNode("node-from-hosts-list");

providedAddresses.add(otherNode.getAddress());
transportAddressConnector.unreachableAddresses.add(otherNode.getAddress());

peerFinder.activate(lastAcceptedNodes);
final long endTime
= deterministicTaskQueue.getCurrentTimeMillis() + VERBOSITY_INCREASE_TIMEOUT_SETTING.get(Settings.EMPTY).millis();

MockLogAppender appender = new MockLogAppender();
try {
appender.start();
Loggers.addAppender(LogManager.getLogger("org.elasticsearch.discovery.PeerFinder"), appender);
appender.addExpectation(new MockLogAppender.SeenEventExpectation(
"connection failed",
"org.elasticsearch.discovery.PeerFinder",
Level.DEBUG,
"address [" + otherNode.getAddress() + "]* connection failed*") {
@Override
public boolean innerMatch(LogEvent event) {
return event.getThrown() instanceof IOException && event.getThrown().getMessage().startsWith("cannot connect to");
}
});

deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
appender.assertAllExpectationsMatched();

appender.addExpectation(new MockLogAppender.SeenEventExpectation(
"connection failed",
"org.elasticsearch.discovery.PeerFinder",
Level.WARN,
"address [" + otherNode.getAddress() + "]* connection failed*")
{
@Override
public boolean innerMatch(LogEvent event) {
return event.getThrown() instanceof IOException && event.getThrown().getMessage().startsWith("cannot connect to");
}
});
while (deterministicTaskQueue.getCurrentTimeMillis() <= endTime) {
deterministicTaskQueue.advanceTime();
runAllRunnableTasks();
}
appender.assertAllExpectationsMatched();

} finally {
Loggers.removeAppender(LogManager.getLogger("org.elasticsearch.discovery.PeerFinder"), appender);
appender.stop();
}
}

public void testReconnectsToDisconnectedNodes() {
final DiscoveryNode otherNode = newDiscoveryNode("original-node");
providedAddresses.add(otherNode.getAddress());
Expand Down