Skip to content

Commit 68676e2

Browse files
committed
[ML] Fix thread leak when waiting for job flush (#32196) (#32541)
1 parent fed37da commit 68676e2

File tree

6 files changed

+21
-17
lines changed

6 files changed

+21
-17
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public void persistJob(BiConsumer<Void, Exception> handler) {
269269
}
270270

271271
@Nullable
272-
FlushAcknowledgement waitFlushToCompletion(String flushId) {
272+
FlushAcknowledgement waitFlushToCompletion(String flushId) throws InterruptedException {
273273
LOGGER.debug("[{}] waiting for flush", job.getId());
274274

275275
FlushAcknowledgement flushAcknowledgement;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ public void awaitCompletion() throws TimeoutException {
482482
* @return The {@link FlushAcknowledgement} if the flush has completed or the parsing finished; {@code null} if the timeout expired
483483
*/
484484
@Nullable
485-
public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) {
485+
public FlushAcknowledgement waitForFlushAcknowledgement(String flushId, Duration timeout) throws InterruptedException {
486486
return failed ? null : flushListener.waitForFlush(flushId, timeout);
487487
}
488488

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,14 @@ class FlushListener {
2222
final AtomicBoolean cleared = new AtomicBoolean(false);
2323

2424
@Nullable
25-
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) {
25+
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException {
2626
if (cleared.get()) {
2727
return null;
2828
}
2929

3030
FlushAcknowledgementHolder holder = awaitingFlushed.computeIfAbsent(flushId, (key) -> new FlushAcknowledgementHolder(flushId));
31-
try {
32-
if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
33-
return holder.flushAcknowledgement;
34-
}
35-
} catch (InterruptedException e) {
36-
Thread.currentThread().interrupt();
31+
if (holder.latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
32+
return holder.flushAcknowledgement;
3733
}
3834
return null;
3935
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void testWriteUpdateProcessMessage() throws IOException {
108108
verifyNoMoreInteractions(process);
109109
}
110110

111-
public void testFlushJob() throws IOException {
111+
public void testFlushJob() throws IOException, InterruptedException {
112112
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
113113
when(process.isProcessAlive()).thenReturn(true);
114114
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
@@ -123,7 +123,7 @@ public void testFlushJob() throws IOException {
123123
}
124124
}
125125

126-
public void testWaitForFlushReturnsIfParserFails() throws IOException {
126+
public void testWaitForFlushReturnsIfParserFails() throws IOException, InterruptedException {
127127
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
128128
when(process.isProcessAlive()).thenReturn(true);
129129
AutoDetectResultProcessor processor = mock(AutoDetectResultProcessor.class);
@@ -144,7 +144,7 @@ public void testFlushJob_throwsIfProcessIsDead() throws IOException {
144144
assertEquals("[foo] Unexpected death of autodetect: Mock process is dead", holder[0].getMessage());
145145
}
146146

147-
public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException {
147+
public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws IOException, InterruptedException {
148148
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
149149
when(process.isProcessAlive()).thenReturn(true);
150150
AutoDetectResultProcessor autoDetectResultProcessor = Mockito.mock(AutoDetectResultProcessor.class);

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ public void testPersisterThrowingDoesntBlockProcessing() {
511511
verify(persister, times(2)).persistModelSnapshot(any(), eq(WriteRequest.RefreshPolicy.IMMEDIATE));
512512
}
513513

514-
public void testParsingErrorSetsFailed() {
514+
public void testParsingErrorSetsFailed() throws InterruptedException {
515515
@SuppressWarnings("unchecked")
516516
Iterator<AutodetectResult> iterator = mock(Iterator.class);
517517
when(iterator.hasNext()).thenThrow(new ElasticsearchParseException("this test throws"));

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@ public void testAcknowledgeFlush() throws Exception {
2222
FlushListener listener = new FlushListener();
2323
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
2424
new Thread(() -> {
25-
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000));
26-
flushAcknowledgementHolder.set(flushAcknowledgement);
25+
try {
26+
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush("_id", Duration.ofMillis(10000));
27+
flushAcknowledgementHolder.set(flushAcknowledgement);
28+
} catch (InterruptedException _ex) {
29+
Thread.currentThread().interrupt();
30+
}
2731
}).start();
2832
assertBusy(() -> assertTrue(listener.awaitingFlushed.containsKey("_id")));
2933
assertNull(flushAcknowledgementHolder.get());
@@ -46,8 +50,12 @@ public void testClear() throws Exception {
4650
AtomicReference<FlushAcknowledgement> flushAcknowledgementHolder = new AtomicReference<>();
4751
flushAcknowledgementHolders.add(flushAcknowledgementHolder);
4852
new Thread(() -> {
49-
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
50-
flushAcknowledgementHolder.set(flushAcknowledgement);
53+
try {
54+
FlushAcknowledgement flushAcknowledgement = listener.waitForFlush(String.valueOf(id), Duration.ofMillis(10000));
55+
flushAcknowledgementHolder.set(flushAcknowledgement);
56+
} catch (InterruptedException _ex) {
57+
Thread.currentThread().interrupt();
58+
}
5159
}).start();
5260
}
5361
assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));

0 commit comments

Comments
 (0)