Skip to content

Commit a713ff8

Browse files
committed
fixup! pythongh-105829: Add test to demonstrate deadlock
Change test strategy. We now force the main thread to block during the wake-up call by mocking the wake-up object and artificially limiting to a single wake-up before blocking. This allows us to reduce some timeouts, number of tasks and lower the total runtime of the test. It should also guarantee a blocking main thread on all platforms, regardless of any pipe buffer sizes. The drawback is that the test is now a bit opinionated on how we fix this issue (i.e. just making the wake-up pipe non blocking would not satisfy this test even though it is a valid fix for the issue).
1 parent 6104264 commit a713ff8

File tree

1 file changed

+24
-18
lines changed

1 file changed

+24
-18
lines changed

Lib/test/test_concurrent_futures/test_deadlock.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import contextlib
2+
import queue
23
import signal
34
import sys
45
import time
56
import unittest
67
import unittest.mock
78
from pickle import PicklingError
89
from concurrent import futures
9-
from concurrent.futures.process import BrokenProcessPool
10+
from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
1011

1112
from test import support
1213

@@ -265,30 +266,35 @@ def timeout(_signum, _frame):
265266
thread_run = futures.process._ExecutorManagerThread.run
266267
def mock_run(self):
267268
# Delay thread startup so the wakeup pipe can fill up and block
268-
time.sleep(5)
269+
time.sleep(3)
269270
thread_run(self)
270271

271-
def adjust_and_check_jobs_needed_to_block_pipe(connection):
272-
try:
273-
# Try to reduce pipe size to speed up test. Only works on Unix systems
274-
import fcntl
275-
from fcntl import F_SETPIPE_SZ
276-
pipe_size = fcntl.fcntl(connection.fileno(), F_SETPIPE_SZ, 1024)
277-
except ImportError:
278-
# Assume 64k pipe if we fail, makes test take longer
279-
pipe_size = 65536
272+
class MockWakeup(_ThreadWakeup):
273+
"""Mock wakeup object to force the wakeup to block"""
274+
def __init__(self):
275+
super().__init__()
276+
self._dummy_queue = queue.Queue(maxsize=1)
280277

281-
# We send 4 bytes per job (one zero sized bytes object)
282-
return pipe_size // 4 + 100 # Add some margin
278+
def wakeup(self):
279+
self._dummy_queue.put(None, block=True)
280+
super().wakeup()
283281

284-
with unittest.mock.patch.object(futures.process._ExecutorManagerThread, 'run', mock_run):
282+
def clear(self):
283+
try:
284+
while True:
285+
self._dummy_queue.get_nowait()
286+
except queue.Empty:
287+
super().clear()
288+
289+
with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
290+
'run', mock_run),
291+
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
292+
MockWakeup)):
285293
with self.executor_type(max_workers=2,
286294
mp_context=self.get_context()) as executor:
287295
self.executor = executor # Allow clean up in fail_on_deadlock
288296

289-
# Try to speed up the test by reducing the size of the wakeup pipe
290-
job_num = adjust_and_check_jobs_needed_to_block_pipe(
291-
executor._executor_manager_thread_wakeup._writer)
297+
job_num = 100
292298
job_data = range(job_num)
293299

294300
# Need to use sigalarm for timeout detection because
@@ -299,7 +305,7 @@ def adjust_and_check_jobs_needed_to_block_pipe(connection):
299305
# the wakeup call that deadlocked on a blocking pipe.
300306
old_handler = signal.signal(signal.SIGALRM, timeout)
301307
try:
302-
signal.alarm(int(support.LONG_TIMEOUT))
308+
signal.alarm(int(self.TIMEOUT))
303309
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
304310
finally:
305311
signal.alarm(0)

0 commit comments

Comments
 (0)