Skip to content

Commit 4c06b8f

Browse files
authored
Check for closed connection while opening
While opening a connection to a node, a channel can subsequently close. If this happens, a future callback whose purpose is to close all other channels and disconnect from the node will fire. However, this future will not be ready to close all the channels because the connection will not be exposed to the future callback yet. Since this callback is run once, we will never try to disconnect from this node again and we will be left with a closed channel. This commit adds a check that all channels are open before exposing the channel and throws a general connection exception. In this case, the usual connection retry logic will take over. Relates #26932
1 parent d6fc4af commit 4c06b8f

File tree

10 files changed

+116
-21
lines changed

10 files changed

+116
-21
lines changed

core/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,9 @@ public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile c
599599
nodeChannels = new NodeChannels(nodeChannels, version); // clone the channels - we now have the correct version
600600
transportService.onConnectionOpened(nodeChannels);
601601
connectionRef.set(nodeChannels);
602+
if (Arrays.stream(nodeChannels.channels).allMatch(this::isOpen) == false) {
603+
throw new ConnectTransportException(node, "a channel closed while connecting");
604+
}
602605
success = true;
603606
return nodeChannels;
604607
} catch (ConnectTransportException e) {
@@ -1034,7 +1037,18 @@ protected void innerOnFailure(Exception e) {
10341037
*/
10351038
protected abstract void sendMessage(Channel channel, BytesReference reference, ActionListener<Channel> listener);
10361039

1037-
protected abstract NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile connectionProfile,
1040+
/**
1041+
* Connect to the node with channels as defined by the specified connection profile. Implementations must invoke the specified channel
1042+
* close callback when a channel is closed.
1043+
*
1044+
* @param node the node to connect to
1045+
* @param connectionProfile the connection profile
1046+
* @param onChannelClose callback to invoke when a channel is closed
1047+
* @return the channels
1048+
* @throws IOException if an I/O exception occurs while opening channels
1049+
*/
1050+
protected abstract NodeChannels connectToChannels(DiscoveryNode node,
1051+
ConnectionProfile connectionProfile,
10381052
Consumer<Channel> onChannelClose) throws IOException;
10391053

10401054
/**

core/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.Objects;
6464
import java.util.concurrent.CopyOnWriteArrayList;
6565
import java.util.concurrent.CountDownLatch;
66+
import java.util.concurrent.ExecutorService;
6667
import java.util.concurrent.ScheduledFuture;
6768
import java.util.function.Function;
6869
import java.util.function.Predicate;
@@ -187,6 +188,15 @@ protected TaskManager createTaskManager() {
187188
return new TaskManager(settings);
188189
}
189190

191+
/**
192+
* The executor service for this transport service.
193+
*
194+
* @return the executor service
195+
*/
196+
protected ExecutorService getExecutorService() {
197+
return threadPool.generic();
198+
}
199+
190200
void setTracerLogInclude(List<String> tracerLogInclude) {
191201
this.tracerLogInclude = tracerLogInclude.toArray(Strings.EMPTY_ARRAY);
192202
}
@@ -232,7 +242,7 @@ protected void doStop() {
232242
if (holderToNotify != null) {
233243
// callback that an exception happened, but on a different thread since we don't
234244
// want handlers to worry about stack overflows
235-
threadPool.generic().execute(new AbstractRunnable() {
245+
getExecutorService().execute(new AbstractRunnable() {
236246
@Override
237247
public void onRejection(Exception e) {
238248
// if we get rejected during node shutdown we don't wanna bubble it up
@@ -879,20 +889,20 @@ void onNodeConnected(final DiscoveryNode node) {
879889
// connectToNode(); connection is completed successfully
880890
// addConnectionListener(); this listener shouldn't be called
881891
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
882-
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node)));
892+
getExecutorService().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node)));
883893
}
884894

885895
void onConnectionOpened(Transport.Connection connection) {
886896
// capture listeners before spawning the background callback so the following pattern won't trigger a call
887897
// connectToNode(); connection is completed successfully
888898
// addConnectionListener(); this listener shouldn't be called
889899
final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
890-
threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(connection)));
900+
getExecutorService().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(connection)));
891901
}
892902

893903
public void onNodeDisconnected(final DiscoveryNode node) {
894904
try {
895-
threadPool.generic().execute( () -> {
905+
getExecutorService().execute( () -> {
896906
for (final TransportConnectionListener connectionListener : connectionListeners) {
897907
connectionListener.onNodeDisconnected(node);
898908
}
@@ -911,7 +921,7 @@ void onConnectionClosed(Transport.Connection connection) {
911921
if (holderToNotify != null) {
912922
// callback that an exception happened, but on a different thread since we don't
913923
// want handlers to worry about stack overflows
914-
threadPool.generic().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException(
924+
getExecutorService().execute(() -> holderToNotify.handler().handleException(new NodeDisconnectedException(
915925
connection.getNode(), holderToNotify.action())));
916926
}
917927
}

core/src/test/java/org/elasticsearch/transport/TCPTransportTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,8 @@ protected void sendMessage(Object o, BytesReference reference, ActionListener li
224224
}
225225

226226
@Override
227-
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile,
228-
Consumer onChannelClose) throws IOException {
227+
protected NodeChannels connectToChannels(
228+
DiscoveryNode node, ConnectionProfile profile, Consumer onChannelClose) throws IOException {
229229
return new NodeChannels(node, new Object[profile.getNumConnections()], profile);
230230
}
231231

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase {
5353

5454
public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
55-
ClusterSettings clusterSettings, boolean doHandshake) {
55+
ClusterSettings clusterSettings, boolean doHandshake) {
5656
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
5757
Transport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
5858
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
@@ -86,6 +86,13 @@ protected MockTransportService build(Settings settings, Version version, Cluster
8686
return transportService;
8787
}
8888

89+
@Override
90+
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
91+
final Netty4Transport t = (Netty4Transport) transport;
92+
@SuppressWarnings("unchecked") final TcpTransport<Channel>.NodeChannels channels = (TcpTransport<Channel>.NodeChannels) connection;
93+
t.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true, false);
94+
}
95+
8996
public void testConnectException() throws UnknownHostException {
9097
try {
9198
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
@@ -108,7 +115,8 @@ public void testBindUnavailableAddress() {
108115
.build();
109116
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
110117
BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> {
111-
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true);
118+
MockTransportService transportService =
119+
nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true);
112120
try {
113121
transportService.start();
114122
} finally {

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.Set;
7373
import java.util.concurrent.ConcurrentMap;
7474
import java.util.concurrent.CopyOnWriteArrayList;
75+
import java.util.concurrent.ExecutorService;
7576
import java.util.concurrent.LinkedBlockingDeque;
7677
import java.util.concurrent.atomic.AtomicBoolean;
7778
import java.util.function.Function;
@@ -167,6 +168,17 @@ protected TaskManager createTaskManager() {
167168
}
168169
}
169170

171+
private volatile String executorName;
172+
173+
public void setExecutorName(final String executorName) {
174+
this.executorName = executorName;
175+
}
176+
177+
@Override
178+
protected ExecutorService getExecutorService() {
179+
return executorName == null ? super.getExecutorService() : getThreadPool().executor(executorName);
180+
}
181+
170182
/**
171183
* Clears all the registered rules.
172184
*/

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,10 @@
8383

8484
import static java.util.Collections.emptyMap;
8585
import static java.util.Collections.emptySet;
86+
import static org.hamcrest.Matchers.containsString;
8687
import static org.hamcrest.Matchers.empty;
8788
import static org.hamcrest.Matchers.equalTo;
89+
import static org.hamcrest.Matchers.hasToString;
8890
import static org.hamcrest.Matchers.instanceOf;
8991
import static org.hamcrest.Matchers.notNullValue;
9092
import static org.hamcrest.Matchers.startsWith;
@@ -147,14 +149,14 @@ public void onNodeDisconnected(DiscoveryNode node) {
147149
private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings,
148150
Settings settings, boolean acceptRequests, boolean doHandshake) {
149151
MockTransportService service = build(
150-
Settings.builder()
151-
.put(settings)
152-
.put(Node.NODE_NAME_SETTING.getKey(), name)
153-
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
154-
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
155-
.build(),
156-
version,
157-
clusterSettings, doHandshake);
152+
Settings.builder()
153+
.put(settings)
154+
.put(Node.NODE_NAME_SETTING.getKey(), name)
155+
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
156+
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
157+
.build(),
158+
version,
159+
clusterSettings, doHandshake);
158160
if (acceptRequests) {
159161
service.acceptIncomingRequests();
160162
}
@@ -2612,4 +2614,33 @@ public void testProfilesIncludesDefault() {
26122614
assertEquals(new HashSet<>(Arrays.asList("default", "test")), profileSettings.stream().map(s -> s.profileName).collect(Collectors
26132615
.toSet()));
26142616
}
2617+
2618+
public void testChannelCloseWhileConnecting() throws IOException {
2619+
try (MockTransportService service = build(Settings.builder().put("name", "close").build(), version0, null, true)) {
2620+
service.setExecutorName(ThreadPool.Names.SAME); // make sure stuff is executed in a blocking fashion
2621+
service.addConnectionListener(new TransportConnectionListener() {
2622+
@Override
2623+
public void onConnectionOpened(final Transport.Connection connection) {
2624+
try {
2625+
closeConnectionChannel(service.getOriginalTransport(), connection);
2626+
} catch (final IOException e) {
2627+
throw new AssertionError(e);
2628+
}
2629+
}
2630+
});
2631+
final ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
2632+
builder.addConnections(1,
2633+
TransportRequestOptions.Type.BULK,
2634+
TransportRequestOptions.Type.PING,
2635+
TransportRequestOptions.Type.RECOVERY,
2636+
TransportRequestOptions.Type.REG,
2637+
TransportRequestOptions.Type.STATE);
2638+
final ConnectTransportException e =
2639+
expectThrows(ConnectTransportException.class, () -> service.openConnection(nodeA, builder.build()));
2640+
assertThat(e, hasToString(containsString(("a channel closed while connecting"))));
2641+
}
2642+
}
2643+
2644+
protected abstract void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException;
2645+
26152646
}

test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx
176176
}
177177

178178
@Override
179-
protected NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile profile,
179+
protected NodeChannels connectToChannels(DiscoveryNode node,
180+
ConnectionProfile profile,
180181
Consumer<MockChannel> onChannelClose) throws IOException {
181182
final MockChannel[] mockChannels = new MockChannel[1];
182183
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE); // we always use light here

test/framework/src/main/java/org/elasticsearch/transport/nio/NioClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ public NioClient(Logger logger, OpenChannels openChannels, Supplier<SocketSelect
5656
this.channelFactory = channelFactory;
5757
}
5858

59-
public boolean connectToChannels(DiscoveryNode node, NioSocketChannel[] channels, TimeValue connectTimeout,
59+
public boolean connectToChannels(DiscoveryNode node,
60+
NioSocketChannel[] channels,
61+
TimeValue connectTimeout,
6062
Consumer<NioChannel> closeListener) throws IOException {
6163
boolean allowedToConnect = semaphore.tryAcquire();
6264
if (allowedToConnect == false) {

test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Collections;
3434

3535
public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
36+
3637
@Override
3738
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
3839
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
@@ -53,4 +54,13 @@ protected Version executeHandshake(DiscoveryNode node, MockChannel mockChannel,
5354
mockTransportService.start();
5455
return mockTransportService;
5556
}
57+
58+
@Override
59+
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
60+
final MockTcpTransport t = (MockTcpTransport) transport;
61+
@SuppressWarnings("unchecked") final TcpTransport<MockTcpTransport.MockChannel>.NodeChannels channels =
62+
(TcpTransport<MockTcpTransport.MockChannel>.NodeChannels) connection;
63+
t.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true, false);
64+
}
65+
5666
}

test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
5454

5555
public static MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
56-
ClusterSettings clusterSettings, boolean doHandshake) {
56+
ClusterSettings clusterSettings, boolean doHandshake) {
5757
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
5858
NetworkService networkService = new NetworkService(Collections.emptyList());
5959
Transport transport = new NioTransport(settings, threadPool,
@@ -96,6 +96,13 @@ protected MockTransportService build(Settings settings, Version version, Cluster
9696
return transportService;
9797
}
9898

99+
@Override
100+
protected void closeConnectionChannel(Transport transport, Transport.Connection connection) throws IOException {
101+
final NioTransport t = (NioTransport) transport;
102+
@SuppressWarnings("unchecked") TcpTransport<NioChannel>.NodeChannels channels = (TcpTransport<NioChannel>.NodeChannels) connection;
103+
t.closeChannels(channels.getChannels().subList(0, randomIntBetween(1, channels.getChannels().size())), true, false);
104+
}
105+
99106
public void testConnectException() throws UnknownHostException {
100107
try {
101108
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),

0 commit comments

Comments
 (0)