Skip to content

Commit 00db9c1

Browse files
Make Connection Future Err. Handling more Resilient (#42781) (#42804)
* There were a number of possible (runtime-) exceptions that could be raised in the adjusted code and prevent resolving the listener * Relates #42350
1 parent 34fd9ce commit 00db9c1

File tree

1 file changed

+24
-30
lines changed

1 file changed

+24
-30
lines changed

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

+24-30
Original file line numberDiff line numberDiff line change
@@ -933,50 +933,44 @@ public void onResponse(Void v) {
933933
if (countDown.countDown()) {
934934
final TcpChannel handshakeChannel = channels.get(0);
935935
try {
936-
executeHandshake(node, handshakeChannel, connectionProfile, new ActionListener<Version>() {
937-
@Override
938-
public void onResponse(Version version) {
939-
NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
940-
long relativeMillisTime = threadPool.relativeTimeInMillis();
941-
nodeChannels.channels.forEach(ch -> {
942-
// Mark the channel init time
943-
ch.getChannelStats().markAccessed(relativeMillisTime);
944-
ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
945-
});
946-
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
947-
listener.onResponse(nodeChannels);
948-
}
949-
950-
@Override
951-
public void onFailure(Exception e) {
952-
CloseableChannel.closeChannels(channels, false);
953-
954-
if (e instanceof ConnectTransportException) {
955-
listener.onFailure(e);
956-
} else {
957-
listener.onFailure(new ConnectTransportException(node, "general node connection failure", e));
958-
}
959-
}
960-
});
936+
executeHandshake(node, handshakeChannel, connectionProfile, ActionListener.wrap(version -> {
937+
NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
938+
long relativeMillisTime = threadPool.relativeTimeInMillis();
939+
nodeChannels.channels.forEach(ch -> {
940+
// Mark the channel init time
941+
ch.getChannelStats().markAccessed(relativeMillisTime);
942+
ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
943+
});
944+
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
945+
listener.onResponse(nodeChannels);
946+
}, e -> closeAndFail(e instanceof ConnectTransportException ?
947+
e : new ConnectTransportException(node, "general node connection failure", e))));
961948
} catch (Exception ex) {
962-
CloseableChannel.closeChannels(channels, false);
963-
listener.onFailure(ex);
949+
closeAndFail(ex);
964950
}
965951
}
966952
}
967953

968954
@Override
969955
public void onFailure(Exception ex) {
970956
if (countDown.fastForward()) {
971-
CloseableChannel.closeChannels(channels, false);
972-
listener.onFailure(new ConnectTransportException(node, "connect_exception", ex));
957+
closeAndFail(new ConnectTransportException(node, "connect_exception", ex));
973958
}
974959
}
975960

976961
public void onTimeout() {
977962
if (countDown.fastForward()) {
963+
closeAndFail(new ConnectTransportException(node, "connect_timeout[" + connectionProfile.getConnectTimeout() + "]"));
964+
}
965+
}
966+
967+
private void closeAndFail(Exception e) {
968+
try {
978969
CloseableChannel.closeChannels(channels, false);
979-
listener.onFailure(new ConnectTransportException(node, "connect_timeout[" + connectionProfile.getConnectTimeout() + "]"));
970+
} catch (Exception ex) {
971+
e.addSuppressed(ex);
972+
} finally {
973+
listener.onFailure(e);
980974
}
981975
}
982976
}

0 commit comments

Comments
 (0)