Skip to content

Commit 488cd29

Browse files
Tim-Brookspgomulka
authored andcommitted
Open node connections asynchronously (elastic#35144)
This is related to elastic#29023. Additionally at other points we have discussed a preference for removing the need to unnecessarily block threads for opening new node connections. This commit lays the groudwork for this by opening connections asynchronously at the transport level. We still block, however, this work will make it possible to eventually remove all blocking on new connections out of the TransportService and Transport.
1 parent 8072a49 commit 488cd29

File tree

37 files changed

+655
-451
lines changed

37 files changed

+655
-451
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.transport;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2324
import org.elasticsearch.common.network.NetworkModule;
2425
import org.elasticsearch.common.network.NetworkService;
@@ -81,8 +82,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
8182
CircuitBreakerService circuitBreakerService,
8283
NamedWriteableRegistry namedWriteableRegistry,
8384
NetworkService networkService) {
84-
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, threadPool, networkService, bigArrays,
85-
namedWriteableRegistry, circuitBreakerService));
85+
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
86+
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
8687
}
8788

8889
@Override

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,15 @@
2121

2222
import io.netty.channel.Channel;
2323
import io.netty.channel.ChannelException;
24+
import io.netty.channel.ChannelFuture;
2425
import io.netty.channel.ChannelOption;
2526
import io.netty.channel.ChannelPromise;
27+
2628
import java.io.IOException;
29+
2730
import org.elasticsearch.ExceptionsHelper;
2831
import org.elasticsearch.action.ActionListener;
32+
import org.elasticsearch.common.Nullable;
2933
import org.elasticsearch.common.bytes.BytesReference;
3034
import org.elasticsearch.common.concurrent.CompletableContext;
3135
import org.elasticsearch.transport.TcpChannel;
@@ -37,11 +41,13 @@ public class Netty4TcpChannel implements TcpChannel {
3741

3842
private final Channel channel;
3943
private final String profile;
44+
private final CompletableContext<Void> connectContext;
4045
private final CompletableContext<Void> closeContext = new CompletableContext<>();
4146

42-
Netty4TcpChannel(Channel channel, String profile) {
47+
Netty4TcpChannel(Channel channel, String profile, @Nullable ChannelFuture connectFuture) {
4348
this.channel = channel;
4449
this.profile = profile;
50+
this.connectContext = new CompletableContext<>();
4551
this.channel.closeFuture().addListener(f -> {
4652
if (f.isSuccess()) {
4753
closeContext.complete(null);
@@ -55,6 +61,20 @@ public class Netty4TcpChannel implements TcpChannel {
5561
}
5662
}
5763
});
64+
65+
connectFuture.addListener(f -> {
66+
if (f.isSuccess()) {
67+
connectContext.complete(null);
68+
} else {
69+
Throwable cause = f.cause();
70+
if (cause instanceof Error) {
71+
ExceptionsHelper.maybeDieOnAnotherThread(cause);
72+
connectContext.completeExceptionally(new Exception(cause));
73+
} else {
74+
connectContext.completeExceptionally((Exception) cause);
75+
}
76+
}
77+
});
5878
}
5979

6080
@Override
@@ -72,6 +92,11 @@ public void addCloseListener(ActionListener<Void> listener) {
7292
closeContext.addListener(ActionListener.toBiConsumer(listener));
7393
}
7494

95+
@Override
96+
public void addConnectListener(ActionListener<Void> listener) {
97+
connectContext.addListener(ActionListener.toBiConsumer(listener));
98+
}
99+
75100
@Override
76101
public void setSoLinger(int value) throws IOException {
77102
if (channel.isOpen()) {

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import io.netty.util.concurrent.Future;
3939
import org.apache.logging.log4j.message.ParameterizedMessage;
4040
import org.elasticsearch.ExceptionsHelper;
41-
import org.elasticsearch.action.ActionListener;
41+
import org.elasticsearch.Version;
4242
import org.elasticsearch.cluster.node.DiscoveryNode;
4343
import org.elasticsearch.common.SuppressForbidden;
4444
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -101,9 +101,9 @@ public class Netty4Transport extends TcpTransport {
101101
private volatile Bootstrap clientBootstrap;
102102
private volatile NioEventLoopGroup eventLoopGroup;
103103

104-
public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
104+
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
105105
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
106-
super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
106+
super("netty", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
107107
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
108108
this.workerCount = WORKER_COUNT.get(settings);
109109

@@ -216,37 +216,23 @@ protected ChannelHandler getClientChannelInitializer(DiscoveryNode node) {
216216
static final AttributeKey<Netty4TcpServerChannel> SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel");
217217

218218
@Override
219-
protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> listener) throws IOException {
219+
protected Netty4TcpChannel initiateChannel(DiscoveryNode node) throws IOException {
220220
InetSocketAddress address = node.getAddress().address();
221221
Bootstrap bootstrapWithHandler = clientBootstrap.clone();
222222
bootstrapWithHandler.handler(getClientChannelInitializer(node));
223223
bootstrapWithHandler.remoteAddress(address);
224-
ChannelFuture channelFuture = bootstrapWithHandler.connect();
224+
ChannelFuture connectFuture = bootstrapWithHandler.connect();
225225

226-
Channel channel = channelFuture.channel();
226+
Channel channel = connectFuture.channel();
227227
if (channel == null) {
228-
ExceptionsHelper.maybeDieOnAnotherThread(channelFuture.cause());
229-
throw new IOException(channelFuture.cause());
228+
ExceptionsHelper.maybeDieOnAnotherThread(connectFuture.cause());
229+
throw new IOException(connectFuture.cause());
230230
}
231231
addClosedExceptionLogger(channel);
232232

233-
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default");
233+
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default", connectFuture);
234234
channel.attr(CHANNEL_KEY).set(nettyChannel);
235235

236-
channelFuture.addListener(f -> {
237-
if (f.isSuccess()) {
238-
listener.onResponse(null);
239-
} else {
240-
Throwable cause = f.cause();
241-
if (cause instanceof Error) {
242-
ExceptionsHelper.maybeDieOnAnotherThread(cause);
243-
listener.onFailure(new Exception(cause));
244-
} else {
245-
listener.onFailure((Exception) cause);
246-
}
247-
}
248-
});
249-
250236
return nettyChannel;
251237
}
252238

@@ -309,7 +295,7 @@ protected ServerChannelInitializer(String name) {
309295
@Override
310296
protected void initChannel(Channel ch) throws Exception {
311297
addClosedExceptionLogger(ch);
312-
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name);
298+
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name, ch.newSucceededFuture());
313299
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
314300
ch.pipeline().addLast("logging", new ESLoggingHandler());
315301
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.transport.netty4;
2020

21+
import org.elasticsearch.Version;
2122
import org.elasticsearch.cluster.node.DiscoveryNode;
2223
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2324
import org.elasticsearch.common.io.stream.StreamInput;
@@ -59,15 +60,15 @@ public void testScheduledPing() throws Exception {
5960
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
6061

6162
NamedWriteableRegistry registry = new NamedWriteableRegistry(Collections.emptyList());
62-
final Netty4Transport nettyA = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
63-
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
63+
final Netty4Transport nettyA = new Netty4Transport(settings, Version.CURRENT, threadPool,
64+
new NetworkService(Collections.emptyList()), BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
6465
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
6566
null);
6667
serviceA.start();
6768
serviceA.acceptIncomingRequests();
6869

69-
final Netty4Transport nettyB = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
70-
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
70+
final Netty4Transport nettyB = new Netty4Transport(settings, Version.CURRENT, threadPool,
71+
new NetworkService(Collections.emptyList()), BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
7172
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
7273
null);
7374

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.transport.netty4;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2324
import org.elasticsearch.common.network.NetworkService;
2425
import org.elasticsearch.common.settings.Settings;
@@ -65,7 +66,7 @@ public void startThreadPool() {
6566
threadPool = new ThreadPool(settings);
6667
NetworkService networkService = new NetworkService(Collections.emptyList());
6768
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
68-
nettyTransport = new Netty4Transport(settings, threadPool, networkService, bigArrays,
69+
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, bigArrays,
6970
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
7071
nettyTransport.start();
7172

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public ExceptionThrowingNetty4Transport(
108108
BigArrays bigArrays,
109109
NamedWriteableRegistry namedWriteableRegistry,
110110
CircuitBreakerService circuitBreakerService) {
111-
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
111+
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
112112
}
113113

114114
@Override

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.transport.netty4;
2020

21+
import org.elasticsearch.Version;
2122
import org.elasticsearch.common.component.Lifecycle;
2223
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2324
import org.elasticsearch.common.network.NetworkService;
@@ -118,7 +119,7 @@ public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exc
118119

119120
private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
120121
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
121-
TcpTransport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
122+
TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()),
122123
bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
123124
transport.start();
124125

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.transport.netty4;
2121

2222
import org.elasticsearch.Version;
23+
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.cluster.node.DiscoveryNode;
2425
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2526
import org.elasticsearch.common.network.NetworkService;
@@ -40,7 +41,6 @@
4041
import org.elasticsearch.transport.Transport;
4142
import org.elasticsearch.transport.TransportService;
4243

43-
import java.io.IOException;
4444
import java.net.InetAddress;
4545
import java.net.UnknownHostException;
4646
import java.util.Collections;
@@ -54,23 +54,17 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
5454
public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
5555
ClusterSettings clusterSettings, boolean doHandshake) {
5656
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
57-
Transport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
57+
Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
5858
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
5959

6060
@Override
61-
public Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,
62-
InterruptedException {
61+
public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
6362
if (doHandshake) {
64-
return super.executeHandshake(node, channel, timeout);
63+
super.executeHandshake(node, channel, timeout, listener);
6564
} else {
66-
return version.minimumCompatibilityVersion();
65+
listener.onResponse(version.minimumCompatibilityVersion());
6766
}
6867
}
69-
70-
@Override
71-
protected Version getCurrentVersion() {
72-
return version;
73-
}
7468
};
7569
MockTransportService mockTransportService =
7670
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());

plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.discovery.ec2;
2121

2222
import com.amazonaws.services.ec2.model.Tag;
23-
import org.elasticsearch.Version;
2423
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2524
import org.elasticsearch.common.network.NetworkService;
2625
import org.elasticsearch.common.settings.Settings;
@@ -74,8 +73,7 @@ public static void stopThreadPool() throws InterruptedException {
7473
public void createTransportService() {
7574
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
7675
final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
77-
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),
78-
Version.CURRENT) {
76+
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList())) {
7977
@Override
8078
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
8179
// we just need to ensure we don't resolve DNS here

plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTcpChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public void addCloseListener(ActionListener<Void> listener) {
5858
addCloseListener(ActionListener.toBiConsumer(listener));
5959
}
6060

61+
@Override
62+
public void addConnectListener(ActionListener<Void> listener) {
63+
addConnectListener(ActionListener.toBiConsumer(listener));
64+
}
65+
6166
@Override
6267
public void close() {
6368
getContext().closeChannel();

plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.elasticsearch.transport.nio;
2121

2222
import org.elasticsearch.ElasticsearchException;
23-
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.Version;
2424
import org.elasticsearch.cluster.node.DiscoveryNode;
2525
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2626
import org.elasticsearch.common.network.NetworkService;
@@ -66,10 +66,10 @@ public class NioTransport extends TcpTransport {
6666
private volatile NioGroup nioGroup;
6767
private volatile TcpChannelFactory clientChannelFactory;
6868

69-
protected NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
70-
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
71-
CircuitBreakerService circuitBreakerService) {
72-
super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
69+
protected NioTransport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
70+
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
71+
CircuitBreakerService circuitBreakerService) {
72+
super("nio", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
7373
this.pageCacheRecycler = pageCacheRecycler;
7474
}
7575

@@ -80,10 +80,9 @@ protected NioTcpServerChannel bind(String name, InetSocketAddress address) throw
8080
}
8181

8282
@Override
83-
protected NioTcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> connectListener) throws IOException {
83+
protected NioTcpChannel initiateChannel(DiscoveryNode node) throws IOException {
8484
InetSocketAddress address = node.getAddress().address();
8585
NioTcpChannel channel = nioGroup.openChannel(address, clientChannelFactory);
86-
channel.addConnectListener(ActionListener.toBiConsumer(connectListener));
8786
return channel;
8887
}
8988

plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/NioTransportPlugin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.transport.nio;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2324
import org.elasticsearch.common.network.NetworkService;
2425
import org.elasticsearch.common.settings.Setting;
@@ -61,8 +62,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
6162
NamedWriteableRegistry namedWriteableRegistry,
6263
NetworkService networkService) {
6364
return Collections.singletonMap(NIO_TRANSPORT_NAME,
64-
() -> new NioTransport(settings, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry,
65-
circuitBreakerService));
65+
() -> new NioTransport(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler,
66+
namedWriteableRegistry, circuitBreakerService));
6667
}
6768

6869
@Override

plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/NioTransportIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
104104
ExceptionThrowingNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
105105
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
106106
CircuitBreakerService circuitBreakerService) {
107-
super(settings, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService);
107+
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry,
108+
circuitBreakerService);
108109
}
109110

110111
@Override

0 commit comments

Comments
 (0)