Skip to content

Commit 1aa24ee

Browse files
vstinnerelfstrom
andcommitted
gh-109917, gh-105829: Fix concurrent.futures _ThreadWakeup.wakeup()
Replace test_gh105829_should_not_deadlock_if_wakeup_pipe_full() test which was mocking too many concurrent.futures internals with a new test_wakeup() functional test. Co-Authored-By: elfstrom <[email protected]>
1 parent cbdacc7 commit 1aa24ee

File tree

3 files changed

+72
-81
lines changed

3 files changed

+72
-81
lines changed

Lib/concurrent/futures/process.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,29 +66,43 @@
6666

6767

6868
class _ThreadWakeup:
69+
# Constant overriden by tests to make them faster
70+
_wakeup_message = b'x'
71+
6972
def __init__(self):
7073
self._closed = False
7174
self._reader, self._writer = mp.Pipe(duplex=False)
75+
self._awaken = False
7276

7377
def close(self):
7478
# Please note that we do not take the shutdown lock when
7579
# calling clear() (to avoid deadlocking) so this method can
7680
# only be called safely from the same thread as all calls to
7781
# clear() even if you hold the shutdown lock. Otherwise we
7882
# might try to read from the closed pipe.
79-
if not self._closed:
80-
self._closed = True
81-
self._writer.close()
82-
self._reader.close()
83+
if self._closed:
84+
return
85+
self._closed = True
86+
self._writer.close()
87+
self._reader.close()
8388

8489
def wakeup(self):
85-
if not self._closed:
86-
self._writer.send_bytes(b"")
90+
if self._closed:
91+
return
92+
if self._awaken:
93+
# gh-105829: Send a single message to not block if the pipe is
94+
# full. wait_result_broken_or_wakeup() ignores the message anyway,
95+
# it just calls clear().
96+
return
97+
self._awaken = True
98+
self._writer.send_bytes(self._wakeup_message)
8799

88100
def clear(self):
89-
if not self._closed:
90-
while self._reader.poll():
91-
self._reader.recv_bytes()
101+
if self._closed:
102+
return
103+
while self._reader.poll():
104+
self._reader.recv_bytes()
105+
self._awaken = False
92106

93107

94108
def _python_exit():

Lib/test/test_concurrent_futures/test_deadlock.py

Lines changed: 3 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
import contextlib
2-
import queue
3-
import signal
42
import sys
53
import time
64
import unittest
@@ -203,7 +201,7 @@ def test_shutdown_deadlock(self):
203201
self.executor.shutdown(wait=True)
204202
with self.executor_type(max_workers=2,
205203
mp_context=self.get_context()) as executor:
206-
self.executor = executor # Allow clean up in fail_on_deadlock
204+
self.executor = executor # Allow clean up in _fail_on_deadlock
207205
f = executor.submit(_crash, delay=.1)
208206
executor.shutdown(wait=True)
209207
with self.assertRaises(BrokenProcessPool):
@@ -216,7 +214,7 @@ def test_shutdown_deadlock_pickle(self):
216214
self.executor.shutdown(wait=True)
217215
with self.executor_type(max_workers=2,
218216
mp_context=self.get_context()) as executor:
219-
self.executor = executor # Allow clean up in fail_on_deadlock
217+
self.executor = executor # Allow clean up in _fail_on_deadlock
220218

221219
# Start the executor and get the executor_manager_thread to collect
222220
# the threads and avoid dangling thread that should be cleaned up
@@ -244,79 +242,12 @@ def test_crash_big_data(self):
244242
data = "a" * support.PIPE_MAX_SIZE
245243
with self.executor_type(max_workers=2,
246244
mp_context=self.get_context()) as executor:
247-
self.executor = executor # Allow clean up in fail_on_deadlock
245+
self.executor = executor # Allow clean up in _fail_on_deadlock
248246
with self.assertRaises(BrokenProcessPool):
249247
list(executor.map(_crash_with_data, [data] * 10))
250248

251249
executor.shutdown(wait=True)
252250

253-
def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
254-
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
255-
# fill up and block. See: https://github.com/python/cpython/issues/105829
256-
257-
# Lots of cargo culting while writing this test, apologies if
258-
# something is really stupid...
259-
260-
self.executor.shutdown(wait=True)
261-
262-
if not hasattr(signal, 'alarm'):
263-
raise unittest.SkipTest(
264-
"Tested platform does not support the alarm signal")
265-
266-
def timeout(_signum, _frame):
267-
import faulthandler
268-
faulthandler.dump_traceback()
269-
270-
raise RuntimeError("timed out while submitting jobs?")
271-
272-
thread_run = futures.process._ExecutorManagerThread.run
273-
def mock_run(self):
274-
# Delay thread startup so the wakeup pipe can fill up and block
275-
time.sleep(3)
276-
thread_run(self)
277-
278-
class MockWakeup(_ThreadWakeup):
279-
"""Mock wakeup object to force the wakeup to block"""
280-
def __init__(self):
281-
super().__init__()
282-
self._dummy_queue = queue.Queue(maxsize=1)
283-
284-
def wakeup(self):
285-
self._dummy_queue.put(None, block=True)
286-
super().wakeup()
287-
288-
def clear(self):
289-
try:
290-
while True:
291-
self._dummy_queue.get_nowait()
292-
except queue.Empty:
293-
super().clear()
294-
295-
with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
296-
'run', mock_run),
297-
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
298-
MockWakeup)):
299-
with self.executor_type(max_workers=2,
300-
mp_context=self.get_context()) as executor:
301-
self.executor = executor # Allow clean up in fail_on_deadlock
302-
303-
job_num = 100
304-
job_data = range(job_num)
305-
306-
# Need to use sigalarm for timeout detection because
307-
# Executor.submit is not guarded by any timeout (both
308-
# self._work_ids.put(self._queue_count) and
309-
# self._executor_manager_thread_wakeup.wakeup() might
310-
# timeout, maybe more?). In this specific case it was
311-
# the wakeup call that deadlocked on a blocking pipe.
312-
old_handler = signal.signal(signal.SIGALRM, timeout)
313-
try:
314-
signal.alarm(int(self.TIMEOUT))
315-
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
316-
finally:
317-
signal.alarm(0)
318-
signal.signal(signal.SIGALRM, old_handler)
319-
320251

321252
create_executor_tests(globals(), ExecutorDeadlockTest,
322253
executor_mixins=(ProcessPoolForkMixin,

Lib/test/test_concurrent_futures/test_process_pool.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,52 @@ def mock_start_new_thread(func, *args):
216216
list(executor.map(mul, [(2, 3)] * 10))
217217
executor.shutdown()
218218

219+
def test_wakeup(self):
220+
# gh-105829: Check that calling _ExecutorManagerThread wakeup() many
221+
# times in ProcessPoolExecutor.submit() does not block if the
222+
# _ThreadWakeup pipe becomes full.
223+
224+
def get_pipe_size(connection):
225+
try:
226+
import fcntl
227+
return fcntl.fcntl(connection.fileno(), fcntl.F_GETPIPE_SZ)
228+
except ImportError:
229+
# Assume 64 KiB pipe if we fail, makes test take longer
230+
return 65_536
231+
232+
executor = self.executor
233+
with executor:
234+
# Summit a job to start the executor manager thread
235+
# future = self.executor.submit(str, 12)
236+
# future.result()
237+
238+
# Wrap _ThreadWakeup.wakeup() to count how many times it has been
239+
# called
240+
thread_wakeup = executor._executor_manager_thread_wakeup
241+
orig_wakeup = thread_wakeup.wakeup
242+
nwakeup = 0
243+
def wrap_wakeup():
244+
nonlocal nwakeup
245+
nwakeup += 1
246+
orig_wakeup()
247+
thread_wakeup.wakeup = wrap_wakeup
248+
249+
# Use longer "wakeup message" to make the hang more likely
250+
# and to speed up the test
251+
njob = self.worker_count * 2 # at least 2 jobs per worker
252+
pipe_size = get_pipe_size(thread_wakeup._writer)
253+
msg_len = min(pipe_size // njob, 512)
254+
thread_wakeup._wakeup_message = b'x' * msg_len
255+
msg_size = 4 + len(thread_wakeup._wakeup_message)
256+
257+
njob = pipe_size // msg_size
258+
job_data = range(njob)
259+
if support.verbose:
260+
print(f"run {njob:,} jobs")
261+
262+
self.assertEqual(len(list(executor.map(int, job_data))), njob)
263+
self.assertGreaterEqual(nwakeup, njob)
264+
219265

220266
create_executor_tests(globals(), ProcessPoolExecutorTest,
221267
executor_mixins=(ProcessPoolForkMixin,

0 commit comments

Comments
 (0)