Skip to content

Commit a77c413

Browse files
committed
gh-109047: concurrent.futures catches exc on add_call_item_to_queue()
concurrent.futures: The *executor manager thread* now catches exceptions when adding an item to the *call queue*. During Python finalization, creating a new thread can now raise RuntimeError. Catch the exception and call terminate_broken() in this case. Add test_python_finalization_error() to test_concurrent_futures. concurrent.futures._ExecutorManagerThread changes: * terminate_broken() no longer calls shutdown_workers() since the queue is no longer working anymore (read and write ends of the queue pipe are closed). * terminate_broken() now terminates child processes. * wait_result_broken_or_wakeup() now uses the short form (1 argument, not 3) of traceback.format_exception(). * _ExecutorManagerThread.terminate_broken() now holds shutdown_lock to prevent race conditons with ProcessPoolExecutor.submit(). multiprocessing.Queue changes: * Add _terminate_broken() method. * _start_thread() sets _thread to None on exception to prevent leaking "dangling threads" even if the thread was not started yet.
1 parent 3439cb0 commit a77c413

File tree

4 files changed

+83
-22
lines changed

4 files changed

+83
-22
lines changed

Lib/concurrent/futures/process.py

+30-17
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,14 @@ def run(self):
341341
# Main loop for the executor manager thread.
342342

343343
while True:
344-
self.add_call_item_to_queue()
344+
# gh-109047: During Python finalization, if self.call_queue.put()
345+
# tries to try to create a thread, it can fail with RuntimeError.
346+
try:
347+
self.add_call_item_to_queue()
348+
except BaseException as exc:
349+
cause = format_exception(exc)
350+
self.terminate_broken(cause)
351+
return
345352

346353
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
347354

@@ -425,8 +432,8 @@ def wait_result_broken_or_wakeup(self):
425432
try:
426433
result_item = result_reader.recv()
427434
is_broken = False
428-
except BaseException as e:
429-
cause = format_exception(type(e), e, e.__traceback__)
435+
except BaseException as exc:
436+
cause = format_exception(exc)
430437

431438
elif wakeup_reader in ready:
432439
is_broken = False
@@ -463,7 +470,7 @@ def is_shutting_down(self):
463470
return (_global_shutdown or executor is None
464471
or executor._shutdown_thread)
465472

466-
def terminate_broken(self, cause):
473+
def _terminate_broken_unlocked(self, cause):
467474
# Terminate the executor because it is in a broken state. The cause
468475
# argument can be used to display more information on the error that
469476
# lead the executor into becoming broken.
@@ -490,7 +497,7 @@ def terminate_broken(self, cause):
490497
for work_id, work_item in self.pending_work_items.items():
491498
try:
492499
work_item.future.set_exception(bpe)
493-
except _base.InvalidStateError as exc:
500+
except _base.InvalidStateError:
494501
# set_exception() fails if the future is cancelled: ignore it.
495502
# Trying to check if the future is cancelled before calling
496503
# set_exception() would leave a race condition if the future is
@@ -505,17 +512,14 @@ def terminate_broken(self, cause):
505512
for p in self.processes.values():
506513
p.terminate()
507514

508-
# Prevent queue writing to a pipe which is no longer read.
509-
# https://github.com/python/cpython/issues/94777
510-
self.call_queue._reader.close()
511-
512-
# gh-107219: Close the connection writer which can unblock
513-
# Queue._feed() if it was stuck in send_bytes().
514-
if sys.platform == 'win32':
515-
self.call_queue._writer.close()
515+
self.call_queue._terminate_broken()
516516

517517
# clean up resources
518-
self.join_executor_internals()
518+
self._join_executor_internals_unlocked(broken=True)
519+
520+
def terminate_broken(self, cause):
521+
with self.shutdown_lock:
522+
self._terminate_broken_unlocked(cause)
519523

520524
def flag_executor_shutting_down(self):
521525
# Flag the executor as shutting down and cancel remaining tasks if
@@ -558,15 +562,24 @@ def shutdown_workers(self):
558562
break
559563

560564
def join_executor_internals(self):
561-
self.shutdown_workers()
565+
with self.shutdown_lock:
566+
self._join_executor_internals_unlocked()
567+
568+
def _join_executor_internals_unlocked(self, broken=False):
569+
# If broken, call_queue was closed and is no longer usable
570+
if not broken:
571+
self.shutdown_workers()
572+
562573
# Release the queue's resources as soon as possible.
563574
self.call_queue.close()
564575
self.call_queue.join_thread()
565-
with self.shutdown_lock:
566-
self.thread_wakeup.close()
576+
self.thread_wakeup.close()
577+
567578
# If .join() is not called on the created processes then
568579
# some ctx.Queue methods may deadlock on Mac OS X.
569580
for p in self.processes.values():
581+
if broken:
582+
p.terminate()
570583
p.join()
571584

572585
def get_n_children_alive(self):

Lib/multiprocessing/queues.py

+23-5
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,20 @@ def cancel_join_thread(self):
158158
except AttributeError:
159159
pass
160160

161+
def _terminate_broken(self):
162+
# Close a Queue on error.
163+
164+
# gh-94777: Prevent queue writing to a pipe which is no longer read.
165+
self._reader.close()
166+
167+
# gh-107219: Close the connection writer which can unblock
168+
# Queue._feed() if it was stuck in send_bytes().
169+
if sys.platform == 'win32':
170+
self.call_queue._writer.close()
171+
172+
self.close()
173+
self.join_thread()
174+
161175
def _start_thread(self):
162176
debug('Queue._start_thread()')
163177

@@ -169,13 +183,17 @@ def _start_thread(self):
169183
self._wlock, self._reader.close, self._writer.close,
170184
self._ignore_epipe, self._on_queue_feeder_error,
171185
self._sem),
172-
name='QueueFeederThread'
186+
name='QueueFeederThread',
187+
daemon=True,
173188
)
174-
self._thread.daemon = True
175189

176-
debug('doing self._thread.start()')
177-
self._thread.start()
178-
debug('... done self._thread.start()')
190+
try:
191+
debug('doing self._thread.start()')
192+
self._thread.start()
193+
debug('... done self._thread.start()')
194+
except:
195+
self._thread = None
196+
raise
179197

180198
if not self._joincancelled:
181199
self._jointhread = Finalize(

Lib/test/test_concurrent_futures/test_process_pool.py

+26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import sys
3+
import threading
34
import time
45
import unittest
56
from concurrent import futures
@@ -187,6 +188,31 @@ def test_max_tasks_early_shutdown(self):
187188
for i, future in enumerate(futures):
188189
self.assertEqual(future.result(), mul(i, i))
189190

191+
def test_python_finalization_error(self):
192+
context = self.get_context()
193+
194+
# gh-109047: Create _ExecutorManagerThread, but block
195+
# QueueFeederThread. Mock the threading.start_new_thread() function
196+
# to inject RuntimeError: simulate the error raised during Python
197+
# finalization.
198+
orig_start_new_thread = threading._start_new_thread
199+
nthread = 0
200+
def mock_start_new_thread(func, *args):
201+
nonlocal nthread
202+
if nthread >= 1:
203+
raise RuntimeError("can't create new thread at "
204+
"interpreter shutdown")
205+
nthread += 1
206+
return orig_start_new_thread(func, *args)
207+
208+
with support.swap_attr(threading, '_start_new_thread',
209+
mock_start_new_thread):
210+
executor = self.executor_type(max_workers=2, mp_context=context)
211+
with executor:
212+
with self.assertRaises(BrokenProcessPool):
213+
list(executor.map(mul, [(2, 3)] * 10))
214+
executor.shutdown()
215+
190216

191217
create_executor_tests(globals(), ProcessPoolExecutorTest,
192218
executor_mixins=(ProcessPoolForkMixin,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
:mod:`concurrent.futures`: The *executor manager thread* now catches exceptions
2+
when adding an item to the *call queue*. During Python finalization, creating a
3+
new thread can now raise :exc:`RuntimeError`. Catch the exception and call
4+
``terminate_broken()`` in this case. Patch by Victor Stinner.

0 commit comments

Comments
 (0)