diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 0885a8f9d6479..0206bd88245b3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -270,7 +270,7 @@ public void persistJob(BiConsumer handler) { } @Nullable - FlushAcknowledgement waitFlushToCompletion(String flushId) { + FlushAcknowledgement waitFlushToCompletion(String flushId) throws InterruptedException { LOGGER.debug("[{}] waiting for flush", job.getId()); FlushAcknowledgement flushAcknowledgement; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index da5e70112f045..023493c1d4174 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -485,7 +485,7 @@ public void awaitCompletion() throws TimeoutException { * @return The {@link FlushAcknowledgement} if the flush has completed or the parsing finished; {@code null} if the timeout expired */ @Nullable - public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) { + public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) throws InterruptedException { return failed ? null : flushListener.waitForFlush(flushId, timeout); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java index 66b43b5e36fa4..1340556fbcdb1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java @@ -22,18 +22,14 @@ class FlushListener { final AtomicBoolean cleared = new AtomicBoolean(false); @Nullable - FlushAcknowledgement waitForFlush(String flushId, Duration timeout) { + FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException { if (cleared.get()) { return null; } FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId)); - try { - if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { - return holder.flushAcknowledgement; - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + return holder.flushAcknowledgement; } return null; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index ab24aadb9dc3a..fda96ca29a695 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -108,7 +108,7 @@ public void testWriteUpdateProcessMessage() throws IOException { verifyNoMoreInteractions(process); } - public void testFlushJob() throws IOException { + public void testFlushJob() throws IOException, InterruptedException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); @@ -123,7 +123,7 @@ public void testFlushJob() throws IOException { } } - public void testWaitForFlushReturnsIfParserFails() throws IOException { + public void testWaitForFlushReturnsIfParserFails() throws IOException, InterruptedException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class); @@ -144,7 +144,7 @@ public void testFlushJob_throwsIfProcessIsDead() throws IOException { assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage()); } - public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException { + public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException, InterruptedException { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index 8eb0317ba0dbe..af05a05cfb710 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -514,7 +514,7 @@ public void testPersisterThrowingDoesntBlockProcessing() { verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE)); } - public void testParsingErrorSetsFailed() { + public void testParsingErrorSetsFailed() throws InterruptedException { @SuppressWarnings("unchecked") Iterator iterator = mock(Iterator.class); when(iterator.hasNext()).thenThrow(new ElasticsearchParseException("this test throws")); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java index 70ba757f148fe..3bcedb523923e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java @@ -22,8 +22,12 @@ public void testAcknowledgeFlush() throws Exception { FlushListener listener = new FlushListener(); AtomicReference flushAcknowledgementHolder = new AtomicReference<>(); new Thread(() -> { - FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000)); - flushAcknowledgementHolder.set(flushAcknowledgement); + try { + FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000)); + flushAcknowledgementHolder.set(flushAcknowledgement); + } catch (InterruptedException _ex) { + Thread.currentThread().interrupt(); + } }).start(); assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id"))); assertNull(flushAcknowledgementHolder.get()); @@ -46,8 +50,12 @@ public void testClear() throws Exception { AtomicReference flushAcknowledgementHolder = new AtomicReference<>(); flushAcknowledgementHolders.add(flushAcknowledgementHolder); new Thread(() -> { - FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000)); - flushAcknowledgementHolder.set(flushAcknowledgement); + try { + FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000)); + flushAcknowledgementHolder.set(flushAcknowledgement); + } catch (InterruptedException _ex) { + Thread.currentThread().interrupt(); + } }).start(); } assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));