Skip to content

Commit 9cf695b

Browse files
committed
[3.12] pythongh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock (pythonGH-108513)
This fixes issue pythonGH-105829, python#105829 Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Chris Withers <[email protected]> Co-authored-by: Thomas Moreau <[email protected]>. (cherry picked from commit 405b063) Co-authored-by: elfstrom <[email protected]>
1 parent f6287bd commit 9cf695b

File tree

3 files changed

+87
-4
lines changed

3 files changed

+87
-4
lines changed

Lib/concurrent/futures/process.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ def __init__(self):
7171
self._reader, self._writer = mp.Pipe(duplex=False)
7272

7373
def close(self):
74+
# Please note that we do not take the shutdown lock when
75+
# calling clear() (to avoid deadlocking) so this method can
76+
# only be called safely from the same thread as all calls to
77+
# clear() even if you hold the shutdown lock. Otherwise we
78+
# might try to read from the closed pipe.
7479
if not self._closed:
7580
self._closed = True
7681
self._writer.close()
@@ -426,8 +431,12 @@ def wait_result_broken_or_wakeup(self):
426431
elif wakeup_reader in ready:
427432
is_broken = False
428433

429-
with self.shutdown_lock:
430-
self.thread_wakeup.clear()
434+
# No need to hold the _shutdown_lock here because:
435+
# 1. we're the only thread to use the wakeup reader
436+
# 2. we're also the only thread to call thread_wakeup.close()
437+
# 3. we want to avoid a possible deadlock when both reader and writer
438+
# would block (gh-105829)
439+
self.thread_wakeup.clear()
431440

432441
return result_item, is_broken, cause
433442

@@ -706,7 +715,10 @@ def __init__(self, max_workers=None, mp_context=None,
706715
# as it could result in a deadlock if a worker process dies with the
707716
# _result_queue write lock still acquired.
708717
#
709-
# _shutdown_lock must be locked to access _ThreadWakeup.
718+
# _shutdown_lock must be locked to access _ThreadWakeup.close() and
719+
# .wakeup(). Care must also be taken to not call clear or close from
720+
# more than one thread since _ThreadWakeup.clear() is not protected by
721+
# the _shutdown_lock
710722
self._executor_manager_thread_wakeup = _ThreadWakeup()
711723

712724
# Create communication channels for the executor

Lib/test/test_concurrent_futures/test_deadlock.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import contextlib
2+
import queue
3+
import signal
24
import sys
35
import time
46
import unittest
7+
import unittest.mock
58
from pickle import PicklingError
69
from concurrent import futures
7-
from concurrent.futures.process import BrokenProcessPool
10+
from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
811

912
from test import support
1013

@@ -239,6 +242,73 @@ def test_crash_big_data(self):
239242
with self.assertRaises(BrokenProcessPool):
240243
list(executor.map(_crash_with_data, [data] * 10))
241244

245+
def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
246+
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
247+
# fill up and block. See: https://github.com/python/cpython/issues/105829
248+
249+
# Lots of cargo culting while writing this test, apologies if
250+
# something is really stupid...
251+
252+
self.executor.shutdown(wait=True)
253+
254+
if not hasattr(signal, 'alarm'):
255+
raise unittest.SkipTest(
256+
"Tested platform does not support the alarm signal")
257+
258+
def timeout(_signum, _frame):
259+
import faulthandler
260+
faulthandler.dump_traceback()
261+
262+
raise RuntimeError("timed out while submitting jobs?")
263+
264+
thread_run = futures.process._ExecutorManagerThread.run
265+
def mock_run(self):
266+
# Delay thread startup so the wakeup pipe can fill up and block
267+
time.sleep(3)
268+
thread_run(self)
269+
270+
class MockWakeup(_ThreadWakeup):
271+
"""Mock wakeup object to force the wakeup to block"""
272+
def __init__(self):
273+
super().__init__()
274+
self._dummy_queue = queue.Queue(maxsize=1)
275+
276+
def wakeup(self):
277+
self._dummy_queue.put(None, block=True)
278+
super().wakeup()
279+
280+
def clear(self):
281+
try:
282+
while True:
283+
self._dummy_queue.get_nowait()
284+
except queue.Empty:
285+
super().clear()
286+
287+
with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
288+
'run', mock_run),
289+
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
290+
MockWakeup)):
291+
with self.executor_type(max_workers=2,
292+
mp_context=self.get_context()) as executor:
293+
self.executor = executor # Allow clean up in fail_on_deadlock
294+
295+
job_num = 100
296+
job_data = range(job_num)
297+
298+
# Need to use sigalarm for timeout detection because
299+
# Executor.submit is not guarded by any timeout (both
300+
# self._work_ids.put(self._queue_count) and
301+
# self._executor_manager_thread_wakeup.wakeup() might
302+
# timeout, maybe more?). In this specific case it was
303+
# the wakeup call that deadlocked on a blocking pipe.
304+
old_handler = signal.signal(signal.SIGALRM, timeout)
305+
try:
306+
signal.alarm(int(self.TIMEOUT))
307+
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
308+
finally:
309+
signal.alarm(0)
310+
signal.signal(signal.SIGALRM, old_handler)
311+
242312

243313
create_executor_tests(globals(), ExecutorDeadlockTest,
244314
executor_mixins=(ProcessPoolForkMixin,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix concurrent.futures.ProcessPoolExecutor deadlock

0 commit comments

Comments
 (0)