Skip to content

Commit 6189998

Browse files
authored
Open node connections asynchronously (#35343)
This is related to #29023. Additionally at other points we have discussed a preference for removing the need to unnecessarily block threads for opening new node connections. This commit lays the groudwork for this by opening connections asynchronously at the transport level. We still block, however, this work will make it possible to eventually remove all blocking on new connections out of the TransportService and Transport.
1 parent 01faa32 commit 6189998

File tree

32 files changed

+1278
-605
lines changed

32 files changed

+1278
-605
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.transport;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2324
import org.elasticsearch.common.network.NetworkModule;
2425
import org.elasticsearch.common.network.NetworkService;
@@ -83,8 +84,8 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
8384
CircuitBreakerService circuitBreakerService,
8485
NamedWriteableRegistry namedWriteableRegistry,
8586
NetworkService networkService) {
86-
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, threadPool, networkService, bigArrays,
87-
namedWriteableRegistry, circuitBreakerService));
87+
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
88+
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
8889
}
8990

9091
@Override

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,10 @@
2424
import io.netty.channel.ChannelDuplexHandler;
2525
import io.netty.channel.ChannelHandlerContext;
2626
import io.netty.util.Attribute;
27+
import org.elasticsearch.ElasticsearchException;
2728
import org.elasticsearch.ExceptionsHelper;
28-
import org.elasticsearch.common.bytes.BytesReference;
29-
import org.elasticsearch.transport.TcpHeader;
3029
import org.elasticsearch.transport.Transports;
3130

32-
import java.net.InetSocketAddress;
3331

3432
/**
3533
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
@@ -38,41 +36,37 @@
3836
final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
3937

4038
private final Netty4Transport transport;
41-
private final String profileName;
4239

43-
Netty4MessageChannelHandler(Netty4Transport transport, String profileName) {
40+
Netty4MessageChannelHandler(Netty4Transport transport) {
4441
this.transport = transport;
45-
this.profileName = profileName;
4642
}
4743

4844
@Override
4945
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
5046
Transports.assertTransportThread();
51-
if (!(msg instanceof ByteBuf)) {
52-
ctx.fireChannelRead(msg);
53-
return;
54-
}
47+
assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();
48+
5549
final ByteBuf buffer = (ByteBuf) msg;
56-
final int remainingMessageSize = buffer.getInt(buffer.readerIndex() - TcpHeader.MESSAGE_LENGTH_SIZE);
57-
final int expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
5850
try {
5951
Channel channel = ctx.channel();
60-
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
61-
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
62-
// buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size
63-
BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize);
64-
Attribute<NettyTcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
65-
transport.messageReceived(reference, channelAttribute.get(), profileName, remoteAddress, remainingMessageSize);
52+
Attribute<Netty4TcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
53+
transport.inboundMessage(channelAttribute.get(), Netty4Utils.toBytesReference(buffer));
6654
} finally {
67-
// Set the expected position of the buffer, no matter what happened
68-
buffer.readerIndex(expectedReaderIndex);
55+
buffer.release();
6956
}
7057
}
7158

7259
@Override
7360
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
7461
ExceptionsHelper.maybeDieOnAnotherThread(cause);
75-
transport.exceptionCaught(ctx, cause);
62+
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
63+
final Throwable newCause = unwrapped != null ? unwrapped : cause;
64+
Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
65+
if (newCause instanceof Error) {
66+
transport.onException(tcpChannel, new Exception(newCause));
67+
} else {
68+
transport.onException(tcpChannel, (Exception) newCause);
69+
}
7670
}
7771

7872
}

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoder.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,30 @@
3030

3131
final class Netty4SizeHeaderFrameDecoder extends ByteToMessageDecoder {
3232

33+
private static final int HEADER_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
34+
3335
@Override
3436
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
3537
try {
36-
boolean continueProcessing = TcpTransport.validateMessageHeader(Netty4Utils.toBytesReference(in));
37-
final ByteBuf message = in.skipBytes(TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE);
38-
if (!continueProcessing) return;
39-
out.add(message);
38+
boolean continueDecode = true;
39+
while (continueDecode) {
40+
int messageLength = TcpTransport.readMessageLength(Netty4Utils.toBytesReference(in));
41+
if (messageLength == -1) {
42+
continueDecode = false;
43+
} else {
44+
int messageLengthWithHeader = messageLength + HEADER_SIZE;
45+
// If the message length is greater than the network bytes available, we have not read a complete frame.
46+
if (messageLengthWithHeader > in.readableBytes()) {
47+
continueDecode = false;
48+
} else {
49+
final ByteBuf message = in.retainedSlice(in.readerIndex() + HEADER_SIZE, messageLength);
50+
out.add(message);
51+
in.readerIndex(in.readerIndex() + messageLengthWithHeader);
52+
}
53+
}
54+
}
4055
} catch (IllegalArgumentException ex) {
4156
throw new TooLongFrameException(ex);
42-
} catch (IllegalStateException ex) {
43-
/* decode will be called until the ByteBuf is fully consumed; when it is fully
44-
* consumed, transport#validateMessageHeader will throw an IllegalStateException which
45-
* is okay, it means we have finished consuming the ByteBuf and we can get out
46-
*/
4757
}
4858
}
4959

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyTcpChannel.java renamed to modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,56 @@
2020
package org.elasticsearch.transport.netty4;
2121

2222
import io.netty.channel.Channel;
23+
import io.netty.channel.ChannelException;
24+
import io.netty.channel.ChannelFuture;
2325
import io.netty.channel.ChannelOption;
2426
import io.netty.channel.ChannelPromise;
2527
import org.elasticsearch.ExceptionsHelper;
2628
import org.elasticsearch.action.ActionListener;
29+
import org.elasticsearch.common.Nullable;
2730
import org.elasticsearch.common.bytes.BytesReference;
31+
import org.elasticsearch.common.concurrent.CompletableContext;
2832
import org.elasticsearch.transport.TcpChannel;
2933
import org.elasticsearch.transport.TransportException;
3034

35+
import java.io.IOException;
3136
import java.net.InetSocketAddress;
32-
import java.util.concurrent.CompletableFuture;
3337

34-
public class NettyTcpChannel implements TcpChannel {
38+
public class Netty4TcpChannel implements TcpChannel {
3539

3640
private final Channel channel;
37-
private final CompletableFuture<Void> closeContext = new CompletableFuture<>();
41+
private final String profile;
42+
private final CompletableContext<Void> connectContext;
43+
private final CompletableContext<Void> closeContext = new CompletableContext<>();
3844

39-
NettyTcpChannel(Channel channel) {
45+
Netty4TcpChannel(Channel channel, String profile, @Nullable ChannelFuture connectFuture) {
4046
this.channel = channel;
47+
this.profile = profile;
48+
this.connectContext = new CompletableContext<>();
4149
this.channel.closeFuture().addListener(f -> {
4250
if (f.isSuccess()) {
4351
closeContext.complete(null);
4452
} else {
4553
Throwable cause = f.cause();
4654
if (cause instanceof Error) {
4755
ExceptionsHelper.maybeDieOnAnotherThread(cause);
48-
closeContext.completeExceptionally(cause);
56+
closeContext.completeExceptionally(new Exception(cause));
4957
} else {
50-
closeContext.completeExceptionally(cause);
58+
closeContext.completeExceptionally((Exception) cause);
59+
}
60+
}
61+
});
62+
63+
connectFuture.addListener(f -> {
64+
if (f.isSuccess()) {
65+
connectContext.complete(null);
66+
} else {
67+
Throwable cause = f.cause();
68+
if (cause instanceof Error) {
69+
ExceptionsHelper.maybeDieOnAnotherThread(cause);
70+
connectContext.completeExceptionally(new Exception(cause));
71+
} else {
72+
connectContext.completeExceptionally((Exception) cause);
5173
}
5274
}
5375
});
@@ -58,14 +80,30 @@ public void close() {
5880
channel.close();
5981
}
6082

83+
@Override
84+
public String getProfile() {
85+
return profile;
86+
}
87+
6188
@Override
6289
public void addCloseListener(ActionListener<Void> listener) {
63-
closeContext.whenComplete(ActionListener.toBiConsumer(listener));
90+
closeContext.addListener(ActionListener.toBiConsumer(listener));
91+
}
92+
93+
@Override
94+
public void addConnectListener(ActionListener<Void> listener) {
95+
connectContext.addListener(ActionListener.toBiConsumer(listener));
6496
}
6597

6698
@Override
67-
public void setSoLinger(int value) {
68-
channel.config().setOption(ChannelOption.SO_LINGER, value);
99+
public void setSoLinger(int value) throws IOException {
100+
if (channel.isOpen()) {
101+
try {
102+
channel.config().setOption(ChannelOption.SO_LINGER, value);
103+
} catch (ChannelException e) {
104+
throw new IOException(e);
105+
}
106+
}
69107
}
70108

71109
@Override
@@ -78,6 +116,11 @@ public InetSocketAddress getLocalAddress() {
78116
return (InetSocketAddress) channel.localAddress();
79117
}
80118

119+
@Override
120+
public InetSocketAddress getRemoteAddress() {
121+
return (InetSocketAddress) channel.remoteAddress();
122+
}
123+
81124
@Override
82125
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
83126
ChannelPromise writePromise = channel.newPromise();
@@ -87,8 +130,11 @@ public void sendMessage(BytesReference reference, ActionListener<Void> listener)
87130
} else {
88131
final Throwable cause = f.cause();
89132
ExceptionsHelper.maybeDieOnAnotherThread(cause);
90-
assert cause instanceof Exception;
91-
listener.onFailure((Exception) cause);
133+
if (cause instanceof Error) {
134+
listener.onFailure(new Exception(cause));
135+
} else {
136+
listener.onFailure((Exception) cause);
137+
}
92138
}
93139
});
94140
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), writePromise);
@@ -104,7 +150,7 @@ public Channel getLowLevelChannel() {
104150

105151
@Override
106152
public String toString() {
107-
return "NettyTcpChannel{" +
153+
return "Netty4TcpChannel{" +
108154
"localAddress=" + getLocalAddress() +
109155
", remoteAddress=" + channel.remoteAddress() +
110156
'}';
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport.netty4;
21+
22+
import io.netty.channel.Channel;
23+
import org.elasticsearch.ExceptionsHelper;
24+
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.common.concurrent.CompletableContext;
26+
import org.elasticsearch.transport.TcpServerChannel;
27+
28+
import java.net.InetSocketAddress;
29+
30+
public class Netty4TcpServerChannel implements TcpServerChannel {
31+
32+
private final Channel channel;
33+
private final String profile;
34+
private final CompletableContext<Void> closeContext = new CompletableContext<>();
35+
36+
Netty4TcpServerChannel(Channel channel, String profile) {
37+
this.channel = channel;
38+
this.profile = profile;
39+
this.channel.closeFuture().addListener(f -> {
40+
if (f.isSuccess()) {
41+
closeContext.complete(null);
42+
} else {
43+
Throwable cause = f.cause();
44+
if (cause instanceof Error) {
45+
ExceptionsHelper.maybeDieOnAnotherThread(cause);
46+
closeContext.completeExceptionally(new Exception(cause));
47+
} else {
48+
closeContext.completeExceptionally((Exception) cause);
49+
}
50+
}
51+
});
52+
}
53+
54+
@Override
55+
public String getProfile() {
56+
return profile;
57+
}
58+
59+
@Override
60+
public InetSocketAddress getLocalAddress() {
61+
return (InetSocketAddress) channel.localAddress();
62+
}
63+
64+
@Override
65+
public void close() {
66+
channel.close();
67+
}
68+
69+
@Override
70+
public void addCloseListener(ActionListener<Void> listener) {
71+
closeContext.addListener(ActionListener.toBiConsumer(listener));
72+
}
73+
74+
@Override
75+
public boolean isOpen() {
76+
return channel.isOpen();
77+
}
78+
79+
@Override
80+
public String toString() {
81+
return "Netty4TcpChannel{" +
82+
"localAddress=" + getLocalAddress() +
83+
'}';
84+
}
85+
}

0 commit comments

Comments
 (0)