Skip to content

gh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock #108513

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 22, 2023
47 changes: 47 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import threading
import time
import unittest
import unittest.mock
import weakref
from pickle import PicklingError

Expand Down Expand Up @@ -1389,6 +1390,52 @@ def test_crash_big_data(self):
with self.assertRaises(BrokenProcessPool):
list(executor.map(_crash_with_data, [data] * 10))

def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
# fill up and block. See: https://github.com/python/cpython/issues/105829

# Lots of cargo culting while writing this test, apologies if
# something is really stupid...

self.executor.shutdown(wait=True)

if not hasattr(signal, 'alarm'):
raise unittest.SkipTest(
"Tested platform does not support the alarm signal")

def timeout(_signum, _frame):
raise RuntimeError("timed out while submitting jobs?")

thread_run = futures.process._ExecutorManagerThread.run
def mock_run(self):
# Delay thread startup so the wakeup pipe can fill up and block
time.sleep(5)
thread_run(self)

# Should be support.PIPE_MAX_SIZE but it is way too
# pessimistic here, would take too long. Assume 64k pipe
# buffer and add some margin...
job_num = 65536 * 2
job_data = range(job_num)
with unittest.mock.patch.object(futures.process._ExecutorManagerThread, 'run', mock_run):
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock

# Need to use sigalarm for timeout detection because
# Executor.submit is not guarded by any timeout (both
# self._work_ids.put(self._queue_count) and
# self._executor_manager_thread_wakeup.wakeup() might
# timeout, maybe more?). In this specific case it was
# the wakeup call that deadlocked on a blocking pipe.
old_handler = signal.signal(signal.SIGALRM, timeout)
try:
signal.alarm(int(support.LONG_TIMEOUT))
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)


create_executor_tests(ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down