diff --git a/buildSrc/src/main/resources/forbidden/es-all-signatures.txt b/buildSrc/src/main/resources/forbidden/es-all-signatures.txt index 2ea46376ae3bf..f8fb59fd8253f 100644 --- a/buildSrc/src/main/resources/forbidden/es-all-signatures.txt +++ b/buildSrc/src/main/resources/forbidden/es-all-signatures.txt @@ -63,3 +63,6 @@ java.util.concurrent.ScheduledThreadPoolExecutor#(int) java.util.concurrent.ScheduledThreadPoolExecutor#(int, java.util.concurrent.ThreadFactory) java.util.concurrent.ScheduledThreadPoolExecutor#(int, java.util.concurrent.RejectedExecutionHandler) java.util.concurrent.ScheduledThreadPoolExecutor#(int, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler) + +@defaultMessage use Scheduler.schedule(Runnable, delay, executor) instead (mocking tests typically rely on that signature). +org.elasticsearch.threadpool.Scheduler#schedule(org.elasticsearch.common.unit.TimeValue, java.lang.String, java.lang.Runnable) diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java index 9264cdde30c75..d729a4d9c3aa7 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java @@ -215,7 +215,7 @@ public void onFailure(Exception e) { logger.trace( (Supplier) () -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e); countSearchRetry.run(); - threadPool.schedule(delay, ThreadPool.Names.SAME, RetryHelper.this); + threadPool.schedule(RetryHelper.this, delay, ThreadPool.Names.SAME); return; } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 3d2199e516773..8e19f012a6e4a 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -90,7 +90,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -320,7 +319,7 @@ public void testThreadPoolRejectionsAbortRequest() throws Exception { worker.rethrottle(1); setupClient(new TestThreadPool(getTestName()) { @Override - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { // While we're here we can check that the sleep made it through assertThat(delay.nanos(), greaterThan(0L)); assertThat(delay.seconds(), lessThanOrEqualTo(10L)); @@ -439,7 +438,7 @@ public void testScrollDelay() throws Exception { AtomicReference capturedCommand = new AtomicReference<>(); setupClient(new TestThreadPool(getTestName()) { @Override - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { capturedDelay.set(delay); capturedCommand.set(command); return null; @@ -615,7 +614,7 @@ public void testCancelWhileDelayedAfterScrollResponse() throws Exception { */ setupClient(new TestThreadPool(getTestName()) { @Override - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { /* * This is called twice: * 1. To schedule the throttling. When that happens we immediately cancel the task. @@ -626,7 +625,7 @@ public ScheduledFuture schedule(TimeValue delay, String name, Runnable comman if (delay.nanos() > 0) { generic().execute(() -> taskManager.cancel(testTask, reason, () -> {})); } - return super.schedule(delay, name, command); + return super.schedule(command, delay, name); } }); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java index f67a5b627fb4c..7b19ced23603c 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java @@ -69,7 +69,6 @@ import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -104,7 +103,7 @@ public ExecutorService executor(String name) { } @Override - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { command.run(); return null; } diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java index 5a0a99ceb2ae6..86f2ba332790f 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java @@ -19,6 +19,11 @@ package org.elasticsearch.threadpool; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -26,6 +31,7 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; import org.junit.After; import org.junit.Before; @@ -38,6 +44,7 @@ import java.util.function.Consumer; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; @@ -108,7 +115,12 @@ public void testExecutionErrorOnSinglePrioritizingThreadPoolExecutor() throws In try { checkExecutionError(getExecuteRunner(prioritizedExecutor)); checkExecutionError(getSubmitRunner(prioritizedExecutor)); + // bias towards timeout + checkExecutionError(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r)); + // race whether timeout or success (but typically biased towards success) checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r)); + // bias towards no timeout. + checkExecutionError(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r)); } finally { ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS); } @@ -170,10 +182,7 @@ public void testExecutionExceptionOnDefaultThreadPoolTypes() throws InterruptedE final boolean expectExceptionOnSchedule = // fixed_auto_queue_size wraps stuff into TimedRunnable, which is an AbstractRunnable // TODO: this is dangerous as it will silently swallow exceptions, and possibly miss calling a response listener - ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE - // scheduler just swallows the exception here - // TODO: bubble these exceptions up - && ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.DIRECT; + ThreadPool.THREAD_POOL_TYPES.get(executor) != ThreadPool.ThreadPoolType.FIXED_AUTO_QUEUE_SIZE; checkExecutionException(getScheduleRunner(executor), expectExceptionOnSchedule); } } @@ -219,14 +228,19 @@ public void testExecutionExceptionOnAutoQueueFixedESThreadPoolExecutor() throws } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37708") public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throws InterruptedException { final PrioritizedEsThreadPoolExecutor prioritizedExecutor = EsExecutors.newSinglePrioritizing("test", EsExecutors.daemonThreadFactory("test"), threadPool.getThreadContext(), threadPool.scheduler()); try { checkExecutionException(getExecuteRunner(prioritizedExecutor), true); checkExecutionException(getSubmitRunner(prioritizedExecutor), false); + + // bias towards timeout + checkExecutionException(r -> prioritizedExecutor.execute(delayMillis(r, 10), TimeValue.ZERO, r), true); + // race whether timeout or success (but typically biased towards success) checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.ZERO, r), true); + // bias towards no timeout. + checkExecutionException(r -> prioritizedExecutor.execute(r, TimeValue.timeValueMillis(10), r), true); } finally { ThreadPool.terminate(prioritizedExecutor, 10, TimeUnit.SECONDS); } @@ -235,26 +249,39 @@ public void testExecutionExceptionOnSinglePrioritizingThreadPoolExecutor() throw public void testExecutionExceptionOnScheduler() throws InterruptedException { final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); try { - // scheduler just swallows the exceptions - // TODO: bubble these exceptions up - checkExecutionException(getExecuteRunner(scheduler), false); - checkExecutionException(getSubmitRunner(scheduler), false); - checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), false); + checkExecutionException(getExecuteRunner(scheduler), true); + // while submit does return a Future, we choose to log exceptions anyway, + // since this is the semi-internal SafeScheduledThreadPoolExecutor that is being used, + // which also logs exceptions for schedule calls. + checkExecutionException(getSubmitRunner(scheduler), true); + checkExecutionException(r -> scheduler.schedule(r, randomFrom(0, 1), TimeUnit.MILLISECONDS), true); } finally { Scheduler.terminate(scheduler, 10, TimeUnit.SECONDS); } } + private Runnable delayMillis(Runnable r, int ms) { + return () -> { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + r.run(); + }; + } + private void checkExecutionException(Consumer runner, boolean expectException) throws InterruptedException { - logger.info("checking exception for {}", runner); final Runnable runnable; final boolean willThrow; if (randomBoolean()) { + logger.info("checking direct exception for {}", runner); runnable = () -> { throw new IllegalStateException("future exception"); }; willThrow = expectException; } else { + logger.info("checking abstract runnable exception for {}", runner); runnable = new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -275,6 +302,7 @@ protected void doRun() { o -> { assertEquals(willThrow, o.isPresent()); if (willThrow) { + if (o.get() instanceof Error) throw (Error) o.get(); assertThat(o.get(), instanceOf(IllegalStateException.class)); assertThat(o.get(), hasToString(containsString("future exception"))); } @@ -313,7 +341,7 @@ Consumer getScheduleRunner(String executor) { return new Consumer() { @Override public void accept(Runnable runnable) { - threadPool.schedule(randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor, runnable); + threadPool.schedule(runnable, randomFrom(TimeValue.ZERO, TimeValue.timeValueMillis(1)), executor); } @Override @@ -324,10 +352,10 @@ public String toString() { } private void runExecutionTest( - final Consumer runner, - final Runnable runnable, - final boolean expectThrowable, - final Consumer> consumer) throws InterruptedException { + final Consumer runner, + final Runnable runnable, + final boolean expectThrowable, + final Consumer> consumer) throws InterruptedException { final AtomicReference throwableReference = new AtomicReference<>(); final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler(); final CountDownLatch uncaughtExceptionHandlerLatch = new CountDownLatch(1); @@ -335,31 +363,66 @@ private void runExecutionTest( try { Thread.setDefaultUncaughtExceptionHandler((t, e) -> { assertTrue(expectThrowable); - throwableReference.set(e); + assertTrue("Only one message allowed", throwableReference.compareAndSet(null, e)); uncaughtExceptionHandlerLatch.countDown(); }); final CountDownLatch supplierLatch = new CountDownLatch(1); - try { - runner.accept(() -> { - try { - runnable.run(); - } finally { - supplierLatch.countDown(); + Runnable job = () -> { + try { + runnable.run(); + } finally { + supplierLatch.countDown(); + } + }; + + // snoop on logging to also handle the cases where exceptions are simply logged in Scheduler. + final Logger schedulerLogger = LogManager.getLogger(Scheduler.SafeScheduledThreadPoolExecutor.class); + final MockLogAppender appender = new MockLogAppender(); + appender.addExpectation( + new MockLogAppender.LoggingExpectation() { + @Override + public void match(LogEvent event) { + if (event.getLevel() == Level.WARN) { + assertThat("no other warnings than those expected", + event.getMessage().getFormattedMessage(), + equalTo("uncaught exception in scheduled thread [" + Thread.currentThread().getName() + "]")); + assertTrue(expectThrowable); + assertNotNull(event.getThrown()); + assertTrue("only one message allowed", throwableReference.compareAndSet(null, event.getThrown())); + uncaughtExceptionHandlerLatch.countDown(); + } + } + + @Override + public void assertMatched() { } }); - } catch (Throwable t) { - consumer.accept(Optional.of(t)); - return; - } - supplierLatch.await(); + appender.start(); + Loggers.addAppender(schedulerLogger, appender); + try { + try { + runner.accept(job); + } catch (Throwable t) { + consumer.accept(Optional.of(t)); + return; + } + + supplierLatch.await(); - if (expectThrowable) { - uncaughtExceptionHandlerLatch.await(); + if (expectThrowable) { + uncaughtExceptionHandlerLatch.await(); + } + } finally { + Loggers.removeAppender(schedulerLogger, appender); + appender.stop(); } + consumer.accept(Optional.ofNullable(throwableReference.get())); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); } finally { Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 904003623b7e8..68775b5af5cfc 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -203,10 +203,15 @@ public static Builder builder(BiConsumer scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS), + buildScheduler(scheduledThreadPoolExecutor), () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); } + private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor) { + return (command, delay, executor) -> + Scheduler.wrapAsScheduledCancellable(scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); + } + private final int bulkActions; private final long bulkSize; @@ -345,7 +350,9 @@ private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler if (flushInterval == null) { return new Scheduler.Cancellable() { @Override - public void cancel() {} + public boolean cancel() { + return false; + } @Override public boolean isCancelled() { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java index a7163a9084566..f745a1bded599 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/Retry.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/Retry.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -32,7 +31,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.concurrent.ScheduledFuture; import java.util.function.BiConsumer; import java.util.function.Predicate; @@ -121,7 +119,7 @@ static class RetryHandler implements ActionListener { // needed to construct the next bulk request based on the response to the previous one // volatile as we're called from a scheduled thread private volatile BulkRequest currentBulkRequest; - private volatile ScheduledFuture scheduledRequestFuture; + private volatile Scheduler.Cancellable retryCancellable; RetryHandler(BackoffPolicy backoffPolicy, BiConsumer> consumer, ActionListener listener, Scheduler scheduler) { @@ -155,7 +153,9 @@ public void onFailure(Exception e) { try { listener.onFailure(e); } finally { - FutureUtils.cancel(scheduledRequestFuture); + if (retryCancellable != null) { + retryCancellable.cancel(); + } } } @@ -164,7 +164,7 @@ private void retry(BulkRequest bulkRequestForRetry) { TimeValue next = backoff.next(); logger.trace("Retry of bulk request scheduled in {} ms.", next.millis()); Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry)); - scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command); + retryCancellable = scheduler.schedule(command, next, ThreadPool.Names.SAME); } private BulkRequest createBulkRequestForRetry(BulkResponse bulkItemResponses) { @@ -198,7 +198,9 @@ private void finishHim() { try { listener.onResponse(getAccumulatedResponse()); } finally { - FutureUtils.cancel(scheduledRequestFuture); + if (retryCancellable != null) { + retryCancellable.cancel(); + } } } diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 7387b03ee822d..d442069b57f93 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -42,7 +42,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; @@ -68,7 +68,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; final class TransportClientNodesService implements Closeable { @@ -100,7 +99,7 @@ final class TransportClientNodesService implements Closeable { private final NodeSampler nodesSampler; - private volatile ScheduledFuture nodesSamplerFuture; + private volatile Scheduler.Cancellable nodesSamplerCancellable; private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt()); @@ -146,7 +145,7 @@ final class TransportClientNodesService implements Closeable { this.nodesSampler = new SimpleNodeSampler(); } this.hostFailureListener = hostFailureListener; - this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler()); + this.nodesSamplerCancellable = threadPool.schedule(new ScheduledNodeSampler(), nodesSamplerInterval, ThreadPool.Names.GENERIC); } public List transportAddresses() { @@ -325,7 +324,9 @@ public void close() { return; } closed = true; - FutureUtils.cancel(nodesSamplerFuture); + if (nodesSamplerCancellable != null) { + nodesSamplerCancellable.cancel(); + } for (DiscoveryNode node : nodes) { transportService.disconnectFromNode(node); } @@ -392,7 +393,7 @@ public void run() { try { nodesSampler.sample(); if (!closed) { - nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, this); + nodesSamplerCancellable = threadPool.schedule(this, nodesSamplerInterval, ThreadPool.Names.GENERIC); } } catch (Exception e) { logger.warn("failed to sample", e); diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index e8261ca9f09cf..f06d69057ccea 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -132,7 +132,7 @@ public void onMaster() { } try { // Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running - threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob()); + threadPool.schedule(new SubmitReschedulingClusterInfoUpdatedJob(), updateFrequency, executorName()); if (clusterService.state().getNodes().getDataNodes().size() > 1) { // Submit an info update job to be run immediately threadPool.executor(executorName()).execute(() -> maybeRefresh()); @@ -224,7 +224,7 @@ public void run() { logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString()); } try { - threadPool.schedule(updateFrequency, executorName(), this); + threadPool.schedule(this, updateFrequency, executorName()); } catch (EsRejectedExecutionException ex) { logger.debug("Reschedule cluster info service was rejected", ex); } diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index 90526aaa9fd21..d6c2824fdbb10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -31,10 +31,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.discovery.zen.MasterFaultDetection; import org.elasticsearch.discovery.zen.NodesFaultDetection; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -42,7 +42,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledFuture; import static org.elasticsearch.common.settings.Setting.Property; import static org.elasticsearch.common.settings.Setting.positiveTimeSetting; @@ -71,7 +70,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { private final TimeValue reconnectInterval; - private volatile ScheduledFuture backgroundFuture = null; + private volatile Scheduler.Cancellable backgroundCancellable = null; @Inject public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) { @@ -187,19 +186,21 @@ protected void doRun() { @Override public void onAfter() { if (lifecycle.started()) { - backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, this); + backgroundCancellable = threadPool.schedule(this, reconnectInterval, ThreadPool.Names.GENERIC); } } } @Override protected void doStart() { - backgroundFuture = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ConnectionChecker()); + backgroundCancellable = threadPool.schedule(new ConnectionChecker(), reconnectInterval, ThreadPool.Names.GENERIC); } @Override protected void doStop() { - FutureUtils.cancel(backgroundFuture); + if (backgroundCancellable != null) { + backgroundCancellable.cancel(); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java index 4589cc0ea6967..82d459f7f3cce 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java @@ -32,10 +32,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -69,7 +68,7 @@ public class DelayedAllocationService extends AbstractLifecycleComponent impleme class DelayedRerouteTask extends ClusterStateUpdateTask { final TimeValue nextDelay; // delay until submitting the reroute command final long baseTimestampNanos; // timestamp (in nanos) upon which delay was calculated - volatile ScheduledFuture future; + volatile Scheduler.Cancellable cancellable; final AtomicBoolean cancelScheduling = new AtomicBoolean(); DelayedRerouteTask(TimeValue nextDelay, long baseTimestampNanos) { @@ -83,12 +82,14 @@ public long scheduledTimeToRunInNanos() { public void cancelScheduling() { cancelScheduling.set(true); - FutureUtils.cancel(future); + if (cancellable != null) { + cancellable.cancel(); + } removeIfSameTask(this); } public void schedule() { - future = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() { + cancellable = threadPool.schedule(new AbstractRunnable() { @Override protected void doRun() throws Exception { if (cancelScheduling.get()) { @@ -102,7 +103,7 @@ public void onFailure(Exception e) { logger.warn("failed to submit schedule/execute reroute post unassigned shard", e); removeIfSameTask(DelayedRerouteTask.this); } - }); + }, nextDelay, ThreadPool.Names.SAME); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 3ccbe5ebb7495..002f843eb6016 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -42,9 +42,9 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; @@ -55,7 +55,6 @@ import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -280,7 +279,7 @@ public void addTimeoutListener(@Nullable final TimeValue timeout, final TimeoutC public void run() { if (timeout != null) { NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); - notifyTimeout.future = threadPool.schedule(timeout, ThreadPool.Names.GENERIC, notifyTimeout); + notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, ThreadPool.Names.GENERIC); onGoingTimeouts.add(notifyTimeout); } timeoutClusterStateListeners.add(listener); @@ -541,7 +540,7 @@ protected void warnAboutSlowTaskIfNeeded(TimeValue executionTime, String source) class NotifyTimeout implements Runnable { final TimeoutClusterStateListener listener; final TimeValue timeout; - volatile ScheduledFuture future; + volatile Scheduler.Cancellable cancellable; NotifyTimeout(TimeoutClusterStateListener listener, TimeValue timeout) { this.listener = listener; @@ -549,12 +548,14 @@ class NotifyTimeout implements Runnable { } public void cancel() { - FutureUtils.cancel(future); + if (cancellable != null) { + cancellable.cancel(); + } } @Override public void run() { - if (future != null && future.isCancelled()) { + if (cancellable != null && cancellable.isCancelled()) { return; } if (lifecycle.stoppedOrClosed()) { diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index b81d32e9d81e5..34fcf8a714940 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -45,10 +45,10 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; @@ -57,7 +57,6 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -563,7 +562,7 @@ private static class AckCountDownListener implements Discovery.AckListener { private final DiscoveryNode masterNode; private final ThreadPool threadPool; private final long clusterStateVersion; - private volatile Future ackTimeoutCallback; + private volatile Scheduler.Cancellable ackTimeoutCallback; private Exception lastFailure; AckCountDownListener(AckedClusterStateTaskListener ackedTaskListener, long clusterStateVersion, DiscoveryNodes nodes, @@ -595,10 +594,10 @@ public void onCommit(TimeValue commitTime) { } else if (countDown.countDown()) { finish(); } else { - this.ackTimeoutCallback = threadPool.schedule(timeLeft, ThreadPool.Names.GENERIC, this::onTimeout); + this.ackTimeoutCallback = threadPool.schedule(this::onTimeout, timeLeft, ThreadPool.Names.GENERIC); // re-check if onNodeAck has not completed while we were scheduling the timeout if (countDown.isCountedDown()) { - FutureUtils.cancel(ackTimeoutCallback); + ackTimeoutCallback.cancel(); } } } @@ -623,7 +622,9 @@ public void onNodeAck(DiscoveryNode node, @Nullable Exception e) { private void finish() { logger.trace("all expected nodes acknowledged cluster_state update (version: {})", clusterStateVersion); - FutureUtils.cancel(ackTimeoutCallback); + if (ackTimeoutCallback != null) { + ackTimeoutCallback.cancel(); + } ackedTaskListener.onAllNodesAcked(lastFailure); } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java index e06e1a41907db..3c1716cda1522 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -21,11 +21,11 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.util.Objects; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -37,7 +37,7 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable { private final ThreadPool threadPool; private final AtomicBoolean closed = new AtomicBoolean(false); private final boolean autoReschedule; - private volatile ScheduledFuture scheduledFuture; + private volatile Scheduler.Cancellable cancellable; private volatile boolean isScheduledOrRunning; private volatile Exception lastThrownException; private volatile TimeValue interval; @@ -56,7 +56,7 @@ protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue inte */ public synchronized void setInterval(TimeValue interval) { this.interval = interval; - if (scheduledFuture != null) { + if (cancellable != null) { rescheduleIfNecessary(); } } @@ -84,18 +84,18 @@ public synchronized void rescheduleIfNecessary() { if (isClosed()) { return; } - if (scheduledFuture != null) { - FutureUtils.cancel(scheduledFuture); + if (cancellable != null) { + cancellable.cancel(); } if (interval.millis() > 0 && mustReschedule()) { if (logger.isTraceEnabled()) { logger.trace("scheduling {} every {}", toString(), interval); } - scheduledFuture = threadPool.schedule(interval, getThreadPool(), this); + cancellable = threadPool.schedule(this, interval, getThreadPool()); isScheduledOrRunning = true; } else { logger.trace("scheduled {} disabled", toString()); - scheduledFuture = null; + cancellable = null; isScheduledOrRunning = false; } } @@ -110,8 +110,10 @@ public boolean isScheduled() { * Cancel any scheduled run, but do not prevent subsequent restarts. */ public synchronized void cancel() { - FutureUtils.cancel(scheduledFuture); - scheduledFuture = null; + if (cancellable != null) { + cancellable.cancel(); + cancellable = null; + } isScheduledOrRunning = false; } @@ -132,7 +134,7 @@ public boolean isClosed() { @Override public final void run() { synchronized (this) { - scheduledFuture = null; + cancellable = null; isScheduledOrRunning = autoReschedule; } try { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index cb358a0596d25..29cd7f6682a64 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -118,8 +118,11 @@ public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int * Checks if the runnable arose from asynchronous submission of a task to an executor. If an uncaught exception was thrown * during the execution of this task, we need to inspect this runnable and see if it is an error that should be propagated * to the uncaught exception handler. + * + * @param runnable the runnable to inspect, should be a RunnableFuture + * @return non fatal exception or null if no exception. */ - public static void rethrowErrors(Runnable runnable) { + public static Throwable rethrowErrors(Runnable runnable) { if (runnable instanceof RunnableFuture) { try { ((RunnableFuture) runnable).get(); @@ -143,8 +146,13 @@ public static void rethrowErrors(Runnable runnable) { // restore the interrupt status Thread.currentThread().interrupt(); } + if (e instanceof ExecutionException) { + return e.getCause(); + } } } + + return null; } private static final class DirectExecutorService extends AbstractExecutorService { diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java index 0dd0f10069bc4..a36168f2f79ed 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java @@ -127,7 +127,7 @@ private void innerStart(final DiscoveryNode masterNode) { this.masterPinger = new MasterPinger(); // we start pinging slightly later to allow the chosen master to complete it's own master election - threadPool.schedule(pingInterval, ThreadPool.Names.SAME, masterPinger); + threadPool.schedule(masterPinger, pingInterval, ThreadPool.Names.SAME); } public void stop(String reason) { @@ -173,7 +173,7 @@ protected void handleTransportDisconnect(DiscoveryNode node) { } this.masterPinger = new MasterPinger(); // we use schedule with a 0 time value to run the pinger on the pool as it will run on later - threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, masterPinger); + threadPool.schedule(masterPinger, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME); } catch (Exception e) { logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode); notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)"); @@ -217,7 +217,7 @@ public void run() { final DiscoveryNode masterToPing = masterNode; if (masterToPing == null) { // master is null, should not happen, but we are still running, so reschedule - threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); + threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME); return; } @@ -242,7 +242,7 @@ public void handleResponse(MasterPingResponseResponse response) { // check if the master node did not get switched on us..., if it did, we simply return with no reschedule if (masterToPing.equals(MasterFaultDetection.this.masterNode())) { // we don't stop on disconnection from master, we keep pinging it - threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); + threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java index 9dc9cc78179d3..11190094d661e 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodesFaultDetection.java @@ -131,7 +131,7 @@ public void updateNodesAndPing(ClusterState clusterState) { // it's OK to overwrite an existing nodeFD - it will just stop and the new one will pick things up. nodesFD.put(node, fd); // we use schedule with a 0 time value to run the pinger on the pool as it will run on later - threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd); + threadPool.schedule(fd, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME); } } } @@ -160,7 +160,7 @@ protected void handleTransportDisconnect(DiscoveryNode node) { transportService.connectToNode(node); nodesFD.put(node, fd); // we use schedule with a 0 time value to run the pinger on the pool as it will run on later - threadPool.schedule(TimeValue.timeValueMillis(0), ThreadPool.Names.SAME, fd); + threadPool.schedule(fd, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME); } catch (Exception e) { logger.trace("[node ] [{}] transport disconnected (with verified connect)", node); // clean up if needed, just to be safe.. @@ -239,7 +239,7 @@ public void handleResponse(PingResponse response) { return; } retryCount = 0; - threadPool.schedule(pingInterval, ThreadPool.Names.SAME, NodeFD.this); + threadPool.schedule(NodeFD.this, pingInterval, ThreadPool.Names.SAME); } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java index 459666e0c8443..153937deda7a1 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.java @@ -303,9 +303,9 @@ protected void doRun() throws Exception { } }; threadPool.generic().execute(pingSender); - threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender); - threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender); - threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() { + threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC); + threadPool.schedule(pingSender, TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC); + threadPool.schedule(new AbstractRunnable() { @Override protected void doRun() throws Exception { finishPingingRound(pingingRound); @@ -315,7 +315,7 @@ protected void doRun() throws Exception { public void onFailure(Exception e) { logger.warn("unexpected error while finishing pinging round", e); } - }); + }, scheduleDuration, ThreadPool.Names.GENERIC); } // for testing @@ -556,8 +556,8 @@ private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) temporalResponses.add(request.pingResponse); // add to any ongoing pinging activePingingRounds.values().forEach(p -> p.addPingResponseToCollection(request.pingResponse)); - threadPool.schedule(TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME, - () -> temporalResponses.remove(request.pingResponse)); + threadPool.schedule(() -> temporalResponses.remove(request.pingResponse), + TimeValue.timeValueMillis(request.timeout.millis() * 2), ThreadPool.Names.SAME); List pingResponses = CollectionUtils.iterableAsArrayList(temporalResponses); pingResponses.add(createPingResponse(contextProvider.clusterState())); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index e94751e6f0cd9..f4bb12d2e621a 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -204,12 +204,12 @@ private void performStateRecovery(boolean enforceRecoverAfterTime, String reason if (enforceRecoverAfterTime && recoverAfterTime != null) { if (scheduledRecovery.compareAndSet(false, true)) { logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason); - threadPool.schedule(recoverAfterTime, ThreadPool.Names.GENERIC, () -> { + threadPool.schedule(() -> { if (recovered.compareAndSet(false, true)) { logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime); gateway.performStateRecovery(recoveryListener); } - }); + }, recoverAfterTime, ThreadPool.Names.GENERIC); } } else { if (recovered.compareAndSet(false, true)) { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java index 67e0f5400b389..a62dd939b4d4d 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java @@ -156,7 +156,7 @@ public void onFailure(Exception e) { TimeValue delay = retries.next(); logger.trace(() -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e); countSearchRetry.run(); - threadPool.schedule(delay, ThreadPool.Names.SAME, retryWithContext); + threadPool.schedule(retryWithContext, delay, ThreadPool.Names.SAME); } else { logger.warn(() -> new ParameterizedMessage( "giving up on search because we retried [{}] times without success", retryCount), e); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java b/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java index 17bf59a104a80..ae2a6a552cba4 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java @@ -24,11 +24,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -173,10 +172,10 @@ TimeValue throttledUntil() { if (delayed == null) { return timeValueNanos(0); } - if (delayed.future == null) { + if (delayed.scheduled == null) { return timeValueNanos(0); } - return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS))); + return timeValueNanos(max(0, delayed.scheduled.getDelay(TimeUnit.NANOSECONDS))); } /** @@ -249,16 +248,16 @@ class DelayedPrepareBulkRequest { private final ThreadPool threadPool; private final Runnable command; private final float requestsPerSecond; - private final ScheduledFuture future; + private final Scheduler.ScheduledCancellable scheduled; DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, Runnable command) { this.threadPool = threadPool; this.requestsPerSecond = requestsPerSecond; this.command = command; - this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, () -> { + this.scheduled = threadPool.schedule(() -> { throttledNanos.addAndGet(delay.nanos()); command.run(); - }); + }, delay, ThreadPool.Names.GENERIC); } DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) { @@ -272,9 +271,9 @@ DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) { return this; } - long remainingDelay = future.getDelay(TimeUnit.NANOSECONDS); + long remainingDelay = scheduled.getDelay(TimeUnit.NANOSECONDS); // Actually reschedule the task - if (false == FutureUtils.cancel(future)) { + if (scheduled == null || false == scheduled.cancel()) { // Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here. logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", task.getId()); return this; diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 5019c90135f1c..05151c92ad1d6 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -212,7 +212,7 @@ public class IndicesService extends AbstractLifecycleComponent @Override protected void doStart() { // Start thread that will manage cleaning the field data cache periodically - threadPool.schedule(this.cleanInterval, ThreadPool.Names.SAME, this.cacheCleaner); + threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME); } public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv, NamedXContentRegistry xContentRegistry, @@ -1178,7 +1178,7 @@ public void run() { } // Reschedule itself to run again if not closed if (closed.get() == false) { - threadPool.schedule(interval, ThreadPool.Names.SAME, this); + threadPool.schedule(this, interval, ThreadPool.Names.SAME); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index e55a452e6e9bf..0b7238bc2e25d 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -163,7 +163,7 @@ protected void retryRecovery(final long recoveryId, final String reason, TimeVal private void retryRecovery(final long recoveryId, final TimeValue retryAfter, final TimeValue activityTimeout) { RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout); if (newTarget != null) { - threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId())); + threadPool.schedule(new RecoveryRunner(newTarget.recoveryId()), retryAfter, ThreadPool.Names.GENERIC); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index 6b442750c1898..d08d7b6d0bd98 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -76,8 +76,8 @@ private void startRecoveryInternal(RecoveryTarget recoveryTarget, TimeValue acti assert existingTarget == null : "found two RecoveryStatus instances with the same id"; logger.trace("{} started recovery from {}, id [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode(), recoveryTarget.recoveryId()); - threadPool.schedule(activityTimeout, ThreadPool.Names.GENERIC, - new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout)); + threadPool.schedule(new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout), + activityTimeout, ThreadPool.Names.GENERIC); } /** @@ -289,7 +289,7 @@ protected void doRun() throws Exception { } lastSeenAccessTime = accessTime; logger.trace("[monitor] rescheduling check for [{}]. last access time is [{}]", recoveryId, lastSeenAccessTime); - threadPool.schedule(checkInterval, ThreadPool.Names.GENERIC, this); + threadPool.schedule(this, checkInterval, ThreadPool.Names.GENERIC); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 1031308d7a70a..9e7ae26b6b76b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -90,7 +90,7 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool, new Processor.Parameters( env, scriptService, analysisRegistry, threadPool.getThreadContext(), threadPool::relativeTimeInMillis, - (delay, command) -> threadPool.schedule( + (delay, command) -> threadPool.scheduleDeprecated( TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command ), this ) diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java index b05e87db91943..77a873316a53d 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResultsService.java @@ -190,7 +190,7 @@ public void onFailure(Exception e) { } else { TimeValue wait = backoff.next(); logger.warn(() -> new ParameterizedMessage("failed to store task result, retrying in [{}]", wait), e); - threadPool.schedule(wait, ThreadPool.Names.SAME, () -> doStoreResult(backoff, index, listener)); + threadPool.schedule(() -> doStoreResult(backoff, index, listener), wait, ThreadPool.Names.SAME); } } }); diff --git a/server/src/main/java/org/elasticsearch/threadpool/CancellableAdapter.java b/server/src/main/java/org/elasticsearch/threadpool/CancellableAdapter.java new file mode 100644 index 0000000000000..9b5f658c6243e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/threadpool/CancellableAdapter.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.util.concurrent.FutureUtils; + +import java.util.concurrent.Future; + +class CancellableAdapter implements Scheduler.Cancellable { + private Future future; + + CancellableAdapter(Future future) { + assert future != null; + this.future = future; + } + + @Override + public boolean cancel() { + return FutureUtils.cancel(future); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } +} diff --git a/server/src/main/java/org/elasticsearch/threadpool/ScheduledCancellableAdapter.java b/server/src/main/java/org/elasticsearch/threadpool/ScheduledCancellableAdapter.java new file mode 100644 index 0000000000000..25c958ed04fb4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/threadpool/ScheduledCancellableAdapter.java @@ -0,0 +1,100 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.util.concurrent.FutureUtils; + +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +class ScheduledCancellableAdapter implements Scheduler.ScheduledCancellable { + private final ScheduledFuture scheduledFuture; + + ScheduledCancellableAdapter(ScheduledFuture scheduledFuture) { + assert scheduledFuture != null; + this.scheduledFuture = scheduledFuture; + } + + @Override + public long getDelay(TimeUnit unit) { + return scheduledFuture.getDelay(unit); + } + + @Override + public int compareTo(Delayed other) { + // unwrap other by calling on it. + return -other.compareTo(scheduledFuture); + } + + @Override + public boolean cancel() { + return FutureUtils.cancel(scheduledFuture); + } + + @Override + public boolean isCancelled() { + return scheduledFuture.isCancelled(); + } + + static ScheduledFuture toScheduledFuture(Scheduler.ScheduledCancellable cancellable) { + if (cancellable instanceof ScheduledCancellableAdapter) + return ((ScheduledCancellableAdapter) cancellable).scheduledFuture; + else + return new ScheduledFuture() { + @Override + public long getDelay(TimeUnit unit) { + return cancellable.getDelay(unit); + } + + @Override + public int compareTo(Delayed o) { + return -o.compareTo(cancellable); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + assert mayInterruptIfRunning == false; + return cancellable.cancel(); + } + + @Override + public boolean isCancelled() { + return cancellable.isCancelled(); + } + + @Override + public boolean isDone() { + throw new UnsupportedOperationException(); + } + + @Override + public Object get() throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + @Override + public Object get(long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + }; + } +} diff --git a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java index 1b7c74ed6eec4..34a6efe82038d 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java +++ b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java @@ -19,6 +19,9 @@ package org.elasticsearch.threadpool; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -27,6 +30,8 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import java.util.concurrent.Delayed; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -39,6 +44,14 @@ */ public interface Scheduler { + /** + * Create a scheduler that can be used client side. Server side, please use ThreadPool.schedule instead. + * + * Notice that if any scheduled jobs fail with an exception, they will be logged as a warning. This includes jobs started + * using execute, submit and schedule. + * @param settings the settings to use + * @return executor + */ static ScheduledThreadPoolExecutor initScheduler(Settings settings) { final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); @@ -78,6 +91,24 @@ default Runnable preserveContext(Runnable command) { return command; } + /** + * Schedules a one-shot command to be run after a given delay. The command is not run in the context of the calling thread. + * To preserve the context of the calling thread you may call {@link #preserveContext(Runnable)} on the runnable before passing + * it to this method. + * The command runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow + * to execute on a different executor, in which case blocking calls are allowed. + * + * @param command the command to run + * @param delay delay before the task executes + * @param executor the name of the executor that has to execute this task. Ignored in the default implementation but can be used + * by subclasses that support multiple executors. + * @return a ScheduledFuture who's get will return when the task has been added to its target thread pool and throws an exception if + * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool + * the ScheduledFuture cannot interact with it. + * @throws EsRejectedExecutionException if the task cannot be scheduled for execution + */ + ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor); + /** * Schedules a one-shot command to be run after a given delay. The command is not run in the context of the calling thread. * To preserve the context of the calling thread you may call {@link #preserveContext(Runnable)} on the runnable before passing @@ -93,8 +124,12 @@ default Runnable preserveContext(Runnable command) { * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool * the ScheduledFuture cannot interact with it. * @throws EsRejectedExecutionException if the task cannot be scheduled for execution + * @deprecated use {@link #schedule(Runnable, TimeValue, String)} instead */ - ScheduledFuture schedule(TimeValue delay, String executor, Runnable command); + @Deprecated + default ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { + return ScheduledCancellableAdapter.toScheduledFuture(schedule(command, delay, executor)); + } /** * Schedules a periodic action that runs on scheduler thread. Do not run blocking calls on the scheduler thread. Subclasses may allow @@ -111,6 +146,25 @@ default Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, return new ReschedulingRunnable(command, interval, executor, this, (e) -> {}, (e) -> {}); } + /** + * Utility method to wrap a Future as a Cancellable + * @param future the future to wrap + * @return a cancellable delegating to the future + */ + static Cancellable wrapAsCancellable(Future future) { + return new CancellableAdapter(future); + } + + /** + * Utility method to wrap a ScheduledFuture as a ScheduledCancellable + * @param scheduledFuture the scheduled future to wrap + * @return a SchedulecCancellable delegating to the scheduledFuture + */ + static ScheduledCancellable wrapAsScheduledCancellable(ScheduledFuture scheduledFuture) { + return new ScheduledCancellableAdapter(scheduledFuture); + } + + /** * This interface represents an object whose execution may be cancelled during runtime. */ @@ -119,7 +173,7 @@ interface Cancellable { /** * Cancel the execution of this object. This method is idempotent. */ - void cancel(); + boolean cancel(); /** * Check if the execution has been cancelled @@ -128,6 +182,11 @@ interface Cancellable { boolean isCancelled(); } + /** + * A scheduled cancellable allow cancelling and reading the remaining delay of a scheduled task. + */ + interface ScheduledCancellable extends Delayed, Cancellable { } + /** * This class encapsulates the scheduling of a {@link Runnable} that needs to be repeated on a interval. For example, checking a value * for cleanup every second could be done by passing in a Runnable that can perform the check and the specified interval between @@ -165,12 +224,14 @@ final class ReschedulingRunnable extends AbstractRunnable implements Cancellable this.scheduler = scheduler; this.rejectionConsumer = rejectionConsumer; this.failureConsumer = failureConsumer; - scheduler.schedule(interval, executor, this); + scheduler.schedule(this, interval, executor); } @Override - public void cancel() { + public boolean cancel() { + final boolean result = run; run = false; + return result; } @Override @@ -202,7 +263,7 @@ public void onAfter() { // if this has not been cancelled reschedule it to run again if (run) { try { - scheduler.schedule(interval, executor, this); + scheduler.schedule(this, interval, executor); } catch (final EsRejectedExecutionException e) { onRejection(e); } @@ -211,9 +272,10 @@ public void onAfter() { } /** - * This subclass ensures to properly bubble up Throwable instances of type Error. + * This subclass ensures to properly bubble up Throwable instances of type Error and logs exceptions thrown in submitted/scheduled tasks */ class SafeScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + private static final Logger logger = LogManager.getLogger(SafeScheduledThreadPoolExecutor.class); @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") public SafeScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { @@ -232,7 +294,12 @@ public SafeScheduledThreadPoolExecutor(int corePoolSize) { @Override protected void afterExecute(Runnable r, Throwable t) { - EsExecutors.rethrowErrors(r); + Throwable exception = EsExecutors.rethrowErrors(r); + if (exception != null) { + logger.warn(() -> + new ParameterizedMessage("uncaught exception in scheduled thread [{}]", Thread.currentThread().getName()), + exception); + } } } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 00ff4327892cf..d22f0166ab921 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -335,21 +335,32 @@ public ExecutorService executor(String name) { * context of the calling thread you may call threadPool.getThreadContext().preserveContext on the runnable before passing * it to this method. * + * @param command the command to run * @param delay delay before the task executes * @param executor the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes * the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the * command completes. - * @param command the command to run * @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if * the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool * the ScheduledFuture will cannot interact with it. * @throws org.elasticsearch.common.util.concurrent.EsRejectedExecutionException if the task cannot be scheduled for execution */ - public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { + @Override + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) { if (!Names.SAME.equals(executor)) { command = new ThreadedRunnable(command, executor(executor)); } - return scheduler.schedule(new ThreadPool.LoggingRunnable(command), delay.millis(), TimeUnit.MILLISECONDS); + return new ScheduledCancellableAdapter(scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); + } + + + /** + * Only in 6.7, to be used in the one case where the old style schedule is necessary to stay API backwards compatible. + * @deprecated + */ + @Deprecated + public ScheduledFuture scheduleDeprecated(TimeValue delay, String executor, Runnable command) { + return ScheduledCancellableAdapter.toScheduledFuture(schedule(command, delay, executor)); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index ad7b1a61b8d6f..07cdb2a18cc2e 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -326,7 +326,7 @@ private List initiateConnection(DiscoveryNode node, ConnectionProfil } TimeValue connectTimeout = connectionProfile.getConnectTimeout(); - threadPool.schedule(connectTimeout, ThreadPool.Names.GENERIC, channelsConnectedListener::onTimeout); + threadPool.schedule(channelsConnectedListener::onTimeout, connectTimeout, ThreadPool.Names.GENERIC); return channels; } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java index 3497b29d6d0d7..8db953e4e9598 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java @@ -73,8 +73,10 @@ void sendHandshake(long requestId, DiscoveryNode node, TcpChannel channel, TimeV final Version minCompatVersion = version.minimumCompatibilityVersion(); handshakeRequestSender.sendRequest(node, channel, requestId, minCompatVersion); - threadPool.schedule(timeout, ThreadPool.Names.GENERIC, - () -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]"))); + threadPool.schedule( + () -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]")), + timeout, + ThreadPool.Names.GENERIC); success = true; } catch (Exception e) { handler.handleLocalException(new ConnectTransportException(node, "failure to send " + HANDSHAKE_ACTION_NAME, e)); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java index b8d06e7e1174e..404699be2c4e0 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java @@ -158,7 +158,7 @@ private ScheduledPing(TimeValue pingInterval) { void ensureStarted() { if (isStarted.get() == false && isStarted.compareAndSet(false, true)) { - threadPool.schedule(pingInterval, ThreadPool.Names.GENERIC, this); + threadPool.schedule(this, pingInterval, ThreadPool.Names.GENERIC); } } @@ -186,7 +186,7 @@ protected void doRunInLifecycle() { @Override protected void onAfterInLifecycle() { try { - threadPool.schedule(pingInterval, ThreadPool.Names.GENERIC, this); + threadPool.schedule(this, pingInterval, ThreadPool.Names.GENERIC); } catch (EsRejectedExecutionException ex) { if (ex.isExecutorShutdown()) { logger.debug("couldn't schedule new ping execution, executor is shutting down", ex); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 676575b8ecd49..c6f37338401b2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -42,14 +42,15 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -65,7 +66,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -621,7 +621,7 @@ private void sendRequestInternal(final Transport.C } if (timeoutHandler != null) { assert options.timeout() != null; - timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler); + timeoutHandler.scheduleTimeout(options.timeout()); } connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream } catch (final Exception e) { @@ -988,7 +988,7 @@ final class TimeoutHandler implements Runnable { private final long sentTime = threadPool.relativeTimeInMillis(); private final String action; private final DiscoveryNode node; - volatile ScheduledFuture future; + volatile Scheduler.Cancellable cancellable; TimeoutHandler(long requestId, DiscoveryNode node, String action) { this.requestId = requestId; @@ -1023,13 +1023,19 @@ public void run() { public void cancel() { assert responseHandlers.contains(requestId) == false : "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers"; - FutureUtils.cancel(future); + if (cancellable != null) { + cancellable.cancel(); + } } @Override public String toString() { return "timeout handler for [" + requestId + "][" + action + "]"; } + + private void scheduleTimeout(TimeValue timeout) { + this.cancellable = threadPool.schedule(this, timeout, ThreadPool.Names.GENERIC); + } } static class TimeoutInfoHolder { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index 6a7d9bc02ec3e..e2527397a780a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -96,7 +96,7 @@ public void testAwaitOnCloseCallsOnClose() throws Exception { BiConsumer> consumer = (request, listener) -> {}; BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(), 0, 10, new ByteSizeValue(1000), null, - (delay, executor, command) -> null, () -> called.set(true), BulkRequest::new); + (command, delay, executor) -> null, () -> called.set(true), BulkRequest::new); assertFalse(called.get()); bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS); diff --git a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java index db624798bb71c..a76fced9772f5 100644 --- a/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java +++ b/server/src/test/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskStateTests.java @@ -33,10 +33,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; @@ -149,9 +146,9 @@ public void testDelayAndRethrottle() throws IOException, InterruptedException { int batchSizeForMaxDelay = (int) (maxDelay.seconds() * originalRequestsPerSecond); ThreadPool threadPool = new TestThreadPool(getTestName()) { @Override - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { assertThat(delay.nanos(), both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(maxDelay.nanos()))); - return super.schedule(delay, name, command); + return super.schedule(command, delay, name); } }; try { @@ -202,8 +199,8 @@ public void onFailure(Exception e) { public void testDelayNeverNegative() throws IOException { // Thread pool that returns a ScheduledFuture that claims to have a negative delay ThreadPool threadPool = new TestThreadPool("test") { - public ScheduledFuture schedule(TimeValue delay, String name, Runnable command) { - return new ScheduledFuture() { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String name) { + return new ScheduledCancellable() { @Override public long getDelay(TimeUnit unit) { return -1; @@ -215,7 +212,7 @@ public int compareTo(Delayed o) { } @Override - public boolean cancel(boolean mayInterruptIfRunning) { + public boolean cancel() { throw new UnsupportedOperationException(); } @@ -223,21 +220,6 @@ public boolean cancel(boolean mayInterruptIfRunning) { public boolean isCancelled() { throw new UnsupportedOperationException(); } - - @Override - public boolean isDone() { - throw new UnsupportedOperationException(); - } - - @Override - public Void get() throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - @Override - public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - throw new UnsupportedOperationException(); - } }; } }; diff --git a/server/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java b/server/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java index b412aa5755d4f..5b60c82feb14f 100644 --- a/server/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/monitor/jvm/JvmGcMonitorServiceSettingsTests.java @@ -175,7 +175,8 @@ interface TriFunction { private static class MockCancellable implements Cancellable { @Override - public void cancel() { + public boolean cancel() { + return true; } @Override diff --git a/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java b/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java index f13a5f4e3bddb..785552124ea26 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java @@ -33,7 +33,6 @@ import org.junit.Before; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -83,7 +82,7 @@ public void testDoesNotRescheduleUntilExecutionFinished() throws Exception { ReschedulingRunnable reschedulingRunnable = new ReschedulingRunnable(runnable, delay, Names.GENERIC, threadPool, (e) -> {}, (e) -> {}); // this call was made during construction of the runnable - verify(threadPool, times(1)).schedule(delay, Names.GENERIC, reschedulingRunnable); + verify(threadPool, times(1)).schedule(reschedulingRunnable, delay, Names.GENERIC); // create a thread and start the runnable Thread runThread = new Thread() { @@ -103,7 +102,7 @@ public void run() { runThread.join(); // validate schedule was called again - verify(threadPool, times(2)).schedule(delay, Names.GENERIC, reschedulingRunnable); + verify(threadPool, times(2)).schedule(reschedulingRunnable, delay, Names.GENERIC); } public void testThatRunnableIsRescheduled() throws Exception { @@ -251,7 +250,7 @@ public void testOnRejectionCausesCancellation() throws Exception { terminate(threadPool); threadPool = new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "fixed delay tests").build()) { @Override - public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) { if (command instanceof ReschedulingRunnable) { ((ReschedulingRunnable) command).onRejection(new EsRejectedExecutionException()); } else { diff --git a/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java b/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java new file mode 100644 index 0000000000000..ef7371693c3e1 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java @@ -0,0 +1,193 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +public class SchedulerTests extends ESTestCase { + + public void testCancelOnThreadPool() { + ThreadPool threadPool = new TestThreadPool("test"); + AtomicLong executed = new AtomicLong(); + try { + ThreadPool.THREAD_POOL_TYPES.keySet().forEach(type -> + scheduleAndCancel(threadPool, executed, type)); + assertEquals(0, executed.get()); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + + private void scheduleAndCancel(ThreadPool threadPool, AtomicLong executed, String type) { + Scheduler.ScheduledCancellable scheduled = threadPool.schedule(executed::incrementAndGet, TimeValue.timeValueSeconds(20), type); + assertEquals(1, schedulerQueueSize(threadPool)); + assertFalse(scheduled.isCancelled()); + assertTrue(scheduled.cancel()); + assertTrue(scheduled.isCancelled()); + assertEquals("Cancel must auto-remove", 0, schedulerQueueSize(threadPool)); + } + + private int schedulerQueueSize(ThreadPool threadPool) { + return ((Scheduler.SafeScheduledThreadPoolExecutor) threadPool.scheduler()).getQueue().size(); + } + + public void testCancelOnScheduler() { + ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY); + Scheduler scheduler = (command, delay, name) -> + Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); + + AtomicLong executed = new AtomicLong(); + try { + Scheduler.ScheduledCancellable scheduled = + scheduler.schedule(executed::incrementAndGet, TimeValue.timeValueSeconds(20), ThreadPool.Names.SAME); + assertEquals(1, executor.getQueue().size()); + assertFalse(scheduled.isCancelled()); + assertTrue(scheduled.cancel()); + assertTrue(scheduled.isCancelled()); + assertEquals("Cancel must auto-remove", 0, executor.getQueue().size()); + assertEquals(0, executed.get()); + } finally { + Scheduler.terminate(executor, 10, TimeUnit.SECONDS); + } + } + + + public void testDelay() throws InterruptedException { + ThreadPool threadPool = new TestThreadPool("test"); + try { + List jobs = LongStream.range(20,30) + .mapToObj(delay -> threadPool.schedule(() -> {}, + TimeValue.timeValueSeconds(delay), + ThreadPool.Names.SAME)) + .collect(Collectors.toCollection(ArrayList::new)); + + Collections.reverse(jobs); + + List initialDelays = verifyJobDelays(jobs); + Thread.sleep(50); + List laterDelays = verifyJobDelays(jobs); + + assertThat(laterDelays, + Matchers.contains(initialDelays.stream().map(Matchers::lessThan).collect(Collectors.toList()))); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + + private List verifyJobDelays(List jobs) { + List delays = new ArrayList<>(jobs.size()); + Scheduler.ScheduledCancellable previous = null; + for (Scheduler.ScheduledCancellable job : jobs) { + if (previous != null) { + long previousDelay = previous.getDelay(TimeUnit.MILLISECONDS); + long delay = job.getDelay(TimeUnit.MILLISECONDS); + assertThat(delay, Matchers.lessThan(previousDelay)); + assertThat(job, Matchers.lessThan(previous)); + } + assertThat(job.getDelay(TimeUnit.SECONDS), Matchers.greaterThan(1L)); + assertThat(job.getDelay(TimeUnit.SECONDS), Matchers.lessThanOrEqualTo(30L)); + + delays.add(job.getDelay(TimeUnit.NANOSECONDS)); + previous = job; + } + + return delays; + } + + // simple test for successful scheduling, exceptions tested more thoroughly in EvilThreadPoolTests + public void testScheduledOnThreadPool() throws InterruptedException { + ThreadPool threadPool = new TestThreadPool("test"); + CountDownLatch missingExecutions = new CountDownLatch(ThreadPool.THREAD_POOL_TYPES.keySet().size()); + try { + ThreadPool.THREAD_POOL_TYPES.keySet() + .forEach(type -> + threadPool.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), type)); + + assertTrue(missingExecutions.await(30, TimeUnit.SECONDS)); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + + // simple test for successful scheduling, exceptions tested more thoroughly in EvilThreadPoolTests + public void testScheduledOnScheduler() throws InterruptedException { + ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY); + Scheduler scheduler = (command, delay, name) -> + Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); + + CountDownLatch missingExecutions = new CountDownLatch(1); + try { + scheduler.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), ThreadPool.Names.SAME); + assertTrue(missingExecutions.await(30, TimeUnit.SECONDS)); + } finally { + Scheduler.terminate(executor, 10, TimeUnit.SECONDS); + } + } + + @SuppressForbidden(reason = "this tests that the deprecated method still works") + public void testDeprecatedSchedule() throws ExecutionException, InterruptedException { + verifyDeprecatedSchedule(((threadPool, runnable) + -> threadPool.schedule(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable))); + } + + public void testThreadPoolScheduleDeprecated() throws ExecutionException, InterruptedException { + verifyDeprecatedSchedule(((threadPool, runnable) + -> threadPool.scheduleDeprecated(TimeValue.timeValueMillis(randomInt(10)), ThreadPool.Names.SAME, runnable))); + } + + private void verifyDeprecatedSchedule(BiFunction> scheduleFunction) throws InterruptedException, ExecutionException { + ThreadPool threadPool = new TestThreadPool("test"); + CountDownLatch missingExecutions = new CountDownLatch(1); + try { + scheduleFunction.apply(threadPool, missingExecutions::countDown) + .get(); + assertEquals(0, missingExecutions.getCount()); + + ExecutionException exception = expectThrows(ExecutionException.class, + "schedule(...).get() must throw exception from runnable", + () -> scheduleFunction.apply(threadPool, + () -> { throw new IllegalArgumentException("FAIL"); } + ).get()); + + assertEquals(IllegalArgumentException.class, exception.getCause().getClass()); + } finally { + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java b/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java index a56db579cec80..e400292873411 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportKeepAliveTests.java @@ -32,7 +32,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.Deque; -import java.util.concurrent.ScheduledFuture; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -212,7 +211,7 @@ private CapturingThreadPool() { } @Override - public ScheduledFuture schedule(TimeValue delay, String executor, Runnable task) { + public ScheduledCancellable schedule(Runnable task, TimeValue delay, String executor) { scheduledTasks.add(new Tuple<>(delay, task)); return null; } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index eb4e55853d5a1..f14afe3c36534 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -344,7 +344,7 @@ public ExecutorService executor(String name) { } @Override - public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { + public ScheduledCancellable schedule(Runnable command, TimeValue delay, String executor) { final int NOT_STARTED = 0; final int STARTED = 1; final int CANCELLED = 2; @@ -364,7 +364,7 @@ public String toString() { } })); - return new ScheduledFuture() { + return new ScheduledCancellable() { @Override public long getDelay(TimeUnit unit) { throw new UnsupportedOperationException(); @@ -376,8 +376,7 @@ public int compareTo(Delayed o) { } @Override - public boolean cancel(boolean mayInterruptIfRunning) { - assert mayInterruptIfRunning == false; + public boolean cancel() { return taskState.compareAndSet(NOT_STARTED, CANCELLED); } @@ -386,20 +385,6 @@ public boolean isCancelled() { return taskState.get() == CANCELLED; } - @Override - public boolean isDone() { - throw new UnsupportedOperationException(); - } - - @Override - public Object get() { - throw new UnsupportedOperationException(); - } - - @Override - public Object get(long timeout, TimeUnit unit) { - throw new UnsupportedOperationException(); - } }; } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index b187379563519..8ceca92420fb8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -375,7 +375,7 @@ protected void doRun() throws IOException { runnable.run(); } else { requestsToSendWhenCleared.add(runnable); - threadPool.schedule(delay, ThreadPool.Names.GENERIC, runnable); + threadPool.schedule(runnable, delay, ThreadPool.Names.GENERIC); } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 956171ba9b7c3..7e632a3f35782 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -104,7 +104,7 @@ protected AllocatedPersistentTask createTask(long id, String type, String action Client followerClient = wrapClient(client, params.getHeaders()); BiConsumer scheduler = (delay, command) -> { try { - threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, command); + threadPool.schedule(command, delay, Ccr.CCR_THREAD_POOL_NAME); } catch (EsRejectedExecutionException e) { if (e.isExecutorShutdown()) { logger.debug("couldn't schedule command, executor is shutting down", e); @@ -310,7 +310,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) { logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", shardFollowNodeTask), e); - threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state)); + threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME); } else { shardFollowNodeTask.markAsFailed(e); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 4af9e7c23a276..629127c454cef 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -98,7 +98,7 @@ private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testR BiConsumer scheduler = (delay, task) -> { assert delay.millis() < 100 : "The delay should be kept to a minimum, so that this test does not take to long to run"; if (stopped.get() == false) { - threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); + threadPool.schedule(task, delay, ThreadPool.Names.GENERIC); } }; List receivedOperations = Collections.synchronizedList(new ArrayList<>()); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 53214a729b0fd..f751348fbfe93 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -382,7 +382,7 @@ private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ); final String recordedLeaderIndexHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); - BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); + BiConsumer scheduler = (delay, task) -> threadPool.schedule(task, delay, ThreadPool.Names.GENERIC); AtomicBoolean stopped = new AtomicBoolean(false); LongSet fetchOperations = new LongHashSet(); return new ShardFollowNodeTask( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 190933b1e9316..a06763eb0d9ef 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -13,7 +13,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.joda.time.DateTime; @@ -21,7 +21,6 @@ import java.util.Objects; import java.util.Random; -import java.util.concurrent.ScheduledFuture; import java.util.function.Supplier; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -45,7 +44,7 @@ public class MlDailyMaintenanceService implements Releasable { */ private final Supplier schedulerProvider; - private volatile ScheduledFuture future; + private volatile Scheduler.Cancellable cancellable; MlDailyMaintenanceService(ThreadPool threadPool, Client client, Supplier scheduleProvider) { this.threadPool = Objects.requireNonNull(threadPool); @@ -82,13 +81,13 @@ public void start() { public void stop() { LOGGER.debug("Stopping ML daily maintenance service"); - if (future != null && future.isCancelled() == false) { - FutureUtils.cancel(future); + if (cancellable != null && cancellable.isCancelled() == false) { + cancellable.cancel(); } } public boolean isStarted() { - return future != null; + return cancellable != null; } @Override @@ -98,7 +97,7 @@ public void close() { private void scheduleNext() { try { - future = threadPool.schedule(schedulerProvider.get(), ThreadPool.Names.GENERIC, this::triggerTasks); + cancellable = threadPool.schedule(this::triggerTasks, schedulerProvider.get(), ThreadPool.Names.GENERIC); } catch (EsRejectedExecutionException e) { if (e.isExecutorShutdown()) { LOGGER.debug("failed to schedule next maintenance task; shutting down", e); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index ed5c3bbc3bcc3..1d26d78ae3668 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -16,11 +16,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; @@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -153,7 +152,8 @@ public void isolateDatafeed(long allocationId) { // otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel // the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running. private void innerRun(Holder holder, long startTime, Long endTime) { - holder.future = threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() { + holder.cancellable = + Scheduler.wrapAsCancellable(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME).submit(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -204,14 +204,14 @@ protected void doRun() { } } } - }); + })); } void doDatafeedRealtime(long delayInMsSinceEpoch, String jobId, Holder holder) { if (holder.isRunning() && !holder.isIsolated()) { TimeValue delay = computeNextDelay(delayInMsSinceEpoch); logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId); - holder.future = threadPool.schedule(delay, MachineLearning.DATAFEED_THREAD_POOL_NAME, new AbstractRunnable() { + holder.cancellable = threadPool.schedule(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -248,7 +248,7 @@ protected void doRun() { doDatafeedRealtime(nextDelayInMsSinceEpoch, jobId, holder); } } - }); + }, delay, MachineLearning.DATAFEED_THREAD_POOL_NAME); } } @@ -297,7 +297,7 @@ public class Holder { private final boolean autoCloseJob; private final ProblemTracker problemTracker; private final Consumer finishHandler; - volatile Future future; + volatile Scheduler.Cancellable cancellable; private volatile boolean isRelocating; Holder(TransportStartDatafeedAction.DatafeedTask task, String datafeedId, DatafeedJob datafeedJob, @@ -341,7 +341,9 @@ public void stop(String source, TimeValue timeout, Exception e) { logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeedId, datafeedJob.getJobId(), acquired); runningDatafeedsOnThisNode.remove(allocationId); - FutureUtils.cancel(future); + if (cancellable != null) { + cancellable.cancel(); + } auditor.info(datafeedJob.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); finishHandler.accept(e); logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index ab45c7f6017c5..755f5245c05ba 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -18,7 +18,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; @@ -108,7 +108,7 @@ public class AutoDetectResultProcessor { private volatile Date latestDateForEstablishedModelMemoryCalc; private volatile long latestEstablishedModelMemory; private volatile boolean haveNewLatestModelSizeStats; - private Future scheduledEstablishedModelMemoryUpdate; // only accessed in synchronized methods + private Scheduler.Cancellable scheduledEstablishedModelMemoryUpdate; // only accessed in synchronized methods public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, JobResultsProvider jobResultsProvider, @@ -387,8 +387,8 @@ synchronized void scheduleEstablishedModelMemoryUpdate(TimeValue delay) { if (scheduledEstablishedModelMemoryUpdate == null) { try { - scheduledEstablishedModelMemoryUpdate = client.threadPool().schedule(delay, MachineLearning.UTILITY_THREAD_POOL_NAME, - () -> runEstablishedModelMemoryUpdate(false)); + scheduledEstablishedModelMemoryUpdate = client.threadPool().schedule( + () -> runEstablishedModelMemoryUpdate(false), delay, MachineLearning.UTILITY_THREAD_POOL_NAME); LOGGER.trace("[{}] Scheduled established model memory update to run in [{}]", jobId, delay); } catch (EsRejectedExecutionException e) { if (e.isExecutorShutdown()) { @@ -416,7 +416,7 @@ private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting if (scheduledEstablishedModelMemoryUpdate != null) { if (cancelExisting) { LOGGER.debug("[{}] Bringing forward previously scheduled established model memory update", jobId); - FutureUtils.cancel(scheduledEstablishedModelMemoryUpdate); + scheduledEstablishedModelMemoryUpdate.cancel(); } scheduledEstablishedModelMemoryUpdate = null; updateEstablishedModelMemoryOnJob(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 7aa2d93f1201f..d94e9a01fcd36 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -10,11 +10,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledFuture; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.hamcrest.Matchers.is; @@ -46,8 +46,8 @@ public void setUpMocks() { }).when(executorService).execute(any(Runnable.class)); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); - ScheduledFuture scheduledFuture = mock(ScheduledFuture.class); - when(threadPool.schedule(any(), any(), any())).thenReturn(scheduledFuture); + Scheduler.ScheduledCancellable scheduledCancellable = mock(Scheduler.ScheduledCancellable.class); + when(threadPool.schedule(any(Runnable.class), any(), any())).thenReturn(scheduledCancellable); when(clusterService.getClusterName()).thenReturn(CLUSTER_NAME); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index eaa69de2c23b5..1b8eb62488860 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; @@ -48,7 +49,7 @@ import java.util.Collections; import java.util.Date; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -114,7 +115,7 @@ public void setUpTests() { ExecutorService executorService = mock(ExecutorService.class); doAnswer(invocation -> { ((Runnable) invocation.getArguments()[0]).run(); - return null; + return mock(Future.class); }).when(executorService).submit(any(Runnable.class)); when(threadPool.executor(MachineLearning.DATAFEED_THREAD_POOL_NAME)).thenReturn(executorService); when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); @@ -148,7 +149,7 @@ public void testLookbackOnly_WarnsWhenNoDataIsRetrieved() throws Exception { datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - verify(threadPool, never()).schedule(any(), any(), any()); + verify(threadPool, never()).schedule(any(Runnable.class), any(), any()); verify(auditor).warning("job_id", "Datafeed lookback retrieved no data"); } @@ -159,7 +160,7 @@ public void testStart_GivenNewlyCreatedJobLookback() throws Exception { datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - verify(threadPool, never()).schedule(any(), any(), any()); + verify(threadPool, never()).schedule(any(Runnable.class), any(), any()); } public void testStart_extractionProblem() throws Exception { @@ -169,7 +170,7 @@ public void testStart_extractionProblem() throws Exception { datafeedManager.run(task, handler); verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME); - verify(threadPool, never()).schedule(any(), any(), any()); + verify(threadPool, never()).schedule(any(Runnable.class), any(), any()); verify(auditor, times(1)).error(eq("job_id"), anyString()); } @@ -178,12 +179,12 @@ public void testStart_emptyDataCountException() throws Exception { int[] counter = new int[] {0}; doAnswer(invocationOnMock -> { if (counter[0]++ < 10) { - Runnable r = (Runnable) invocationOnMock.getArguments()[2]; + Runnable r = (Runnable) invocationOnMock.getArguments()[0]; currentTime += 600000; r.run(); } - return mock(ScheduledFuture.class); - }).when(threadPool).schedule(any(), any(), any()); + return mock(Scheduler.ScheduledCancellable.class); + }).when(threadPool).schedule(any(Runnable.class), any(), any()); when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.EmptyDataCountException(0L)); when(datafeedJob.runRealtime()).thenThrow(new DatafeedJob.EmptyDataCountException(0L)); @@ -192,7 +193,7 @@ public void testStart_emptyDataCountException() throws Exception { DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null); datafeedManager.run(task, handler); - verify(threadPool, times(11)).schedule(any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); + verify(threadPool, times(11)).schedule(any(), any(), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME)); verify(auditor, times(1)).warning(eq("job_id"), anyString()); } @@ -248,7 +249,7 @@ public void testStart_GivenNewlyCreatedJobLookBackAndRealtime() throws Exception verify(handler).accept(null); assertThat(datafeedManager.isRunning(task.getAllocationId()), is(false)); } else { - verify(threadPool, times(1)).schedule(eq(new TimeValue(1)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME), any()); + verify(threadPool, times(1)).schedule(any(), eq(new TimeValue(1)), eq(MachineLearning.DATAFEED_THREAD_POOL_NAME)); assertThat(datafeedManager.isRunning(task.getAllocationId()), is(true)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index e524cd1feb390..850eba2c5e871 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -537,7 +537,8 @@ public void testKill() throws TimeoutException { } private void setupScheduleDelayTime(TimeValue delay) { - when(threadPool.schedule(any(TimeValue.class), anyString(), any(Runnable.class))) - .thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[2], delay.nanos(), TimeUnit.NANOSECONDS)); + when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), anyString())) + .thenAnswer(i -> Scheduler.wrapAsScheduledCancellable(executor.schedule((Runnable) i.getArguments()[0], + delay.nanos(), TimeUnit.NANOSECONDS))); } } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/cleaner/CleanerService.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/cleaner/CleanerService.java index e6af45ee2bca0..571bfc7c1a964 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/cleaner/CleanerService.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/cleaner/CleanerService.java @@ -13,8 +13,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.monitoring.MonitoringField; import org.joda.time.DateTime; @@ -22,7 +22,6 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ScheduledFuture; /** * {@code CleanerService} takes care of deleting old monitoring indices. @@ -57,7 +56,7 @@ public CleanerService(Settings settings, ClusterSettings clusterSettings, Thread @Override protected void doStart() { logger.debug("starting cleaning service"); - threadPool.schedule(executionScheduler.nextExecutionDelay(new DateTime(ISOChronology.getInstance())), executorName(), runnable); + threadPool.schedule(runnable, executionScheduler.nextExecutionDelay(new DateTime(ISOChronology.getInstance())), executorName()); logger.debug("cleaning service started"); } @@ -153,7 +152,7 @@ public interface Listener { */ class IndicesCleaner extends AbstractLifecycleRunnable { - private volatile ScheduledFuture future; + private volatile Scheduler.Cancellable cancellable; /** * Enable automatic logging and stopping of the runnable based on the {@link #lifecycle}. @@ -197,7 +196,7 @@ protected void onAfterInLifecycle() { logger.debug("scheduling next execution in [{}] seconds", delay.seconds()); try { - future = threadPool.schedule(delay, executorName(), this); + cancellable = threadPool.schedule(this, delay, executorName()); } catch (EsRejectedExecutionException e) { if (e.isExecutorShutdown()) { logger.debug("couldn't schedule new execution of the cleaner, executor is shutting down", e); @@ -215,13 +214,13 @@ public void onFailure(Exception e) { /** * Cancel/stop the cleaning service. *

- * This will kill any scheduled {@link #future} from running. It's possible that this will be executed concurrently with the + * This will kill any scheduled {@link #cancellable} from running. It's possible that this will be executed concurrently with the * {@link #onAfter() rescheduling code}, at which point it will be stopped during the next execution if the service is * stopped. */ public void cancel() { - if (future != null && future.isCancelled() == false) { - FutureUtils.cancel(future); + if (cancellable != null && cancellable.isCancelled() == false) { + cancellable.cancel(); } } } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java index 193b33b7d8fe9..fe2b581d58c16 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java @@ -130,7 +130,7 @@ protected void doAuthenticate(UsernamePasswordToken token, ActionListener userActionList () -> sessionFactory.unauthenticatedSession(username, contextPreservingListener(new LdapSessionActionListener("lookup", username, sessionListener))), logger); threadPool.generic().execute(cancellableLdapRunnable); - threadPool.schedule(executionTimeout, Names.SAME, cancellableLdapRunnable::maybeTimeout); + threadPool.schedule(cancellableLdapRunnable::maybeTimeout, executionTimeout, Names.SAME); } else { userActionListener.onResponse(null); }