Skip to content

Commit 920b039

Browse files
committed
Decouple nio constructs from the tcp transport (#27484)
This is related to #27260. Currently, basic nio constructs (nio channels, the channel factories, selector event handlers, etc) implement logic that is specific to the tcp transport. For example, NioChannel implements the TcpChannel interface. These nio constructs at some point will also need to support other protocols (ex: http). This commit separates the TcpTransport logic from the nio building blocks.
1 parent 72cf294 commit 920b039

25 files changed

+374
-190
lines changed

test/framework/src/main/java/org/elasticsearch/transport/nio/AcceptorEventHandler.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.logging.log4j.message.ParameterizedMessage;
2424
import org.elasticsearch.transport.nio.channel.ChannelFactory;
25-
import org.elasticsearch.transport.nio.channel.NioChannel;
2625
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
2726
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
2827
import org.elasticsearch.transport.nio.channel.SelectionKeyUtils;
2928

3029
import java.io.IOException;
31-
import java.util.function.Consumer;
3230
import java.util.function.Supplier;
3331

3432
/**
@@ -37,15 +35,10 @@
3735
public class AcceptorEventHandler extends EventHandler {
3836

3937
private final Supplier<SocketSelector> selectorSupplier;
40-
private final Consumer<NioChannel> acceptedChannelCallback;
41-
private final OpenChannels openChannels;
4238

43-
public AcceptorEventHandler(Logger logger, OpenChannels openChannels, Supplier<SocketSelector> selectorSupplier,
44-
Consumer<NioChannel> acceptedChannelCallback) {
45-
super(logger, openChannels);
46-
this.openChannels = openChannels;
39+
public AcceptorEventHandler(Logger logger, Supplier<SocketSelector> selectorSupplier) {
40+
super(logger);
4741
this.selectorSupplier = selectorSupplier;
48-
this.acceptedChannelCallback = acceptedChannelCallback;
4942
}
5043

5144
/**
@@ -56,7 +49,6 @@ public AcceptorEventHandler(Logger logger, OpenChannels openChannels, Supplier<S
5649
*/
5750
void serverChannelRegistered(NioServerSocketChannel nioServerSocketChannel) {
5851
SelectionKeyUtils.setAcceptInterested(nioServerSocketChannel);
59-
openChannels.serverChannelOpened(nioServerSocketChannel);
6052
}
6153

6254
/**
@@ -79,8 +71,7 @@ void acceptChannel(NioServerSocketChannel nioServerChannel) throws IOException {
7971
ChannelFactory channelFactory = nioServerChannel.getChannelFactory();
8072
SocketSelector selector = selectorSupplier.get();
8173
NioSocketChannel nioSocketChannel = channelFactory.acceptNioChannel(nioServerChannel, selector);
82-
openChannels.acceptedChannelOpened(nioSocketChannel);
83-
acceptedChannelCallback.accept(nioSocketChannel);
74+
nioServerChannel.getAcceptContext().accept(nioSocketChannel);
8475
}
8576

8677
/**

test/framework/src/main/java/org/elasticsearch/transport/nio/EventHandler.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@
2929
public abstract class EventHandler {
3030

3131
protected final Logger logger;
32-
private final OpenChannels openChannels;
3332

34-
public EventHandler(Logger logger, OpenChannels openChannels) {
33+
public EventHandler(Logger logger) {
3534
this.logger = logger;
36-
this.openChannels = openChannels;
3735
}
3836

3937
/**
@@ -71,7 +69,6 @@ void uncaughtException(Exception exception) {
7169
* @param channel that should be closed
7270
*/
7371
void handleClose(NioChannel channel) {
74-
openChannels.channelClosed(channel);
7572
try {
7673
channel.closeFromSelector();
7774
} catch (IOException e) {

test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@
3333
import org.elasticsearch.threadpool.ThreadPool;
3434
import org.elasticsearch.transport.TcpTransport;
3535
import org.elasticsearch.transport.Transports;
36-
import org.elasticsearch.transport.nio.channel.ChannelFactory;
3736
import org.elasticsearch.transport.nio.channel.NioChannel;
3837
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
3938
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
39+
import org.elasticsearch.transport.nio.channel.TcpChannelFactory;
40+
import org.elasticsearch.transport.nio.channel.TcpNioServerSocketChannel;
41+
import org.elasticsearch.transport.nio.channel.TcpNioSocketChannel;
4042
import org.elasticsearch.transport.nio.channel.TcpReadContext;
4143
import org.elasticsearch.transport.nio.channel.TcpWriteContext;
4244

@@ -65,12 +67,12 @@ public class NioTransport extends TcpTransport {
6567
public static final Setting<Integer> NIO_ACCEPTOR_COUNT =
6668
intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);
6769

68-
protected final OpenChannels openChannels = new OpenChannels(logger);
69-
private final ConcurrentMap<String, ChannelFactory> profileToChannelFactory = newConcurrentMap();
70+
private final OpenChannels openChannels = new OpenChannels(logger);
71+
private final ConcurrentMap<String, TcpChannelFactory> profileToChannelFactory = newConcurrentMap();
7072
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
7173
private final ArrayList<SocketSelector> socketSelectors = new ArrayList<>();
7274
private RoundRobinSelectorSupplier clientSelectorSupplier;
73-
private ChannelFactory clientChannelFactory;
75+
private TcpChannelFactory clientChannelFactory;
7476
private int acceptorNumber;
7577

7678
public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
@@ -84,17 +86,21 @@ public long getNumOpenServerConnections() {
8486
}
8587

8688
@Override
87-
protected NioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException {
88-
ChannelFactory channelFactory = this.profileToChannelFactory.get(name);
89+
protected TcpNioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException {
90+
TcpChannelFactory channelFactory = this.profileToChannelFactory.get(name);
8991
AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings));
90-
return channelFactory.openNioServerSocketChannel(address, selector);
92+
TcpNioServerSocketChannel serverChannel = channelFactory.openNioServerSocketChannel(address, selector);
93+
openChannels.serverChannelOpened(serverChannel);
94+
serverChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(serverChannel)));
95+
return serverChannel;
9196
}
9297

9398
@Override
94-
protected NioChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
99+
protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
95100
throws IOException {
96-
NioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
101+
TcpNioSocketChannel channel = clientChannelFactory.openNioChannel(node.getAddress().address(), clientSelectorSupplier.get());
97102
openChannels.clientChannelOpened(channel);
103+
channel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel)));
98104
channel.addConnectListener(connectListener);
99105
return channel;
100106
}
@@ -119,14 +125,14 @@ protected void doStart() {
119125

120126
Consumer<NioSocketChannel> clientContextSetter = getContextSetter("client-socket");
121127
clientSelectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
122-
clientChannelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), clientContextSetter);
128+
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
129+
clientChannelFactory = new TcpChannelFactory(clientProfileSettings, clientContextSetter, getServerContextSetter());
123130

124131
if (NetworkService.NETWORK_SERVER.get(settings)) {
125132
int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings);
126133
for (int i = 0; i < acceptorCount; ++i) {
127134
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
128-
AcceptorEventHandler eventHandler = new AcceptorEventHandler(logger, openChannels, selectorSupplier,
129-
this::serverAcceptedChannel);
135+
AcceptorEventHandler eventHandler = new AcceptorEventHandler(logger, selectorSupplier);
130136
AcceptingSelector acceptor = new AcceptingSelector(eventHandler);
131137
acceptors.add(acceptor);
132138
}
@@ -143,7 +149,8 @@ protected void doStart() {
143149
for (ProfileSettings profileSettings : profileSettings) {
144150
String profileName = profileSettings.profileName;
145151
Consumer<NioSocketChannel> contextSetter = getContextSetter(profileName);
146-
profileToChannelFactory.putIfAbsent(profileName, new ChannelFactory(profileSettings, contextSetter));
152+
TcpChannelFactory factory = new TcpChannelFactory(profileSettings, contextSetter, getServerContextSetter());
153+
profileToChannelFactory.putIfAbsent(profileName, factory);
147154
bindServer(profileSettings);
148155
}
149156
}
@@ -169,14 +176,27 @@ protected void stopInternal() {
169176
}
170177

171178
protected SocketEventHandler getSocketEventHandler() {
172-
return new SocketEventHandler(logger, this::exceptionCaught, openChannels);
179+
return new SocketEventHandler(logger);
173180
}
174181

175182
final void exceptionCaught(NioSocketChannel channel, Exception exception) {
176-
onException(channel, exception);
183+
onException((TcpNioSocketChannel) channel, exception);
177184
}
178185

179186
private Consumer<NioSocketChannel> getContextSetter(String profileName) {
180-
return (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName,this)), new TcpWriteContext(c));
187+
return (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName,this)), new TcpWriteContext(c),
188+
this::exceptionCaught);
189+
}
190+
191+
private void acceptChannel(NioSocketChannel channel) {
192+
TcpNioSocketChannel tcpChannel = (TcpNioSocketChannel) channel;
193+
openChannels.acceptedChannelOpened(tcpChannel);
194+
tcpChannel.addCloseListener(ActionListener.wrap(() -> openChannels.channelClosed(channel)));
195+
serverAcceptedChannel(tcpChannel);
196+
197+
}
198+
199+
private Consumer<NioServerSocketChannel> getServerContextSetter() {
200+
return (c) -> c.setAcceptContext(this::acceptChannel);
181201
}
182202
}

test/framework/src/main/java/org/elasticsearch/transport/nio/OpenChannels.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.elasticsearch.transport.nio.channel.NioChannel;
2626
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
2727
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
28+
import org.elasticsearch.transport.nio.channel.TcpNioServerSocketChannel;
29+
import org.elasticsearch.transport.nio.channel.TcpNioSocketChannel;
2830

2931
import java.util.ArrayList;
3032
import java.util.HashSet;
@@ -38,17 +40,17 @@
3840
public class OpenChannels implements Releasable {
3941

4042
// TODO: Maybe set concurrency levels?
41-
private final ConcurrentMap<NioSocketChannel, Long> openClientChannels = newConcurrentMap();
42-
private final ConcurrentMap<NioSocketChannel, Long> openAcceptedChannels = newConcurrentMap();
43-
private final ConcurrentMap<NioServerSocketChannel, Long> openServerChannels = newConcurrentMap();
43+
private final ConcurrentMap<TcpNioSocketChannel, Long> openClientChannels = newConcurrentMap();
44+
private final ConcurrentMap<TcpNioSocketChannel, Long> openAcceptedChannels = newConcurrentMap();
45+
private final ConcurrentMap<TcpNioServerSocketChannel, Long> openServerChannels = newConcurrentMap();
4446

4547
private final Logger logger;
4648

4749
public OpenChannels(Logger logger) {
4850
this.logger = logger;
4951
}
5052

51-
public void serverChannelOpened(NioServerSocketChannel channel) {
53+
public void serverChannelOpened(TcpNioServerSocketChannel channel) {
5254
boolean added = openServerChannels.putIfAbsent(channel, System.nanoTime()) == null;
5355
if (added && logger.isTraceEnabled()) {
5456
logger.trace("server channel opened: {}", channel);
@@ -59,7 +61,7 @@ public long serverChannelsCount() {
5961
return openServerChannels.size();
6062
}
6163

62-
public void acceptedChannelOpened(NioSocketChannel channel) {
64+
public void acceptedChannelOpened(TcpNioSocketChannel channel) {
6365
boolean added = openAcceptedChannels.putIfAbsent(channel, System.nanoTime()) == null;
6466
if (added && logger.isTraceEnabled()) {
6567
logger.trace("accepted channel opened: {}", channel);
@@ -70,14 +72,14 @@ public HashSet<NioSocketChannel> getAcceptedChannels() {
7072
return new HashSet<>(openAcceptedChannels.keySet());
7173
}
7274

73-
public void clientChannelOpened(NioSocketChannel channel) {
75+
public void clientChannelOpened(TcpNioSocketChannel channel) {
7476
boolean added = openClientChannels.putIfAbsent(channel, System.nanoTime()) == null;
7577
if (added && logger.isTraceEnabled()) {
7678
logger.trace("client channel opened: {}", channel);
7779
}
7880
}
7981

80-
public Map<NioSocketChannel, Long> getClientChannels() {
82+
public Map<TcpNioSocketChannel, Long> getClientChannels() {
8183
return openClientChannels;
8284
}
8385

@@ -105,7 +107,7 @@ public void closeServerChannels() {
105107

106108
@Override
107109
public void close() {
108-
Stream<NioChannel> channels = Stream.concat(openClientChannels.keySet().stream(), openAcceptedChannels.keySet().stream());
110+
Stream<TcpChannel> channels = Stream.concat(openClientChannels.keySet().stream(), openAcceptedChannels.keySet().stream());
109111
TcpChannel.closeChannels(channels.collect(Collectors.toList()), true);
110112

111113
openClientChannels.clear();

test/framework/src/main/java/org/elasticsearch/transport/nio/SocketEventHandler.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,16 @@
2727
import org.elasticsearch.transport.nio.channel.WriteContext;
2828

2929
import java.io.IOException;
30-
import java.util.function.BiConsumer;
3130

3231
/**
3332
* Event handler designed to handle events from non-server sockets
3433
*/
3534
public class SocketEventHandler extends EventHandler {
3635

37-
private final BiConsumer<NioSocketChannel, Exception> exceptionHandler;
3836
private final Logger logger;
3937

40-
public SocketEventHandler(Logger logger, BiConsumer<NioSocketChannel, Exception> exceptionHandler, OpenChannels openChannels) {
41-
super(logger, openChannels);
42-
this.exceptionHandler = exceptionHandler;
38+
public SocketEventHandler(Logger logger) {
39+
super(logger);
4340
this.logger = logger;
4441
}
4542

@@ -150,6 +147,6 @@ void genericChannelException(NioChannel channel, Exception exception) {
150147
}
151148

152149
private void exceptionCaught(NioSocketChannel channel, Exception e) {
153-
exceptionHandler.accept(channel, e);
150+
channel.getExceptionContext().accept(channel, e);
154151
}
155152
}

test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.common.bytes.BytesReference;
2323
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
24+
import org.elasticsearch.transport.nio.channel.TcpNioSocketChannel;
2425

2526
import java.io.IOException;
2627

@@ -34,7 +35,7 @@ public TcpReadHandler(String profile, NioTransport transport) {
3435
this.transport = transport;
3536
}
3637

37-
public void handleMessage(BytesReference reference, NioSocketChannel channel, int messageBytesLength) {
38+
public void handleMessage(BytesReference reference, TcpNioSocketChannel channel, int messageBytesLength) {
3839
try {
3940
transport.messageReceived(reference, channel, profile, channel.getRemoteAddress(), messageBytesLength);
4041
} catch (IOException e) {

test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -137,25 +137,18 @@ public S getRawChannel() {
137137
return socketChannel;
138138
}
139139

140+
@Override
141+
public void addCloseListener(ActionListener<Void> listener) {
142+
closeContext.whenComplete(ActionListener.toBiConsumer(listener));
143+
}
144+
140145
// Package visibility for testing
141146
void setSelectionKey(SelectionKey selectionKey) {
142147
this.selectionKey = selectionKey;
143148
}
144-
145149
// Package visibility for testing
150+
146151
void closeRawChannel() throws IOException {
147152
socketChannel.close();
148153
}
149-
150-
@Override
151-
public void addCloseListener(ActionListener<Void> listener) {
152-
closeContext.whenComplete(ActionListener.toBiConsumer(listener));
153-
}
154-
155-
@Override
156-
public void setSoLinger(int value) throws IOException {
157-
if (isOpen()) {
158-
socketChannel.setOption(StandardSocketOptions.SO_LINGER, value);
159-
}
160-
}
161154
}

0 commit comments

Comments
 (0)