-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Propagate Errors in executors to uncaught exception handler #36137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pinging @elastic/es-core-infra |
Oooooooooooooh! Nice. |
ping @jasontedor |
* elastic/master: (539 commits) SQL: documentation improvements and updates (elastic#36918) [DOCS] Merges list of discovery and cluster formation settings (elastic#36909) Only compress responses if request was compressed (elastic#36867) Remove duplicate paragraph (elastic#36942) Fix URI to cluster stats endpoint on specific nodes (elastic#36784) Fix typo in unitTest task (elastic#36930) RecoveryMonitor#lastSeenAccessTime should be volatile (elastic#36781) [CCR] Add `ccr.auto_follow_coordinator.wait_for_timeout` setting (elastic#36714) Scripting: Remove deprecated params.ctx (elastic#36848) Refactor the REST actions to clarify what endpoints are deprecated. (elastic#36869) Add JDK 12 to CI rotation (elastic#36915) Improve error message for 6.x style realm settings (elastic#36876) Send clear session as routable remote request (elastic#36805) [DOCS] Remove redundant ILM attributes (elastic#36808) SQL: Fix bug regarding histograms usage in scripting (elastic#36866) Update index mappings when ccr restore complete (elastic#36879) Docs: Bump version to alpha2 after release Enable IPv6 URIs in reindex from remote (elastic#36874) Watcher: Remove unused local variable in doExecute (elastic#36655) [DOCS] Synchs titles of X-Pack APIs ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great change. Thank you for taking it on. A few comments:
- I am concerned about a new executor being added in the future (like
AutodetectWorkerExecutorService
that would not do the right thing); can we detect this and enforce it? Similar to what we have done withLoggerUsageCheck
andFeatureAwareCheck
? - The wrapping and unwrapping feels a little brittle with all the instance of checking; how about something like this?
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java
index 644b6d067fc..729ad4c52de 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java
@@ -151,8 +151,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
@Override
protected Runnable unwrap(Runnable runnable) {
- if (runnable instanceof TieBreakingPrioritizedRunnable) {
- return super.unwrap(((TieBreakingPrioritizedRunnable) runnable).unwrap());
+ if (runnable instanceof WrappedRunnable) {
+ return ((WrappedRunnable) runnable).unwrap();
} else {
return super.unwrap(runnable);
}
@@ -256,9 +256,10 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
}
- Runnable unwrap() {
+ public Runnable unwrap() {
return runnable;
}
+
}
private static final class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java
index 7ef2e96e2c5..6773e2884f8 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedRunnable.java
@@ -23,7 +23,7 @@ import org.elasticsearch.common.Priority;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
-public abstract class PrioritizedRunnable implements Runnable, Comparable<PrioritizedRunnable> {
+public abstract class PrioritizedRunnable implements WrappedRunnable, Comparable<PrioritizedRunnable> {
private final Priority priority;
private final long creationDate;
@@ -82,5 +82,11 @@ public abstract class PrioritizedRunnable implements Runnable, Comparable<Priori
public void run() {
runnable.run();
}
+
+ @Override
+ public Runnable unwrap() {
+ return runnable;
+ }
+
}
}
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java
index 7fbc9ed91e8..c1d2f1f43f8 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java
@@ -85,8 +85,8 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
@Override
protected Runnable unwrap(Runnable runnable) {
final Runnable unwrapped = super.unwrap(runnable);
- if (unwrapped instanceof TimedRunnable) {
- return ((TimedRunnable) unwrapped).unwrap();
+ if (unwrapped instanceof WrappedRunnable) {
+ return ((WrappedRunnable) unwrapped).unwrap();
} else {
return unwrapped;
}
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
index 83ee7f63cb8..b95dd3c01af 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
@@ -347,11 +347,8 @@ public final class ThreadContext implements Closeable, Writeable {
* Unwraps a command that was previously wrapped by {@link #preserveContext(Runnable)}.
*/
public Runnable unwrap(Runnable command) {
- if (command instanceof ContextPreservingAbstractRunnable) {
- return ((ContextPreservingAbstractRunnable) command).unwrap();
- }
- if (command instanceof ContextPreservingRunnable) {
- return ((ContextPreservingRunnable) command).unwrap();
+ if (command instanceof WrappedRunnable) {
+ return ((WrappedRunnable) command).unwrap();
}
return command;
}
@@ -677,7 +674,7 @@ public final class ThreadContext implements Closeable, Writeable {
/**
* Wraps an AbstractRunnable to preserve the thread context.
*/
- private class ContextPreservingAbstractRunnable extends AbstractRunnable {
+ private class ContextPreservingAbstractRunnable extends AbstractRunnable implements WrappedRunnable {
private final AbstractRunnable in;
private final ThreadContext.StoredContext creatorsContext;
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java
index ea264aa1608..3f3589a7226 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java
@@ -23,7 +23,7 @@ package org.elasticsearch.common.util.concurrent;
* A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation
* through execution as well as only execution time.
*/
-class TimedRunnable extends AbstractRunnable {
+class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
private final Runnable original;
private final long creationTimeNanos;
private long startTimeNanos;
@@ -94,7 +94,8 @@ class TimedRunnable extends AbstractRunnable {
return Math.max(finishTimeNanos - startTimeNanos, 1);
}
- Runnable unwrap() {
+ public Runnable unwrap() {
return original;
}
+
}
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/WrappedRunnable.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/WrappedRunnable.java
index 2bd031ee2b4..bdb4b6b36b6 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/WrappedRunnable.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/WrappedRunnable.java
@@ -1,4 +1,7 @@
package org.elasticsearch.common.util.concurrent;
-public class WrappedRunnable {
+public interface WrappedRunnable extends Runnable {
+
+ Runnable unwrap();
+
}
@elasticmachine run the Gradle build tests 1 |
pinging my reviewers here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thank you for the iteration to prevent new executors from having the same problem.
@elasticmachine please run the Gradle build tests 1 |
1 similar comment
@elasticmachine please run the Gradle build tests 1 |
@elasticmachine please run the Gradle build tests 1 |
1 similar comment
@elasticmachine please run the Gradle build tests 1 |
This is a continuation of #28667 and has as goal to convert all executors to propagate errors to the uncaught exception handler. Notable missing ones were the direct executor and the scheduler. This commit also makes it the property of the executor, not the runnable, to ensure this property. A big part of this commit also consists of vastly improving the test coverage in this area.
…-response-header-performance * elastic/master: Remove Redundant RestoreRequest Class (elastic#37535) Create specific exception for when snapshots are in progress (elastic#37550) Mute UnicastZenPingTests#testSimplePings [DOCS] Adds size limitation to the get datafeeds APIs (elastic#37578) Fix assertion at end of forceRefreshes (elastic#37559) Propagate Errors in executors to uncaught exception handler (elastic#36137) [DOCS] Adds limitation to the get jobs API (elastic#37549) Add set_priority action to ILM (elastic#37397) Make recovery source send operations non-blocking (elastic#37503) Allow field types to optimize phrase prefix queries (elastic#37436) Added fatal_exception field for ccr stats in monitoring mapping. (elastic#37563) Fix testRelocateWhileContinuouslyIndexingAndWaitingForRefresh (elastic#37560) Moved ccr integration to the package with other ccr integration tests. Mute TransportClientNodesServiceTests#testListenerFailures Decreased time out in test Fix erroneous docstrings for abstract bulk by scroll request (elastic#37517) SQL: Rename SQL type DATE to DATETIME (elastic#37395) Remove the AbstracLifecycleComponent constructor with Settings (elastic#37523)
Fixed review comments: removed todo, use FutureUtils.cancel and removed scheduler task decoration since this adds more complexity than it benefits. This is a continuation of elastic#28667, elastic#36137 and also fixes elastic#37708.
Scheduler.schedule(...) would previously assume that caller handles exception by calling get() on the returned ScheduledFuture. schedule() now returns a ScheduledCancellable that no longer gives access to the exception. Instead, any exception thrown out of a scheduled Runnable is logged as a warning. This is a continuation of #28667, #36137 and also fixes #37708.
Scheduler.schedule(...) would previously assume that caller handles exception by calling get() on the returned ScheduledFuture. schedule() now returns a ScheduledCancellable that no longer gives access to the exception. Instead, any exception thrown out of a scheduled Runnable is logged as a warning. In this backport to 6.x, source backwards compatibility is maintained and some of the changes has therefore not been carried out (notably the signature change on Processor.Parameters.scheduler). This is a continuation of elastic#28667, elastic#36137 and also fixes elastic#37708.
Scheduler.schedule(...) would previously assume that caller handles exception by calling get() on the returned ScheduledFuture. schedule() now returns a ScheduledCancellable that no longer gives access to the exception. Instead, any exception thrown out of a scheduled Runnable is logged as a warning. In this backport to 6.x, source backwards compatibility is maintained and some of the changes has therefore not been carried out (notably the signature change on Processor.Parameters.scheduler). This is a continuation of #28667, #36137 and also fixes #37708.
Executors of type fixed_auto_queue_size (i.e. search / search_throttled) wrap runnables into TimedRunnable, which is an AbstractRunnable. This is dangerous as it might silently swallow exceptions, and possibly miss calling a response listener. While this has not triggered any failures in the tests I have run so far, it might help uncover future problems. Follow-up to #36137
Executors of type fixed_auto_queue_size (i.e. search / search_throttled) wrap runnables into TimedRunnable, which is an AbstractRunnable. This is dangerous as it might silently swallow exceptions, and possibly miss calling a response listener. While this has not triggered any failures in the tests I have run so far, it might help uncover future problems. Follow-up to #36137
This PR is a continuation of #28667 and has as goal to convert all executors to propagate errors to the uncaught exception handler. Notable missing ones were the direct executor and the scheduler. This PR also makes it the property of the executor, not the runnable, to ensure this property. A big part of this PR consists of vastly improving the test coverage in this area, which also uncovers other woes marked as TODO:
fixed_auto_queue_size
(i.e. search / search_throttled) wraps runnables intoTimedRunnable
, which is anAbstractRunnable
. This is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener.SAME
executor./cc: @DaveCTurner