Skip to content

Commit 834a5f4

Browse files
authored
[ML] Fix submit after shutdown in process worker service (#83646)
Backport of #83645
1 parent c39a08f commit 834a5f4

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectWorkerExecutorService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public synchronized void execute(Runnable command) {
7575
EsRejectedExecutionException rejected = new EsRejectedExecutionException("autodetect worker service has shutdown", true);
7676
if (command instanceof AbstractRunnable) {
7777
((AbstractRunnable) command).onRejection(rejected);
78+
return;
7879
} else {
7980
throw rejected;
8081
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectWorkerExecutorServiceTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.atomic.AtomicInteger;
2222

2323
import static org.hamcrest.Matchers.containsString;
24+
import static org.hamcrest.Matchers.isA;
2425

2526
public class AutodetectWorkerExecutorServiceTests extends ESTestCase {
2627

@@ -39,6 +40,33 @@ public void testAutodetectWorkerExecutorService_SubmitAfterShutdown() {
3940
expectThrows(EsRejectedExecutionException.class, () -> executor.execute(() -> {}));
4041
}
4142

43+
public void testAutodetectWorkerExecutorService_SubmitAfterShutdownWithAbstractRunnable() {
44+
AutodetectWorkerExecutorService executor = new AutodetectWorkerExecutorService(new ThreadContext(Settings.EMPTY));
45+
46+
threadPool.generic().execute(() -> executor.start());
47+
executor.shutdown();
48+
AtomicBoolean rejected = new AtomicBoolean(false);
49+
executor.execute(new AbstractRunnable() {
50+
@Override
51+
public void onRejection(Exception e) {
52+
assertThat(e, isA(EsRejectedExecutionException.class));
53+
rejected.set(true);
54+
}
55+
56+
@Override
57+
public void onFailure(Exception e) {
58+
fail("onFailure should not be called after the worker is shutdown");
59+
}
60+
61+
@Override
62+
protected void doRun() throws Exception {
63+
fail("doRun should not be called after the worker is shutdown");
64+
}
65+
});
66+
67+
assertTrue(rejected.get());
68+
}
69+
4270
public void testAutodetectWorkerExecutorService_TasksNotExecutedCallHandlerOnShutdown() throws Exception {
4371
AutodetectWorkerExecutorService executor = new AutodetectWorkerExecutorService(new ThreadContext(Settings.EMPTY));
4472

0 commit comments

Comments
 (0)