Skip to content

Commit 855801a

Browse files
committed
Propagate Errors in executors to uncaught exception handler (#36137)
This is a continuation of #28667 and has as goal to convert all executors to propagate errors to the uncaught exception handler. Notable missing ones were the direct executor and the scheduler. This commit also makes it the property of the executor, not the runnable, to ensure this property. A big part of this commit also consists of vastly improving the test coverage in this area.
1 parent 69d2c6c commit 855801a

File tree

19 files changed

+519
-115
lines changed

19 files changed

+519
-115
lines changed

buildSrc/src/main/resources/forbidden/es-all-signatures.txt

+13
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,16 @@ java.nio.channels.SocketChannel#connect(java.net.SocketAddress)
5050
java.lang.Boolean#getBoolean(java.lang.String)
5151

5252
org.apache.lucene.util.IOUtils @ use @org.elasticsearch.core.internal.io instead
53+
54+
@defaultMessage use executors from org.elasticsearch.common.util.concurrent.EsExecutors instead which will properly bubble up Errors
55+
java.util.concurrent.AbstractExecutorService#<init>()
56+
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue)
57+
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.ThreadFactory)
58+
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.RejectedExecutionHandler)
59+
java.util.concurrent.ThreadPoolExecutor#<init>(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)
60+
61+
@defaultMessage extend org.elasticsearch.threadpool.Scheduler.SafeScheduledThreadPoolExecutor instead which will properly bubble up Errors
62+
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int)
63+
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory)
64+
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.RejectedExecutionHandler)
65+
java.util.concurrent.ScheduledThreadPoolExecutor#<init>(int, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)

qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java

+286-23
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,21 @@
1919

2020
package org.elasticsearch.threadpool;
2121

22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.unit.TimeValue;
24+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
25+
import org.elasticsearch.common.util.concurrent.EsExecutors;
26+
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
27+
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
2228
import org.elasticsearch.test.ESTestCase;
2329
import org.junit.After;
2430
import org.junit.Before;
2531

2632
import java.util.Optional;
2733
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.ScheduledThreadPoolExecutor;
36+
import java.util.concurrent.TimeUnit;
2837
import java.util.concurrent.atomic.AtomicReference;
2938
import java.util.function.Consumer;
3039

@@ -46,26 +55,275 @@ public void tearDownThreadPool() throws InterruptedException {
4655
terminate(threadPool);
4756
}
4857

49-
public void testExecutionException() throws InterruptedException {
50-
runExecutionExceptionTest(
51-
() -> {
58+
public void testExecutionErrorOnDefaultThreadPoolTypes() throws InterruptedException {
59+
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
60+
checkExecutionError(getExecuteRunner(threadPool.executor(executor)));
61+
checkExecutionError(getSubmitRunner(threadPool.executor(executor)));
62+
checkExecutionError(getScheduleRunner(executor));
63+
}
64+
}
65+
66+
public void testExecutionErrorOnDirectExecutorService() throws InterruptedException {
67+
final ExecutorService directExecutorService = EsExecutors.newDirectExecutorService();
68+
checkExecutionError(getExecuteRunner(directExecutorService));
69+
checkExecutionError(getSubmitRunner(directExecutorService));
70+
}
71+
72+
public void testExecutionErrorOnFixedESThreadPoolExecutor() throws InterruptedException {
73+
final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1,
74+
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
75+
try {
76+
checkExecutionError(getExecuteRunner(fixedExecutor));
77+
checkExecutionError(getSubmitRunner(fixedExecutor));
78+
} finally {
79+
ThreadPool.terminate(fixedExecutor, 10, TimeUnit.SECONDS);
80+
}
81+
}
82+
83+
public void testExecutionErrorOnScalingESThreadPoolExecutor() throws InterruptedException {
84+
final EsThreadPoolExecutor scalingExecutor = EsExecutors.newScaling("test", 1, 1,
85+
10, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
86+
try {
87+
checkExecutionError(getExecuteRunner(scalingExecutor));
88+
checkExecutionError(getSubmitRunner(scalingExecutor));
89+
} finally {
90+
ThreadPool.terminate(scalingExecutor, 10, TimeUnit.SECONDS);
91+
}
92+
}
93+
94+
public void testExecutionErrorOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException {
95+
final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1,
96+
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
97+
try {
98+
checkExecutionError(getExecuteRunner(autoQueueFixedExecutor));
99+
checkExecutionError(getSubmitRunner(autoQueueFixedExecutor));
100+
} finally {
101+
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
102+
}
103+
}
104+
105+
public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
106+
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
107+
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
108+
try {
109+
checkExecutionError(getExecuteRunner(prioritizedExecutor));
110+
checkExecutionError(getSubmitRunner(prioritizedExecutor));
111+
checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r));
112+
} finally {
113+
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
114+
}
115+
}
116+
117+
public void testExecutionErrorOnScheduler() throws InterruptedException {
118+
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
119+
try {
120+
checkExecutionError(getExecuteRunner(scheduler));
121+
checkExecutionError(getSubmitRunner(scheduler));
122+
checkExecutionError(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS));
123+
} finally {
124+
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
125+
}
126+
}
127+
128+
private void checkExecutionError(Consumer<Runnable> runner) throws InterruptedException {
129+
logger.info("checking error for {}", runner);
130+
final Runnable runnable;
131+
if (randomBoolean()) {
132+
runnable = () -> {
133+
throw new Error("future error");
134+
};
135+
} else {
136+
runnable = new AbstractRunnable() {
137+
@Override
138+
public void onFailure(Exception e) {
139+
140+
}
141+
142+
@Override
143+
protected void doRun() {
52144
throw new Error("future error");
53-
},
54-
true,
55-
o -> {
56-
assertTrue(o.isPresent());
57-
assertThat(o.get(), instanceOf(Error.class));
58-
assertThat(o.get(), hasToString(containsString("future error")));
59-
});
60-
runExecutionExceptionTest(
61-
() -> {
145+
}
146+
};
147+
}
148+
runExecutionTest(
149+
runner,
150+
runnable,
151+
true,
152+
o -> {
153+
assertTrue(o.isPresent());
154+
assertThat(o.get(), instanceOf(Error.class));
155+
assertThat(o.get(), hasToString(containsString("future error")));
156+
});
157+
}
158+
159+
public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedException {
160+
for (String executor : ThreadPool.THREAD_POOL_TYPES.keySet()) {
161+
final boolean expectExceptionOnExecute =
162+
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
163+
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
164+
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE;
165+
checkExecutionException(getExecuteRunner(threadPool.executor(executor)), expectExceptionOnExecute);
166+
167+
// here, it's ok for the exception not to bubble up. Accessing the future will yield the exception
168+
checkExecutionException(getSubmitRunner(threadPool.executor(executor)), false);
169+
170+
final boolean expectExceptionOnSchedule =
171+
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
172+
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
173+
ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE
174+
// scheduler just swallows the exception here
175+
// TODO: bubble these exceptions up
176+
&& ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.DIRECT;
177+
checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule);
178+
}
179+
}
180+
181+
public void testExecutionExceptionOnDirectExecutorService() throws InterruptedException {
182+
final ExecutorService directExecutorService = EsExecutors.newDirectExecutorService();
183+
checkExecutionException(getExecuteRunner(directExecutorService), true);
184+
checkExecutionException(getSubmitRunner(directExecutorService), false);
185+
}
186+
187+
public void testExecutionExceptionOnFixedESThreadPoolExecutor() throws InterruptedException {
188+
final EsThreadPoolExecutor fixedExecutor = EsExecutors.newFixed("test", 1, 1,
189+
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
190+
try {
191+
checkExecutionException(getExecuteRunner(fixedExecutor), true);
192+
checkExecutionException(getSubmitRunner(fixedExecutor), false);
193+
} finally {
194+
ThreadPool.terminate(fixedExecutor, 10, TimeUnit.SECONDS);
195+
}
196+
}
197+
198+
public void testExecutionExceptionOnScalingESThreadPoolExecutor() throws InterruptedException {
199+
final EsThreadPoolExecutor scalingExecutor = EsExecutors.newScaling("test", 1, 1,
200+
10, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
201+
try {
202+
checkExecutionException(getExecuteRunner(scalingExecutor), true);
203+
checkExecutionException(getSubmitRunner(scalingExecutor), false);
204+
} finally {
205+
ThreadPool.terminate(scalingExecutor, 10, TimeUnit.SECONDS);
206+
}
207+
}
208+
209+
public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws InterruptedException {
210+
final EsThreadPoolExecutor autoQueueFixedExecutor = EsExecutors.newAutoQueueFixed("test", 1, 1,
211+
1, 1, 1, TimeValue.timeValueSeconds(10), EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext());
212+
try {
213+
// fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable
214+
// TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener
215+
checkExecutionException(getExecuteRunner(autoQueueFixedExecutor), false);
216+
checkExecutionException(getSubmitRunner(autoQueueFixedExecutor), false);
217+
} finally {
218+
ThreadPool.terminate(autoQueueFixedExecutor, 10, TimeUnit.SECONDS);
219+
}
220+
}
221+
222+
public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException {
223+
final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test",
224+
EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler());
225+
try {
226+
checkExecutionException(getExecuteRunner(prioritizedExecutor), true);
227+
checkExecutionException(getSubmitRunner(prioritizedExecutor), false);
228+
checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true);
229+
} finally {
230+
ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS);
231+
}
232+
}
233+
234+
public void testExecutionExceptionOnScheduler() throws InterruptedException {
235+
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);
236+
try {
237+
// scheduler just swallows the exceptions
238+
// TODO: bubble these exceptions up
239+
checkExecutionException(getExecuteRunner(scheduler), false);
240+
checkExecutionException(getSubmitRunner(scheduler), false);
241+
checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), false);
242+
} finally {
243+
Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS);
244+
}
245+
}
246+
247+
private void checkExecutionException(Consumer<Runnable> runner, boolean expectException) throws InterruptedException {
248+
logger.info("checking exception for {}", runner);
249+
final Runnable runnable;
250+
final boolean willThrow;
251+
if (randomBoolean()) {
252+
runnable = () -> {
253+
throw new IllegalStateException("future exception");
254+
};
255+
willThrow = expectException;
256+
} else {
257+
runnable = new AbstractRunnable() {
258+
@Override
259+
public void onFailure(Exception e) {
260+
261+
}
262+
263+
@Override
264+
protected void doRun() {
62265
throw new IllegalStateException("future exception");
63-
},
64-
false,
65-
o -> assertFalse(o.isPresent()));
266+
}
267+
};
268+
willThrow = false;
269+
}
270+
runExecutionTest(
271+
runner,
272+
runnable,
273+
willThrow,
274+
o -> {
275+
assertEquals(willThrow, o.isPresent());
276+
if (willThrow) {
277+
assertThat(o.get(), instanceOf(IllegalStateException.class));
278+
assertThat(o.get(), hasToString(containsString("future exception")));
279+
}
280+
});
281+
}
282+
283+
Consumer<Runnable> getExecuteRunner(ExecutorService executor) {
284+
return new Consumer<Runnable>() {
285+
@Override
286+
public void accept(Runnable runnable) {
287+
executor.execute(runnable);
288+
}
289+
290+
@Override
291+
public String toString() {
292+
return "executor(" + executor + ").execute()";
293+
}
294+
};
295+
}
296+
297+
Consumer<Runnable> getSubmitRunner(ExecutorService executor) {
298+
return new Consumer<Runnable>() {
299+
@Override
300+
public void accept(Runnable runnable) {
301+
executor.submit(runnable);
302+
}
303+
304+
@Override
305+
public String toString() {
306+
return "executor(" + executor + ").submit()";
307+
}
308+
};
309+
}
310+
311+
Consumer<Runnable> getScheduleRunner(String executor) {
312+
return new Consumer<Runnable>() {
313+
@Override
314+
public void accept(Runnable runnable) {
315+
threadPool.schedule(randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor, runnable);
316+
}
317+
318+
@Override
319+
public String toString() {
320+
return "schedule(" + executor + ")";
321+
}
322+
};
66323
}
67324

68-
private void runExecutionExceptionTest(
325+
private void runExecutionTest(
326+
final Consumer<Runnable> runner,
69327
final Runnable runnable,
70328
final boolean expectThrowable,
71329
final Consumer<Optional<Throwable>> consumer) throws InterruptedException {
@@ -82,13 +340,18 @@ private void runExecutionExceptionTest(
82340

83341
final CountDownLatch supplierLatch = new CountDownLatch(1);
84342

85-
threadPool.generic().submit(() -> {
86-
try {
87-
runnable.run();
88-
} finally {
89-
supplierLatch.countDown();
90-
}
91-
});
343+
try {
344+
runner.accept(() -> {
345+
try {
346+
runnable.run();
347+
} finally {
348+
supplierLatch.countDown();
349+
}
350+
});
351+
} catch (Throwable t) {
352+
consumer.accept(Optional.of(t));
353+
return;
354+
}
92355

93356
supplierLatch.await();
94357

server/src/main/java/org/elasticsearch/ExceptionsHelper.java

+7
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,13 @@ public static Optional<Error> maybeError(final Throwable cause, final Logger log
243243
return Optional.empty();
244244
}
245245

246+
/**
247+
* See {@link #maybeError(Throwable, Logger)}. Uses the class-local logger.
248+
*/
249+
public static Optional<Error> maybeError(final Throwable cause) {
250+
return maybeError(cause, logger);
251+
}
252+
246253
/**
247254
* If the specified cause is an unrecoverable error, this method will rethrow the cause on a separate thread so that it can not be
248255
* caught and bubbles up to the uncaught exception handler. Note that the cause tree is examined for any {@link Error}. See

0 commit comments

Comments
 (0)