Skip to content

Handle scheduler exceptions #38183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions buildSrc/src/main/resources/forbidden/es-all-signatures.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,6 @@ java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.RejectedExecutionHandler)
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)

@defaultMessage use Scheduler.schedule(Runnable, delay, executor) instead (mocking tests typically rely on that signature).
org.elasticsearch.threadpool.Scheduler#schedule(org.elasticsearch.common.unit.TimeValue, java.lang.String, java.lang.Runnable)
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void onFailure(Exception e) {
logger.trace(
(Supplier<?>) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e);
countSearchRetry.run();
threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this);
threadPool.schedule(RetryHelper.this, delay, ThreadPool.Names.SAME);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -320,7 +319,7 @@ public void testThreadPoolRejectionsAbortRequest() throws Exception {
worker.rethrottle(1);
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
// While we're here we can check that the sleep made it through
assertThat(delay.nanos(), greaterThan(0L));
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
Expand Down Expand Up @@ -439,7 +438,7 @@ public void testScrollDelay() throws Exception {
AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
capturedDelay.set(delay);
capturedCommand.set(command);
return null;
Expand Down Expand Up @@ -615,7 +614,7 @@ public void testCancelWhileDelayedAfterScrollResponse() throws Exception {
*/
setupClient(new TestThreadPool(getTestName()) {
@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
/*
* This is called twice:
* 1. To schedule the throttling. When that happens we immediately cancel the task.
Expand All @@ -626,7 +625,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
if (delay.nanos() > 0) {
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
}
return super.schedule(delay, name, command);
return super.schedule(command, delay, name);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -104,7 +103,7 @@ public ExecutorService executor(String name) {
}

@Override
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
command.run();
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@

package org.elasticsearch.threadpool;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.junit.After;
import org.junit.Before;

Expand All @@ -38,6 +44,7 @@
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;

Expand Down Expand Up @@ -108,7 +115,12 @@ public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws In
try {
checkExecutionError(getExecuteRunner(prioritizedExecutor));
checkExecutionError(getSubmitRunner(prioritizedExecutor));
// bias towards timeout
checkExecutionError(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r));
// race whether timeout or success (but typically biased towards success)
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r));
// bias towards no timeout.
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r));
} finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -170,10 +182,7 @@ public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedE
final boolean expectExceptionOnSchedule =
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE
// scheduler just swallows the exception here
// TODO: bubble these exceptions up
&& ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.DIRECT;
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule);
}
}
Expand Down Expand Up @@ -219,14 +228,19 @@ public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37708")
public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
try {
checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
checkExecutionException(getSubmitRunner(prioritizedExecutor), false);

// bias towards timeout
checkExecutionException(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r), true);
// race whether timeout or success (but typically biased towards success)
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true);
// bias towards no timeout.
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r), true);
} finally {
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
}
Expand All @@ -235,26 +249,39 @@ public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throw
public void testExecutionExceptionOnScheduler() throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
try {
// scheduler just swallows the exceptions
// TODO: bubble these exceptions up
checkExecutionException(getExecuteRunner(scheduler), false);
checkExecutionException(getSubmitRunner(scheduler), false);
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), false);
checkExecutionException(getExecuteRunner(scheduler), true);
// while submit does return a Future, we choose to log exceptions anyway,
// since this is the semi-internal SafeScheduledThreadPoolExecutor that is being used,
// which also logs exceptions for schedule calls.
checkExecutionException(getSubmitRunner(scheduler), true);
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), true);
} finally {
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
}
}

private Runnable delayMillis(Runnable r, int ms) {
return () -> {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
r.run();
};
}

private void checkExecutionException(Consumer<Runnable> runner, boolean expectException) throws InterruptedException {
logger.info("checking exception for {}", runner);
final Runnable runnable;
final boolean willThrow;
if (randomBoolean()) {
logger.info("checking direct exception for {}", runner);
runnable = () -> {
throw new IllegalStateException("future exception");
};
willThrow = expectException;
} else {
logger.info("checking abstract runnable exception for {}", runner);
runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
Expand All @@ -275,6 +302,7 @@ protected void doRun() {
o -> {
assertEquals(willThrow, o.isPresent());
if (willThrow) {
if (o.get() instanceof Error) throw (Error) o.get();
assertThat(o.get(), instanceOf(IllegalStateException.class));
assertThat(o.get(), hasToString(containsString("future exception")));
}
Expand Down Expand Up @@ -313,7 +341,7 @@ Consumer<Runnable> getScheduleRunner(String executor) {
return new Consumer<Runnable>() {
@Override
public void accept(Runnable runnable) {
threadPool.schedule(randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor, runnable);
threadPool.schedule(runnable, randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor);
}

@Override
Expand All @@ -324,42 +352,77 @@ public String toString() {
}

private void runExecutionTest(
final Consumer<Runnable> runner,
final Runnable runnable,
final boolean expectThrowable,
final Consumer<Optional<Throwable>> consumer) throws InterruptedException {
final Consumer<Runnable> runner,
final Runnable runnable,
final boolean expectThrowable,
final Consumer<Optional<Throwable>> consumer) throws InterruptedException {
final AtomicReference<Throwable> throwableReference = new AtomicReference<>();
final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
final CountDownLatch uncaughtExceptionHandlerLatch = new CountDownLatch(1);

try {
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
assertTrue(expectThrowable);
throwableReference.set(e);
assertTrue("Only one message allowed", throwableReference.compareAndSet(null, e));
uncaughtExceptionHandlerLatch.countDown();
});

final CountDownLatch supplierLatch = new CountDownLatch(1);

try {
runner.accept(() -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
Runnable job = () -> {
try {
runnable.run();
} finally {
supplierLatch.countDown();
}
};

// snoop on logging to also handle the cases where exceptions are simply logged in Scheduler.
final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class);
final MockLogAppender appender = new MockLogAppender();
appender.addExpectation(
new MockLogAppender.LoggingExpectation() {
@Override
public void match(LogEvent event) {
if (event.getLevel() == Level.WARN) {
assertThat("no other warnings than those expected",
event.getMessage().getFormattedMessage(),
equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]"));
assertTrue(expectThrowable);
assertNotNull(event.getThrown());
assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown()));
uncaughtExceptionHandlerLatch.countDown();
}
}

@Override
public void assertMatched() {
}
});
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

supplierLatch.await();
appender.start();
Loggers.addAppender(schedulerLogger, appender);
try {
try {
runner.accept(job);
} catch (Throwable t) {
consumer.accept(Optional.of(t));
return;
}

supplierLatch.await();

if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
if (expectThrowable) {
uncaughtExceptionHandlerLatch.await();
}
} finally {
Loggers.removeAppender(schedulerLogger, appender);
appender.stop();
}

consumer.accept(Optional.ofNullable(throwableReference.get()));
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} finally {
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,15 @@ public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkRespons
Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
return new Builder(consumer, listener,
(delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
buildScheduler(scheduledThreadPoolExecutor),
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
}

private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
return (command, delay, executor) ->
Scheduler.wrapAsScheduledCancellable(scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
}

private final int bulkActions;
private final long bulkSize;

Expand Down Expand Up @@ -345,7 +350,9 @@ private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler
if (flushInterval == null) {
return new Scheduler.Cancellable() {
@Override
public void cancel() {}
public boolean cancel() {
return false;
}

@Override
public boolean isCancelled() {
Expand Down
14 changes: 8 additions & 6 deletions server/src/main/java/org/elasticsearch/action/bulk/Retry.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -121,7 +119,7 @@ static class RetryHandler implements ActionListener<BulkResponse> {
// needed to construct the next bulk request based on the response to the previous one
// volatile as we're called from a scheduled thread
private volatile BulkRequest currentBulkRequest;
private volatile ScheduledFuture<?> scheduledRequestFuture;
private volatile Scheduler.Cancellable retryCancellable;

RetryHandler(BackoffPolicy backoffPolicy, BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
ActionListener<BulkResponse> listener, Scheduler scheduler) {
Expand Down Expand Up @@ -155,7 +153,9 @@ public void onFailure(Exception e) {
try {
listener.onFailure(e);
} finally {
FutureUtils.cancel(scheduledRequestFuture);
if (retryCancellable != null) {
retryCancellable.cancel();
}
}
}

Expand All @@ -164,7 +164,7 @@ private void retry(BulkRequest bulkRequestForRetry) {
TimeValue next = backoff.next();
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
retryCancellable = scheduler.schedule(command, next, ThreadPool.Names.SAME);
}

private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
Expand Down Expand Up @@ -198,7 +198,9 @@ private void finishHim() {
try {
listener.onResponse(getAccumulatedResponse());
} finally {
FutureUtils.cancel(scheduledRequestFuture);
if (retryCancellable != null) {
retryCancellable.cancel();
}
}
}

Expand Down
Loading