|
19 | 19 | import threading
|
20 | 20 | import time
|
21 | 21 | import unittest
|
| 22 | +import unittest.mock |
22 | 23 | import weakref
|
23 | 24 | from pickle import PicklingError
|
24 | 25 |
|
@@ -1389,6 +1390,52 @@ def test_crash_big_data(self):
|
1389 | 1390 | with self.assertRaises(BrokenProcessPool):
|
1390 | 1391 | list(executor.map(_crash_with_data, [data] * 10))
|
1391 | 1392 |
|
| 1393 | + def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self): |
| 1394 | + # Issue #105829: The _ExecutorManagerThread wakeup pipe could |
| 1395 | + # fill up and block. See: https://github.com/python/cpython/issues/105829 |
| 1396 | + |
| 1397 | + # Lots of cargo culting while writing this test, apologies if |
| 1398 | + # something is really stupid... |
| 1399 | + |
| 1400 | + self.executor.shutdown(wait=True) |
| 1401 | + |
| 1402 | + if not hasattr(signal, 'alarm'): |
| 1403 | + raise unittest.SkipTest( |
| 1404 | + "Tested platform does not support the alarm signal") |
| 1405 | + |
| 1406 | + def timeout(_signum, _frame): |
| 1407 | + raise RuntimeError("timed out while submitting jobs?") |
| 1408 | + |
| 1409 | + thread_run = futures.process._ExecutorManagerThread.run |
| 1410 | + def mock_run(self): |
| 1411 | + # Delay thread startup so the wakeup pipe can fill up and block |
| 1412 | + time.sleep(5) |
| 1413 | + thread_run(self) |
| 1414 | + |
| 1415 | + # Should be support.PIPE_MAX_SIZE but it is way too |
| 1416 | + # pessimistic here, would take too long. Assume 64k pipe |
| 1417 | + # buffer and add some margin... |
| 1418 | + job_num = 65536 * 2 |
| 1419 | + job_data = range(job_num) |
| 1420 | + with unittest.mock.patch.object(futures.process._ExecutorManagerThread, 'run', mock_run): |
| 1421 | + with self.executor_type(max_workers=2, |
| 1422 | + mp_context=self.get_context()) as executor: |
| 1423 | + self.executor = executor # Allow clean up in fail_on_deadlock |
| 1424 | + |
| 1425 | + # Need to use sigalarm for timeout detection because |
| 1426 | + # Executor.submit is not guarded by any timeout (both |
| 1427 | + # self._work_ids.put(self._queue_count) and |
| 1428 | + # self._executor_manager_thread_wakeup.wakeup() might |
| 1429 | + # timeout, maybe more?). In this specific case it was |
| 1430 | + # the wakeup call that deadlocked on a blocking pipe. |
| 1431 | + old_handler = signal.signal(signal.SIGALRM, timeout) |
| 1432 | + try: |
| 1433 | + signal.alarm(int(support.LONG_TIMEOUT)) |
| 1434 | + self.assertEqual(job_num, len(list(executor.map(int, job_data)))) |
| 1435 | + finally: |
| 1436 | + signal.alarm(0) |
| 1437 | + signal.signal(signal.SIGALRM, old_handler) |
| 1438 | + |
1392 | 1439 |
|
1393 | 1440 | create_executor_tests(ExecutorDeadlockTest,
|
1394 | 1441 | executor_mixins=(ProcessPoolForkMixin,
|
|
0 commit comments