diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java index 86f2ba332790f..f6c2c616d4008 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java @@ -19,11 +19,7 @@ package org.elasticsearch.threadpool; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.LogEvent; -import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -31,20 +27,21 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.MockLogAppender; import org.junit.After; import org.junit.Before; import java.util.Optional; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; @@ -260,6 +257,50 @@ public void testExecutionExceptionOnScheduler() throws InterruptedException { } } + @SuppressForbidden(reason = "this tests that the deprecated method still works") + public void testDeprecatedSchedule() throws ExecutionException, InterruptedException { + verifyDeprecatedSchedule(((threadPool, runnable) + -> threadPool.schedule(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable))); + } + + public void testThreadPoolScheduleDeprecated() throws ExecutionException, InterruptedException { + verifyDeprecatedSchedule(((threadPool, runnable) + -> threadPool.scheduleDeprecated(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable))); + } + + private void verifyDeprecatedSchedule(BiFunction> scheduleFunction) throws InterruptedException, ExecutionException { + Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler(); + CountDownLatch missingExceptions = new CountDownLatch(1); + Thread.setDefaultUncaughtExceptionHandler((t, e) -> { + missingExceptions.countDown(); + }); + try { + ThreadPool threadPool = new TestThreadPool("test"); + CountDownLatch missingExecutions = new CountDownLatch(1); + try { + scheduleFunction.apply(threadPool, missingExecutions::countDown) + .get(); + assertEquals(0, missingExecutions.getCount()); + + ExecutionException exception = expectThrows(ExecutionException.class, + "schedule(...).get() must throw exception from runnable", + () -> scheduleFunction.apply(threadPool, + () -> { + throw new IllegalArgumentException("FAIL"); + } + ).get()); + + assertEquals(IllegalArgumentException.class, exception.getCause().getClass()); + assertTrue(missingExceptions.await(30, TimeUnit.SECONDS)); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } finally { + Thread.setDefaultUncaughtExceptionHandler(originalHandler); + } + } + private Runnable delayMillis(Runnable r, int ms) { return () -> { try { @@ -369,60 +410,26 @@ private void runExecutionTest( final CountDownLatch supplierLatch = new CountDownLatch(1); - Runnable job = () -> { - try { - runnable.run(); - } finally { - supplierLatch.countDown(); - } - }; - - // snoop on logging to also handle the cases where exceptions are simply logged in Scheduler. - final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class); - final MockLogAppender appender = new MockLogAppender(); - appender.addExpectation( - new MockLogAppender.LoggingExpectation() { - @Override - public void match(LogEvent event) { - if (event.getLevel() == Level.WARN) { - assertThat("no other warnings than those expected", - event.getMessage().getFormattedMessage(), - equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]")); - assertTrue(expectThrowable); - assertNotNull(event.getThrown()); - assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown())); - uncaughtExceptionHandlerLatch.countDown(); - } - } - - @Override - public void assertMatched() { + try { + runner.accept(() -> { + try { + runnable.run(); + } finally { + supplierLatch.countDown(); } }); + } catch (Throwable t) { + consumer.accept(Optional.of(t)); + return; + } - appender.start(); - Loggers.addAppender(schedulerLogger, appender); - try { - try { - runner.accept(job); - } catch (Throwable t) { - consumer.accept(Optional.of(t)); - return; - } - - supplierLatch.await(); + supplierLatch.await(); - if (expectThrowable) { - uncaughtExceptionHandlerLatch.await(); - } - } finally { - Loggers.removeAppender(schedulerLogger, appender); - appender.stop(); + if (expectThrowable) { + uncaughtExceptionHandlerLatch.await(); } consumer.accept(Optional.ofNullable(throwableReference.get())); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); } finally { Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java index eb9e36eeec0a8..d45a77ddb22ac 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java +++ b/server/src/main/java/org/elasticsearch/index/shard/GlobalCheckpointListeners.java @@ -135,7 +135,7 @@ synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpo * before we could be cancelled by the notification. In this case, our listener here would * not be in the map and we should not fire the timeout logic. */ - removed = listeners.remove(listener).v2() != null; + removed = listeners.remove(listener) != null; } if (removed) { final TimeoutException e = new TimeoutException(timeout.getStringRep()); diff --git a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java index 34a6efe82038d..450354e44dbbb 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java +++ b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java @@ -21,7 +21,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -47,8 +47,8 @@ public interface Scheduler { /** * Create a scheduler that can be used client side. Server side, please use ThreadPool.schedule instead. * - * Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started - * using execute, submit and schedule. + * Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will + * be logged as a warning. This includes jobs started using execute, submit and schedule. * @param settings the settings to use * @return executor */ @@ -272,7 +272,8 @@ public void onAfter() { } /** - * This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks + * This subclass ensures to properly bubble up Throwable instances of both type Error and Exception thrown in submitted/scheduled + * tasks to the uncaught exception handler */ class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class); @@ -294,12 +295,10 @@ public SafeScheduledThreadPoolExecutor(int corePoolSize) { @Override protected void afterExecute(Runnable r, Throwable t) { - Throwable exception = EsExecutors.rethrowErrors(r); - if (exception != null) { - logger.warn(() -> - new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()), - exception); - } + if (t != null) return; + // Scheduler only allows Runnable's so we expect no checked exceptions here. If anyone uses submit directly on `this`, we + // accept the wrapped exception in the output. + ExceptionsHelper.reThrowIfNotNull(EsExecutors.rethrowErrors(r)); } } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index d22f0166ab921..30db1ac52726f 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.Counter; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; @@ -497,7 +498,16 @@ class ThreadedRunnable implements Runnable { @Override public void run() { - executor.execute(runnable); + try { + executor.execute(runnable); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + logger.debug(new ParameterizedMessage("could not schedule execution of [{}] on [{}] as executor is shut down", + runnable, executor), e); + } else { + throw e; + } + } } @Override diff --git a/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java b/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java index ef7371693c3e1..186f9e86b1e76 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.threadpool; -import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -29,12 +28,9 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -157,37 +153,4 @@ public void testScheduledOnScheduler() throws InterruptedException { Scheduler.terminate(executor, 10, TimeUnit.SECONDS); } } - - @SuppressForbidden(reason = "this tests that the deprecated method still works") - public void testDeprecatedSchedule() throws ExecutionException, InterruptedException { - verifyDeprecatedSchedule(((threadPool, runnable) - -> threadPool.schedule(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable))); - } - - public void testThreadPoolScheduleDeprecated() throws ExecutionException, InterruptedException { - verifyDeprecatedSchedule(((threadPool, runnable) - -> threadPool.scheduleDeprecated(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable))); - } - - private void verifyDeprecatedSchedule(BiFunction> scheduleFunction) throws InterruptedException, ExecutionException { - ThreadPool threadPool = new TestThreadPool("test"); - CountDownLatch missingExecutions = new CountDownLatch(1); - try { - scheduleFunction.apply(threadPool, missingExecutions::countDown) - .get(); - assertEquals(0, missingExecutions.getCount()); - - ExecutionException exception = expectThrows(ExecutionException.class, - "schedule(...).get() must throw exception from runnable", - () -> scheduleFunction.apply(threadPool, - () -> { throw new IllegalArgumentException("FAIL"); } - ).get()); - - assertEquals(IllegalArgumentException.class, exception.getCause().getClass()); - } finally { - ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); - } - } - }