Skip to content

gh-109917, gh-105829: Fix concurrent.futures _ThreadWakeup.wakeup() #110129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,43 @@


class _ThreadWakeup:
# Constant overriden by tests to make them faster
_wakeup_msg = b'x'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no reason to not define this as b"". Giving it a value makes it seem like it matters in some way and it is also more bytes to send.


def __init__(self):
self._closed = False
self._reader, self._writer = mp.Pipe(duplex=False)
self._awaken = False

def close(self):
# Please note that we do not take the shutdown lock when
# calling clear() (to avoid deadlocking) so this method can
# only be called safely from the same thread as all calls to
# clear() even if you hold the shutdown lock. Otherwise we
# might try to read from the closed pipe.
if not self._closed:
self._closed = True
self._writer.close()
self._reader.close()
if self._closed:
return
self._closed = True
self._writer.close()
self._reader.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems to serve no purpose

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's just that for me, it's more readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's distracting in this PR though...


def wakeup(self):
if not self._closed:
self._writer.send_bytes(b"")
if self._closed:
return
if self._awaken:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By adding reads and writes of self._awaken we now have a race in the code. Since it is only assignment and reading it should be fine if one is happy with relying on the atomicity of these bytecode primitives as implemented in CPython currently. Opinions differ on that point I think, and I'm no expert, but it should probably be considered somewhat carefully.

# gh-105829: Send a single message to not block if the pipe is
# full. wait_result_broken_or_wakeup() ignores the message anyway,
# it just calls clear().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is misleading as this change has no impact on the gh-105829 issue in the current state. As long as the shutdown lock is not taken by both wakeup and clear it does not matter if we block on the pipe here. It might in some sense actually be desirable as it could perhaps give the manager thread a better chance of running and moving some objects to the worker queue.

At this state, this is just a potential optimization to avoid sending unnecessary wakeup messages. Also, as mentioned before, it is not entirely clear to me that this does not introduce some sort of bug.

return
self._awaken = True
self._writer.send_bytes(self._wakeup_msg)

def clear(self):
if not self._closed:
while self._reader.poll():
self._reader.recv_bytes()
if self._closed:
return
while self._reader.poll():
self._reader.recv_bytes()
self._awaken = False


def _python_exit():
Expand Down
75 changes: 3 additions & 72 deletions Lib/test/test_concurrent_futures/test_deadlock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import contextlib
import queue
import signal
import sys
import time
import unittest
Expand Down Expand Up @@ -203,7 +201,7 @@ def test_shutdown_deadlock(self):
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
self.executor = executor # Allow clean up in _fail_on_deadlock
f = executor.submit(_crash, delay=.1)
executor.shutdown(wait=True)
with self.assertRaises(BrokenProcessPool):
Expand All @@ -216,7 +214,7 @@ def test_shutdown_deadlock_pickle(self):
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
self.executor = executor # Allow clean up in _fail_on_deadlock

# Start the executor and get the executor_manager_thread to collect
# the threads and avoid dangling thread that should be cleaned up
Expand Down Expand Up @@ -244,79 +242,12 @@ def test_crash_big_data(self):
data = "a" * support.PIPE_MAX_SIZE
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
self.executor = executor # Allow clean up in _fail_on_deadlock
with self.assertRaises(BrokenProcessPool):
list(executor.map(_crash_with_data, [data] * 10))

executor.shutdown(wait=True)

def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
# fill up and block. See: https://github.com/python/cpython/issues/105829

# Lots of cargo culting while writing this test, apologies if
# something is really stupid...

self.executor.shutdown(wait=True)

if not hasattr(signal, 'alarm'):
raise unittest.SkipTest(
"Tested platform does not support the alarm signal")

def timeout(_signum, _frame):
import faulthandler
faulthandler.dump_traceback()

raise RuntimeError("timed out while submitting jobs?")

thread_run = futures.process._ExecutorManagerThread.run
def mock_run(self):
# Delay thread startup so the wakeup pipe can fill up and block
time.sleep(3)
thread_run(self)

class MockWakeup(_ThreadWakeup):
"""Mock wakeup object to force the wakeup to block"""
def __init__(self):
super().__init__()
self._dummy_queue = queue.Queue(maxsize=1)

def wakeup(self):
self._dummy_queue.put(None, block=True)
super().wakeup()

def clear(self):
try:
while True:
self._dummy_queue.get_nowait()
except queue.Empty:
super().clear()

with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
'run', mock_run),
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
MockWakeup)):
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock

job_num = 100
job_data = range(job_num)

# Need to use sigalarm for timeout detection because
# Executor.submit is not guarded by any timeout (both
# self._work_ids.put(self._queue_count) and
# self._executor_manager_thread_wakeup.wakeup() might
# timeout, maybe more?). In this specific case it was
# the wakeup call that deadlocked on a blocking pipe.
old_handler = signal.signal(signal.SIGALRM, timeout)
try:
signal.alarm(int(self.TIMEOUT))
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be kept in some form regardless of the implementation strategy used to solve the deadlock as we want to test that we have actually removed this bug. See 995a14c for a version that could be used to test for the issue if we use an implementation that relies on making sure we never block on the pipe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the purpose of the test. The test is supposed to check wakeup() implementation, but instead, it overrides wakeup() with a different implementation which blocks if it's called more than once. My change makes sure that calling wakeup() twice or more is safe.

The test also overrides _ExecutorManagerThread.run() whereas this method is responsible to check for the wakeup pipe and stop the loop when the wakeup pipe becomes non-empty. The test is supposed to check that concurrent.futures doesn't hang when it gets a lot of jobs, but instead it mocks parts of concurrent.futures which caused the bug.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote a new functional test instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the test is to test that we don't deadlock when the pipe blocks. In this version the test is a bit opinionated about what type of implementation it accepts. This was noted in the commit message but should probably have been documented in a comment in the test as well to be extra clear.

The point of this test version is to force wakeup to block and see that we still don't deadlock. This is done by augmenting the normal wakeup operation with an additional .put to a size 1 queue that will block on the second wakeup, which is more or less guaranteed since we also delay the startup of the _ExecutorManagerThread.

The initial test was less opinionated and only made sure to delay the _ExecutorManagerThread startup enough to allow the wakeup pipe to fill. The drawback with that version was runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, to be extra clear, as I noted in #109917 (comment), I believe the instability issue we are having in the test right now is due to the ordering of the dummy queue operation and the real wakeup pipe operations. Both primitives are thread safe but not done atomically as a single update and may interleave arbitrarily. With the current order of operations this can lead to an incorrect state where the dummy queue is full but the wakeup pipe is empty. By doing the swap suggested I think this can no longer happen in any possible operation interleaving. There is no indication that the actual implementation is incorrect, this looks like purely a test issue.



create_executor_tests(globals(), ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
46 changes: 46 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,52 @@ def mock_start_new_thread(func, *args):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()

def test_wakeup(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a deadlock test and should probably remain in the deadlock suite

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the test name is very generic and does not describe the purpose of the test

# gh-105829: Check that calling _ExecutorManagerThread wakeup() many
# times in ProcessPoolExecutor.submit() does not block if the
# _ThreadWakeup pipe becomes full.

def get_pipe_size(connection):
try:
import fcntl
return fcntl.fcntl(connection.fileno(), fcntl.F_GETPIPE_SZ)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all platforms support F_GETPIPE_SZ so this will fail on macos, etc. Fixed by:

                import fcntl
                from fcntl import F_GETPIPE_SZ  # Does not exist in all fcntl implementations
                return fcntl.fcntl(connection.fileno(), F_GETPIPE_SZ)

except ImportError:
# Assume 64 KiB pipe if we fail, makes test take longer
return 65_536

executor = self.executor
with executor:
# Summit a job to start the executor manager thread
# future = self.executor.submit(str, 12)
# future.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented code


# Wrap _ThreadWakeup.wakeup() to count how many times it has been
# called
thread_wakeup = executor._executor_manager_thread_wakeup
orig_wakeup = thread_wakeup.wakeup
nwakeup = 0
def wrap_wakeup():
nonlocal nwakeup
nwakeup += 1
orig_wakeup()
thread_wakeup.wakeup = wrap_wakeup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This substitution is now done after the executor has started. Is this safe, and if so, under what assumptions?


# Use longer "wakeup message" to make the hang more likely
# and to speed up the test
njob = self.worker_count * 2 # at least 2 jobs per worker
pipe_size = get_pipe_size(thread_wakeup._writer)
msg_len = min(pipe_size // njob, 512)
thread_wakeup._wakeup_msg = b'x' * msg_len
msg_size = 4 + len(thread_wakeup._wakeup_msg)

njob = pipe_size // msg_size + 10 # Add some margin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems quite confusing to me. First we specify the number of jobs we want and compute a message size based on that. Then we loop back around and recompute the number of jobs based on the message size. This feels circular and redundant. Personally, if we are going to set the message size ourselves, I would stop messing with the pipe and just make sure the data we send is overkill.

            # Use longer "wakeup message" to make the hang more likely
            # and to speed up the test. The true size will be slightly
            # larger than this due to overhead but that is irrelevant
            # for our purposes.
            msg_size = 2048
            thread_wakeup._wakeup_msg = b'x' * msg_size
            njob = 2 ** 20 // msg_size  # A megabyte should fill up most pipes
            job_data = range(njob)

With this, the test still takes just 0.2s and works the same on all platforms.

job_data = range(njob)
if support.verbose:
print(f"run {njob:,} jobs")

self.assertEqual(len(list(executor.map(int, job_data))), njob)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test wont fail gracefully without using sigalarm or some other mechanism to trigger a timeout in case of deadlock. Right now you have to wait a really long time (15 min?) for the test to fail.

self.assertGreaterEqual(nwakeup, njob)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue here is that the test relies on the startup time of the ExecutorManagerThread but we dont do anything here to make sure that this is sufficiently slow. I tested what happens if the thread had already started when we reach the test point and the test passed even with a broken implementation.

So either we could make sure the startup is slow or that the test fails regardless. I tried it with my suggested size/jobs setting above and that was sufficient to trigger failure even if the thread was started. As always, it is hard to judge how reliable that will be on other systems but I suspect it is overkill enough to fail in most cases.



create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down