|
24 | 24 | import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
25 | 25 | import org.elasticsearch.common.settings.Settings;
|
26 | 26 | import org.elasticsearch.common.unit.TimeValue;
|
| 27 | +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; |
27 | 28 | import org.elasticsearch.common.util.concurrent.ThreadContext;
|
28 | 29 | import org.elasticsearch.common.xcontent.XContentType;
|
29 | 30 | import org.elasticsearch.index.Index;
|
@@ -91,8 +92,17 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
|
91 | 92 | leaderClient = wrapClient(client, params);
|
92 | 93 | }
|
93 | 94 | Client followerClient = wrapClient(client, params);
|
94 |
| - BiConsumer<TimeValue, Runnable> scheduler = |
95 |
| - (delay, command) -> threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); |
| 95 | + BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> { |
| 96 | + try { |
| 97 | + threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); |
| 98 | + } catch (EsRejectedExecutionException e) { |
| 99 | + if (e.isExecutorShutdown()) { |
| 100 | + logger.debug("couldn't schedule command, executor is shutting down", e); |
| 101 | + } else { |
| 102 | + throw e; |
| 103 | + } |
| 104 | + } |
| 105 | + }; |
96 | 106 | return new ShardFollowNodeTask(
|
97 | 107 | id, type, action, getDescription(taskInProgress), parentTaskId, headers, params, scheduler, System::nanoTime) {
|
98 | 108 |
|
|
0 commit comments