Skip to content

Commit bf7259c

Browse files
Log warnings on exceptions in Scheduler
Scheduler.schedule(...) would previously assume that caller handles exception by calling get() on the returned ScheduledFuture. schedule() now returns a ScheduledCancellable that no longer gives access to the exception. Instead, any exception thrown out of a scheduled Runnable is logged as a warning. This is a continuation of elastic#28667, elastic#36317 and also fixes elastic#37708.
1 parent 827c4f6 commit bf7259c

File tree

59 files changed

+679
-262
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+679
-262
lines changed

libs/grok/src/main/java/org/elasticsearch/grok/ThreadWatchdog.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020

2121
import java.util.Map;
2222
import java.util.concurrent.ConcurrentHashMap;
23-
import java.util.concurrent.ScheduledFuture;
2423
import java.util.concurrent.atomic.AtomicBoolean;
2524
import java.util.concurrent.atomic.AtomicInteger;
26-
import java.util.function.BiFunction;
25+
import java.util.function.BiConsumer;
2726
import java.util.function.LongSupplier;
2827

2928
/**
@@ -68,7 +67,7 @@ public interface ThreadWatchdog {
6867
static ThreadWatchdog newInstance(long interval,
6968
long maxExecutionTime,
7069
LongSupplier relativeTimeSupplier,
71-
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
70+
BiConsumer<Long, Runnable> scheduler) {
7271
return new Default(interval, maxExecutionTime, relativeTimeSupplier, scheduler);
7372
}
7473

@@ -105,15 +104,15 @@ class Default implements ThreadWatchdog {
105104
private final long interval;
106105
private final long maxExecutionTime;
107106
private final LongSupplier relativeTimeSupplier;
108-
private final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
107+
private final BiConsumer<Long, Runnable> scheduler;
109108
private final AtomicInteger registered = new AtomicInteger(0);
110109
private final AtomicBoolean running = new AtomicBoolean(false);
111110
final ConcurrentHashMap<Thread, Long> registry = new ConcurrentHashMap<>();
112111

113112
private Default(long interval,
114113
long maxExecutionTime,
115114
LongSupplier relativeTimeSupplier,
116-
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
115+
BiConsumer<Long, Runnable> scheduler) {
117116
this.interval = interval;
118117
this.maxExecutionTime = maxExecutionTime;
119118
this.relativeTimeSupplier = relativeTimeSupplier;
@@ -124,7 +123,7 @@ public void register() {
124123
registered.getAndIncrement();
125124
Long previousValue = registry.put(Thread.currentThread(), relativeTimeSupplier.getAsLong());
126125
if (running.compareAndSet(false, true) == true) {
127-
scheduler.apply(interval, this::interruptLongRunningExecutions);
126+
scheduler.accept(interval, this::interruptLongRunningExecutions);
128127
}
129128
assert previousValue == null;
130129
}
@@ -149,7 +148,7 @@ private void interruptLongRunningExecutions() {
149148
}
150149
}
151150
if (registered.get() > 0) {
152-
scheduler.apply(interval, this::interruptLongRunningExecutions);
151+
scheduler.accept(interval, this::interruptLongRunningExecutions);
153152
} else {
154153
running.set(false);
155154
}

libs/grok/src/test/java/org/elasticsearch/grok/GrokTests.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import java.util.TreeMap;
30-
import java.util.concurrent.ScheduledFuture;
3130
import java.util.concurrent.atomic.AtomicBoolean;
32-
import java.util.function.BiFunction;
31+
import java.util.function.BiConsumer;
3332

3433
import static org.hamcrest.Matchers.equalTo;
3534
import static org.hamcrest.Matchers.is;
@@ -418,7 +417,7 @@ public void testExponentialExpressions() {
418417
"Zustand->ABGESCHLOSSEN Kassennummer->%{WORD:param9} Bonnummer->%{WORD:param10} Datum->%{DATESTAMP_OTHER:param11}";
419418
String logLine = "Bonsuche mit folgender Anfrage: Belegart->[EINGESCHRAENKTER_VERKAUF, VERKAUF, NACHERFASSUNG] " +
420419
"Zustand->ABGESCHLOSSEN Kassennummer->2 Bonnummer->6362 Datum->Mon Jan 08 00:00:00 UTC 2018";
421-
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> {
420+
BiConsumer<Long, Runnable> scheduler = (delay, command) -> {
422421
try {
423422
Thread.sleep(delay);
424423
} catch (InterruptedException e) {
@@ -430,7 +429,6 @@ public void testExponentialExpressions() {
430429
}
431430
});
432431
t.start();
433-
return null;
434432
};
435433
Grok grok = new Grok(basePatterns, grokPattern, ThreadWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler));
436434
Exception e = expectThrows(RuntimeException.class, () -> grok.captures(logLine));

libs/grok/src/test/java/org/elasticsearch/grok/ThreadWatchdogTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ public void testInterrupt() throws Exception {
5151
}
5252
});
5353
thread.start();
54-
return null;
5554
});
5655

5756
Map<?, ?> registry = ((ThreadWatchdog.Default) watchdog).registry;

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ public List<Setting<?>> getSettings() {
111111
private static ThreadWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
112112
long intervalMillis = WATCHDOG_INTERVAL.get(parameters.env.settings()).getMillis();
113113
long maxExecutionTimeMillis = WATCHDOG_MAX_EXECUTION_TIME.get(parameters.env.settings()).getMillis();
114-
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis, parameters.relativeTimeSupplier, parameters.scheduler);
114+
return ThreadWatchdog.newInstance(intervalMillis, maxExecutionTimeMillis,
115+
parameters.relativeTimeSupplier, parameters.scheduler::apply);
115116
}
116117

117118
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public void onFailure(Exception e) {
215215
logger.trace(
216216
(Supplier<?>) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e);
217217
countSearchRetry.run();
218-
threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this);
218+
threadPool.schedule(RetryHelper.this, delay, ThreadPool.Names.SAME);
219219
return;
220220
}
221221
}

modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@
9090
import java.util.Set;
9191
import java.util.concurrent.CountDownLatch;
9292
import java.util.concurrent.ExecutionException;
93-
import java.util.concurrent.ScheduledFuture;
9493
import java.util.concurrent.TimeUnit;
9594
import java.util.concurrent.atomic.AtomicInteger;
9695
import java.util.concurrent.atomic.AtomicReference;
@@ -320,7 +319,7 @@ public void testThreadPoolRejectionsAbortRequest() throws Exception {
320319
worker.rethrottle(1);
321320
setupClient(new TestThreadPool(getTestName()) {
322321
@Override
323-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
322+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
324323
// While we're here we can check that the sleep made it through
325324
assertThat(delay.nanos(), greaterThan(0L));
326325
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
@@ -439,7 +438,7 @@ public void testScrollDelay() throws Exception {
439438
AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
440439
setupClient(new TestThreadPool(getTestName()) {
441440
@Override
442-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
441+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
443442
capturedDelay.set(delay);
444443
capturedCommand.set(command);
445444
return null;
@@ -615,7 +614,7 @@ public void testCancelWhileDelayedAfterScrollResponse() throws Exception {
615614
*/
616615
setupClient(new TestThreadPool(getTestName()) {
617616
@Override
618-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
617+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
619618
/*
620619
* This is called twice:
621620
* 1. To schedule the throttling. When that happens we immediately cancel the task.
@@ -626,7 +625,7 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
626625
if (delay.nanos() > 0) {
627626
generic().execute(() -> taskManager.cancel(testTask, reason, () -> {}));
628627
}
629-
return super.schedule(delay, name, command);
628+
return super.schedule(command, delay, name);
630629
}
631630
});
632631

modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import java.nio.charset.StandardCharsets;
7070
import java.util.concurrent.ExecutorService;
7171
import java.util.concurrent.Future;
72-
import java.util.concurrent.ScheduledFuture;
7372
import java.util.concurrent.atomic.AtomicBoolean;
7473
import java.util.function.Consumer;
7574

@@ -104,7 +103,7 @@ public ExecutorService executor(String name) {
104103
}
105104

106105
@Override
107-
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
106+
public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) {
108107
command.run();
109108
return null;
110109
}

0 commit comments

Comments
 (0)