Skip to content

Propagate Errors in executors to uncaught exception handler #36137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 17, 2019
13 changes: 13 additions & 0 deletions buildSrc/src/main/resources/forbidden/es-all-signatures.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,16 @@ java.nio.channels.SocketChannel#connect(java.net.SocketAddress)
java.lang.Boolean#getBoolean(java.lang.String)

org.apache.lucene.util.IOUtils @ use @org.elasticsearch.core.internal.io instead

@defaultMessage use executors from org.elasticsearch.common.util.concurrent.EsExecutors instead which will properly bubble up Errors
java.util.concurrent.AbstractExecutorService#<init>()
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue)
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.ThreadFactory)
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.RejectedExecutionHandler)
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)

@defaultMessage extend org.elasticsearch.threadpool.Scheduler.SafeScheduledThreadPoolExecutor instead which will properly bubble up Errors
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.RejectedExecutionHandler)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@

package org.elasticsearch.threadpool;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand All @@ -42,30 +51,279 @@ public void setUpThreadPool() {
}

@After
public void tearDownThreadPool() throws InterruptedException {
public void tearDownThreadPool() {
terminate(threadPool);
}

public void testExecutionException() throws InterruptedException {
runExecutionExceptionTest(
() -> {
public void testExecutionErrorOnDefaultThreadPoolTypes() throws InterruptedException {
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
checkExecutionError(getExecuteRunner(threadPool.executor(executor)));
checkExecutionError(getSubmitRunner(threadPool.executor(executor)));
checkExecutionError(getScheduleRunner(executor));
}
}

public void testExecutionErrorOnDirectExecutorService() throws InterruptedException {
final ExecutorService directExecutorService = EsExecutors.newDirectExecutorService();
checkExecutionError(getExecuteRunner(directExecutorService));
checkExecutionError(getSubmitRunner(directExecutorService));
}

public void testExecutionErrorOnFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1,
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionError(getExecuteRunner(fixedExecutor));
checkExecutionError(getSubmitRunner(fixedExecutor));
} finally {
ThreadPool.terminate(fixedExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionErrorOnScalingESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor scalingExecutor = EsExecutors.newScaling("test", 1, 1,
10, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionError(getExecuteRunner(scalingExecutor));
checkExecutionError(getSubmitRunner(scalingExecutor));
} finally {
ThreadPool.terminate(scalingExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionErrorOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1,
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionError(getExecuteRunner(autoQueueFixedExecutor));
checkExecutionError(getSubmitRunner(autoQueueFixedExecutor));
} finally {
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
try {
checkExecutionError(getExecuteRunner(prioritizedExecutor));
checkExecutionError(getSubmitRunner(prioritizedExecutor));
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r));
} finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionErrorOnScheduler() throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
try {
checkExecutionError(getExecuteRunner(scheduler));
checkExecutionError(getSubmitRunner(scheduler));
checkExecutionError(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS));
} finally {
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
}
}

private void checkExecutionError(Consumer<Runnable> runner) throws InterruptedException {
logger.info("checking error for {}", runner);
final Runnable runnable;
if (randomBoolean()) {
runnable = () -> {
throw new Error("future error");
};
} else {
runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {

}

@Override
protected void doRun() {
throw new Error("future error");
},
true,
o -> {
assertTrue(o.isPresent());
assertThat(o.get(), instanceOf(Error.class));
assertThat(o.get(), hasToString(containsString("future error")));
});
runExecutionExceptionTest(
() -> {
}
};
}
runExecutionTest(
runner,
runnable,
true,
o -> {
assertTrue(o.isPresent());
assertThat(o.get(), instanceOf(Error.class));
assertThat(o.get(), hasToString(containsString("future error")));
});
}

public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException {
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
final boolean expectExceptionOnExecute =
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
checkExecutionException(getExecuteRunner(threadPool.executor(executor)), expectExceptionOnExecute);

// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
checkExecutionException(getSubmitRunner(threadPool.executor(executor)), false);

final boolean expectExceptionOnSchedule =
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE
// scheduler just swallows the exception here
// TODO: bubble these exceptions up
&& ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.DIRECT;
checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule);
}
}

public void testExecutionExceptionOnDirectExecutorService() throws InterruptedException {
final ExecutorService directExecutorService = EsExecutors.newDirectExecutorService();
checkExecutionException(getExecuteRunner(directExecutorService), true);
checkExecutionException(getSubmitRunner(directExecutorService), false);
}

public void testExecutionExceptionOnFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1,
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionException(getExecuteRunner(fixedExecutor), true);
checkExecutionException(getSubmitRunner(fixedExecutor), false);
} finally {
ThreadPool.terminate(fixedExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor scalingExecutor = EsExecutors.newScaling("test", 1, 1,
10, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
checkExecutionException(getExecuteRunner(scalingExecutor), true);
checkExecutionException(getSubmitRunner(scalingExecutor), false);
} finally {
ThreadPool.terminate(scalingExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException {
final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1,
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
try {
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), false);
checkExecutionException(getSubmitRunner(autoQueueFixedExecutor), false);
} finally {
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
try {
checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
checkExecutionException(getSubmitRunner(prioritizedExecutor), false);
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true);
} finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
}
}

public void testExecutionExceptionOnScheduler() throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
try {
// scheduler just swallows the exceptions
// TODO: bubble these exceptions up
checkExecutionException(getExecuteRunner(scheduler), false);
checkExecutionException(getSubmitRunner(scheduler), false);
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), false);
} finally {
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
}
}

private void checkExecutionException(Consumer<Runnable> runner, boolean expectException) throws InterruptedException {
logger.info("checking exception for {}", runner);
final Runnable runnable;
final boolean willThrow;
if (randomBoolean()) {
runnable = () -> {
throw new IllegalStateException("future exception");
};
willThrow = expectException;
} else {
runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {

}

@Override
protected void doRun() {
throw new IllegalStateException("future exception");
},
false,
o -> assertFalse(o.isPresent()));
}
};
willThrow = false;
}
runExecutionTest(
runner,
runnable,
willThrow,
o -> {
assertEquals(willThrow, o.isPresent());
if (willThrow) {
assertThat(o.get(), instanceOf(IllegalStateException.class));
assertThat(o.get(), hasToString(containsString("future exception")));
}
});
}

Consumer<Runnable> getExecuteRunner(ExecutorService executor) {
return new Consumer<Runnable>() {
@Override
public void accept(Runnable runnable) {
executor.execute(runnable);
}

@Override
public String toString() {
return "executor(" + executor + ").execute()";
}
};
}

Consumer<Runnable> getSubmitRunner(ExecutorService executor) {
return new Consumer<Runnable>() {
@Override
public void accept(Runnable runnable) {
executor.submit(runnable);
}

@Override
public String toString() {
return "executor(" + executor + ").submit()";
}
};
}

Consumer<Runnable> getScheduleRunner(String executor) {
return new Consumer<Runnable>() {
@Override
public void accept(Runnable runnable) {
threadPool.schedule(randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor, runnable);
}

@Override
public String toString() {
return "schedule(" + executor + ")";
}
};
}

private void runExecutionExceptionTest(
private void runExecutionTest(
final Consumer<Runnable> runner,
final Runnable runnable,
final boolean expectThrowable,
final Consumer<Optional<Throwable>> consumer) throws InterruptedException {
Expand All @@ -82,13 +340,18 @@ private void runExecutionExceptionTest(

final CountDownLatch supplierLatch = new CountDownLatch(1);

threadPool.generic().submit(() -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
});
try {
runner.accept(() -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
});
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

supplierLatch.await();

Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/elasticsearch/ExceptionsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,13 @@ public static Optional<Error> maybeError(final Throwable cause, final Logger log
return Optional.empty();
}

/**
* See {@link #maybeError(Throwable, Logger)}. Uses the class-local logger.
*/
public static Optional<Error> maybeError(final Throwable cause) {
return maybeError(cause, logger);
}

/**
* If the specified cause is an unrecoverable error, this method will rethrow the cause on a separate thread so that it can not be
* caught and bubbles up to the uncaught exception handler. Note that the cause tree is examined for any {@link Error}. See
Expand Down
Loading