From 6d142d312207ea38cb115925378f739f09fb2924 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 3 Jun 2019 15:41:38 +0200 Subject: [PATCH] Make Connection Future Err. Handling more Resilient (#42781) * There were a number of possible (runtime-) exceptions that could be raised in the adjusted code and prevent resolving the listener * Relates #42350 --- .../elasticsearch/transport/TcpTransport.java | 54 +++++++++---------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index eef9f4f42637c..7be8872ab5fac 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -933,34 +933,20 @@ public void onResponse(Void v) { if (countDown.countDown()) { final TcpChannel handshakeChannel = channels.get(0); try { - executeHandshake(node, handshakeChannel, connectionProfile, new ActionListener() { - @Override - public void onResponse(Version version) { - NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version); - long relativeMillisTime = threadPool.relativeTimeInMillis(); - nodeChannels.channels.forEach(ch -> { - // Mark the channel init time - ch.getChannelStats().markAccessed(relativeMillisTime); - ch.addCloseListener(ActionListener.wrap(nodeChannels::close)); - }); - keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile); - listener.onResponse(nodeChannels); - } - - @Override - public void onFailure(Exception e) { - CloseableChannel.closeChannels(channels, false); - - if (e instanceof ConnectTransportException) { - listener.onFailure(e); - } else { - listener.onFailure(new ConnectTransportException(node, "general node connection failure", e)); - } - } - }); + executeHandshake(node, handshakeChannel, connectionProfile, ActionListener.wrap(version -> { + NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version); + long relativeMillisTime = threadPool.relativeTimeInMillis(); + nodeChannels.channels.forEach(ch -> { + // Mark the channel init time + ch.getChannelStats().markAccessed(relativeMillisTime); + ch.addCloseListener(ActionListener.wrap(nodeChannels::close)); + }); + keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile); + listener.onResponse(nodeChannels); + }, e -> closeAndFail(e instanceof ConnectTransportException ? + e : new ConnectTransportException(node, "general node connection failure", e)))); } catch (Exception ex) { - CloseableChannel.closeChannels(channels, false); - listener.onFailure(ex); + closeAndFail(ex); } } } @@ -968,15 +954,23 @@ public void onFailure(Exception e) { @Override public void onFailure(Exception ex) { if (countDown.fastForward()) { - CloseableChannel.closeChannels(channels, false); - listener.onFailure(new ConnectTransportException(node, "connect_exception", ex)); + closeAndFail(new ConnectTransportException(node, "connect_exception", ex)); } } public void onTimeout() { if (countDown.fastForward()) { + closeAndFail(new ConnectTransportException(node, "connect_timeout[" + connectionProfile.getConnectTimeout() + "]")); + } + } + + private void closeAndFail(Exception e) { + try { CloseableChannel.closeChannels(channels, false); - listener.onFailure(new ConnectTransportException(node, "connect_timeout[" + connectionProfile.getConnectTimeout() + "]")); + } catch (Exception ex) { + e.addSuppressed(ex); + } finally { + listener.onFailure(e); } } }