Skip to content

Commit a84744a

Browse files
Handle scheduler exceptions (#38183)
Scheduler.schedule(...) would previously assume that caller handles exception by calling get() on the returned ScheduledFuture. schedule() now returns a ScheduledCancellable that no longer gives access to the exception. Instead, any exception thrown out of a scheduled Runnable is logged as a warning. In this backport to 6.x, source backwards compatibility is maintained and some of the changes has therefore not been carried out (notably the signature change on Processor.Parameters.scheduler). This is a continuation of #28667, #36137 and also fixes #37708.
1 parent 588a08e commit a84744a

File tree

53 files changed

+718
-241
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+718
-241
lines changed

buildSrc/src/main/resources/forbidden/es-all-signatures.txt

+3
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,6 @@ java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int)
6363
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory)
6464
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.RejectedExecutionHandler)
6565
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)
66+
67+
@defaultMessage use Scheduler.schedule(Runnable, delay, executor) instead (mocking tests typically rely on that signature).
68+
org.elasticsearch.threadpool.Scheduler#schedule(org.elasticsearch.common.unit.TimeValue, java.lang.String, java.lang.Runnable)

modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public void onFailure(Exception e) {
215215
logger.trace(
216216
(Supplier<?>) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e);
217217
countSearchRetry.run();
218-
threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this);
218+
threadPool.schedule(RetryHelper.this, delay, ThreadPool.Names.SAME);
219219
return;
220220
}
221221
}

modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@
9292
import java.util.Set;
9393
import java.util.concurrent.CountDownLatch;
9494
import java.util.concurrent.ExecutionException;
95-
import java.util.concurrent.ScheduledFuture;
9695
import java.util.concurrent.TimeUnit;
9796
import java.util.concurrent.atomic.AtomicInteger;
9897
import java.util.concurrent.atomic.AtomicReference;
@@ -322,7 +321,7 @@ public void testThreadPoolRejectionsAbortRequest() throws Exception {
322321
worker.rethrottle(1);
323322
setupClient(new TestThreadPool(getTestName()) {
324323
@Override
325-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
324+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
326325
// While we're here we can check that the sleep made it through
327326
assertThat(delay.nanos(), greaterThan(0L));
328327
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
@@ -441,7 +440,7 @@ public void testScrollDelay() throws Exception {
441440
AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
442441
setupClient(new TestThreadPool(getTestName()) {
443442
@Override
444-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
443+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
445444
capturedDelay.set(delay);
446445
capturedCommand.set(command);
447446
return null;
@@ -617,7 +616,7 @@ public void testCancelWhileDelayedAfterScrollResponse() throws Exception {
617616
*/
618617
setupClient(new TestThreadPool(getTestName()) {
619618
@Override
620-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
619+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
621620
/*
622621
* This is called twice:
623622
* 1. To schedule the throttling. When that happens we immediately cancel the task.
@@ -628,7 +627,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
628627
if (delay.nanos() > 0) {
629628
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
630629
}
631-
return super.schedule(delay, name, command);
630+
return super.schedule(command, delay, name);
632631
}
633632
});
634633

modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import java.nio.charset.StandardCharsets;
7070
import java.util.concurrent.ExecutorService;
7171
import java.util.concurrent.Future;
72-
import java.util.concurrent.ScheduledFuture;
7372
import java.util.concurrent.atomic.AtomicBoolean;
7473
import java.util.function.Consumer;
7574

@@ -104,7 +103,7 @@ public ExecutorService executor(String name) {
104103
}
105104

106105
@Override
107-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
106+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
108107
command.run();
109108
return null;
110109
}

qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java

+93-30
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,19 @@
1919

2020
package org.elasticsearch.threadpool;
2121

22+
import org.apache.logging.log4j.Level;
23+
import org.apache.logging.log4j.LogManager;
24+
import org.apache.logging.log4j.Logger;
25+
import org.apache.logging.log4j.core.LogEvent;
26+
import org.elasticsearch.common.logging.Loggers;
2227
import org.elasticsearch.common.settings.Settings;
2328
import org.elasticsearch.common.unit.TimeValue;
2429
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2530
import org.elasticsearch.common.util.concurrent.EsExecutors;
2631
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
2732
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
2833
import org.elasticsearch.test.ESTestCase;
34+
import org.elasticsearch.test.MockLogAppender;
2935
import org.junit.After;
3036
import org.junit.Before;
3137

@@ -38,6 +44,7 @@
3844
import java.util.function.Consumer;
3945

4046
import static org.hamcrest.Matchers.containsString;
47+
import static org.hamcrest.Matchers.equalTo;
4148
import static org.hamcrest.Matchers.hasToString;
4249
import static org.hamcrest.Matchers.instanceOf;
4350

@@ -108,7 +115,12 @@ public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws In
108115
try {
109116
checkExecutionError(getExecuteRunner(prioritizedExecutor));
110117
checkExecutionError(getSubmitRunner(prioritizedExecutor));
118+
// bias towards timeout
119+
checkExecutionError(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r));
120+
// race whether timeout or success (but typically biased towards success)
111121
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r));
122+
// bias towards no timeout.
123+
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r));
112124
} finally {
113125
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
114126
}
@@ -170,10 +182,7 @@ public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedE
170182
final boolean expectExceptionOnSchedule =
171183
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
172184
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
173-
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE
174-
// scheduler just swallows the exception here
175-
// TODO: bubble these exceptions up
176-
&& ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.DIRECT;
185+
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
177186
checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule);
178187
}
179188
}
@@ -219,14 +228,19 @@ public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws
219228
}
220229
}
221230

222-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37708")
223231
public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
224232
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
225233
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
226234
try {
227235
checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
228236
checkExecutionException(getSubmitRunner(prioritizedExecutor), false);
237+
238+
// bias towards timeout
239+
checkExecutionException(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r), true);
240+
// race whether timeout or success (but typically biased towards success)
229241
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true);
242+
// bias towards no timeout.
243+
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r), true);
230244
} finally {
231245
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
232246
}
@@ -235,26 +249,39 @@ public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throw
235249
public void testExecutionExceptionOnScheduler() throws InterruptedException {
236250
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
237251
try {
238-
// scheduler just swallows the exceptions
239-
// TODO: bubble these exceptions up
240-
checkExecutionException(getExecuteRunner(scheduler), false);
241-
checkExecutionException(getSubmitRunner(scheduler), false);
242-
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), false);
252+
checkExecutionException(getExecuteRunner(scheduler), true);
253+
// while submit does return a Future, we choose to log exceptions anyway,
254+
// since this is the semi-internal SafeScheduledThreadPoolExecutor that is being used,
255+
// which also logs exceptions for schedule calls.
256+
checkExecutionException(getSubmitRunner(scheduler), true);
257+
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), true);
243258
} finally {
244259
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
245260
}
246261
}
247262

263+
private Runnable delayMillis(Runnable r, int ms) {
264+
return () -> {
265+
try {
266+
Thread.sleep(ms);
267+
} catch (InterruptedException e) {
268+
Thread.currentThread().interrupt();
269+
}
270+
r.run();
271+
};
272+
}
273+
248274
private void checkExecutionException(Consumer<Runnable> runner, boolean expectException) throws InterruptedException {
249-
logger.info("checking exception for {}", runner);
250275
final Runnable runnable;
251276
final boolean willThrow;
252277
if (randomBoolean()) {
278+
logger.info("checking direct exception for {}", runner);
253279
runnable = () -> {
254280
throw new IllegalStateException("future exception");
255281
};
256282
willThrow = expectException;
257283
} else {
284+
logger.info("checking abstract runnable exception for {}", runner);
258285
runnable = new AbstractRunnable() {
259286
@Override
260287
public void onFailure(Exception e) {
@@ -275,6 +302,7 @@ protected void doRun() {
275302
o -> {
276303
assertEquals(willThrow, o.isPresent());
277304
if (willThrow) {
305+
if (o.get() instanceof Error) throw (Error) o.get();
278306
assertThat(o.get(), instanceOf(IllegalStateException.class));
279307
assertThat(o.get(), hasToString(containsString("future exception")));
280308
}
@@ -313,7 +341,7 @@ Consumer<Runnable> getScheduleRunner(String executor) {
313341
return new Consumer<Runnable>() {
314342
@Override
315343
public void accept(Runnable runnable) {
316-
threadPool.schedule(randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor, runnable);
344+
threadPool.schedule(runnable, randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor);
317345
}
318346

319347
@Override
@@ -324,42 +352,77 @@ public String toString() {
324352
}
325353

326354
private void runExecutionTest(
327-
final Consumer<Runnable> runner,
328-
final Runnable runnable,
329-
final boolean expectThrowable,
330-
final Consumer<Optional<Throwable>> consumer) throws InterruptedException {
355+
final Consumer<Runnable> runner,
356+
final Runnable runnable,
357+
final boolean expectThrowable,
358+
final Consumer<Optional<Throwable>> consumer) throws InterruptedException {
331359
final AtomicReference<Throwable> throwableReference = new AtomicReference<>();
332360
final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
333361
final CountDownLatch uncaughtExceptionHandlerLatch = new CountDownLatch(1);
334362

335363
try {
336364
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
337365
assertTrue(expectThrowable);
338-
throwableReference.set(e);
366+
assertTrue("Only one message allowed", throwableReference.compareAndSet(null, e));
339367
uncaughtExceptionHandlerLatch.countDown();
340368
});
341369

342370
final CountDownLatch supplierLatch = new CountDownLatch(1);
343371

344-
try {
345-
runner.accept(() -> {
346-
try {
347-
runnable.run();
348-
} finally {
349-
supplierLatch.countDown();
372+
Runnable job = () -> {
373+
try {
374+
runnable.run();
375+
} finally {
376+
supplierLatch.countDown();
377+
}
378+
};
379+
380+
// snoop on logging to also handle the cases where exceptions are simply logged in Scheduler.
381+
final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class);
382+
final MockLogAppender appender = new MockLogAppender();
383+
appender.addExpectation(
384+
new MockLogAppender.LoggingExpectation() {
385+
@Override
386+
public void match(LogEvent event) {
387+
if (event.getLevel() == Level.WARN) {
388+
assertThat("no other warnings than those expected",
389+
event.getMessage().getFormattedMessage(),
390+
equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]"));
391+
assertTrue(expectThrowable);
392+
assertNotNull(event.getThrown());
393+
assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown()));
394+
uncaughtExceptionHandlerLatch.countDown();
395+
}
396+
}
397+
398+
@Override
399+
public void assertMatched() {
350400
}
351401
});
352-
} catch (Throwable t) {
353-
consumer.accept(Optional.of(t));
354-
return;
355-
}
356402

357-
supplierLatch.await();
403+
appender.start();
404+
Loggers.addAppender(schedulerLogger, appender);
405+
try {
406+
try {
407+
runner.accept(job);
408+
} catch (Throwable t) {
409+
consumer.accept(Optional.of(t));
410+
return;
411+
}
412+
413+
supplierLatch.await();
358414

359-
if (expectThrowable) {
360-
uncaughtExceptionHandlerLatch.await();
415+
if (expectThrowable) {
416+
uncaughtExceptionHandlerLatch.await();
417+
}
418+
} finally {
419+
Loggers.removeAppender(schedulerLogger, appender);
420+
appender.stop();
361421
}
422+
362423
consumer.accept(Optional.ofNullable(throwableReference.get()));
424+
} catch (IllegalAccessException e) {
425+
throw new RuntimeException(e);
363426
} finally {
364427
Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
365428
}

server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,15 @@ public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkRespons
203203
Objects.requireNonNull(listener, "listener");
204204
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
205205
return new Builder(consumer, listener,
206-
(delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
206+
buildScheduler(scheduledThreadPoolExecutor),
207207
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
208208
}
209209

210+
private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) {
211+
return (command, delay, executor) ->
212+
Scheduler.wrapAsScheduledCancellable(scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
213+
}
214+
210215
private final int bulkActions;
211216
private final long bulkSize;
212217

@@ -345,7 +350,9 @@ private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler
345350
if (flushInterval == null) {
346351
return new Scheduler.Cancellable() {
347352
@Override
348-
public void cancel() {}
353+
public boolean cancel() {
354+
return false;
355+
}
349356

350357
@Override
351358
public boolean isCancelled() {

server/src/main/java/org/elasticsearch/action/bulk/Retry.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,13 @@
2424
import org.elasticsearch.action.support.PlainActionFuture;
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.common.unit.TimeValue;
27-
import org.elasticsearch.common.util.concurrent.FutureUtils;
2827
import org.elasticsearch.rest.RestStatus;
2928
import org.elasticsearch.threadpool.Scheduler;
3029
import org.elasticsearch.threadpool.ThreadPool;
3130

3231
import java.util.ArrayList;
3332
import java.util.Iterator;
3433
import java.util.List;
35-
import java.util.concurrent.ScheduledFuture;
3634
import java.util.function.BiConsumer;
3735
import java.util.function.Predicate;
3836

@@ -121,7 +119,7 @@ static class RetryHandler implements ActionListener<BulkResponse> {
121119
// needed to construct the next bulk request based on the response to the previous one
122120
// volatile as we're called from a scheduled thread
123121
private volatile BulkRequest currentBulkRequest;
124-
private volatile ScheduledFuture<?> scheduledRequestFuture;
122+
private volatile Scheduler.Cancellable retryCancellable;
125123

126124
RetryHandler(BackoffPolicy backoffPolicy, BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
127125
ActionListener<BulkResponse> listener, Scheduler scheduler) {
@@ -155,7 +153,9 @@ public void onFailure(Exception e) {
155153
try {
156154
listener.onFailure(e);
157155
} finally {
158-
FutureUtils.cancel(scheduledRequestFuture);
156+
if (retryCancellable != null) {
157+
retryCancellable.cancel();
158+
}
159159
}
160160
}
161161

@@ -164,7 +164,7 @@ private void retry(BulkRequest bulkRequestForRetry) {
164164
TimeValue next = backoff.next();
165165
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
166166
Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
167-
scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
167+
retryCancellable = scheduler.schedule(command, next, ThreadPool.Names.SAME);
168168
}
169169

170170
private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) {
@@ -198,7 +198,9 @@ private void finishHim() {
198198
try {
199199
listener.onResponse(getAccumulatedResponse());
200200
} finally {
201-
FutureUtils.cancel(scheduledRequestFuture);
201+
if (retryCancellable != null) {
202+
retryCancellable.cancel();
203+
}
202204
}
203205
}
204206

0 commit comments

Comments
 (0)