Skip to content

Commit 3137689

Browse files
committed
gh-109047: concurrent.futures catches exc on add_call_item_to_queue()
concurrent.futures: The *executor manager thread* now catches exceptions when adding an item to the *call queue*. During Python finalization, creating a new thread can now raise RuntimeError. Catch the exception and call terminate_broken() in this case. Add test_python_finalization_error() to test_concurrent_futures. concurrent.futures._ExecutorManagerThread changes: * terminate_broken() no longer calls shutdown_workers() since the queue is no longer working anymore (read and write ends of the queue pipe are closed). * terminate_broken() now terminates child processes. * wait_result_broken_or_wakeup() now uses the short form of traceback.format_exception(). * _ExecutorManagerThread.terminate_broken() now holds shutdown_lock to prevent race conditons with ProcessPoolExecutor.submit(). ProcessPoolExecutor changes: * ProcessPoolExecutor.submit() now starts by checking if the executor is broken. multiprocessing.Queue changes: * Add _terminate_broken() method. * _start_thread() sets _thread to None on exception to prevent leaking "dangling threads" even if the thread was not started yet.
1 parent 3439cb0 commit 3137689

File tree

4 files changed

+84
-23
lines changed

4 files changed

+84
-23
lines changed

Lib/concurrent/futures/process.py

+31-18
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,12 @@ def run(self):
341341
# Main loop for the executor manager thread.
342342

343343
while True:
344-
self.add_call_item_to_queue()
344+
try:
345+
self.add_call_item_to_queue()
346+
except BaseException as exc:
347+
cause = format_exception(exc)
348+
self.terminate_broken(cause)
349+
return
345350

346351
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
347352

@@ -425,8 +430,8 @@ def wait_result_broken_or_wakeup(self):
425430
try:
426431
result_item = result_reader.recv()
427432
is_broken = False
428-
except BaseException as e:
429-
cause = format_exception(type(e), e, e.__traceback__)
433+
except BaseException as exc:
434+
cause = format_exception(exc)
430435

431436
elif wakeup_reader in ready:
432437
is_broken = False
@@ -463,7 +468,7 @@ def is_shutting_down(self):
463468
return (_global_shutdown or executor is None
464469
or executor._shutdown_thread)
465470

466-
def terminate_broken(self, cause):
471+
def _terminate_broken(self, cause):
467472
# Terminate the executor because it is in a broken state. The cause
468473
# argument can be used to display more information on the error that
469474
# lead the executor into becoming broken.
@@ -505,17 +510,14 @@ def terminate_broken(self, cause):
505510
for p in self.processes.values():
506511
p.terminate()
507512

508-
# Prevent queue writing to a pipe which is no longer read.
509-
# https://github.com/python/cpython/issues/94777
510-
self.call_queue._reader.close()
511-
512-
# gh-107219: Close the connection writer which can unblock
513-
# Queue._feed() if it was stuck in send_bytes().
514-
if sys.platform == 'win32':
515-
self.call_queue._writer.close()
513+
self.call_queue._terminate_broken()
516514

517515
# clean up resources
518-
self.join_executor_internals()
516+
self._join_executor_internals(broken=True)
517+
518+
def terminate_broken(self, cause):
519+
with self.shutdown_lock:
520+
self._terminate_broken(cause)
519521

520522
def flag_executor_shutting_down(self):
521523
# Flag the executor as shutting down and cancel remaining tasks if
@@ -558,15 +560,21 @@ def shutdown_workers(self):
558560
break
559561

560562
def join_executor_internals(self):
561-
self.shutdown_workers()
563+
with self.shutdown_lock:
564+
self._join_executor_internals()
565+
566+
def _join_executor_internals(self, broken=False):
567+
if not broken:
568+
self.shutdown_workers()
562569
# Release the queue's resources as soon as possible.
563570
self.call_queue.close()
564571
self.call_queue.join_thread()
565-
with self.shutdown_lock:
566-
self.thread_wakeup.close()
572+
self.thread_wakeup.close()
567573
# If .join() is not called on the created processes then
568574
# some ctx.Queue methods may deadlock on Mac OS X.
569575
for p in self.processes.values():
576+
if broken:
577+
p.terminate()
570578
p.join()
571579

572580
def get_n_children_alive(self):
@@ -773,7 +781,13 @@ def _launch_processes(self):
773781
for _ in range(len(self._processes), self._max_workers):
774782
self._spawn_process()
775783

784+
def _check_broken(self):
785+
if self._broken:
786+
raise BrokenProcessPool(self._broken)
787+
776788
def _spawn_process(self):
789+
self._check_broken()
790+
777791
p = self._mp_context.Process(
778792
target=_process_worker,
779793
args=(self._call_queue,
@@ -786,8 +800,7 @@ def _spawn_process(self):
786800

787801
def submit(self, fn, /, *args, **kwargs):
788802
with self._shutdown_lock:
789-
if self._broken:
790-
raise BrokenProcessPool(self._broken)
803+
self._check_broken()
791804
if self._shutdown_thread:
792805
raise RuntimeError('cannot schedule new futures after shutdown')
793806
if _global_shutdown:

Lib/multiprocessing/queues.py

+23-5
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,20 @@ def cancel_join_thread(self):
158158
except AttributeError:
159159
pass
160160

161+
def _terminate_broken(self):
162+
# Close a Queue on error.
163+
164+
# gh-94777: Prevent queue writing to a pipe which is no longer read.
165+
self._reader.close()
166+
167+
# gh-107219: Close the connection writer which can unblock
168+
# Queue._feed() if it was stuck in send_bytes().
169+
if sys.platform == 'win32':
170+
self.call_queue._writer.close()
171+
172+
self.close()
173+
self.join_thread()
174+
161175
def _start_thread(self):
162176
debug('Queue._start_thread()')
163177

@@ -169,13 +183,17 @@ def _start_thread(self):
169183
self._wlock, self._reader.close, self._writer.close,
170184
self._ignore_epipe, self._on_queue_feeder_error,
171185
self._sem),
172-
name='QueueFeederThread'
186+
name='QueueFeederThread',
187+
daemon=True,
173188
)
174-
self._thread.daemon = True
175189

176-
debug('doing self._thread.start()')
177-
self._thread.start()
178-
debug('... done self._thread.start()')
190+
try:
191+
debug('doing self._thread.start()')
192+
self._thread.start()
193+
debug('... done self._thread.start()')
194+
except:
195+
self._thread = None
196+
raise
179197

180198
if not self._joincancelled:
181199
self._jointhread = Finalize(

Lib/test/test_concurrent_futures/test_process_pool.py

+26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import sys
3+
import threading
34
import time
45
import unittest
56
from concurrent import futures
@@ -187,6 +188,31 @@ def test_max_tasks_early_shutdown(self):
187188
for i, future in enumerate(futures):
188189
self.assertEqual(future.result(), mul(i, i))
189190

191+
def test_python_finalization_error(self):
192+
context = self.get_context()
193+
194+
# gh-109047: Create _ExecutorManagerThread, but block
195+
# QueueFeederThread. Mock the threading.start_new_thread() function
196+
# to inject RuntimeError: simulate the error raised during Python
197+
# finalization.
198+
orig_start_new_thread = threading._start_new_thread
199+
nthread = 0
200+
def mock_start_new_thread(func, *args):
201+
nonlocal nthread
202+
if nthread >= 1:
203+
raise RuntimeError("can't create new thread at "
204+
"interpreter shutdown")
205+
nthread += 1
206+
return orig_start_new_thread(func, *args)
207+
208+
with support.swap_attr(threading, '_start_new_thread',
209+
mock_start_new_thread):
210+
executor = self.executor_type(max_workers=2, mp_context=context)
211+
with executor:
212+
with self.assertRaises(BrokenProcessPool):
213+
list(executor.map(mul, [(2, 3)] * 10))
214+
executor.shutdown()
215+
190216

191217
create_executor_tests(globals(), ProcessPoolExecutorTest,
192218
executor_mixins=(ProcessPoolForkMixin,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions
2+
when adding an item to the *call queue*. During Python finalization, creating a
3+
new thread can now raise :exc:`RuntimeError`. Catch the exception and call
4+
``terminate_broken()`` in this case. Patch by Victor Stinner.

0 commit comments

Comments
 (0)