Skip to content

Commit 5d4bf8c

Browse files
elfstrompitrou
andcommitted
fixup! pythongh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock
This reverts the previous fix and instead opts to remove the locking completely when clearing the wakeup pipe. We can do this because clear() and close() are both called from the same thread and nowhere else. In this version of this fix, the call to ProcessPoolExecutor.submit can still block on the wakeup pipe if it happens to fill up. This should not be an issue as there are already other cases where the submit call can block and if the wakeup pipe is full it implies there is already a lot of work items queued up. Co-authored-by: Antoine Pitrou <[email protected]>
1 parent 995a14c commit 5d4bf8c

File tree

1 file changed

+16
-15
lines changed

1 file changed

+16
-15
lines changed

Lib/concurrent/futures/process.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,27 +69,21 @@ class _ThreadWakeup:
6969
def __init__(self):
7070
self._closed = False
7171
self._reader, self._writer = mp.Pipe(duplex=False)
72-
os.set_blocking(self._writer.fileno(), False)
7372

7473
def close(self):
74+
# Please note that we do not take the shutdown lock when
75+
# calling clear() (to avoid deadlocking) so this method can
76+
# only be called safely from the same thread as all calls to
77+
# clear() even if you hold the shutdown lock. Otherwise we
78+
# might try to read from the closed pipe.
7579
if not self._closed:
7680
self._closed = True
7781
self._writer.close()
7882
self._reader.close()
7983

8084
def wakeup(self):
8185
if not self._closed:
82-
try:
83-
self._writer.send_bytes(b"")
84-
except BlockingIOError:
85-
# Assuming BlockingIOError is only raised when there
86-
# is data in the pipe then we can skip the wake-up
87-
# here because we are holding the shutdown_lock and
88-
# the clear() call is also protected by this
89-
# lock. This means the reader will wake up again (or
90-
# is already awake) due to the existing data in the
91-
# pipe.
92-
pass
86+
self._writer.send_bytes(b"")
9387

9488
def clear(self):
9589
if not self._closed:
@@ -437,8 +431,12 @@ def wait_result_broken_or_wakeup(self):
437431
elif wakeup_reader in ready:
438432
is_broken = False
439433

440-
with self.shutdown_lock:
441-
self.thread_wakeup.clear()
434+
# No need to hold the _shutdown_lock here because:
435+
# 1. we're the only thread to use the wakeup reader
436+
# 2. we're also the only thread to call thread_wakeup.close()
437+
# 3. we want to avoid a possible deadlock when both reader and writer
438+
# would block (gh-105829)
439+
self.thread_wakeup.clear()
442440

443441
return result_item, is_broken, cause
444442

@@ -717,7 +715,10 @@ def __init__(self, max_workers=None, mp_context=None,
717715
# as it could result in a deadlock if a worker process dies with the
718716
# _result_queue write lock still acquired.
719717
#
720-
# _shutdown_lock must be locked to access _ThreadWakeup.
718+
# _shutdown_lock must be locked to access _ThreadWakeup.close() and
719+
# .wakeup(). Care must also be taken to not call clear or close from
720+
# more than one thread since _ThreadWakeup.clear() is not protected by
721+
# the _shutdown_lock
721722
self._executor_manager_thread_wakeup = _ThreadWakeup()
722723

723724
# Create communication channels for the executor

0 commit comments

Comments
 (0)