Skip to content

Add log warnings for long running event handling #39729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ protected void writeException(SocketChannelContext context, Exception exception)
context.handleException(exception);
}

/**
* This method is called when a task or listener attached to a channel is available to run.
*
* @param task to handle
*/
protected void handleTask(Runnable task) {
task.run();
}

/**
* This method is called when a task or listener attached to a channel operation throws an exception.
*
Expand All @@ -165,7 +174,11 @@ protected void taskException(Exception exception) {
*/
protected void postHandling(SocketChannelContext context) {
if (context.selectorShouldClose()) {
handleClose(context);
try {
handleClose(context);
} catch (IOException e) {
closeException(context, e);
}
} else {
SelectionKey selectionKey = context.getSelectionKey();
boolean currentlyWriteInterested = SelectionKeyUtils.isWriteInterested(selectionKey);
Expand Down Expand Up @@ -203,34 +216,30 @@ protected void uncaughtException(Exception exception) {
*
* @param context that should be closed
*/
protected void handleClose(ChannelContext<?> context) {
try {
context.closeFromSelector();
} catch (IOException e) {
closeException(context, e);
}
protected void handleClose(ChannelContext<?> context) throws IOException {
context.closeFromSelector();
assert context.isOpen() == false : "Should always be done as we are on the selector thread";
}

/**
* This method is called when an attempt to close a channel throws an exception.
*
* @param channel that was being closed
* @param context that was being closed
* @param exception that occurred
*/
protected void closeException(ChannelContext<?> channel, Exception exception) {
channel.handleException(exception);
protected void closeException(ChannelContext<?> context, Exception exception) {
context.handleException(exception);
}

/**
* This method is called when handling an event from a channel fails due to an unexpected exception.
* An example would be if checking ready ops on a {@link java.nio.channels.SelectionKey} threw
* {@link java.nio.channels.CancelledKeyException}.
*
* @param channel that caused the exception
* @param context that caused the exception
* @param exception that was thrown
*/
protected void genericChannelException(ChannelContext<?> channel, Exception exception) {
channel.handleException(exception);
protected void genericChannelException(ChannelContext<?> context, Exception exception) {
context.handleException(exception);
}
}
32 changes: 16 additions & 16 deletions libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,15 @@ void preSelect() {
private void handleScheduledTasks(long nanoTime) {
Runnable task;
while ((task = taskScheduler.pollTask(nanoTime)) != null) {
try {
task.run();
} catch (Exception e) {
eventHandler.taskException(e);
}
handleTask(task);
}
}

private void handleTask(Runnable task) {
try {
eventHandler.handleTask(task);
} catch (Exception e) {
eventHandler.taskException(e);
}
}

Expand Down Expand Up @@ -353,11 +357,7 @@ public void writeToChannel(WriteOperation writeOperation) {
*/
public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
assertOnSelectorThread();
try {
listener.accept(value, null);
} catch (Exception e) {
eventHandler.taskException(e);
}
handleTask(() -> listener.accept(value, null));
}

/**
Expand All @@ -369,11 +369,7 @@ public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
*/
public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Exception exception) {
assertOnSelectorThread();
try {
listener.accept(null, exception);
} catch (Exception e) {
eventHandler.taskException(e);
}
handleTask(() -> listener.accept(null, exception));
}

private void cleanupPendingWrites() {
Expand Down Expand Up @@ -437,7 +433,11 @@ private void setUpNewChannels() {
private void closePendingChannels() {
ChannelContext<?> channelContext;
while ((channelContext = channelsToClose.poll()) != null) {
eventHandler.handleClose(channelContext);
try {
eventHandler.handleClose(channelContext);
} catch (Exception e) {
eventHandler.closeException(channelContext, e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static org.mockito.Matchers.same;
Expand Down Expand Up @@ -243,10 +244,16 @@ public void testPostHandlingWillRemoveWriteIfNecessary() throws IOException {
assertEquals(SelectionKey.OP_READ, key.interestOps());
}

public void testListenerExceptionCallsGenericExceptionHandler() throws IOException {
RuntimeException listenerException = new RuntimeException();
handler.taskException(listenerException);
verify(genericExceptionHandler).accept(listenerException);
public void testHandleTaskWillRunTask() throws Exception {
AtomicBoolean isRun = new AtomicBoolean(false);
handler.handleTask(() -> isRun.set(true));
assertTrue(isRun.get());
}

public void testTaskExceptionWillCallExceptionHandler() throws Exception {
RuntimeException exception = new RuntimeException();
handler.taskException(exception);
verify(genericExceptionHandler).accept(exception);
}

private class DoNotRegisterSocketContext extends BytesChannelContext {
Expand Down
64 changes: 43 additions & 21 deletions libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -86,6 +87,10 @@ public void setUp() throws Exception {
when(serverChannelContext.isOpen()).thenReturn(true);
when(serverChannelContext.getSelector()).thenReturn(selector);
when(serverChannelContext.getSelectionKey()).thenReturn(selectionKey);
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArguments()[0]).run();
return null;
}).when(eventHandler).handleTask(any());
}

@SuppressWarnings({"unchecked", "rawtypes"})
Expand All @@ -102,6 +107,23 @@ public void testQueueChannelForClosed() throws IOException {
verify(eventHandler).handleClose(context);
}

@SuppressWarnings({"unchecked", "rawtypes"})
public void testCloseException() throws IOException {
IOException ioException = new IOException();
NioChannel channel = mock(NioChannel.class);
ChannelContext context = mock(ChannelContext.class);
when(channel.getContext()).thenReturn(context);
when(context.getSelector()).thenReturn(selector);

selector.queueChannelClose(channel);

doThrow(ioException).when(eventHandler).handleClose(context);

selector.singleLoop();

verify(eventHandler).closeException(context, ioException);
}

public void testNioDelayedTasksAreExecuted() throws IOException {
AtomicBoolean isRun = new AtomicBoolean(false);
long nanoTime = System.nanoTime() - 1;
Expand All @@ -113,9 +135,27 @@ public void testNioDelayedTasksAreExecuted() throws IOException {
assertTrue(isRun.get());
}

public void testTaskExceptionsAreHandled() {
RuntimeException taskException = new RuntimeException();
long nanoTime = System.nanoTime() - 1;
Runnable task = () -> {
throw taskException;
};
selector.getTaskScheduler().scheduleAtRelativeTime(task, nanoTime);

doAnswer((a) -> {
task.run();
return null;
}).when(eventHandler).handleTask(same(task));

selector.singleLoop();
verify(eventHandler).taskException(taskException);
}

public void testDefaultSelectorTimeoutIsUsedIfNoTaskSooner() throws IOException {
long delay = new TimeValue(15, TimeUnit.MINUTES).nanos();
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {
}, System.nanoTime() + delay);

selector.singleLoop();
verify(rawSelector).select(300);
Expand All @@ -127,7 +167,8 @@ public void testSelectorTimeoutWillBeReducedIfTaskSooner() throws Exception {
assertBusy(() -> {
ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
long delay = new TimeValue(50, TimeUnit.MILLISECONDS).nanos();
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {
}, System.nanoTime() + delay);
selector.singleLoop();
verify(rawSelector).select(captor.capture());
assertTrue(captor.getValue() > 0);
Expand Down Expand Up @@ -455,23 +496,4 @@ public void testCleanup() throws Exception {
verify(eventHandler).handleClose(channelContext);
verify(eventHandler).handleClose(unregisteredContext);
}

public void testExecuteListenerWillHandleException() throws Exception {
RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).accept(null, null);

selector.executeListener(listener, null);

verify(eventHandler).taskException(exception);
}

public void testExecuteFailedListenerWillHandleException() throws Exception {
IOException ioException = new IOException();
RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).accept(null, ioException);

selector.executeFailedListener(listener, ioException);

verify(eventHandler).taskException(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected void doStart() {
boolean success = false;
try {
nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
(s) -> new TestingSocketEventHandler(this::onNonChannelException, s));
(s) -> new TestEventHandler(this::onNonChannelException, s, System::nanoTime));

ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
clientChannelFactory = new MockTcpChannelFactory(true, clientProfileSettings, "client");
Expand Down
Loading