Skip to content

Commit dc86b4c

Browse files
authored
Decouple ChannelFactory from Tcp classes (#27286)
* Decouple `ChannelFactory` from Tcp classes This is related to #27260. Currently `ChannelFactory` is tightly coupled to classes related to the elasticsearch Tcp binary protocol. This commit modifies the factory to be able to construct http or other protocol channels.
1 parent 798066a commit dc86b4c

File tree

3 files changed

+50
-18
lines changed

3 files changed

+50
-18
lines changed

test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.elasticsearch.transport.nio.channel.NioChannel;
4242
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
4343
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
44+
import org.elasticsearch.transport.nio.channel.TcpReadContext;
45+
import org.elasticsearch.transport.nio.channel.TcpWriteContext;
4446

4547
import java.io.IOException;
4648
import java.net.InetSocketAddress;
@@ -68,7 +70,7 @@ public class NioTransport extends TcpTransport<NioChannel> {
6870
public static final Setting<Integer> NIO_ACCEPTOR_COUNT =
6971
intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);
7072

71-
private final TcpReadHandler tcpReadHandler = new TcpReadHandler(this);
73+
private final Consumer<NioSocketChannel> contextSetter;
7274
private final ConcurrentMap<String, ChannelFactory> profileToChannelFactory = newConcurrentMap();
7375
private final OpenChannels openChannels = new OpenChannels(logger);
7476
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
@@ -79,6 +81,7 @@ public class NioTransport extends TcpTransport<NioChannel> {
7981
public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
8082
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
8183
super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
84+
contextSetter = (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(this)), new TcpWriteContext(c));
8285
}
8386

8487
@Override
@@ -206,7 +209,7 @@ protected void doStart() {
206209

207210
// loop through all profiles and start them up, special handling for default one
208211
for (ProfileSettings profileSettings : profileSettings) {
209-
profileToChannelFactory.putIfAbsent(profileSettings.profileName, new ChannelFactory(profileSettings, tcpReadHandler));
212+
profileToChannelFactory.putIfAbsent(profileSettings.profileName, new ChannelFactory(profileSettings, contextSetter));
210213
bindServer(profileSettings);
211214
}
212215
}
@@ -243,7 +246,7 @@ final void exceptionCaught(NioSocketChannel channel, Throwable cause) {
243246

244247
private NioClient createClient() {
245248
Supplier<SocketSelector> selectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
246-
ChannelFactory channelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), tcpReadHandler);
249+
ChannelFactory channelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), contextSetter);
247250
return new NioClient(logger, openChannels, selectorSupplier, defaultConnectionProfile.getConnectTimeout(), channelFactory);
248251
}
249252

test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.transport.TcpTransport;
2727
import org.elasticsearch.transport.nio.AcceptingSelector;
2828
import org.elasticsearch.transport.nio.SocketSelector;
29-
import org.elasticsearch.transport.nio.TcpReadHandler;
3029

3130
import java.io.Closeable;
3231
import java.io.IOException;
@@ -39,23 +38,36 @@
3938

4039
public class ChannelFactory {
4140

42-
private final TcpReadHandler handler;
41+
private final Consumer<NioSocketChannel> contextSetter;
4342
private final RawChannelFactory rawChannelFactory;
4443

45-
public ChannelFactory(TcpTransport.ProfileSettings profileSettings, TcpReadHandler handler) {
46-
this(new RawChannelFactory(profileSettings), handler);
44+
/**
45+
* This will create a {@link ChannelFactory} using the profile settings and context setter passed to this
46+
* constructor. The context setter must be a {@link Consumer} that calls
47+
* {@link NioSocketChannel#setContexts(ReadContext, WriteContext)} with the appropriate read and write
48+
* contexts. The read and write contexts handle the protocol specific encoding and decoding of messages.
49+
*
50+
* @param profileSettings the profile settings channels opened by this factory
51+
* @param contextSetter a consumer that takes a channel and sets the read and write contexts
52+
*/
53+
public ChannelFactory(TcpTransport.ProfileSettings profileSettings, Consumer<NioSocketChannel> contextSetter) {
54+
this(new RawChannelFactory(profileSettings.tcpNoDelay,
55+
profileSettings.tcpKeepAlive,
56+
profileSettings.reuseAddress,
57+
Math.toIntExact(profileSettings.sendBufferSize.getBytes()),
58+
Math.toIntExact(profileSettings.receiveBufferSize.getBytes())), contextSetter);
4759
}
4860

49-
ChannelFactory(RawChannelFactory rawChannelFactory, TcpReadHandler handler) {
50-
this.handler = handler;
61+
ChannelFactory(RawChannelFactory rawChannelFactory, Consumer<NioSocketChannel> contextSetter) {
62+
this.contextSetter = contextSetter;
5163
this.rawChannelFactory = rawChannelFactory;
5264
}
5365

5466
public NioSocketChannel openNioChannel(InetSocketAddress remoteAddress, SocketSelector selector,
5567
Consumer<NioChannel> closeListener) throws IOException {
5668
SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
5769
NioSocketChannel channel = new NioSocketChannel(NioChannel.CLIENT, rawChannel, selector);
58-
channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel));
70+
setContexts(channel);
5971
channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel)));
6072
scheduleChannel(channel, selector);
6173
return channel;
@@ -65,7 +77,7 @@ public NioSocketChannel acceptNioChannel(NioServerSocketChannel serverChannel, S
6577
Consumer<NioChannel> closeListener) throws IOException {
6678
SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverChannel);
6779
NioSocketChannel channel = new NioSocketChannel(serverChannel.getProfile(), rawChannel, selector);
68-
channel.setContexts(new TcpReadContext(channel, handler), new TcpWriteContext(channel));
80+
setContexts(channel);
6981
channel.getCloseFuture().addListener(ActionListener.wrap(closeListener::accept, (e) -> closeListener.accept(channel)));
7082
scheduleChannel(channel, selector);
7183
return channel;
@@ -97,6 +109,12 @@ private void scheduleServerChannel(NioServerSocketChannel channel, AcceptingSele
97109
}
98110
}
99111

112+
private void setContexts(NioSocketChannel channel) {
113+
contextSetter.accept(channel);
114+
assert channel.getReadContext() != null : "read context should have been set on channel";
115+
assert channel.getWriteContext() != null : "write context should have been set on channel";
116+
}
117+
100118
static class RawChannelFactory {
101119

102120
private final boolean tcpNoDelay;
@@ -105,12 +123,13 @@ static class RawChannelFactory {
105123
private final int tcpSendBufferSize;
106124
private final int tcpReceiveBufferSize;
107125

108-
RawChannelFactory(TcpTransport.ProfileSettings profileSettings) {
109-
tcpNoDelay = profileSettings.tcpNoDelay;
110-
tcpKeepAlive = profileSettings.tcpKeepAlive;
111-
tcpReusedAddress = profileSettings.reuseAddress;
112-
tcpSendBufferSize = Math.toIntExact(profileSettings.sendBufferSize.getBytes());
113-
tcpReceiveBufferSize = Math.toIntExact(profileSettings.receiveBufferSize.getBytes());
126+
RawChannelFactory(boolean tcpNoDelay, boolean tcpKeepAlive, boolean tcpReusedAddress, int tcpSendBufferSize,
127+
int tcpReceiveBufferSize) {
128+
this.tcpNoDelay = tcpNoDelay;
129+
this.tcpKeepAlive = tcpKeepAlive;
130+
this.tcpReusedAddress = tcpReusedAddress;
131+
this.tcpSendBufferSize = tcpSendBufferSize;
132+
this.tcpReceiveBufferSize = tcpReceiveBufferSize;
114133
}
115134

116135
SocketChannel openNioChannel(InetSocketAddress remoteAddress) throws IOException {

test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.elasticsearch.transport.nio.TcpReadHandler;
2727
import org.junit.After;
2828
import org.junit.Before;
29+
import org.mockito.invocation.InvocationOnMock;
30+
import org.mockito.stubbing.Answer;
2931

3032
import java.io.IOException;
3133
import java.net.InetAddress;
@@ -36,6 +38,7 @@
3638

3739
import static org.mockito.Matchers.any;
3840
import static org.mockito.Matchers.same;
41+
import static org.mockito.Mockito.doAnswer;
3942
import static org.mockito.Mockito.doThrow;
4043
import static org.mockito.Mockito.mock;
4144
import static org.mockito.Mockito.verify;
@@ -55,12 +58,19 @@ public class ChannelFactoryTests extends ESTestCase {
5558
@SuppressWarnings("unchecked")
5659
public void setupFactory() throws IOException {
5760
rawChannelFactory = mock(ChannelFactory.RawChannelFactory.class);
58-
channelFactory = new ChannelFactory(rawChannelFactory, mock(TcpReadHandler.class));
61+
Consumer contextSetter = mock(Consumer.class);
62+
channelFactory = new ChannelFactory(rawChannelFactory, contextSetter);
5963
listener = mock(Consumer.class);
6064
socketSelector = mock(SocketSelector.class);
6165
acceptingSelector = mock(AcceptingSelector.class);
6266
rawChannel = SocketChannel.open();
6367
rawServerChannel = ServerSocketChannel.open();
68+
69+
doAnswer(invocationOnMock -> {
70+
NioSocketChannel channel = (NioSocketChannel) invocationOnMock.getArguments()[0];
71+
channel.setContexts(mock(ReadContext.class), mock(WriteContext.class));
72+
return null;
73+
}).when(contextSetter).accept(any());
6474
}
6575

6676
@After

0 commit comments

Comments
 (0)