Skip to content

Commit 8f25ac6

Browse files
committed
Open node connections asynchronously (elastic#35144)
This is related to elastic#29023. Additionally at other points we have discussed a preference for removing the need to unnecessarily block threads for opening new node connections. This commit lays the groudwork for this by opening connections asynchronously at the transport level. We still block, however, this work will make it possible to eventually remove all blocking on new connections out of the TransportService and Transport.
1 parent 723f57a commit 8f25ac6

File tree

26 files changed

+745
-442
lines changed

26 files changed

+745
-442
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.transport;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2324
import org.elasticsearch.common.network.NetworkModule;
2425
import org.elasticsearch.common.network.NetworkService;
@@ -83,8 +84,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
8384
CircuitBreakerService circuitBreakerService,
8485
NamedWriteableRegistry namedWriteableRegistry,
8586
NetworkService networkService) {
86-
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, threadPool, networkService, bigArrays,
87-
namedWriteableRegistry, circuitBreakerService));
87+
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
88+
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
8889
}
8990

9091
@Override

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

+13-25
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.apache.logging.log4j.message.ParameterizedMessage;
3939
import org.elasticsearch.ElasticsearchException;
4040
import org.elasticsearch.ExceptionsHelper;
41-
import org.elasticsearch.action.ActionListener;
41+
import org.elasticsearch.Version;
4242
import org.elasticsearch.cluster.node.DiscoveryNode;
4343
import org.elasticsearch.common.SuppressForbidden;
4444
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -101,9 +101,9 @@ public class Netty4Transport extends TcpTransport {
101101
private volatile Bootstrap clientBootstrap;
102102
private volatile NioEventLoopGroup eventLoopGroup;
103103

104-
public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
104+
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
105105
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
106-
super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
106+
super("netty", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
107107
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
108108
this.workerCount = WORKER_COUNT.get(settings);
109109

@@ -221,44 +221,31 @@ protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
221221
}
222222

223223
@Override
224-
protected NettyTcpChannel initiateChannel(DiscoveryNode node, ActionListener<Void> listener) throws IOException {
224+
protected NettyTcpChannel initiateChannel(DiscoveryNode node) throws IOException {
225225
InetSocketAddress address = node.getAddress().address();
226226
Bootstrap bootstrapWithHandler = clientBootstrap.clone();
227227
bootstrapWithHandler.handler(getClientChannelInitializer(node));
228228
bootstrapWithHandler.remoteAddress(address);
229-
ChannelFuture channelFuture = bootstrapWithHandler.connect();
229+
ChannelFuture connectFuture = bootstrapWithHandler.connect();
230230

231-
Channel channel = channelFuture.channel();
231+
Channel channel = connectFuture.channel();
232232
if (channel == null) {
233-
ExceptionsHelper.maybeDieOnAnotherThread(channelFuture.cause());
234-
throw new IOException(channelFuture.cause());
233+
ExceptionsHelper.maybeDieOnAnotherThread(connectFuture.cause());
234+
throw new IOException(connectFuture.cause());
235235
}
236236
addClosedExceptionLogger(channel);
237237

238-
NettyTcpChannel nettyChannel = new NettyTcpChannel(channel);
238+
NettyTcpChannel nettyChannel = new NettyTcpChannel(channel, "default", connectFuture);
239239
channel.attr(CHANNEL_KEY).set(nettyChannel);
240240

241-
channelFuture.addListener(f -> {
242-
if (f.isSuccess()) {
243-
listener.onResponse(null);
244-
} else {
245-
Throwable cause = f.cause();
246-
if (cause instanceof Error) {
247-
ExceptionsHelper.maybeDieOnAnotherThread(cause);
248-
listener.onFailure(new Exception(cause));
249-
} else {
250-
listener.onFailure((Exception) cause);
251-
}
252-
}
253-
});
254-
255241
return nettyChannel;
256242
}
257243

258244
@Override
259245
protected NettyTcpChannel bind(String name, InetSocketAddress address) {
260246
Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel();
261-
NettyTcpChannel esChannel = new NettyTcpChannel(channel);
247+
// TODO: Switch to same server channels
248+
NettyTcpChannel esChannel = new NettyTcpChannel(channel, "server", channel.newSucceededFuture());
262249
channel.attr(CHANNEL_KEY).set(esChannel);
263250
return esChannel;
264251
}
@@ -314,7 +301,8 @@ protected ServerChannelInitializer(String name) {
314301
@Override
315302
protected void initChannel(Channel ch) throws Exception {
316303
addClosedExceptionLogger(ch);
317-
NettyTcpChannel nettyTcpChannel = new NettyTcpChannel(ch);
304+
NettyTcpChannel nettyTcpChannel = new NettyTcpChannel(ch, name, ch.newSucceededFuture());
305+
318306
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
319307
serverAcceptedChannel(nettyTcpChannel);
320308
ch.pipeline().addLast("logging", new ESLoggingHandler());

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java

+36-3
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,33 @@
2020
package org.elasticsearch.transport.netty4;
2121

2222
import io.netty.channel.Channel;
23+
import io.netty.channel.ChannelException;
24+
import io.netty.channel.ChannelFuture;
2325
import io.netty.channel.ChannelOption;
2426
import io.netty.channel.ChannelPromise;
2527
import org.elasticsearch.ExceptionsHelper;
2628
import org.elasticsearch.action.ActionListener;
29+
import org.elasticsearch.common.Nullable;
2730
import org.elasticsearch.common.bytes.BytesReference;
31+
import org.elasticsearch.common.concurrent.CompletableContext;
2832
import org.elasticsearch.transport.TcpChannel;
2933
import org.elasticsearch.transport.TransportException;
3034

35+
import java.io.IOException;
3136
import java.net.InetSocketAddress;
3237
import java.util.concurrent.CompletableFuture;
3338

3439
public class NettyTcpChannel implements TcpChannel {
3540

3641
private final Channel channel;
3742
private final CompletableFuture<Void> closeContext = new CompletableFuture<>();
43+
private final CompletableContext<Void> connectContext;
44+
private final String profile;
3845

39-
NettyTcpChannel(Channel channel) {
46+
NettyTcpChannel(Channel channel, String profile, @Nullable ChannelFuture connectFuture) {
4047
this.channel = channel;
48+
this.profile = profile;
49+
this.connectContext = new CompletableContext<>();
4150
this.channel.closeFuture().addListener(f -> {
4251
if (f.isSuccess()) {
4352
closeContext.complete(null);
@@ -51,6 +60,20 @@ public class NettyTcpChannel implements TcpChannel {
5160
}
5261
}
5362
});
63+
64+
connectFuture.addListener(f -> {
65+
if (f.isSuccess()) {
66+
connectContext.complete(null);
67+
} else {
68+
Throwable cause = f.cause();
69+
if (cause instanceof Error) {
70+
ExceptionsHelper.maybeDieOnAnotherThread(cause);
71+
connectContext.completeExceptionally(new Exception(cause));
72+
} else {
73+
connectContext.completeExceptionally((Exception) cause);
74+
}
75+
}
76+
});
5477
}
5578

5679
@Override
@@ -63,9 +86,19 @@ public void addCloseListener(ActionListener<Void> listener) {
6386
closeContext.whenComplete(ActionListener.toBiConsumer(listener));
6487
}
6588

89+
public void addConnectListener(ActionListener<Void> listener) {
90+
connectContext.addListener(ActionListener.toBiConsumer(listener));
91+
}
92+
6693
@Override
67-
public void setSoLinger(int value) {
68-
channel.config().setOption(ChannelOption.SO_LINGER, value);
94+
public void setSoLinger(int value) throws IOException {
95+
if (channel.isOpen()) {
96+
try {
97+
channel.config().setOption(ChannelOption.SO_LINGER, value);
98+
} catch (ChannelException e) {
99+
throw new IOException(e);
100+
}
101+
}
69102
}
70103

71104
@Override

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.transport.netty4;
2020

21+
import org.elasticsearch.Version;
2122
import org.elasticsearch.cluster.node.DiscoveryNode;
2223
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2324
import org.elasticsearch.common.io.stream.StreamInput;
@@ -60,15 +61,15 @@ public void testScheduledPing() throws Exception {
6061
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
6162

6263
NamedWriteableRegistry registry = new NamedWriteableRegistry(Collections.emptyList());
63-
final Netty4Transport nettyA = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
64-
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
64+
final Netty4Transport nettyA = new Netty4Transport(settings, Version.CURRENT, threadPool,
65+
new NetworkService(Collections.emptyList()), BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
6566
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
6667
null);
6768
serviceA.start();
6869
serviceA.acceptIncomingRequests();
6970

70-
final Netty4Transport nettyB = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
71-
BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
71+
final Netty4Transport nettyB = new Netty4Transport(settings, Version.CURRENT, threadPool,
72+
new NetworkService(Collections.emptyList()), BigArrays.NON_RECYCLING_INSTANCE, registry, circuitBreakerService);
7273
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
7374
null);
7475

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.transport.netty4;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2324
import org.elasticsearch.common.network.NetworkService;
2425
import org.elasticsearch.common.settings.Settings;
@@ -65,7 +66,7 @@ public void startThreadPool() {
6566
threadPool = new ThreadPool(settings);
6667
NetworkService networkService = new NetworkService(Collections.emptyList());
6768
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
68-
nettyTransport = new Netty4Transport(settings, threadPool, networkService, bigArrays,
69+
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, bigArrays,
6970
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
7071
nettyTransport.start();
7172

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public ExceptionThrowingNetty4Transport(
108108
BigArrays bigArrays,
109109
NamedWriteableRegistry namedWriteableRegistry,
110110
CircuitBreakerService circuitBreakerService) {
111-
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
111+
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
112112
}
113113

114114
@Override

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.transport.netty4;
2020

21+
import org.elasticsearch.Version;
2122
import org.elasticsearch.common.component.Lifecycle;
2223
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2324
import org.elasticsearch.common.network.NetworkService;
@@ -118,7 +119,7 @@ public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exc
118119

119120
private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
120121
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
121-
TcpTransport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
122+
TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()),
122123
bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
123124
transport.start();
124125

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

+5-11
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.transport.netty4;
2121

2222
import org.elasticsearch.Version;
23+
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.cluster.node.DiscoveryNode;
2425
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2526
import org.elasticsearch.common.network.NetworkService;
@@ -40,7 +41,6 @@
4041
import org.elasticsearch.transport.Transport;
4142
import org.elasticsearch.transport.TransportService;
4243

43-
import java.io.IOException;
4444
import java.net.InetAddress;
4545
import java.net.UnknownHostException;
4646
import java.util.Collections;
@@ -54,23 +54,17 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
5454
public static MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
5555
ClusterSettings clusterSettings, boolean doHandshake) {
5656
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
57-
Transport transport = new Netty4Transport(settings, threadPool, new NetworkService(Collections.emptyList()),
57+
Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
5858
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
5959

6060
@Override
61-
public Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,
62-
InterruptedException {
61+
public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener<Version> listener) {
6362
if (doHandshake) {
64-
return super.executeHandshake(node, channel, timeout);
63+
super.executeHandshake(node, channel, timeout, listener);
6564
} else {
66-
return version.minimumCompatibilityVersion();
65+
listener.onResponse(version.minimumCompatibilityVersion());
6766
}
6867
}
69-
70-
@Override
71-
protected Version getCurrentVersion() {
72-
return version;
73-
}
7468
};
7569
MockTransportService mockTransportService =
7670
MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet());

plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.discovery.ec2;
2121

2222
import com.amazonaws.services.ec2.model.Tag;
23-
import org.elasticsearch.Version;
2423
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2524
import org.elasticsearch.common.network.NetworkService;
2625
import org.elasticsearch.common.settings.Settings;
@@ -74,8 +73,7 @@ public static void stopThreadPool() throws InterruptedException {
7473
public void createTransportService() {
7574
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
7675
final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
77-
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()),
78-
Version.CURRENT) {
76+
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList())) {
7977
@Override
8078
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
8179
// we just need to ensure we don't resolve DNS here

server/src/main/java/org/elasticsearch/transport/TcpChannel.java

+2-41
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public interface TcpChannel extends Releasable {
6262
*/
6363
void addCloseListener(ActionListener<Void> listener);
6464

65-
6665
/**
6766
* This sets the low level socket option {@link java.net.StandardSocketOptions} SO_LINGER on a channel.
6867
*
@@ -71,7 +70,6 @@ public interface TcpChannel extends Releasable {
7170
*/
7271
void setSoLinger(int value) throws IOException;
7372

74-
7573
/**
7674
* Indicates whether a channel is currently open
7775
*
@@ -95,6 +93,8 @@ public interface TcpChannel extends Releasable {
9593
*/
9694
void sendMessage(BytesReference reference, ActionListener<Void> listener);
9795

96+
void addConnectListener(ActionListener<Void> listener);
97+
9898
/**
9999
* Closes the channel.
100100
*
@@ -128,45 +128,6 @@ static <C extends TcpChannel> void closeChannels(List<C> channels, boolean block
128128
}
129129
}
130130

131-
/**
132-
* Awaits for all of the pending connections to complete. Will throw an exception if at least one of the
133-
* connections fails.
134-
*
135-
* @param discoveryNode the node for the pending connections
136-
* @param connectionFutures representing the pending connections
137-
* @param connectTimeout to wait for a connection
138-
* @throws ConnectTransportException if one of the connections fails
139-
*/
140-
static void awaitConnected(DiscoveryNode discoveryNode, List<ActionFuture<Void>> connectionFutures, TimeValue connectTimeout)
141-
throws ConnectTransportException {
142-
Exception connectionException = null;
143-
boolean allConnected = true;
144-
145-
for (ActionFuture<Void> connectionFuture : connectionFutures) {
146-
try {
147-
connectionFuture.get(connectTimeout.getMillis(), TimeUnit.MILLISECONDS);
148-
} catch (TimeoutException e) {
149-
allConnected = false;
150-
break;
151-
} catch (InterruptedException e) {
152-
Thread.currentThread().interrupt();
153-
throw new IllegalStateException(e);
154-
} catch (ExecutionException e) {
155-
allConnected = false;
156-
connectionException = (Exception) e.getCause();
157-
break;
158-
}
159-
}
160-
161-
if (allConnected == false) {
162-
if (connectionException == null) {
163-
throw new ConnectTransportException(discoveryNode, "connect_timeout[" + connectTimeout + "]");
164-
} else {
165-
throw new ConnectTransportException(discoveryNode, "connect_exception", connectionException);
166-
}
167-
}
168-
}
169-
170131
static void blockOnFutures(List<ActionFuture<Void>> futures) {
171132
for (ActionFuture<Void> future : futures) {
172133
try {

0 commit comments

Comments
 (0)