Skip to content

Commit 531aca8

Browse files
committed
gh-109917, gh-105829: Fix concurrent.futures _ThreadWakeup.wakeup()
Remove test_gh105829_should_not_deadlock_if_wakeup_pipe_full() of test_concurrent_futures.test_deadlock. The test is no longer relevant.
1 parent 5ae6c6d commit 531aca8

File tree

2 files changed

+22
-82
lines changed

2 files changed

+22
-82
lines changed

Lib/concurrent/futures/process.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,26 +69,37 @@ class _ThreadWakeup:
6969
def __init__(self):
7070
self._closed = False
7171
self._reader, self._writer = mp.Pipe(duplex=False)
72+
self._awaken = False
7273

7374
def close(self):
7475
# Please note that we do not take the shutdown lock when
7576
# calling clear() (to avoid deadlocking) so this method can
7677
# only be called safely from the same thread as all calls to
7778
# clear() even if you hold the shutdown lock. Otherwise we
7879
# might try to read from the closed pipe.
79-
if not self._closed:
80-
self._closed = True
81-
self._writer.close()
82-
self._reader.close()
80+
if self._closed:
81+
return
82+
self._closed = True
83+
self._writer.close()
84+
self._reader.close()
8385

8486
def wakeup(self):
85-
if not self._closed:
86-
self._writer.send_bytes(b"")
87+
if self._closed:
88+
return
89+
if self._awaken:
90+
# gh-105829: Write a single byte to not block if the pipe is full.
91+
# wait_result_broken_or_wakeup() does not even read the written
92+
# byte. It only check if something was written to the pipe.
93+
return
94+
self._writer.send_bytes(b"x")
95+
#self._awaken = True
8796

8897
def clear(self):
89-
if not self._closed:
90-
while self._reader.poll():
91-
self._reader.recv_bytes()
98+
if self._closed:
99+
return
100+
while self._reader.poll():
101+
self._reader.recv_bytes()
102+
self._awaken = False
92103

93104

94105
def _python_exit():
@@ -438,12 +449,8 @@ def wait_result_broken_or_wakeup(self):
438449
elif wakeup_reader in ready:
439450
is_broken = False
440451

441-
# No need to hold the _shutdown_lock here because:
442-
# 1. we're the only thread to use the wakeup reader
443-
# 2. we're also the only thread to call thread_wakeup.close()
444-
# 3. we want to avoid a possible deadlock when both reader and writer
445-
# would block (gh-105829)
446-
self.thread_wakeup.clear()
452+
with self.shutdown_lock:
453+
self.thread_wakeup.clear()
447454

448455
return result_item, is_broken, cause
449456

Lib/test/test_concurrent_futures/test_deadlock.py

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -250,73 +250,6 @@ def test_crash_big_data(self):
250250

251251
executor.shutdown(wait=True)
252252

253-
def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
254-
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
255-
# fill up and block. See: https://github.com/python/cpython/issues/105829
256-
257-
# Lots of cargo culting while writing this test, apologies if
258-
# something is really stupid...
259-
260-
self.executor.shutdown(wait=True)
261-
262-
if not hasattr(signal, 'alarm'):
263-
raise unittest.SkipTest(
264-
"Tested platform does not support the alarm signal")
265-
266-
def timeout(_signum, _frame):
267-
import faulthandler
268-
faulthandler.dump_traceback()
269-
270-
raise RuntimeError("timed out while submitting jobs?")
271-
272-
thread_run = futures.process._ExecutorManagerThread.run
273-
def mock_run(self):
274-
# Delay thread startup so the wakeup pipe can fill up and block
275-
time.sleep(3)
276-
thread_run(self)
277-
278-
class MockWakeup(_ThreadWakeup):
279-
"""Mock wakeup object to force the wakeup to block"""
280-
def __init__(self):
281-
super().__init__()
282-
self._dummy_queue = queue.Queue(maxsize=1)
283-
284-
def wakeup(self):
285-
self._dummy_queue.put(None, block=True)
286-
super().wakeup()
287-
288-
def clear(self):
289-
try:
290-
while True:
291-
self._dummy_queue.get_nowait()
292-
except queue.Empty:
293-
super().clear()
294-
295-
with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
296-
'run', mock_run),
297-
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
298-
MockWakeup)):
299-
with self.executor_type(max_workers=2,
300-
mp_context=self.get_context()) as executor:
301-
self.executor = executor # Allow clean up in fail_on_deadlock
302-
303-
job_num = 100
304-
job_data = range(job_num)
305-
306-
# Need to use sigalarm for timeout detection because
307-
# Executor.submit is not guarded by any timeout (both
308-
# self._work_ids.put(self._queue_count) and
309-
# self._executor_manager_thread_wakeup.wakeup() might
310-
# timeout, maybe more?). In this specific case it was
311-
# the wakeup call that deadlocked on a blocking pipe.
312-
old_handler = signal.signal(signal.SIGALRM, timeout)
313-
try:
314-
signal.alarm(int(self.TIMEOUT))
315-
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
316-
finally:
317-
signal.alarm(0)
318-
signal.signal(signal.SIGALRM, old_handler)
319-
320253

321254
create_executor_tests(globals(), ExecutorDeadlockTest,
322255
executor_mixins=(ProcessPoolForkMixin,

0 commit comments

Comments
 (0)