Skip to content

Commit b75003f

Browse files
authored
Isolate nio channel registered from channel active (#44388)
Registering a channel with a selector is a required operation for the channel to be handled properly. Currently, we mix the registeration with other setup operations (ip filtering, SSL initiation, etc). However, a fail to register is fatal. This PR modifies how registeration occurs to immediately close the channel if it fails. There are still two clear loopholes for how a user can interact with a channel even if registration fails. 1. through the exception handler. 2. through the channel accepted callback. These can perhaps be improved in the future. For now, this PR prevents writes from proceeding if the channel is not registered.
1 parent 966440d commit b75003f

File tree

18 files changed

+198
-107
lines changed

18 files changed

+198
-107
lines changed

libs/nio/src/main/java/org/elasticsearch/nio/BytesWriteHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public WriteOperation createWriteOperation(SocketChannelContext context, Object
3535
}
3636

3737
@Override
38-
public void channelRegistered() {}
38+
public void channelActive() {}
3939

4040
@Override
4141
public List<FlushOperation> writeToBytes(WriteOperation writeOperation) {

libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,19 @@ protected void register() throws IOException {
5050
doSelectorRegister();
5151
}
5252

53+
protected void channelActive() throws IOException {}
54+
5355
// Package private for testing
5456
void doSelectorRegister() throws IOException {
55-
setSelectionKey(rawChannel.register(getSelector().rawSelector(), 0));
57+
setSelectionKey(rawChannel.register(getSelector().rawSelector(), 0, this));
5658
}
5759

58-
SelectionKey getSelectionKey() {
60+
protected SelectionKey getSelectionKey() {
5961
return selectionKey;
6062
}
6163

62-
// Protected for tests
63-
protected void setSelectionKey(SelectionKey selectionKey) {
64+
// public for tests
65+
public void setSelectionKey(SelectionKey selectionKey) {
6466
this.selectionKey = selectionKey;
6567
}
6668

libs/nio/src/main/java/org/elasticsearch/nio/DelegatingHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ public DelegatingHandler(NioChannelHandler delegate) {
3232
}
3333

3434
@Override
35-
public void channelRegistered() {
36-
this.delegate.channelRegistered();
35+
public void channelActive() {
36+
this.delegate.channelActive();
3737
}
3838

3939
@Override

libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java

+26-12
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,28 @@ protected void acceptException(ServerChannelContext context, Exception exception
6363
*/
6464
protected void handleRegistration(ChannelContext<?> context) throws IOException {
6565
context.register();
66-
SelectionKey selectionKey = context.getSelectionKey();
67-
selectionKey.attach(context);
66+
assert context.getSelectionKey() != null : "SelectionKey should not be null after registration";
67+
assert context.getSelectionKey().attachment() != null : "Attachment should not be null after registration";
68+
}
69+
70+
/**
71+
* This method is called when an attempt to register a channel throws an exception.
72+
*
73+
* @param context that was registered
74+
* @param exception that occurred
75+
*/
76+
protected void registrationException(ChannelContext<?> context, Exception exception) {
77+
context.handleException(exception);
78+
}
79+
80+
/**
81+
* This method is called after a NioChannel is active with the selector. It should only be called once
82+
* per channel.
83+
*
84+
* @param context that was marked active
85+
*/
86+
protected void handleActive(ChannelContext<?> context) throws IOException {
87+
context.channelActive();
6888
if (context instanceof SocketChannelContext) {
6989
if (((SocketChannelContext) context).readyForFlush()) {
7090
SelectionKeyUtils.setConnectReadAndWriteInterested(context.getSelectionKey());
@@ -78,12 +98,12 @@ protected void handleRegistration(ChannelContext<?> context) throws IOException
7898
}
7999

80100
/**
81-
* This method is called when an attempt to register a channel throws an exception.
101+
* This method is called when setting a channel to active throws an exception.
82102
*
83-
* @param context that was registered
103+
* @param context that was marked active
84104
* @param exception that occurred
85105
*/
86-
protected void registrationException(ChannelContext<?> context, Exception exception) {
106+
protected void activeException(ChannelContext<?> context, Exception exception) {
87107
context.handleException(exception);
88108
}
89109

@@ -180,15 +200,9 @@ protected void postHandling(SocketChannelContext context) {
180200
closeException(context, e);
181201
}
182202
} else {
183-
boolean pendingWrites = context.readyForFlush();
184203
SelectionKey selectionKey = context.getSelectionKey();
185-
if (selectionKey == null) {
186-
if (pendingWrites) {
187-
writeException(context, new IllegalStateException("Tried to write to an not yet registered channel"));
188-
}
189-
return;
190-
}
191204
boolean currentlyWriteInterested = SelectionKeyUtils.isWriteInterested(selectionKey);
205+
boolean pendingWrites = context.readyForFlush();
192206
if (currentlyWriteInterested == false && pendingWrites) {
193207
SelectionKeyUtils.setWriteInterested(selectionKey);
194208
} else if (currentlyWriteInterested && pendingWrites == false) {

libs/nio/src/main/java/org/elasticsearch/nio/NioChannelHandler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
public interface NioChannelHandler {
3030

3131
/**
32-
* This method is called when the channel is registered with its selector.
32+
* This method is called when the channel is active for use.
3333
*/
34-
void channelRegistered();
34+
void channelActive();
3535

3636
/**
3737
* This method is called when a message is queued with a channel. It can be called from any thread.

libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java

+34-18
Original file line numberDiff line numberDiff line change
@@ -340,21 +340,30 @@ public void scheduleForRegistration(NioChannel channel) {
340340
private void writeToChannel(WriteOperation writeOperation) {
341341
assertOnSelectorThread();
342342
SocketChannelContext context = writeOperation.getChannel();
343-
// If the channel does not currently have anything that is ready to flush, we should flush after
344-
// the write operation is queued.
345-
boolean shouldFlushAfterQueuing = context.readyForFlush() == false;
346-
try {
347-
context.queueWriteOperation(writeOperation);
348-
} catch (Exception e) {
349-
shouldFlushAfterQueuing = false;
350-
executeFailedListener(writeOperation.getListener(), e);
351-
}
352343

353-
if (shouldFlushAfterQueuing) {
354-
if (context.selectorShouldClose() == false) {
355-
handleWrite(context);
344+
if (context.isOpen() == false) {
345+
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
346+
} else if (context.getSelectionKey() == null) {
347+
// This should very rarely happen. The only times a channel is exposed outside the event loop,
348+
// but might not registered is through the exception handler and channel accepted callbacks.
349+
executeFailedListener(writeOperation.getListener(), new IllegalStateException("Channel not registered"));
350+
} else {
351+
// If the channel does not currently have anything that is ready to flush, we should flush after
352+
// the write operation is queued.
353+
boolean shouldFlushAfterQueuing = context.readyForFlush() == false;
354+
try {
355+
context.queueWriteOperation(writeOperation);
356+
} catch (Exception e) {
357+
shouldFlushAfterQueuing = false;
358+
executeFailedListener(writeOperation.getListener(), e);
359+
}
360+
361+
if (shouldFlushAfterQueuing) {
362+
if (context.selectorShouldClose() == false) {
363+
handleWrite(context);
364+
}
365+
eventHandler.postHandling(context);
356366
}
357-
eventHandler.postHandling(context);
358367
}
359368
}
360369

@@ -435,14 +444,25 @@ private void registerChannel(ChannelContext<?> newChannel) {
435444
try {
436445
if (newChannel.isOpen()) {
437446
eventHandler.handleRegistration(newChannel);
447+
channelActive(newChannel);
438448
if (newChannel instanceof SocketChannelContext) {
439449
attemptConnect((SocketChannelContext) newChannel, false);
440450
}
441451
} else {
442452
eventHandler.registrationException(newChannel, new ClosedChannelException());
453+
closeChannel(newChannel);
443454
}
444455
} catch (Exception e) {
445456
eventHandler.registrationException(newChannel, e);
457+
closeChannel(newChannel);
458+
}
459+
}
460+
461+
private void channelActive(ChannelContext<?> newChannel) {
462+
try {
463+
eventHandler.handleActive(newChannel);
464+
} catch (IOException e) {
465+
eventHandler.activeException(newChannel, e);
446466
}
447467
}
448468

@@ -464,11 +484,7 @@ private void closeChannel(final ChannelContext<?> channelContext) {
464484
private void handleQueuedWrites() {
465485
WriteOperation writeOperation;
466486
while ((writeOperation = queuedWrites.poll()) != null) {
467-
if (writeOperation.getChannel().isOpen()) {
468-
writeToChannel(writeOperation);
469-
} else {
470-
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
471-
}
487+
writeToChannel(writeOperation);
472488
}
473489
}
474490

libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,8 @@ protected FlushOperation getPendingFlush() {
156156
}
157157

158158
@Override
159-
protected void register() throws IOException {
160-
super.register();
161-
readWriteHandler.channelRegistered();
159+
protected void channelActive() throws IOException {
160+
readWriteHandler.channelActive();
162161
}
163162

164163
@Override

libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java

+17-20
Original file line numberDiff line numberDiff line change
@@ -81,32 +81,25 @@ public void setUpHandler() throws IOException {
8181
}
8282

8383
public void testRegisterCallsContext() throws IOException {
84-
NioSocketChannel channel = mock(NioSocketChannel.class);
85-
SocketChannelContext channelContext = mock(SocketChannelContext.class);
86-
when(channel.getContext()).thenReturn(channelContext);
87-
when(channelContext.getSelectionKey()).thenReturn(new TestSelectionKey(0));
84+
ChannelContext<?> channelContext = randomBoolean() ? mock(SocketChannelContext.class) : mock(ServerChannelContext.class);
85+
TestSelectionKey attachment = new TestSelectionKey(0);
86+
when(channelContext.getSelectionKey()).thenReturn(attachment);
87+
attachment.attach(channelContext);
8888
handler.handleRegistration(channelContext);
8989
verify(channelContext).register();
9090
}
9191

92-
public void testRegisterNonServerAddsOP_CONNECTAndOP_READInterest() throws IOException {
92+
public void testActiveNonServerAddsOP_CONNECTAndOP_READInterest() throws IOException {
9393
SocketChannelContext context = mock(SocketChannelContext.class);
9494
when(context.getSelectionKey()).thenReturn(new TestSelectionKey(0));
95-
handler.handleRegistration(context);
95+
handler.handleActive(context);
9696
assertEquals(SelectionKey.OP_READ | SelectionKey.OP_CONNECT, context.getSelectionKey().interestOps());
9797
}
9898

99-
public void testRegisterAddsAttachment() throws IOException {
100-
ChannelContext<?> context = randomBoolean() ? mock(SocketChannelContext.class) : mock(ServerChannelContext.class);
101-
when(context.getSelectionKey()).thenReturn(new TestSelectionKey(0));
102-
handler.handleRegistration(context);
103-
assertEquals(context, context.getSelectionKey().attachment());
104-
}
105-
106-
public void testHandleServerRegisterSetsOP_ACCEPTInterest() throws IOException {
107-
assertNull(serverContext.getSelectionKey());
108-
109-
handler.handleRegistration(serverContext);
99+
public void testHandleServerActiveSetsOP_ACCEPTInterest() throws IOException {
100+
ServerChannelContext serverContext = mock(ServerChannelContext.class);
101+
when(serverContext.getSelectionKey()).thenReturn(new TestSelectionKey(0));
102+
handler.handleActive(serverContext);
110103

111104
assertEquals(SelectionKey.OP_ACCEPT, serverContext.getSelectionKey().interestOps());
112105
}
@@ -141,11 +134,11 @@ public void testAcceptExceptionCallsExceptionHandler() throws IOException {
141134
verify(serverChannelContext).handleException(exception);
142135
}
143136

144-
public void testRegisterWithPendingWritesAddsOP_CONNECTAndOP_READAndOP_WRITEInterest() throws IOException {
137+
public void testActiveWithPendingWritesAddsOP_CONNECTAndOP_READAndOP_WRITEInterest() throws IOException {
145138
FlushReadyWrite flushReadyWrite = mock(FlushReadyWrite.class);
146139
when(readWriteHandler.writeToBytes(flushReadyWrite)).thenReturn(Collections.singletonList(flushReadyWrite));
147140
context.queueWriteOperation(flushReadyWrite);
148-
handler.handleRegistration(context);
141+
handler.handleActive(context);
149142
assertEquals(SelectionKey.OP_READ | SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE, context.getSelectionKey().interestOps());
150143
}
151144

@@ -266,7 +259,9 @@ private class DoNotRegisterSocketContext extends BytesChannelContext {
266259

267260
@Override
268261
public void register() {
269-
setSelectionKey(new TestSelectionKey(0));
262+
TestSelectionKey selectionKey = new TestSelectionKey(0);
263+
setSelectionKey(selectionKey);
264+
selectionKey.attach(this);
270265
}
271266
}
272267

@@ -280,7 +275,9 @@ private class DoNotRegisterServerContext extends ServerChannelContext {
280275

281276
@Override
282277
public void register() {
278+
TestSelectionKey selectionKey = new TestSelectionKey(0);
283279
setSelectionKey(new TestSelectionKey(0));
280+
selectionKey.attach(this);
284281
}
285282
}
286283
}

libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java

+28
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ public void testRegisteredChannel() throws IOException {
212212
selector.preSelect();
213213

214214
verify(eventHandler).handleRegistration(serverChannelContext);
215+
verify(eventHandler).handleActive(serverChannelContext);
215216
}
216217

217218
public void testClosedServerChannelWillNotBeRegistered() {
@@ -230,7 +231,20 @@ public void testRegisterServerChannelFailsDueToException() throws Exception {
230231

231232
selector.preSelect();
232233

234+
verify(eventHandler, times(0)).handleActive(serverChannelContext);
233235
verify(eventHandler).registrationException(serverChannelContext, closedChannelException);
236+
verify(eventHandler).handleClose(serverChannelContext);
237+
}
238+
239+
public void testChannelActiveException() throws Exception {
240+
executeOnNewThread(() -> selector.scheduleForRegistration(serverChannel));
241+
IOException ioException = new IOException();
242+
doThrow(ioException).when(eventHandler).handleActive(serverChannelContext);
243+
244+
selector.preSelect();
245+
246+
verify(eventHandler).handleActive(serverChannelContext);
247+
verify(eventHandler).activeException(serverChannelContext, ioException);
234248
}
235249

236250
public void testClosedSocketChannelWillNotBeRegistered() throws Exception {
@@ -241,6 +255,7 @@ public void testClosedSocketChannelWillNotBeRegistered() throws Exception {
241255

242256
verify(eventHandler).registrationException(same(channelContext), any(ClosedChannelException.class));
243257
verify(eventHandler, times(0)).handleConnect(channelContext);
258+
verify(eventHandler).handleClose(channelContext);
244259
}
245260

246261
public void testRegisterSocketChannelFailsDueToException() throws InterruptedException {
@@ -253,7 +268,9 @@ public void testRegisterSocketChannelFailsDueToException() throws InterruptedExc
253268
selector.preSelect();
254269

255270
verify(eventHandler).registrationException(channelContext, closedChannelException);
271+
verify(eventHandler, times(0)).handleActive(serverChannelContext);
256272
verify(eventHandler, times(0)).handleConnect(channelContext);
273+
verify(eventHandler).handleClose(channelContext);
257274
});
258275
}
259276

@@ -313,6 +330,17 @@ public void testQueueWriteChannelIsClosed() throws Exception {
313330
verify(listener).accept(isNull(Void.class), any(ClosedChannelException.class));
314331
}
315332

333+
public void testQueueWriteChannelIsUnregistered() throws Exception {
334+
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
335+
336+
executeOnNewThread(() -> selector.queueWrite(writeOperation));
337+
when(channelContext.getSelectionKey()).thenReturn(null);
338+
selector.preSelect();
339+
340+
verify(channelContext, times(0)).queueWriteOperation(writeOperation);
341+
verify(listener).accept(isNull(Void.class), any(IllegalStateException.class));
342+
}
343+
316344
public void testQueueWriteSuccessful() throws Exception {
317345
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
318346
executeOnNewThread(() -> selector.queueWrite(writeOperation));

0 commit comments

Comments
 (0)