From 2b88fa1a5bf0946645e7e1210312ebddc0f968ec Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sat, 6 Apr 2024 10:01:45 +0100 Subject: [PATCH] GH-117536: ensure asyncgens GCd before runner teardown get aclosed --- Lib/asyncio/base_events.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index f0e690b61a73dd..58a3b20fc4065b 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -444,6 +444,12 @@ def __init__(self): # A weak set of all asynchronous generators that are # being iterated by the loop. self._asyncgens = weakref.WeakSet() + # A strong set of asynchronous generators that are being closed + # by asyncgen_finalizer_hook + self._closing_asyncgens = set() + # A strong set of Handles for callbacks about to schedule async gen + # aclose tasks + self._asyncgens_aclose_handles = set() # Set to True when `loop.shutdown_asyncgens` is called. self._asyncgens_shutdown_called = False # Set to True when `loop.shutdown_default_executor` is called. @@ -556,9 +562,15 @@ def _check_default_executor(self): raise RuntimeError('Executor shutdown has been called') def _asyncgen_finalizer_hook(self, agen): - self._asyncgens.discard(agen) + self._closing_asyncgens.add(agen) + + def do_aclose(): + self._closing_asyncgens.discard(agen) + self._asyncgens_aclose_handles.discard(handle) + self.create_task(agen.aclose()) + if not self.is_closed(): - self.call_soon_threadsafe(self.create_task, agen.aclose()) + handle = self.call_soon_threadsafe(do_aclose) def _asyncgen_firstiter_hook(self, agen): if self._asyncgens_shutdown_called: @@ -573,13 +585,18 @@ async def shutdown_asyncgens(self): """Shutdown all active asynchronous generators.""" self._asyncgens_shutdown_called = True - if not len(self._asyncgens): + closing_agens = list(self._asyncgens) + closing_agens.extend(self._closing_asyncgens.copy()) + self._closing_asyncgens.clear() + self._asyncgens.clear() + + if not closing_agens: # If Python version is <3.6 or we don't have any asynchronous # generators alive. return - closing_agens = list(self._asyncgens) - self._asyncgens.clear() + for handle in self._asyncgens_aclose_handles.copy(): + handle.cancel() results = await tasks.gather( *[ag.aclose() for ag in closing_agens],