Skip to content

Commit f6eead7

Browse files
committed
Do not close threadpool if termination fails
This commit changes the code so that the threadpool is not closed unless termination succeeds. Otherwise there can still be running tasks that rely on resources that are closed by closing the threadpool. Additionally, there is a test fix included for the NodeTests that ensures the submitted task is actually running prior to closing the node in the test. Closes elastic#42577
1 parent 56c1ed5 commit f6eead7

File tree

4 files changed

+37
-11
lines changed

4 files changed

+37
-11
lines changed

server/src/main/java/org/elasticsearch/client/transport/TransportClient.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,14 @@ public void close() {
374374
for (LifecycleComponent plugin : pluginLifecycleComponents) {
375375
closeables.add(plugin);
376376
}
377-
closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS));
377+
closeables.add(() -> {
378+
final ThreadPool pool = injector.getInstance(ThreadPool.class);
379+
final boolean terminated = ThreadPool.terminate(pool, 10, TimeUnit.SECONDS);
380+
if (terminated == false) {
381+
// the pool is only closed if termination succeeds, just close even if termination failed
382+
pool.close();
383+
}
384+
});
378385
IOUtils.closeWhileHandlingException(closeables);
379386
}
380387

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

+15-8
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
412412
}
413413
}
414414
cachedTimeThread.join(unit.toMillis(timeout));
415+
result &= cachedTimeThread.isAlive() == false;
415416
return result;
416417
}
417418

@@ -699,22 +700,28 @@ private static boolean awaitTermination(
699700

700701
/**
701702
* Returns <code>true</code> if the given pool was terminated successfully. If the termination timed out,
702-
* the service is <code>null</code> this method will return <code>false</code>.
703+
* the service is <code>null</code> this method will return <code>false</code>. The pool is only closed if
704+
* the termination was successful.
703705
*/
704706
public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) {
707+
boolean terminated = false;
705708
if (pool != null) {
706-
// Leverage try-with-resources to close the threadpool
707-
try (ThreadPool c = pool) {
709+
try {
708710
pool.shutdown();
709711
if (awaitTermination(pool, timeout, timeUnit)) {
710-
return true;
712+
terminated = true;
713+
} else {
714+
// last resort
715+
pool.shutdownNow();
716+
terminated = awaitTermination(pool, timeout, timeUnit);
717+
}
718+
} finally {
719+
if (terminated) {
720+
pool.close();
711721
}
712-
// last resort
713-
pool.shutdownNow();
714-
return awaitTermination(pool, timeout, timeUnit);
715722
}
716723
}
717-
return false;
724+
return terminated;
718725
}
719726

720727
private static boolean awaitTermination(

server/src/test/java/org/elasticsearch/node/NodeTests.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
5151
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5252

53-
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577")
5453
@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS")
5554
public class NodeTests extends ESTestCase {
5655

@@ -154,9 +153,12 @@ public void testCloseOnOutstandingTask() throws Exception {
154153
node.start();
155154
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
156155
AtomicBoolean shouldRun = new AtomicBoolean(true);
156+
CountDownLatch threadRunning = new CountDownLatch(1);
157157
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
158+
threadRunning.countDown();
158159
while (shouldRun.get());
159160
});
161+
threadRunning.await();
160162
node.close();
161163
shouldRun.set(false);
162164
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
@@ -167,12 +169,17 @@ public void testAwaitCloseTimeoutsOnNonInterruptibleTask() throws Exception {
167169
node.start();
168170
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
169171
AtomicBoolean shouldRun = new AtomicBoolean(true);
172+
CountDownLatch threadRunning = new CountDownLatch(1);
170173
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
174+
threadRunning.countDown();
171175
while (shouldRun.get());
172176
});
177+
threadRunning.await();
173178
node.close();
174179
assertFalse(node.awaitClose(0, TimeUnit.MILLISECONDS));
175180
shouldRun.set(false);
181+
// call this again to ensure we terminate and close the threadpool
182+
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
176183
}
177184

178185
public void testCloseOnInterruptibleTask() throws Exception {

test/framework/src/main/java/org/elasticsearch/test/client/NoOpClient.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,15 @@ void doExecute(Action<Response> action, Request request, ActionListener<Response
5757

5858
@Override
5959
public void close() {
60+
boolean terminated = false;
6061
try {
61-
ThreadPool.terminate(threadPool(), 10, TimeUnit.SECONDS);
62+
terminated = ThreadPool.terminate(threadPool(), 10, TimeUnit.SECONDS);
6263
} catch (Exception e) {
6364
throw new ElasticsearchException(e.getMessage(), e);
6465
}
66+
67+
if (terminated == false) {
68+
throw new IllegalStateException("threadpool was not terminated after waiting for 10 seconds");
69+
}
6570
}
6671
}

0 commit comments

Comments
 (0)