Skip to content

Commit 97ea901

Browse files
authored
[3.11] gh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock (GH-108513) (#109783)
This fixes issue GH-105829, #105829 (cherry picked from commit 405b063)
1 parent e6a9cbd commit 97ea901

File tree

3 files changed

+87
-4
lines changed

3 files changed

+87
-4
lines changed

Lib/concurrent/futures/process.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ def __init__(self):
6969
self._reader, self._writer = mp.Pipe(duplex=False)
7070

7171
def close(self):
72+
# Please note that we do not take the shutdown lock when
73+
# calling clear() (to avoid deadlocking) so this method can
74+
# only be called safely from the same thread as all calls to
75+
# clear() even if you hold the shutdown lock. Otherwise we
76+
# might try to read from the closed pipe.
7277
if not self._closed:
7378
self._closed = True
7479
self._writer.close()
@@ -424,8 +429,12 @@ def wait_result_broken_or_wakeup(self):
424429
elif wakeup_reader in ready:
425430
is_broken = False
426431

427-
with self.shutdown_lock:
428-
self.thread_wakeup.clear()
432+
# No need to hold the _shutdown_lock here because:
433+
# 1. we're the only thread to use the wakeup reader
434+
# 2. we're also the only thread to call thread_wakeup.close()
435+
# 3. we want to avoid a possible deadlock when both reader and writer
436+
# would block (gh-105829)
437+
self.thread_wakeup.clear()
429438

430439
return result_item, is_broken, cause
431440

@@ -708,7 +717,10 @@ def __init__(self, max_workers=None, mp_context=None,
708717
# as it could result in a deadlock if a worker process dies with the
709718
# _result_queue write lock still acquired.
710719
#
711-
# _shutdown_lock must be locked to access _ThreadWakeup.
720+
# _shutdown_lock must be locked to access _ThreadWakeup.close() and
721+
# .wakeup(). Care must also be taken to not call clear or close from
722+
# more than one thread since _ThreadWakeup.clear() is not protected by
723+
# the _shutdown_lock
712724
self._executor_manager_thread_wakeup = _ThreadWakeup()
713725

714726
# Create communication channels for the executor

Lib/test/test_concurrent_futures/test_deadlock.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import contextlib
2+
import queue
3+
import signal
24
import sys
35
import time
46
import unittest
7+
import unittest.mock
58
from pickle import PicklingError
69
from concurrent import futures
7-
from concurrent.futures.process import BrokenProcessPool
10+
from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
811

912
from test import support
1013

@@ -239,6 +242,73 @@ def test_crash_big_data(self):
239242
with self.assertRaises(BrokenProcessPool):
240243
list(executor.map(_crash_with_data, [data] * 10))
241244

245+
def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
246+
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
247+
# fill up and block. See: https://github.com/python/cpython/issues/105829
248+
249+
# Lots of cargo culting while writing this test, apologies if
250+
# something is really stupid...
251+
252+
self.executor.shutdown(wait=True)
253+
254+
if not hasattr(signal, 'alarm'):
255+
raise unittest.SkipTest(
256+
"Tested platform does not support the alarm signal")
257+
258+
def timeout(_signum, _frame):
259+
import faulthandler
260+
faulthandler.dump_traceback()
261+
262+
raise RuntimeError("timed out while submitting jobs?")
263+
264+
thread_run = futures.process._ExecutorManagerThread.run
265+
def mock_run(self):
266+
# Delay thread startup so the wakeup pipe can fill up and block
267+
time.sleep(3)
268+
thread_run(self)
269+
270+
class MockWakeup(_ThreadWakeup):
271+
"""Mock wakeup object to force the wakeup to block"""
272+
def __init__(self):
273+
super().__init__()
274+
self._dummy_queue = queue.Queue(maxsize=1)
275+
276+
def wakeup(self):
277+
self._dummy_queue.put(None, block=True)
278+
super().wakeup()
279+
280+
def clear(self):
281+
try:
282+
while True:
283+
self._dummy_queue.get_nowait()
284+
except queue.Empty:
285+
super().clear()
286+
287+
with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
288+
'run', mock_run),
289+
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
290+
MockWakeup)):
291+
with self.executor_type(max_workers=2,
292+
mp_context=self.get_context()) as executor:
293+
self.executor = executor # Allow clean up in fail_on_deadlock
294+
295+
job_num = 100
296+
job_data = range(job_num)
297+
298+
# Need to use sigalarm for timeout detection because
299+
# Executor.submit is not guarded by any timeout (both
300+
# self._work_ids.put(self._queue_count) and
301+
# self._executor_manager_thread_wakeup.wakeup() might
302+
# timeout, maybe more?). In this specific case it was
303+
# the wakeup call that deadlocked on a blocking pipe.
304+
old_handler = signal.signal(signal.SIGALRM, timeout)
305+
try:
306+
signal.alarm(int(self.TIMEOUT))
307+
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
308+
finally:
309+
signal.alarm(0)
310+
signal.signal(signal.SIGALRM, old_handler)
311+
242312

243313
create_executor_tests(globals(), ExecutorDeadlockTest,
244314
executor_mixins=(ProcessPoolForkMixin,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix concurrent.futures.ProcessPoolExecutor deadlock

0 commit comments

Comments
 (0)