Skip to content

Commit 23b850d

Browse files
committed
Move nio channel initialization to event loop (elastic#43780)
Currently in the transport-nio work we connect and bind channels on the a thread before the channel is registered with a selector. Additionally, it is at this point that we set all the socket options. This commit moves these operations onto the event-loop after the channel has been registered with a selector. It attempts to set the socket options for a non-server channel at registration time. If that fails, it will attempt to set the options after the channel is connected. This should fix elastic#41071.
1 parent ea725dc commit 23b850d

File tree

21 files changed

+544
-296
lines changed

21 files changed

+544
-296
lines changed

libs/nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424

2525
public class BytesChannelContext extends SocketChannelContext {
2626

27-
public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
28-
NioChannelHandler handler, InboundChannelBuffer channelBuffer) {
29-
super(channel, selector, exceptionHandler, handler, channelBuffer);
27+
public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Config.Socket socketConfig,
28+
Consumer<Exception> exceptionHandler, NioChannelHandler handler, InboundChannelBuffer channelBuffer) {
29+
super(channel, selector, socketConfig, exceptionHandler, handler, channelBuffer);
3030
}
3131

3232
@Override

libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java

Lines changed: 75 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -19,58 +19,69 @@
1919

2020
package org.elasticsearch.nio;
2121

22-
import org.elasticsearch.common.CheckedRunnable;
23-
2422
import java.io.Closeable;
2523
import java.io.IOException;
2624
import java.io.UncheckedIOException;
2725
import java.net.InetSocketAddress;
28-
import java.net.SocketException;
2926
import java.nio.channels.ServerSocketChannel;
3027
import java.nio.channels.SocketChannel;
31-
import java.security.AccessController;
32-
import java.security.PrivilegedActionException;
33-
import java.security.PrivilegedExceptionAction;
28+
import java.nio.channels.spi.AbstractSelectableChannel;
3429
import java.util.function.Supplier;
3530

3631
public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel, Socket extends NioSocketChannel> {
3732

33+
private final boolean tcpNoDelay;
34+
private final boolean tcpKeepAlive;
35+
private final boolean tcpReuseAddress;
36+
private final int tcpSendBufferSize;
37+
private final int tcpReceiveBufferSize;
3838
private final ChannelFactory.RawChannelFactory rawChannelFactory;
3939

40+
/**
41+
* This will create a {@link ChannelFactory}.
42+
*/
43+
protected ChannelFactory(boolean tcpNoDelay, boolean tcpKeepAlive, boolean tcpReuseAddress, int tcpSendBufferSize,
44+
int tcpReceiveBufferSize) {
45+
this(tcpNoDelay, tcpKeepAlive, tcpReuseAddress, tcpSendBufferSize, tcpReceiveBufferSize, new RawChannelFactory());
46+
}
47+
4048
/**
4149
* This will create a {@link ChannelFactory} using the raw channel factory passed to the constructor.
42-
*
43-
* @param rawChannelFactory a factory that will construct the raw socket channels
4450
*/
45-
protected ChannelFactory(RawChannelFactory rawChannelFactory) {
51+
protected ChannelFactory(boolean tcpNoDelay, boolean tcpKeepAlive, boolean tcpReuseAddress, int tcpSendBufferSize,
52+
int tcpReceiveBufferSize, RawChannelFactory rawChannelFactory) {
53+
this.tcpNoDelay = tcpNoDelay;
54+
this.tcpKeepAlive = tcpKeepAlive;
55+
this.tcpReuseAddress = tcpReuseAddress;
56+
this.tcpSendBufferSize = tcpSendBufferSize;
57+
this.tcpReceiveBufferSize = tcpReceiveBufferSize;
4658
this.rawChannelFactory = rawChannelFactory;
4759
}
4860

4961
public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier<NioSelector> supplier) throws IOException {
50-
SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress);
62+
SocketChannel rawChannel = rawChannelFactory.openNioChannel();
63+
setNonBlocking(rawChannel);
5164
NioSelector selector = supplier.get();
52-
Socket channel = internalCreateChannel(selector, rawChannel);
65+
Socket channel = internalCreateChannel(selector, rawChannel, createSocketConfig(remoteAddress, false));
5366
scheduleChannel(channel, selector);
5467
return channel;
5568
}
5669

57-
public Socket acceptNioChannel(ServerChannelContext serverContext, Supplier<NioSelector> supplier) throws IOException {
58-
SocketChannel rawChannel = rawChannelFactory.acceptNioChannel(serverContext);
59-
// Null is returned if there are no pending sockets to accept
60-
if (rawChannel == null) {
61-
return null;
62-
} else {
63-
NioSelector selector = supplier.get();
64-
Socket channel = internalCreateChannel(selector, rawChannel);
65-
scheduleChannel(channel, selector);
66-
return channel;
67-
}
70+
public Socket acceptNioChannel(SocketChannel rawChannel, Supplier<NioSelector> supplier) throws IOException {
71+
setNonBlocking(rawChannel);
72+
NioSelector selector = supplier.get();
73+
InetSocketAddress remoteAddress = getRemoteAddress(rawChannel);
74+
Socket channel = internalCreateChannel(selector, rawChannel, createSocketConfig(remoteAddress, true));
75+
scheduleChannel(channel, selector);
76+
return channel;
6877
}
6978

70-
public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier<NioSelector> supplier) throws IOException {
71-
ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address);
79+
public ServerSocket openNioServerSocketChannel(InetSocketAddress localAddress, Supplier<NioSelector> supplier) throws IOException {
80+
ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel();
81+
setNonBlocking(rawChannel);
7282
NioSelector selector = supplier.get();
73-
ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel);
83+
Config.ServerSocket config = new Config.ServerSocket(tcpReuseAddress, localAddress);
84+
ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel, config);
7485
scheduleServerChannel(serverChannel, selector);
7586
return serverChannel;
7687
}
@@ -80,27 +91,38 @@ public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Suppli
8091
* returned, the channel should be fully created and setup. Read and write contexts and the channel
8192
* exception handler should have been set.
8293
*
83-
* @param selector the channel will be registered with
84-
* @param channel the raw channel
94+
* @param selector the channel will be registered with
95+
* @param channel the raw channel
96+
* @param socketConfig the socket config
8597
* @return the channel
8698
* @throws IOException related to the creation of the channel
8799
*/
88-
public abstract Socket createChannel(NioSelector selector, SocketChannel channel) throws IOException;
100+
public abstract Socket createChannel(NioSelector selector, SocketChannel channel, Config.Socket socketConfig) throws IOException;
89101

90102
/**
91103
* This method should return a new {@link NioServerSocketChannel} implementation. When this method has
92104
* returned, the channel should be fully created and setup.
93105
*
94106
* @param selector the channel will be registered with
95-
* @param channel the raw channel
107+
* @param channel the raw channel
108+
* @param socketConfig the socket config
96109
* @return the server channel
97110
* @throws IOException related to the creation of the channel
98111
*/
99-
public abstract ServerSocket createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException;
112+
public abstract ServerSocket createServerChannel(NioSelector selector, ServerSocketChannel channel, Config.ServerSocket socketConfig)
113+
throws IOException;
100114

101-
private Socket internalCreateChannel(NioSelector selector, SocketChannel rawChannel) throws IOException {
115+
protected InetSocketAddress getRemoteAddress(SocketChannel rawChannel) throws IOException {
116+
InetSocketAddress remoteAddress = (InetSocketAddress) rawChannel.socket().getRemoteSocketAddress();
117+
if (remoteAddress == null) {
118+
throw new IOException("Accepted socket does not have remote address");
119+
}
120+
return remoteAddress;
121+
}
122+
123+
private Socket internalCreateChannel(NioSelector selector, SocketChannel rawChannel, Config.Socket config) throws IOException {
102124
try {
103-
Socket channel = createChannel(selector, rawChannel);
125+
Socket channel = createChannel(selector, rawChannel, config);
104126
assert channel.getContext() != null : "channel context should have been set on channel";
105127
return channel;
106128
} catch (UncheckedIOException e) {
@@ -114,9 +136,10 @@ private Socket internalCreateChannel(NioSelector selector, SocketChannel rawChan
114136
}
115137
}
116138

117-
private ServerSocket internalCreateServerChannel(NioSelector selector, ServerSocketChannel rawChannel) throws IOException {
139+
private ServerSocket internalCreateServerChannel(NioSelector selector, ServerSocketChannel rawChannel, Config.ServerSocket config)
140+
throws IOException {
118141
try {
119-
return createServerChannel(selector, rawChannel);
142+
return createServerChannel(selector, rawChannel, config);
120143
} catch (Exception e) {
121144
closeRawChannel(rawChannel, e);
122145
throw e;
@@ -141,6 +164,15 @@ private void scheduleServerChannel(ServerSocket channel, NioSelector selector) {
141164
}
142165
}
143166

167+
private void setNonBlocking(AbstractSelectableChannel rawChannel) throws IOException {
168+
try {
169+
rawChannel.configureBlocking(false);
170+
} catch (IOException e) {
171+
closeRawChannel(rawChannel, e);
172+
throw e;
173+
}
174+
}
175+
144176
private static void closeRawChannel(Closeable c, Exception e) {
145177
try {
146178
c.close();
@@ -149,107 +181,19 @@ private static void closeRawChannel(Closeable c, Exception e) {
149181
}
150182
}
151183

152-
public static class RawChannelFactory {
153-
154-
private final boolean tcpNoDelay;
155-
private final boolean tcpKeepAlive;
156-
private final boolean tcpReusedAddress;
157-
private final int tcpSendBufferSize;
158-
private final int tcpReceiveBufferSize;
159-
160-
public RawChannelFactory(boolean tcpNoDelay, boolean tcpKeepAlive, boolean tcpReusedAddress, int tcpSendBufferSize,
161-
int tcpReceiveBufferSize) {
162-
this.tcpNoDelay = tcpNoDelay;
163-
this.tcpKeepAlive = tcpKeepAlive;
164-
this.tcpReusedAddress = tcpReusedAddress;
165-
this.tcpSendBufferSize = tcpSendBufferSize;
166-
this.tcpReceiveBufferSize = tcpReceiveBufferSize;
167-
}
168-
169-
SocketChannel openNioChannel(InetSocketAddress remoteAddress) throws IOException {
170-
SocketChannel socketChannel = SocketChannel.open();
171-
try {
172-
configureSocketChannel(socketChannel);
173-
connect(socketChannel, remoteAddress);
174-
} catch (IOException e) {
175-
closeRawChannel(socketChannel, e);
176-
throw e;
177-
}
178-
return socketChannel;
179-
}
180-
181-
SocketChannel acceptNioChannel(ServerChannelContext serverContext) throws IOException {
182-
ServerSocketChannel rawChannel = serverContext.getChannel().getRawChannel();
183-
assert rawChannel.isBlocking() == false;
184-
SocketChannel socketChannel = accept(rawChannel);
185-
assert rawChannel.isBlocking() == false;
186-
if (socketChannel == null) {
187-
return null;
188-
}
189-
try {
190-
configureSocketChannel(socketChannel);
191-
} catch (IOException e) {
192-
closeRawChannel(socketChannel, e);
193-
throw e;
194-
}
195-
return socketChannel;
196-
}
197-
198-
ServerSocketChannel openNioServerSocketChannel(InetSocketAddress address) throws IOException {
199-
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
200-
serverSocketChannel.configureBlocking(false);
201-
java.net.ServerSocket socket = serverSocketChannel.socket();
202-
try {
203-
socket.setReuseAddress(tcpReusedAddress);
204-
serverSocketChannel.bind(address);
205-
} catch (IOException e) {
206-
closeRawChannel(serverSocketChannel, e);
207-
throw e;
208-
}
209-
return serverSocketChannel;
210-
}
211-
212-
private static final boolean MAC_OS_X = System.getProperty("os.name").startsWith("Mac OS X");
213-
214-
private static void setSocketOption(CheckedRunnable<SocketException> runnable) throws SocketException {
215-
try {
216-
runnable.run();
217-
} catch (SocketException e) {
218-
if (MAC_OS_X == false) {
219-
// ignore on Mac, see https://github.com/elastic/elasticsearch/issues/41071
220-
throw e;
221-
}
222-
}
223-
}
184+
private Config.Socket createSocketConfig(InetSocketAddress remoteAddress, boolean isAccepted) {
185+
return new Config.Socket(tcpNoDelay, tcpKeepAlive, tcpReuseAddress, tcpSendBufferSize, tcpReceiveBufferSize, remoteAddress,
186+
isAccepted);
187+
}
224188

225-
private void configureSocketChannel(SocketChannel channel) throws IOException {
226-
channel.configureBlocking(false);
227-
java.net.Socket socket = channel.socket();
228-
setSocketOption(() -> socket.setTcpNoDelay(tcpNoDelay));
229-
setSocketOption(() -> socket.setKeepAlive(tcpKeepAlive));
230-
setSocketOption(() -> socket.setReuseAddress(tcpReusedAddress));
231-
if (tcpSendBufferSize > 0) {
232-
setSocketOption(() -> socket.setSendBufferSize(tcpSendBufferSize));
233-
}
234-
if (tcpReceiveBufferSize > 0) {
235-
setSocketOption(() -> socket.setSendBufferSize(tcpReceiveBufferSize));
236-
}
237-
}
189+
public static class RawChannelFactory {
238190

239-
public static SocketChannel accept(ServerSocketChannel serverSocketChannel) throws IOException {
240-
try {
241-
return AccessController.doPrivileged((PrivilegedExceptionAction<SocketChannel>) serverSocketChannel::accept);
242-
} catch (PrivilegedActionException e) {
243-
throw (IOException) e.getCause();
244-
}
191+
SocketChannel openNioChannel() throws IOException {
192+
return SocketChannel.open();
245193
}
246194

247-
private static void connect(SocketChannel socketChannel, InetSocketAddress remoteAddress) throws IOException {
248-
try {
249-
AccessController.doPrivileged((PrivilegedExceptionAction<Boolean>) () -> socketChannel.connect(remoteAddress));
250-
} catch (PrivilegedActionException e) {
251-
throw (IOException) e.getCause();
252-
}
195+
ServerSocketChannel openNioServerSocketChannel() throws IOException {
196+
return ServerSocketChannel.open();
253197
}
254198
}
255199
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.nio;
21+
22+
import java.net.InetSocketAddress;
23+
24+
public abstract class Config {
25+
26+
private final boolean tcpReuseAddress;
27+
28+
public Config(boolean tcpReuseAddress) {
29+
this.tcpReuseAddress = tcpReuseAddress;
30+
}
31+
32+
public boolean tcpReuseAddress() {
33+
return tcpReuseAddress;
34+
}
35+
36+
public static class Socket extends Config {
37+
38+
private final boolean tcpNoDelay;
39+
private final boolean tcpKeepAlive;
40+
private final int tcpSendBufferSize;
41+
private final int tcpReceiveBufferSize;
42+
private final InetSocketAddress remoteAddress;
43+
private final boolean isAccepted;
44+
45+
public Socket(boolean tcpNoDelay, boolean tcpKeepAlive, boolean tcpReuseAddress, int tcpSendBufferSize, int tcpReceiveBufferSize,
46+
InetSocketAddress remoteAddress, boolean isAccepted) {
47+
super(tcpReuseAddress);
48+
this.tcpNoDelay = tcpNoDelay;
49+
this.tcpKeepAlive = tcpKeepAlive;
50+
this.tcpSendBufferSize = tcpSendBufferSize;
51+
this.tcpReceiveBufferSize = tcpReceiveBufferSize;
52+
this.remoteAddress = remoteAddress;
53+
this.isAccepted = isAccepted;
54+
}
55+
56+
public boolean tcpNoDelay() {
57+
return tcpNoDelay;
58+
}
59+
60+
public boolean tcpKeepAlive() {
61+
return tcpKeepAlive;
62+
}
63+
64+
public int tcpSendBufferSize() {
65+
return tcpSendBufferSize;
66+
}
67+
68+
public int tcpReceiveBufferSize() {
69+
return tcpReceiveBufferSize;
70+
}
71+
72+
public boolean isAccepted() {
73+
return isAccepted;
74+
}
75+
76+
public InetSocketAddress getRemoteAddress() {
77+
return remoteAddress;
78+
}
79+
}
80+
81+
public static class ServerSocket extends Config {
82+
83+
private InetSocketAddress localAddress;
84+
85+
public ServerSocket(boolean tcpReuseAddress, InetSocketAddress localAddress) {
86+
super(tcpReuseAddress);
87+
this.localAddress = localAddress;
88+
}
89+
90+
public InetSocketAddress getLocalAddress() {
91+
return localAddress;
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)