diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 22ebff57a4ba8..55064dd3e38d7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -117,7 +117,13 @@ public Integer getMaxEmptySearches() { } public void finishReportingTimingStats() { - timingStatsReporter.finishReporting(); + try { + timingStatsReporter.finishReporting(); + } catch (Exception e) { + // We don't want the exception to propagate out of this method as it can leave the datafeed in the "stopping" state forever. + // Since persisting datafeed timing stats is not critical, we just log a warning here. + LOGGER.warn("[{}] Datafeed timing stats could not be reported due to: {}", jobId, e); + } } Long runLookBack(long startTime, Long endTime) throws Exception { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 2b7a40abc2e36..7a057f67eb390 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -361,7 +361,7 @@ public void stop(String source, TimeValue timeout, Exception e, boolean autoClos acquired = datafeedJobLock.tryLock(timeout.millis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); - } finally { + } finally { // It is crucial that none of the calls this "finally" block makes throws an exception for minor problems. logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeedId, datafeedJob.getJobId(), acquired); runningDatafeedsOnThisNode.remove(allocationId); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index b28480354d9b6..1abb1291b17a7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -60,6 +61,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -454,6 +456,15 @@ public void testFlushAnalysisProblemIsConflict() { assertThat(analysisProblemException.shouldStop, is(true)); } + public void testFinishReportingTimingStats() { + doThrow(new EsRejectedExecutionException()).when(timingStatsReporter).finishReporting(); + + long frequencyMs = 100; + long queryDelayMs = 1000; + DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, randomBoolean()); + datafeedJob.finishReportingTimingStats(); + } + private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs, long latestRecordTimeMs, boolean haveSeenDataPreviously) { Supplier currentTimeSupplier = () -> currentTime;