Skip to content

Commit 68ed72b

Browse files
Handle scheduler exceptions (#38014)
Scheduler.schedule(...) would previously assume that caller handles exception by calling get() on the returned ScheduledFuture. schedule() now returns a ScheduledCancellable that no longer gives access to the exception. Instead, any exception thrown out of a scheduled Runnable is logged as a warning. This is a continuation of #28667, #36137 and also fixes #37708.
1 parent 7f738e8 commit 68ed72b

File tree

59 files changed

+617
-260
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+617
-260
lines changed

libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020

2121
import java.util.Map;
2222
import java.util.concurrent.ConcurrentHashMap;
23-
import java.util.concurrent.ScheduledFuture;
2423
import java.util.concurrent.atomic.AtomicBoolean;
2524
import java.util.concurrent.atomic.AtomicInteger;
26-
import java.util.function.BiFunction;
25+
import java.util.function.BiConsumer;
2726
import java.util.function.LongSupplier;
2827

2928
/**
@@ -68,7 +67,7 @@ public interface ThreadWatchdog {
6867
static ThreadWatchdog newInstance(long interval,
6968
long maxExecutionTime,
7069
LongSupplier relativeTimeSupplier,
71-
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
70+
BiConsumer<Long, Runnable> scheduler) {
7271
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
7372
}
7473

@@ -105,15 +104,15 @@ class Default implements ThreadWatchdog {
105104
private final long interval;
106105
private final long maxExecutionTime;
107106
private final LongSupplier relativeTimeSupplier;
108-
private final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
107+
private final BiConsumer<Long, Runnable> scheduler;
109108
private final AtomicInteger registered = new AtomicInteger(0);
110109
private final AtomicBoolean running = new AtomicBoolean(false);
111110
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
112111

113112
private Default(long interval,
114113
long maxExecutionTime,
115114
LongSupplier relativeTimeSupplier,
116-
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
115+
BiConsumer<Long, Runnable> scheduler) {
117116
this.interval = interval;
118117
this.maxExecutionTime = maxExecutionTime;
119118
this.relativeTimeSupplier = relativeTimeSupplier;
@@ -124,7 +123,7 @@ public void register() {
124123
registered.getAndIncrement();
125124
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
126125
if (running.compareAndSet(false, true) == true) {
127-
scheduler.apply(interval, this::interruptLongRunningExecutions);
126+
scheduler.accept(interval, this::interruptLongRunningExecutions);
128127
}
129128
assert previousValue == null;
130129
}
@@ -149,7 +148,7 @@ private void interruptLongRunningExecutions() {
149148
}
150149
}
151150
if (registered.get() > 0) {
152-
scheduler.apply(interval, this::interruptLongRunningExecutions);
151+
scheduler.accept(interval, this::interruptLongRunningExecutions);
153152
} else {
154153
running.set(false);
155154
}

libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import java.util.TreeMap;
30-
import java.util.concurrent.ScheduledFuture;
3130
import java.util.concurrent.atomic.AtomicBoolean;
32-
import java.util.function.BiFunction;
31+
import java.util.function.BiConsumer;
3332

3433
import static org.hamcrest.Matchers.equalTo;
3534
import static org.hamcrest.Matchers.is;
@@ -418,7 +417,7 @@ public void testExponentialExpressions() {
418417
"Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}";
419418
String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " +
420419
"Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018";
421-
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> {
420+
BiConsumer<Long, Runnable> scheduler = (delay, command) -> {
422421
try {
423422
Thread.sleep(delay);
424423
} catch (InterruptedException e) {
@@ -430,7 +429,6 @@ public void testExponentialExpressions() {
430429
}
431430
});
432431
t.start();
433-
return null;
434432
};
435433
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
436434
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));

libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public void testInterrupt() throws Exception {
5151
}
5252
});
5353
thread.start();
54-
return null;
5554
});
5655

5756
Map<?, ?> registry = ((ThreadWatchdog.Default) watchdog).registry;

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ public List<Setting<?>> getSettings() {
111111
private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
112112
long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis();
113113
long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis();
114-
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, parameters.relativeTimeSupplier, parameters.scheduler);
114+
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis,
115+
parameters.relativeTimeSupplier, parameters.scheduler::apply);
115116
}
116117

117118
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public void onFailure(Exception e) {
215215
logger.trace(
216216
(Supplier<?>) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e);
217217
countSearchRetry.run();
218-
threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this);
218+
threadPool.schedule(RetryHelper.this, delay, ThreadPool.Names.SAME);
219219
return;
220220
}
221221
}

modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@
9393
import java.util.Set;
9494
import java.util.concurrent.CountDownLatch;
9595
import java.util.concurrent.ExecutionException;
96-
import java.util.concurrent.ScheduledFuture;
9796
import java.util.concurrent.TimeUnit;
9897
import java.util.concurrent.atomic.AtomicInteger;
9998
import java.util.concurrent.atomic.AtomicReference;
@@ -323,7 +322,7 @@ public void testThreadPoolRejectionsAbortRequest() throws Exception {
323322
worker.rethrottle(1);
324323
setupClient(new TestThreadPool(getTestName()) {
325324
@Override
326-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
325+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
327326
// While we're here we can check that the sleep made it through
328327
assertThat(delay.nanos(), greaterThan(0L));
329328
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
@@ -442,7 +441,7 @@ public void testScrollDelay() throws Exception {
442441
AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
443442
setupClient(new TestThreadPool(getTestName()) {
444443
@Override
445-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
444+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
446445
capturedDelay.set(delay);
447446
capturedCommand.set(command);
448447
return null;
@@ -618,7 +617,7 @@ public void testCancelWhileDelayedAfterScrollResponse() throws Exception {
618617
*/
619618
setupClient(new TestThreadPool(getTestName()) {
620619
@Override
621-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
620+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
622621
/*
623622
* This is called twice:
624623
* 1. To schedule the throttling. When that happens we immediately cancel the task.
@@ -629,7 +628,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
629628
if (delay.nanos() > 0) {
630629
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
631630
}
632-
return super.schedule(delay, name, command);
631+
return super.schedule(command, delay, name);
633632
}
634633
});
635634

modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import java.nio.charset.StandardCharsets;
7070
import java.util.concurrent.ExecutorService;
7171
import java.util.concurrent.Future;
72-
import java.util.concurrent.ScheduledFuture;
7372
import java.util.concurrent.atomic.AtomicBoolean;
7473
import java.util.function.Consumer;
7574

@@ -104,7 +103,7 @@ public ExecutorService executor(String name) {
104103
}
105104

106105
@Override
107-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
106+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
108107
command.run();
109108
return null;
110109
}

0 commit comments

Comments
 (0)