Skip to content

Commit c375d5a

Browse files
authored
Add nio transport to security plugin (#31942)
This is related to #27260. It adds the SecurityNioTransport to the security plugin. Additionally, it adds support for ip filtering. And it randomly uses the nio transport in security integration tests.
1 parent 334c255 commit c375d5a

File tree

13 files changed

+199
-56
lines changed

13 files changed

+199
-56
lines changed

libs/nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,19 @@
2121

2222
import java.io.IOException;
2323
import java.util.function.Consumer;
24+
import java.util.function.Predicate;
2425

2526
public class BytesChannelContext extends SocketChannelContext {
2627

2728
public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
2829
ReadWriteHandler handler, InboundChannelBuffer channelBuffer) {
29-
super(channel, selector, exceptionHandler, handler, channelBuffer);
30+
this(channel, selector, exceptionHandler, handler, channelBuffer, ALWAYS_ALLOW_CHANNEL);
31+
}
32+
33+
public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
34+
ReadWriteHandler handler, InboundChannelBuffer channelBuffer,
35+
Predicate<NioSocketChannel> allowChannelPredicate) {
36+
super(channel, selector, exceptionHandler, handler, channelBuffer, allowChannelPredicate);
3037
}
3138

3239
@Override
@@ -77,7 +84,7 @@ public void closeChannel() {
7784

7885
@Override
7986
public boolean selectorShouldClose() {
80-
return isPeerClosed() || hasIOException() || isClosing.get();
87+
return closeNow() || isClosing.get();
8188
}
8289

8390
/**

libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ public abstract class ChannelContext<S extends SelectableChannel & NetworkChanne
4747
}
4848

4949
protected void register() throws IOException {
50+
doSelectorRegister();
51+
}
52+
53+
// Package private for testing
54+
void doSelectorRegister() throws IOException {
5055
setSelectionKey(rawChannel.register(getSelector().rawSelector(), 0));
5156
}
5257

libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.atomic.AtomicBoolean;
3232
import java.util.function.BiConsumer;
3333
import java.util.function.Consumer;
34+
import java.util.function.Predicate;
3435

3536
/**
3637
* This context should implement the specific logic for a channel. When a channel receives a notification
@@ -43,24 +44,28 @@
4344
*/
4445
public abstract class SocketChannelContext extends ChannelContext<SocketChannel> {
4546

47+
public static final Predicate<NioSocketChannel> ALWAYS_ALLOW_CHANNEL = (c) -> true;
48+
4649
protected final NioSocketChannel channel;
4750
protected final InboundChannelBuffer channelBuffer;
4851
protected final AtomicBoolean isClosing = new AtomicBoolean(false);
4952
private final ReadWriteHandler readWriteHandler;
53+
private final Predicate<NioSocketChannel> allowChannelPredicate;
5054
private final NioSelector selector;
5155
private final CompletableContext<Void> connectContext = new CompletableContext<>();
5256
private final LinkedList<FlushOperation> pendingFlushes = new LinkedList<>();
53-
private boolean ioException;
54-
private boolean peerClosed;
57+
private boolean closeNow;
5558
private Exception connectException;
5659

5760
protected SocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
58-
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
61+
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer,
62+
Predicate<NioSocketChannel> allowChannelPredicate) {
5963
super(channel.getRawChannel(), exceptionHandler);
6064
this.selector = selector;
6165
this.channel = channel;
6266
this.readWriteHandler = readWriteHandler;
6367
this.channelBuffer = channelBuffer;
68+
this.allowChannelPredicate = allowChannelPredicate;
6469
}
6570

6671
@Override
@@ -161,6 +166,14 @@ protected FlushOperation getPendingFlush() {
161166
return pendingFlushes.peekFirst();
162167
}
163168

169+
@Override
170+
protected void register() throws IOException {
171+
super.register();
172+
if (allowChannelPredicate.test(channel) == false) {
173+
closeNow = true;
174+
}
175+
}
176+
164177
@Override
165178
public void closeFromSelector() throws IOException {
166179
getSelector().assertOnSelectorThread();
@@ -217,24 +230,20 @@ public boolean readyForFlush() {
217230
*/
218231
public abstract boolean selectorShouldClose();
219232

220-
protected boolean hasIOException() {
221-
return ioException;
222-
}
223-
224-
protected boolean isPeerClosed() {
225-
return peerClosed;
233+
protected boolean closeNow() {
234+
return closeNow;
226235
}
227236

228237
protected int readFromChannel(ByteBuffer buffer) throws IOException {
229238
try {
230239
int bytesRead = rawChannel.read(buffer);
231240
if (bytesRead < 0) {
232-
peerClosed = true;
241+
closeNow = true;
233242
bytesRead = 0;
234243
}
235244
return bytesRead;
236245
} catch (IOException e) {
237-
ioException = true;
246+
closeNow = true;
238247
throw e;
239248
}
240249
}
@@ -243,12 +252,12 @@ protected int readFromChannel(ByteBuffer[] buffers) throws IOException {
243252
try {
244253
int bytesRead = (int) rawChannel.read(buffers);
245254
if (bytesRead < 0) {
246-
peerClosed = true;
255+
closeNow = true;
247256
bytesRead = 0;
248257
}
249258
return bytesRead;
250259
} catch (IOException e) {
251-
ioException = true;
260+
closeNow = true;
252261
throw e;
253262
}
254263
}
@@ -257,7 +266,7 @@ protected int flushToChannel(ByteBuffer buffer) throws IOException {
257266
try {
258267
return rawChannel.write(buffer);
259268
} catch (IOException e) {
260-
ioException = true;
269+
closeNow = true;
261270
throw e;
262271
}
263272
}
@@ -266,7 +275,7 @@ protected int flushToChannel(ByteBuffer[] buffers) throws IOException {
266275
try {
267276
return (int) rawChannel.write(buffers);
268277
} catch (IOException e) {
269-
ioException = true;
278+
closeNow = true;
270279
throw e;
271280
}
272281
}

libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.atomic.AtomicReference;
3434
import java.util.function.BiConsumer;
3535
import java.util.function.Consumer;
36+
import java.util.function.Predicate;
3637
import java.util.function.Supplier;
3738

3839
import static org.mockito.Matchers.any;
@@ -77,23 +78,39 @@ public void testIOExceptionSetIfEncountered() throws IOException {
7778
when(rawChannel.write(any(ByteBuffer.class))).thenThrow(new IOException());
7879
when(rawChannel.read(any(ByteBuffer[].class), anyInt(), anyInt())).thenThrow(new IOException());
7980
when(rawChannel.read(any(ByteBuffer.class))).thenThrow(new IOException());
80-
assertFalse(context.hasIOException());
81+
assertFalse(context.closeNow());
8182
expectThrows(IOException.class, () -> {
8283
if (randomBoolean()) {
8384
context.read();
8485
} else {
8586
context.flushChannel();
8687
}
8788
});
88-
assertTrue(context.hasIOException());
89+
assertTrue(context.closeNow());
8990
}
9091

9192
public void testSignalWhenPeerClosed() throws IOException {
9293
when(rawChannel.read(any(ByteBuffer[].class), anyInt(), anyInt())).thenReturn(-1L);
9394
when(rawChannel.read(any(ByteBuffer.class))).thenReturn(-1);
94-
assertFalse(context.isPeerClosed());
95+
assertFalse(context.closeNow());
9596
context.read();
96-
assertTrue(context.isPeerClosed());
97+
assertTrue(context.closeNow());
98+
}
99+
100+
public void testValidateInRegisterCanSucceed() throws IOException {
101+
InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
102+
context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, (c) -> true);
103+
assertFalse(context.closeNow());
104+
context.register();
105+
assertFalse(context.closeNow());
106+
}
107+
108+
public void testValidateInRegisterCanFail() throws IOException {
109+
InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
110+
context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, (c) -> false);
111+
assertFalse(context.closeNow());
112+
context.register();
113+
assertTrue(context.closeNow());
97114
}
98115

99116
public void testConnectSucceeds() throws IOException {
@@ -277,7 +294,13 @@ private static class TestSocketChannelContext extends SocketChannelContext {
277294

278295
private TestSocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
279296
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
280-
super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
297+
this(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, ALWAYS_ALLOW_CHANNEL);
298+
}
299+
300+
private TestSocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
301+
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer,
302+
Predicate<NioSocketChannel> allowChannelPredicate) {
303+
super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, allowChannelPredicate);
281304
}
282305

283306
@Override
@@ -309,6 +332,11 @@ public boolean selectorShouldClose() {
309332
public void closeChannel() {
310333
isClosing.set(true);
311334
}
335+
336+
@Override
337+
void doSelectorRegister() {
338+
// We do not want to call the actual register with selector method as it will throw a NPE
339+
}
312340
}
313341

314342
private static byte[] createMessage(int length) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityField.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
public final class SecurityField {
1414

1515
public static final String NAME4 = XPackField.SECURITY + "4";
16+
public static final String NIO = XPackField.SECURITY + "-nio";
1617
public static final Setting<Optional<String>> USER_SETTING =
1718
new Setting<>(setting("user"), (String) null, Optional::ofNullable, Setting.Property.NodeScope);
1819

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecuritySettings.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ public static Settings addTransportSettings(final Settings settings) {
1919
final Settings.Builder builder = Settings.builder();
2020
if (NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings)) {
2121
final String transportType = NetworkModule.TRANSPORT_TYPE_SETTING.get(settings);
22-
if (SecurityField.NAME4.equals(transportType) == false) {
22+
if (SecurityField.NAME4.equals(transportType) == false && SecurityField.NIO.equals(transportType) == false) {
2323
throw new IllegalArgumentException("transport type setting [" + NetworkModule.TRANSPORT_TYPE_KEY
24-
+ "] must be [" + SecurityField.NAME4 + "] but is [" + transportType + "]");
24+
+ "] must be [" + SecurityField.NAME4 + "] or [" + SecurityField.NIO + "]" + " but is ["
25+
+ transportType + "]");
2526
}
2627
} else {
2728
// default to security4
@@ -39,7 +40,7 @@ public static Settings addUserSettings(final Settings settings) {
3940
final int i = userSetting.indexOf(":");
4041
if (i < 0 || i == userSetting.length() - 1) {
4142
throw new IllegalArgumentException("invalid [" + SecurityField.USER_SETTING.getKey()
42-
+ "] setting. must be in the form of \"<username>:<password>\"");
43+
+ "] setting. must be in the form of \"<username>:<password>\"");
4344
}
4445
String username = userSetting.substring(0, i);
4546
String password = userSetting.substring(i + 1);

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@
203203
import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport;
204204
import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport;
205205
import org.elasticsearch.xpack.core.template.TemplateUtils;
206+
import org.elasticsearch.xpack.security.transport.nio.SecurityNioTransport;
206207
import org.joda.time.DateTime;
207208
import org.joda.time.DateTimeZone;
208209

@@ -846,8 +847,14 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
846847
if (transportClientMode || enabled == false) { // don't register anything if we are not enabled, or in transport client mode
847848
return Collections.emptyMap();
848849
}
849-
return Collections.singletonMap(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, threadPool,
850-
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
850+
851+
Map<String, Supplier<Transport>> transports = new HashMap<>();
852+
transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, threadPool,
853+
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
854+
transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, threadPool,
855+
networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService()));
856+
857+
return Collections.unmodifiableMap(transports);
851858
}
852859

853860
@Override

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.io.IOException;
1818
import java.util.function.BiConsumer;
1919
import java.util.function.Consumer;
20+
import java.util.function.Predicate;
2021

2122
/**
2223
* Provides a TLS/SSL read/write layer over a channel. This context will use a {@link SSLDriver} to handshake
@@ -30,7 +31,13 @@ public final class SSLChannelContext extends SocketChannelContext {
3031

3132
SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler, SSLDriver sslDriver,
3233
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
33-
super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
34+
this(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer, ALWAYS_ALLOW_CHANNEL);
35+
}
36+
37+
SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler, SSLDriver sslDriver,
38+
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer,
39+
Predicate<NioSocketChannel> allowChannelPredicate) {
40+
super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, allowChannelPredicate);
3441
this.sslDriver = sslDriver;
3542
}
3643

@@ -52,7 +59,7 @@ public void queueWriteOperation(WriteOperation writeOperation) {
5259

5360
@Override
5461
public void flushChannel() throws IOException {
55-
if (hasIOException()) {
62+
if (closeNow()) {
5663
return;
5764
}
5865
// If there is currently data in the outbound write buffer, flush the buffer.
@@ -116,7 +123,7 @@ public boolean readyForFlush() {
116123
@Override
117124
public int read() throws IOException {
118125
int bytesRead = 0;
119-
if (hasIOException()) {
126+
if (closeNow()) {
120127
return bytesRead;
121128
}
122129
bytesRead = readFromChannel(sslDriver.getNetworkReadBuffer());
@@ -133,7 +140,7 @@ public int read() throws IOException {
133140

134141
@Override
135142
public boolean selectorShouldClose() {
136-
return isPeerClosed() || hasIOException() || sslDriver.isClosed();
143+
return closeNow() || sslDriver.isClosed();
137144
}
138145

139146
@Override

0 commit comments

Comments
 (0)