Skip to content

Commit f8d2cf2

Browse files
committed
Close incoming connections if not ready yet
Today we bind to our transport address(es) very early in the startup of a node so that we know the addresses to which we're bound, even though we are not yet ready to handle any requests. If we receive a request in this state then we throw an `IllegalStateException` which results in a logged warning and the connection being closed. In practice, this happens straight away since the first request on the connection, the handshake, is sent as soon as it's open. With this commit we instead quietly close the connection straight away, even before any requests are received, avoiding the noisy logging. Relates elastic#44939 Relates elastic#16746 Closes elastic#61356
1 parent 61e6734 commit f8d2cf2

File tree

11 files changed

+68
-86
lines changed

11 files changed

+68
-86
lines changed

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public void startThreadPool() {
5858
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler,
5959
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService(), new SharedGroupFactory(settings));
6060
nettyTransport.start();
61+
nettyTransport.acceptIncomingRequests();
6162

6263
TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses();
6364
TransportAddress transportAddress = randomFrom(boundAddresses);

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,6 @@ public void testDefaultKeepAliveSettings() throws IOException {
8484
JavaVersion.current().compareTo(JavaVersion.parse("11")) >= 0);
8585
try (MockTransportService serviceC = buildService("TS_C", Version.CURRENT, Settings.EMPTY);
8686
MockTransportService serviceD = buildService("TS_D", Version.CURRENT, Settings.EMPTY)) {
87-
serviceC.start();
88-
serviceC.acceptIncomingRequests();
89-
serviceD.start();
90-
serviceD.acceptIncomingRequests();
9187

9288
try (Transport.Connection connection = openConnection(serviceC, serviceD.getLocalDiscoNode(), TestProfiles.LIGHT_PROFILE)) {
9389
assertThat(connection, instanceOf(StubbableTransport.WrappedConnection.class));

plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,27 +83,22 @@ public void testDefaultKeepAliveSettings() throws IOException {
8383
(IOUtils.LINUX || IOUtils.MAC_OS_X) &&
8484
JavaVersion.current().compareTo(JavaVersion.parse("11")) >= 0);
8585
try (MockTransportService serviceC = buildService("TS_C", Version.CURRENT, Settings.EMPTY);
86-
MockTransportService serviceD = buildService("TS_D", Version.CURRENT, Settings.EMPTY)) {
87-
serviceC.start();
88-
serviceC.acceptIncomingRequests();
89-
serviceD.start();
90-
serviceD.acceptIncomingRequests();
86+
MockTransportService serviceD = buildService("TS_D", Version.CURRENT, Settings.EMPTY);
87+
Transport.Connection connection = openConnection(serviceC, serviceD.getLocalDiscoNode(), TestProfiles.LIGHT_PROFILE)) {
9188

92-
try (Transport.Connection connection = openConnection(serviceC, serviceD.getLocalDiscoNode(), TestProfiles.LIGHT_PROFILE)) {
93-
assertThat(connection, instanceOf(StubbableTransport.WrappedConnection.class));
94-
Transport.Connection conn = ((StubbableTransport.WrappedConnection) connection).getConnection();
95-
assertThat(conn, instanceOf(TcpTransport.NodeChannels.class));
96-
TcpTransport.NodeChannels nodeChannels = (TcpTransport.NodeChannels) conn;
97-
for (TcpChannel channel : nodeChannels.getChannels()) {
98-
assertFalse(channel.isServerChannel());
99-
checkDefaultKeepAliveOptions(channel);
100-
}
89+
assertThat(connection, instanceOf(StubbableTransport.WrappedConnection.class));
90+
Transport.Connection conn = ((StubbableTransport.WrappedConnection) connection).getConnection();
91+
assertThat(conn, instanceOf(TcpTransport.NodeChannels.class));
92+
TcpTransport.NodeChannels nodeChannels = (TcpTransport.NodeChannels) conn;
93+
for (TcpChannel channel : nodeChannels.getChannels()) {
94+
assertFalse(channel.isServerChannel());
95+
checkDefaultKeepAliveOptions(channel);
96+
}
10197

102-
assertThat(serviceD.getOriginalTransport(), instanceOf(TcpTransport.class));
103-
for (TcpChannel channel : getAcceptedChannels((TcpTransport) serviceD.getOriginalTransport())) {
104-
assertTrue(channel.isServerChannel());
105-
checkDefaultKeepAliveOptions(channel);
106-
}
98+
assertThat(serviceD.getOriginalTransport(), instanceOf(TcpTransport.class));
99+
for (TcpChannel channel : getAcceptedChannels((TcpTransport) serviceD.getOriginalTransport())) {
100+
assertTrue(channel.isServerChannel());
101+
checkDefaultKeepAliveOptions(channel);
107102
}
108103
}
109104
}

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
108108
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
109109
private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap();
110110
private final Set<TcpChannel> acceptedChannels = ConcurrentCollections.newConcurrentSet();
111+
private final AtomicBoolean handleIncomingRequests = new AtomicBoolean();
111112

112113
// this lock is here to make sure we close this transport and disconnect all the client nodes
113114
// connections while no connect operations is going on
@@ -633,13 +634,25 @@ protected void onServerException(TcpServerChannel channel, Exception e) {
633634
}
634635
}
635636

637+
@Override
638+
public void acceptIncomingRequests() {
639+
final boolean startedWithThisCall = handleIncomingRequests.compareAndSet(false, true);
640+
assert startedWithThisCall : "transport was already accepting incoming requests";
641+
logger.debug("now accepting incoming requests");
642+
}
643+
636644
protected void serverAcceptedChannel(TcpChannel channel) {
637-
boolean addedOnThisCall = acceptedChannels.add(channel);
638-
assert addedOnThisCall : "Channel should only be added to accepted channel set once";
639-
// Mark the channel init time
640-
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
641-
channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
642-
logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", channel));
645+
if (handleIncomingRequests.get()) {
646+
boolean addedOnThisCall = acceptedChannels.add(channel);
647+
assert addedOnThisCall : "Channel should only be added to accepted channel set once";
648+
// Mark the channel init time
649+
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
650+
channel.addCloseListener(ActionListener.wrap(() -> acceptedChannels.remove(channel)));
651+
logger.trace("Tcp transport channel accepted: {}", channel);
652+
} else {
653+
logger.debug("Tcp transport channel accepted before we are ready, closing: {}", channel);
654+
channel.close();
655+
}
643656
}
644657

645658
/**

server/src/main/java/org/elasticsearch/transport/Transport.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ default boolean isSecure() {
8080

8181
RequestHandlers getRequestHandlers();
8282

83+
/**
84+
* Called when we are ready to start accepting incoming requests; prior to calling this, incoming connections are accepted and then
85+
* immediately closed.
86+
*/
87+
void acceptIncomingRequests();
88+
8389
/**
8490
* A unidirectional connection to a {@link DiscoveryNode}
8591
*/

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,17 @@ protected void doClose() throws IOException {
279279
}
280280

281281
/**
282-
* start accepting incoming requests.
283-
* when the transport layer starts up it will block any incoming requests until
284-
* this method is called
282+
* Start accepting incoming requests.
283+
*
284+
* The transport service starts before it's ready to accept incoming requests because we need to know the address(es) to which we are
285+
* bound, which means we have to actually bind to them and start accepting incoming connections. However until this method is called we
286+
* close any incoming connections as soon as they are accepted.
285287
*/
286288
public final void acceptIncomingRequests() {
287-
handleIncomingRequests.set(true);
289+
final boolean startedWithThisCall = handleIncomingRequests.compareAndSet(false, true);
290+
assert startedWithThisCall : "transport service was already accepting incoming requests";
291+
logger.debug("now accepting incoming requests");
292+
transport.acceptIncomingRequests();
288293
}
289294

290295
@Override
@@ -928,6 +933,8 @@ public <Request extends TransportRequest> void registerRequestHandler(String act
928933
@Override
929934
public void onRequestReceived(long requestId, String action) {
930935
if (handleIncomingRequests.get() == false) {
936+
// shouldn't happen: the transport should have closed the connection before receiving this request
937+
assert false : "transport not ready yet to handle incoming requests";
931938
throw new IllegalStateException("transport not ready yet to handle incoming requests");
932939
}
933940
if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) {

server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,10 @@ public TransportAddress[] addressesFromString(String address) {
519519
return new TransportAddress[0];
520520
}
521521

522+
@Override
523+
public void acceptIncomingRequests() {
524+
}
525+
522526
@Override
523527
public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
524528
if (profile == null && randomConnectionExceptions && randomBoolean()) {

test/framework/src/main/java/org/elasticsearch/test/transport/FakeTransport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ public void sendRequest(long requestId, String action, TransportRequest request,
7777
});
7878
}
7979

80+
@Override
81+
public void acceptIncomingRequests() {
82+
}
83+
8084
@Override
8185
public TransportStats getStats() {
8286
return null;

test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ Transport getDelegate() {
114114
return delegate;
115115
}
116116

117+
@Override
118+
public void acceptIncomingRequests() {
119+
delegate.acceptIncomingRequests();
120+
}
121+
117122
@Override
118123
public void setMessageListener(TransportMessageListener listener) {
119124
delegate.setMessageListener(listener);

test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

Lines changed: 4 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -545,9 +545,6 @@ public void onRequestSent(DiscoveryNode node, long requestId, String action, Tra
545545

546546
public void testVoidMessageCompressed() throws Exception {
547547
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY)) {
548-
serviceC.start();
549-
serviceC.acceptIncomingRequests();
550-
551548
serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, TransportRequest.Empty::new,
552549
(request, channel, task) -> {
553550
try {
@@ -590,9 +587,6 @@ public void handleException(TransportException exp) {
590587

591588
public void testHelloWorldCompressed() throws Exception {
592589
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY)) {
593-
serviceC.start();
594-
serviceC.acceptIncomingRequests();
595-
596590
serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new,
597591
(request, channel, task) -> {
598592
assertThat("moshe", equalTo(request.message));
@@ -1548,9 +1542,8 @@ public void handleException(TransportException exp) {
15481542
assertTrue(nodeB.getAddress().getAddress().equals(addressB.get().getAddress()));
15491543
}
15501544

1551-
public void testBlockingIncomingRequests() throws Exception {
1552-
try (TransportService service = buildService("TS_TEST", version0, null,
1553-
Settings.EMPTY, false, false)) {
1545+
public void testRejectEarlyIncomingConnections() throws Exception {
1546+
try (TransportService service = buildService("TS_TEST", version0, null, Settings.EMPTY, false, false)) {
15541547
AtomicBoolean requestProcessed = new AtomicBoolean(false);
15551548
service.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new,
15561549
(request, channel, task) -> {
@@ -1560,31 +1553,8 @@ public void testBlockingIncomingRequests() throws Exception {
15601553

15611554
DiscoveryNode node = service.getLocalNode();
15621555
serviceA.close();
1563-
serviceA = buildService("TS_A", version0, null,
1564-
Settings.EMPTY, true, false);
1565-
try (Transport.Connection connection = openConnection(serviceA, node, null)) {
1566-
CountDownLatch latch = new CountDownLatch(1);
1567-
serviceA.sendRequest(connection, "internal:action", new TestRequest(), TransportRequestOptions.EMPTY,
1568-
new TransportResponseHandler<TestResponse>() {
1569-
@Override
1570-
public TestResponse read(StreamInput in) throws IOException {
1571-
return new TestResponse(in);
1572-
}
1573-
1574-
@Override
1575-
public void handleResponse(TestResponse response) {
1576-
latch.countDown();
1577-
}
1578-
1579-
@Override
1580-
public void handleException(TransportException exp) {
1581-
latch.countDown();
1582-
}
1583-
});
1584-
1585-
latch.await();
1586-
assertFalse(requestProcessed.get());
1587-
}
1556+
serviceA = buildService("TS_A", version0, null, Settings.EMPTY, true, false);
1557+
expectThrows(ConnectTransportException.class, () -> openConnection(serviceA, node, null));
15881558

15891559
service.acceptIncomingRequests();
15901560
try (Transport.Connection connection = openConnection(serviceA, node, null)) {
@@ -1674,7 +1644,6 @@ public String toString() {
16741644
public void testSendRandomRequests() throws InterruptedException {
16751645
TransportService serviceC = buildService("TS_C", version0, Settings.EMPTY);
16761646
DiscoveryNode nodeC = serviceC.getLocalNode();
1677-
serviceC.acceptIncomingRequests();
16781647

16791648
final CountDownLatch latch = new CountDownLatch(4);
16801649
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@@ -1906,7 +1875,6 @@ public void testHandshakeWithIncompatVersion() {
19061875
Version version = Version.fromString("2.0.0");
19071876
try (MockTransportService service = buildService("TS_C", version, Settings.EMPTY)) {
19081877
service.start();
1909-
service.acceptIncomingRequests();
19101878
TransportAddress address = service.boundAddress().publishAddress();
19111879
DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", address, emptyMap(), emptySet(), version0);
19121880
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
@@ -1925,7 +1893,6 @@ public void testHandshakeUpdatesVersion() throws IOException {
19251893
Version version = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
19261894
try (MockTransportService service = buildService("TS_C", version, Settings.EMPTY)) {
19271895
service.start();
1928-
service.acceptIncomingRequests();
19291896
TransportAddress address = service.boundAddress().publishAddress();
19301897
DiscoveryNode node = new DiscoveryNode("TS_TPC", "TS_TPC", address, emptyMap(), emptySet(), Version.fromString("2.0.0"));
19311898
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
@@ -2102,8 +2069,6 @@ public void testHandlerIsInvokedOnConnectionClose() throws IOException, Interrup
21022069
(request, channel, task) -> {
21032070
// do nothing
21042071
});
2105-
serviceC.start();
2106-
serviceC.acceptIncomingRequests();
21072072
CountDownLatch latch = new CountDownLatch(1);
21082073
TransportResponseHandler<TransportResponse> transportResponseHandler = new TransportResponseHandler<TransportResponse>() {
21092074
@Override
@@ -2179,8 +2144,6 @@ protected void doRun() throws Exception {
21792144
}
21802145
});
21812146
});
2182-
serviceC.start();
2183-
serviceC.acceptIncomingRequests();
21842147
CountDownLatch responseLatch = new CountDownLatch(1);
21852148
TransportResponseHandler<TransportResponse.Empty> transportResponseHandler = new TransportResponseHandler.Empty() {
21862149
@Override
@@ -2237,8 +2200,6 @@ protected void doRun() throws Exception {
22372200
}
22382201
});
22392202
});
2240-
serviceC.start();
2241-
serviceC.acceptIncomingRequests();
22422203
CountDownLatch responseLatch = new CountDownLatch(1);
22432204
TransportResponseHandler<TransportResponse.Empty> transportResponseHandler = new TransportResponseHandler.Empty() {
22442205
@Override
@@ -2340,8 +2301,6 @@ protected void doRun() throws Exception {
23402301
}
23412302
});
23422303
});
2343-
serviceC.start();
2344-
serviceC.acceptIncomingRequests();
23452304
CountDownLatch responseLatch = new CountDownLatch(1);
23462305
AtomicReference<TransportException> receivedException = new AtomicReference<>(null);
23472306
TransportResponseHandler<TransportResponse.Empty> transportResponseHandler = new TransportResponseHandler.Empty() {
@@ -2424,8 +2383,6 @@ public void testTransportProfilesWithPortAndHost() {
24242383
.putList("transport.profiles.some_other_profile.publish_host", "_local:ipv4_")
24252384
.build())) {
24262385

2427-
serviceC.start();
2428-
serviceC.acceptIncomingRequests();
24292386
Map<String, BoundTransportAddress> profileBoundAddresses = serviceC.transport.profileBoundAddresses();
24302387
assertTrue(profileBoundAddresses.containsKey("some_profile"));
24312388
assertTrue(profileBoundAddresses.containsKey("some_other_profile"));
@@ -2675,8 +2632,6 @@ public <T extends TransportResponse> void sendRequest(
26752632
}
26762633
};
26772634
try (MockTransportService serviceC = buildService("TS_C", CURRENT_VERSION, null, Settings.EMPTY, true, true, interceptor)) {
2678-
serviceC.start();
2679-
serviceC.acceptIncomingRequests();
26802635
final CountDownLatch latch = new CountDownLatch(1);
26812636
serviceC.connectToNode(
26822637
serviceA.getLocalDiscoNode(),

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,6 @@ public boolean matches(SNIServerName sniServerName) {
238238
.put("xpack.security.transport.ssl.verification_mode", "none")
239239
.build();
240240
try (MockTransportService serviceC = buildService("TS_C", version0, settings)) {
241-
serviceC.acceptIncomingRequests();
242-
243241
HashMap<String, String> attributes = new HashMap<>();
244242
attributes.put("server_name", sniIp);
245243
DiscoveryNode node = new DiscoveryNode("server_node_id", new TransportAddress(serverAddress), attributes,
@@ -285,8 +283,6 @@ public void testInvalidSNIServerName() throws Exception {
285283
.put("xpack.security.transport.ssl.verification_mode", "none")
286284
.build();
287285
try (MockTransportService serviceC = buildService("TS_C", version0, settings)) {
288-
serviceC.acceptIncomingRequests();
289-
290286
HashMap<String, String> attributes = new HashMap<>();
291287
attributes.put("server_name", sniIp);
292288
DiscoveryNode node = new DiscoveryNode("server_node_id", new TransportAddress(serverAddress), attributes,

0 commit comments

Comments
 (0)