Skip to content

Commit bbb2a63

Browse files
committed
pythongh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock
This fixes issue python#105829, python#105829 The _ExecutorManagerThread wake-up code could deadlock if the wake-up pipe filled up and blocked. The relevant code looked like this: class _ThreadWakeup: def wakeup(self): if not self._closed: self._writer.send_bytes(b"") def clear(self): if not self._closed: while self._reader.poll(): self._reader.recv_bytes() class ProcessPoolExecutor(_base.Executor): def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: ... # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() class _ExecutorManagerThread(threading.Thread): def wait_result_broken_or_wakeup(self): ... with self.shutdown_lock: self.thread_wakeup.clear() The shutdown_lock must be taken for both reads and writes of the wake-up pipe. If a read or a write of the pipe blocks, the code will deadlock. It looks like reads can't block (a poll() is done before doing any reads) but writes have not protection against blocking. If the _ExecutorManagerThread cannot keep up and clear the wake-up pipe it will fill up and block. This seems to have been rather easy to do in the real world as long as the number of tasks is more than 100000 or so. With this change we make the writes to the wake-up pipe non blocking. If the pipe blocks we will simply skip the write. This should be OK since the reason for the problem is that both reader and writer must hold the shutdown_lock when accessing the pipe. That should imply that we don't need to send anything if the pipe is full, the reader can't be reading it concurrently, it will eventually wake up due to the data already in the pipe.
1 parent 8f440a2 commit bbb2a63

File tree

1 file changed

+12
-1
lines changed

1 file changed

+12
-1
lines changed

Lib/concurrent/futures/process.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class _ThreadWakeup:
6969
def __init__(self):
7070
self._closed = False
7171
self._reader, self._writer = mp.Pipe(duplex=False)
72+
os.set_blocking(self._writer.fileno(), False)
7273

7374
def close(self):
7475
if not self._closed:
@@ -78,7 +79,17 @@ def close(self):
7879

7980
def wakeup(self):
8081
if not self._closed:
81-
self._writer.send_bytes(b"")
82+
try:
83+
self._writer.send_bytes(b"")
84+
except BlockingIOError:
85+
# Assuming BlockingIOError is only raised when there
86+
# is data in the pipe then we can skip the wake-up
87+
# here because we are holding the shutdown_lock and
88+
# the clear() call is also protected by this
89+
# lock. This means the reader will wake up again (or
90+
# is already awake) due to the existing data in the
91+
# pipe.
92+
pass
8293

8394
def clear(self):
8495
if not self._closed:

0 commit comments

Comments
 (0)