Skip to content

Commit 7870ae2

Browse files
authored
Ensure threads running before closing node (#43240)
There are a few tests within NodeTests that submit items to the threadpool and then close the node. The tests are designed to check how running tasks are affected during node close. These tests can cause CI failures since the submitted tasks may not be running when the node is closed and then execute after the thread context is closed, which triggers an unexpected exception. This change ensures the threads are running so we avoid the unexpected exception and can test these cases. The test of task submittal while a node is closing is also important so an additional but muted test has been added that tests the case where a task may be getting submitted while the node is closing and ensuring we do not trigger anything unexpected in these cases. Relates #42774 Relates #42577
1 parent 8db55ea commit 7870ae2

File tree

1 file changed

+54
-2
lines changed

1 file changed

+54
-2
lines changed

server/src/test/java/org/elasticsearch/node/NodeTests.java

+54-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
5151
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5252

53-
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577")
5453
@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS")
5554
public class NodeTests extends ESTestCase {
5655

@@ -154,35 +153,87 @@ public void testCloseOnOutstandingTask() throws Exception {
154153
node.start();
155154
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
156155
AtomicBoolean shouldRun = new AtomicBoolean(true);
156+
final CountDownLatch threadRunning = new CountDownLatch(1);
157157
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
158+
threadRunning.countDown();
158159
while (shouldRun.get());
159160
});
161+
threadRunning.await();
160162
node.close();
161163
shouldRun.set(false);
162164
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
163165
}
164166

167+
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577")
168+
public void testCloseRaceWithTaskExecution() throws Exception {
169+
Node node = new MockNode(baseSettings().build(), basePlugins());
170+
node.start();
171+
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
172+
AtomicBoolean shouldRun = new AtomicBoolean(true);
173+
final CountDownLatch running = new CountDownLatch(3);
174+
Thread submitThread = new Thread(() -> {
175+
running.countDown();
176+
try {
177+
running.await();
178+
} catch (InterruptedException e) {
179+
throw new AssertionError("interrupted while waiting", e);
180+
}
181+
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
182+
while (shouldRun.get());
183+
});
184+
});
185+
Thread closeThread = new Thread(() -> {
186+
running.countDown();
187+
try {
188+
running.await();
189+
} catch (InterruptedException e) {
190+
throw new AssertionError("interrupted while waiting", e);
191+
}
192+
try {
193+
node.close();
194+
} catch (IOException e) {
195+
throw new AssertionError("node close failed", e);
196+
}
197+
});
198+
submitThread.start();
199+
closeThread.start();
200+
running.countDown();
201+
running.await();
202+
203+
submitThread.join();
204+
closeThread.join();
205+
206+
shouldRun.set(false);
207+
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
208+
}
209+
165210
public void testAwaitCloseTimeoutsOnNonInterruptibleTask() throws Exception {
166211
Node node = new MockNode(baseSettings().build(), basePlugins());
167212
node.start();
168213
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
169214
AtomicBoolean shouldRun = new AtomicBoolean(true);
215+
final CountDownLatch threadRunning = new CountDownLatch(1);
170216
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
217+
threadRunning.countDown();
171218
while (shouldRun.get());
172219
});
220+
threadRunning.await();
173221
node.close();
174222
assertFalse(node.awaitClose(0, TimeUnit.MILLISECONDS));
175223
shouldRun.set(false);
224+
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
176225
}
177226

178227
public void testCloseOnInterruptibleTask() throws Exception {
179228
Node node = new MockNode(baseSettings().build(), basePlugins());
180229
node.start();
181230
ThreadPool threadpool = node.injector().getInstance(ThreadPool.class);
182-
CountDownLatch latch = new CountDownLatch(1);
231+
final CountDownLatch threadRunning = new CountDownLatch(1);
232+
final CountDownLatch latch = new CountDownLatch(1);
183233
final CountDownLatch finishLatch = new CountDownLatch(1);
184234
final AtomicBoolean interrupted = new AtomicBoolean(false);
185235
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
236+
threadRunning.countDown();
186237
try {
187238
latch.await();
188239
} catch (InterruptedException e) {
@@ -192,6 +243,7 @@ public void testCloseOnInterruptibleTask() throws Exception {
192243
finishLatch.countDown();
193244
}
194245
});
246+
threadRunning.await();
195247
node.close();
196248
// close should not interrput ongoing tasks
197249
assertFalse(interrupted.get());

0 commit comments

Comments
 (0)