Skip to content

Commit afce6b9

Browse files
authored
Tidy ContinuousComputation rejection handling (#91442)
Today we implicitly assume the executor used by the `ContinuousComputation` has an unbounded queue. This commit makes that explicit. Relates #91386
1 parent c7bfdf8 commit afce6b9

File tree

3 files changed

+10
-7
lines changed

3 files changed

+10
-7
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ContinuousComputation.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
14+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
15+
import org.elasticsearch.threadpool.ThreadPool;
1416

1517
import java.util.Objects;
1618
import java.util.concurrent.ExecutorService;
@@ -30,10 +32,10 @@ public abstract class ContinuousComputation<T> {
3032
private final Processor processor = new Processor();
3133

3234
/**
33-
* @param executorService the background executor service to use to run the computations. No more than one task is executed at once.
35+
* @param threadPool Each computation runs on a {@code GENERIC} thread from this thread pool. At most one task executes at once.
3436
*/
35-
public ContinuousComputation(ExecutorService executorService) {
36-
this.executorService = executorService;
37+
public ContinuousComputation(ThreadPool threadPool) {
38+
this.executorService = threadPool.generic();
3739
}
3840

3941
/**
@@ -77,7 +79,8 @@ public void onFailure(Exception e) {
7779

7880
@Override
7981
public void onRejection(Exception e) {
80-
// shutting down, just give up
82+
// The executor has an unbounded queue so we must be shutting down to get here.
83+
assert e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown() : e;
8184
logger.debug("rejected", e);
8285
}
8386

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public DesiredBalanceShardsAllocator(
9797
this.clusterService = clusterService;
9898
this.reconciler = reconciler;
9999
this.desiredBalanceComputer = desiredBalanceComputer;
100-
this.desiredBalanceComputation = new ContinuousComputation<>(threadPool.generic()) {
100+
this.desiredBalanceComputation = new ContinuousComputation<>(threadPool) {
101101

102102
@Override
103103
protected void processInput(DesiredBalanceInput desiredBalanceInput) {

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ContinuousComputationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public static void terminateThreadPool() {
4646
public void testConcurrency() throws Exception {
4747

4848
final var result = new AtomicReference<Integer>();
49-
final var computation = new ContinuousComputation<Integer>(threadPool.generic()) {
49+
final var computation = new ContinuousComputation<Integer>(threadPool) {
5050

5151
public final Semaphore executePermit = new Semaphore(1);
5252

@@ -104,7 +104,7 @@ public void testSkipsObsoleteValues() throws Exception {
104104
final var finalInput = new Object();
105105

106106
final var result = new AtomicReference<Object>();
107-
final var computation = new ContinuousComputation<Object>(threadPool.generic()) {
107+
final var computation = new ContinuousComputation<Object>(threadPool) {
108108
@Override
109109
protected void processInput(Object input) {
110110
assertNotEquals(input, skippedInput);

0 commit comments

Comments
 (0)