Skip to content

Commit d099dd9

Browse files
committed
Remove tcp profile from low level nio channel
This is related to elastic#27260. Currently every nio channel has a profile field. Profile is a concept that only relates to the tcp transport. Http channels will not have profiles. This commit moves the profile from the nio channel to the read context. The context is the level that protocol specific features and logic should live.
1 parent db688e1 commit d099dd9

15 files changed

+79
-75
lines changed

test/framework/src/main/java/org/elasticsearch/transport/nio/NioTransport.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.transport.nio;
2121

2222
import org.elasticsearch.ElasticsearchException;
23-
import org.elasticsearch.ExceptionsHelper;
2423
import org.elasticsearch.action.ActionListener;
2524
import org.elasticsearch.cluster.node.DiscoveryNode;
2625
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -68,7 +67,6 @@ public class NioTransport extends TcpTransport {
6867
intSetting("transport.nio.acceptor_count", 1, 1, Setting.Property.NodeScope);
6968

7069
protected final OpenChannels openChannels = new OpenChannels(logger);
71-
private final Consumer<NioSocketChannel> contextSetter;
7270
private final ConcurrentMap<String, ChannelFactory> profileToChannelFactory = newConcurrentMap();
7371
private final ArrayList<AcceptingSelector> acceptors = new ArrayList<>();
7472
private final ArrayList<SocketSelector> socketSelectors = new ArrayList<>();
@@ -79,7 +77,6 @@ public class NioTransport extends TcpTransport {
7977
public NioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
8078
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
8179
super("nio", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
82-
contextSetter = (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(this)), new TcpWriteContext(c));
8380
}
8481

8582
@Override
@@ -91,7 +88,7 @@ public long getNumOpenServerConnections() {
9188
protected NioServerSocketChannel bind(String name, InetSocketAddress address) throws IOException {
9289
ChannelFactory channelFactory = this.profileToChannelFactory.get(name);
9390
AcceptingSelector selector = acceptors.get(++acceptorNumber % NioTransport.NIO_ACCEPTOR_COUNT.get(settings));
94-
return channelFactory.openNioServerSocketChannel(name, address, selector);
91+
return channelFactory.openNioServerSocketChannel(address, selector);
9592
}
9693

9794
@Override
@@ -132,8 +129,9 @@ protected void doStart() {
132129
}
133130
}
134131

132+
Consumer<NioSocketChannel> clientContextSetter = getContextSetter("client-socket");
135133
clientSelectorSupplier = new RoundRobinSelectorSupplier(socketSelectors);
136-
clientChannelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), contextSetter);
134+
clientChannelFactory = new ChannelFactory(new ProfileSettings(settings, "default"), clientContextSetter);
137135

138136
if (NetworkService.NETWORK_SERVER.get(settings)) {
139137
int acceptorCount = NioTransport.NIO_ACCEPTOR_COUNT.get(settings);
@@ -155,7 +153,9 @@ protected void doStart() {
155153

156154
// loop through all profiles and start them up, special handling for default one
157155
for (ProfileSettings profileSettings : profileSettings) {
158-
profileToChannelFactory.putIfAbsent(profileSettings.profileName, new ChannelFactory(profileSettings, contextSetter));
156+
String profileName = profileSettings.profileName;
157+
Consumer<NioSocketChannel> contextSetter = getContextSetter(profileName);
158+
profileToChannelFactory.putIfAbsent(profileName, new ChannelFactory(profileSettings, contextSetter));
159159
bindServer(profileSettings);
160160
}
161161
}
@@ -187,4 +187,8 @@ protected SocketEventHandler getSocketEventHandler() {
187187
final void exceptionCaught(NioSocketChannel channel, Exception exception) {
188188
onException(channel, exception);
189189
}
190+
191+
private Consumer<NioSocketChannel> getContextSetter(String profileName) {
192+
return (c) -> c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName,this)), new TcpWriteContext(c));
193+
}
190194
}

test/framework/src/main/java/org/elasticsearch/transport/nio/TcpReadHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,17 @@
2626

2727
public class TcpReadHandler {
2828

29+
private final String profile;
2930
private final NioTransport transport;
3031

31-
public TcpReadHandler(NioTransport transport) {
32+
public TcpReadHandler(String profile, NioTransport transport) {
33+
this.profile = profile;
3234
this.transport = transport;
3335
}
3436

3537
public void handleMessage(BytesReference reference, NioSocketChannel channel, int messageBytesLength) {
3638
try {
37-
transport.messageReceived(reference, channel, channel.getProfile(), channel.getRemoteAddress(), messageBytesLength);
39+
transport.messageReceived(reference, channel, profile, channel.getRemoteAddress(), messageBytesLength);
3840
} catch (IOException e) {
3941
handleException(channel, e);
4042
}

test/framework/src/main/java/org/elasticsearch/transport/nio/channel/AbstractNioChannel.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,11 @@ public abstract class AbstractNioChannel<S extends SelectableChannel & NetworkCh
5757
final AtomicBoolean isClosing = new AtomicBoolean(false);
5858

5959
private final InetSocketAddress localAddress;
60-
private final String profile;
6160
private final CompletableFuture<TcpChannel> closeContext = new CompletableFuture<>();
6261
private final ESSelector selector;
6362
private SelectionKey selectionKey;
6463

65-
AbstractNioChannel(String profile, S socketChannel, ESSelector selector) throws IOException {
66-
this.profile = profile;
64+
AbstractNioChannel(S socketChannel, ESSelector selector) throws IOException {
6765
this.socketChannel = socketChannel;
6866
this.localAddress = (InetSocketAddress) socketChannel.getLocalAddress();
6967
this.selector = selector;
@@ -79,11 +77,6 @@ public InetSocketAddress getLocalAddress() {
7977
return localAddress;
8078
}
8179

82-
@Override
83-
public String getProfile() {
84-
return profile;
85-
}
86-
8780
/**
8881
* Schedules a channel to be closed by the selector event loop with which it is registered.
8982
* <p>

test/framework/src/main/java/org/elasticsearch/transport/nio/channel/ChannelFactory.java

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.transport.nio.channel;
2121

2222

23-
import org.apache.lucene.util.IOUtils;
2423
import org.elasticsearch.mocksocket.PrivilegedSocketAccess;
2524
import org.elasticsearch.transport.TcpTransport;
2625
import org.elasticsearch.transport.nio.AcceptingSelector;
@@ -64,33 +63,51 @@ public ChannelFactory(TcpTransport.ProfileSettings profileSettings, Consumer<Nio
6463

6564
public NioSocketChannel openNioChannel(InetSocketAddress remoteAddress, SocketSelector selector) throws IOException {
6665
SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
67-
NioSocketChannel channel = new NioSocketChannel(NioChannel.CLIENT, rawChannel, selector);
68-
setContexts(channel);
66+
NioSocketChannel channel = createChannel(selector, rawChannel);
6967
scheduleChannel(channel, selector);
7068
return channel;
7169
}
7270

7371
public NioSocketChannel acceptNioChannel(NioServerSocketChannel serverChannel, SocketSelector selector) throws IOException {
7472
SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverChannel);
75-
NioSocketChannel channel = new NioSocketChannel(serverChannel.getProfile(), rawChannel, selector);
76-
setContexts(channel);
73+
NioSocketChannel channel = createChannel(selector, rawChannel);
7774
scheduleChannel(channel, selector);
7875
return channel;
7976
}
8077

81-
public NioServerSocketChannel openNioServerSocketChannel(String profileName, InetSocketAddress address, AcceptingSelector selector)
78+
public NioServerSocketChannel openNioServerSocketChannel(InetSocketAddress address, AcceptingSelector selector)
8279
throws IOException {
8380
ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address);
84-
NioServerSocketChannel serverChannel = new NioServerSocketChannel(profileName, rawChannel, this, selector);
81+
NioServerSocketChannel serverChannel = createServerChannel(selector, rawChannel);
8582
scheduleServerChannel(serverChannel, selector);
8683
return serverChannel;
8784
}
8885

86+
private NioSocketChannel createChannel(SocketSelector selector, SocketChannel rawChannel) throws IOException {
87+
try {
88+
NioSocketChannel channel = new NioSocketChannel(rawChannel, selector);
89+
setContexts(channel);
90+
return channel;
91+
} catch (Exception e) {
92+
closeRawChannel(rawChannel, e);
93+
throw e;
94+
}
95+
}
96+
97+
private NioServerSocketChannel createServerChannel(AcceptingSelector selector, ServerSocketChannel rawChannel) throws IOException {
98+
try {
99+
return new NioServerSocketChannel(rawChannel, this, selector);
100+
} catch (Exception e) {
101+
closeRawChannel(rawChannel, e);
102+
throw e;
103+
}
104+
}
105+
89106
private void scheduleChannel(NioSocketChannel channel, SocketSelector selector) {
90107
try {
91108
selector.scheduleForRegistration(channel);
92109
} catch (IllegalStateException e) {
93-
IOUtils.closeWhileHandlingException(channel.getRawChannel());
110+
closeRawChannel(channel.getRawChannel(), e);
94111
throw e;
95112
}
96113
}
@@ -99,7 +116,7 @@ private void scheduleServerChannel(NioServerSocketChannel channel, AcceptingSele
99116
try {
100117
selector.scheduleForRegistration(channel);
101118
} catch (IllegalStateException e) {
102-
IOUtils.closeWhileHandlingException(channel.getRawChannel());
119+
closeRawChannel(channel.getRawChannel(), e);
103120
throw e;
104121
}
105122
}
@@ -110,6 +127,14 @@ private void setContexts(NioSocketChannel channel) {
110127
assert channel.getWriteContext() != null : "write context should have been set on channel";
111128
}
112129

130+
private static void closeRawChannel(Closeable c, Exception e) {
131+
try {
132+
c.close();
133+
} catch (IOException closeException) {
134+
e.addSuppressed(closeException);
135+
}
136+
}
137+
113138
static class RawChannelFactory {
114139

115140
private final boolean tcpNoDelay;
@@ -142,7 +167,12 @@ SocketChannel openNioChannel(InetSocketAddress remoteAddress) throws IOException
142167
SocketChannel acceptNioChannel(NioServerSocketChannel serverChannel) throws IOException {
143168
ServerSocketChannel serverSocketChannel = serverChannel.getRawChannel();
144169
SocketChannel socketChannel = PrivilegedSocketAccess.accept(serverSocketChannel);
145-
configureSocketChannel(socketChannel);
170+
try {
171+
configureSocketChannel(socketChannel);
172+
} catch (IOException e) {
173+
closeRawChannel(socketChannel, e);
174+
throw e;
175+
}
146176
return socketChannel;
147177
}
148178

@@ -160,14 +190,6 @@ ServerSocketChannel openNioServerSocketChannel(InetSocketAddress address) throws
160190
return serverSocketChannel;
161191
}
162192

163-
private void closeRawChannel(Closeable c, IOException e) {
164-
try {
165-
c.close();
166-
} catch (IOException closeException) {
167-
e.addSuppressed(closeException);
168-
}
169-
}
170-
171193
private void configureSocketChannel(SocketChannel channel) throws IOException {
172194
channel.configureBlocking(false);
173195
Socket socket = channel.socket();

test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioChannel.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,8 @@
3030

3131
public interface NioChannel extends TcpChannel {
3232

33-
String CLIENT = "client-socket";
34-
3533
InetSocketAddress getLocalAddress();
3634

37-
String getProfile();
38-
3935
void close();
4036

4137
void closeFromSelector() throws IOException;

test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioServerSocketChannel.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@
2626

2727
import java.io.IOException;
2828
import java.nio.channels.ServerSocketChannel;
29-
import java.util.concurrent.Future;
3029

3130
public class NioServerSocketChannel extends AbstractNioChannel<ServerSocketChannel> {
3231

3332
private final ChannelFactory channelFactory;
3433

35-
public NioServerSocketChannel(String profile, ServerSocketChannel socketChannel, ChannelFactory channelFactory,
36-
AcceptingSelector selector) throws IOException {
37-
super(profile, socketChannel, selector);
34+
public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory channelFactory, AcceptingSelector selector)
35+
throws IOException {
36+
super(socketChannel, selector);
3837
this.channelFactory = channelFactory;
3938
}
4039

@@ -50,8 +49,7 @@ public void sendMessage(BytesReference reference, ActionListener<TcpChannel> lis
5049
@Override
5150
public String toString() {
5251
return "NioServerSocketChannel{" +
53-
"profile=" + getProfile() +
54-
", localAddress=" + getLocalAddress() +
52+
"localAddress=" + getLocalAddress() +
5553
'}';
5654
}
5755
}

test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public class NioSocketChannel extends AbstractNioChannel<SocketChannel> {
4242
private ReadContext readContext;
4343
private Exception connectException;
4444

45-
public NioSocketChannel(String profile, SocketChannel socketChannel, SocketSelector selector) throws IOException {
46-
super(profile, socketChannel, selector);
45+
public NioSocketChannel(SocketChannel socketChannel, SocketSelector selector) throws IOException {
46+
super(socketChannel, selector);
4747
this.remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
4848
this.socketSelector = selector;
4949
}
@@ -181,8 +181,7 @@ public void addConnectListener(ActionListener<NioChannel> listener) {
181181
@Override
182182
public String toString() {
183183
return "NioSocketChannel{" +
184-
"profile=" + getProfile() +
185-
", localAddress=" + getLocalAddress() +
184+
"localAddress=" + getLocalAddress() +
186185
", remoteAddress=" + remoteAddress +
187186
'}';
188187
}

test/framework/src/test/java/org/elasticsearch/transport/nio/AcceptorEventHandlerTests.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,11 @@
2323
import org.elasticsearch.test.ESTestCase;
2424
import org.elasticsearch.transport.nio.channel.ChannelFactory;
2525
import org.elasticsearch.transport.nio.channel.DoNotRegisterServerChannel;
26-
import org.elasticsearch.transport.nio.channel.NioChannel;
2726
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
2827
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
2928
import org.elasticsearch.transport.nio.channel.ReadContext;
3029
import org.elasticsearch.transport.nio.channel.WriteContext;
3130
import org.junit.Before;
32-
import org.mockito.ArgumentCaptor;
3331

3432
import java.io.IOException;
3533
import java.nio.channels.SelectionKey;
@@ -67,7 +65,7 @@ public void setUpHandler() throws IOException {
6765
handler = new AcceptorEventHandler(logger, openChannels, new RoundRobinSelectorSupplier(selectors), acceptedChannelCallback);
6866

6967
AcceptingSelector selector = mock(AcceptingSelector.class);
70-
channel = new DoNotRegisterServerChannel("", mock(ServerSocketChannel.class), channelFactory, selector);
68+
channel = new DoNotRegisterServerChannel(mock(ServerSocketChannel.class), channelFactory, selector);
7169
channel.register();
7270
}
7371

@@ -88,7 +86,7 @@ public void testHandleRegisterSetsOP_ACCEPTInterest() {
8886
}
8987

9088
public void testHandleAcceptCallsChannelFactory() throws IOException {
91-
NioSocketChannel childChannel = new NioSocketChannel("", mock(SocketChannel.class), socketSelector);
89+
NioSocketChannel childChannel = new NioSocketChannel(mock(SocketChannel.class), socketSelector);
9290
when(channelFactory.acceptNioChannel(same(channel), same(socketSelector))).thenReturn(childChannel);
9391

9492
handler.acceptChannel(channel);
@@ -100,7 +98,7 @@ public void testHandleAcceptCallsChannelFactory() throws IOException {
10098
@SuppressWarnings("unchecked")
10199
public void testHandleAcceptAddsToOpenChannelsAndIsRemovedOnClose() throws IOException {
102100
SocketChannel rawChannel = SocketChannel.open();
103-
NioSocketChannel childChannel = new NioSocketChannel("", rawChannel, socketSelector);
101+
NioSocketChannel childChannel = new NioSocketChannel(rawChannel, socketSelector);
104102
childChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class));
105103
when(channelFactory.acceptNioChannel(same(channel), same(socketSelector))).thenReturn(childChannel);
106104

test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void setUpHandler() throws IOException {
5757
SocketSelector socketSelector = mock(SocketSelector.class);
5858
handler = new SocketEventHandler(logger, exceptionHandler, mock(OpenChannels.class));
5959
rawChannel = mock(SocketChannel.class);
60-
channel = new DoNotRegisterChannel("", rawChannel, socketSelector);
60+
channel = new DoNotRegisterChannel(rawChannel, socketSelector);
6161
readContext = mock(ReadContext.class);
6262
when(rawChannel.finishConnect()).thenReturn(true);
6363

test/framework/src/test/java/org/elasticsearch/transport/nio/channel/ChannelFactoryTests.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,12 @@ public void ensureClosed() throws IOException {
7676
public void testAcceptChannel() throws IOException {
7777
NioServerSocketChannel serverChannel = mock(NioServerSocketChannel.class);
7878
when(rawChannelFactory.acceptNioChannel(serverChannel)).thenReturn(rawChannel);
79-
when(serverChannel.getProfile()).thenReturn("parent-profile");
8079

8180
NioSocketChannel channel = channelFactory.acceptNioChannel(serverChannel, socketSelector);
8281

8382
verify(socketSelector).scheduleForRegistration(channel);
8483

8584
assertEquals(socketSelector, channel.getSelector());
86-
assertEquals("parent-profile", channel.getProfile());
8785
assertEquals(rawChannel, channel.getRawChannel());
8886
}
8987

@@ -106,7 +104,6 @@ public void testOpenChannel() throws IOException {
106104
verify(socketSelector).scheduleForRegistration(channel);
107105

108106
assertEquals(socketSelector, channel.getSelector());
109-
assertEquals("client-socket", channel.getProfile());
110107
assertEquals(rawChannel, channel.getRawChannel());
111108
}
112109

@@ -124,13 +121,11 @@ public void testOpenServerChannel() throws IOException {
124121
InetSocketAddress address = mock(InetSocketAddress.class);
125122
when(rawChannelFactory.openNioServerSocketChannel(same(address))).thenReturn(rawServerChannel);
126123

127-
String profile = "profile";
128-
NioServerSocketChannel channel = channelFactory.openNioServerSocketChannel(profile, address, acceptingSelector);
124+
NioServerSocketChannel channel = channelFactory.openNioServerSocketChannel(address, acceptingSelector);
129125

130126
verify(acceptingSelector).scheduleForRegistration(channel);
131127

132128
assertEquals(acceptingSelector, channel.getSelector());
133-
assertEquals(profile, channel.getProfile());
134129
assertEquals(rawServerChannel, channel.getRawChannel());
135130
}
136131

@@ -139,7 +134,7 @@ public void testOpenedServerChannelRejected() throws IOException {
139134
when(rawChannelFactory.openNioServerSocketChannel(same(address))).thenReturn(rawServerChannel);
140135
doThrow(new IllegalStateException()).when(acceptingSelector).scheduleForRegistration(any());
141136

142-
expectThrows(IllegalStateException.class, () -> channelFactory.openNioServerSocketChannel("", address, acceptingSelector));
137+
expectThrows(IllegalStateException.class, () -> channelFactory.openNioServerSocketChannel(address, acceptingSelector));
143138

144139
assertFalse(rawServerChannel.isOpen());
145140
}

0 commit comments

Comments
 (0)