From 04295d851ef10488b7ad87b338272b2b30814915 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 11 Apr 2024 13:55:13 +0200 Subject: [PATCH 1/4] gh-117536: Fix asyncio _asyncgen_finalizer_hook() Make the finalization of asynchronous generators more reliable. Store a strong reference to the asynchronous generator which is being closed to make sure that shutdown_asyncgens() can close it even if asyncio.run() cancels all tasks. --- Lib/asyncio/base_events.py | 25 ++++++++++++++++--- Lib/test/test_asyncio/test_base_events.py | 21 ++++++++++++++++ ...-04-11-14-11-02.gh-issue-117536.0jp7ep.rst | 2 ++ 3 files changed, 44 insertions(+), 4 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2024-04-11-14-11-02.gh-issue-117536.0jp7ep.rst diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index f0e690b61a73dd..2072772124a922 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -444,6 +444,11 @@ def __init__(self): # A weak set of all asynchronous generators that are # being iterated by the loop. self._asyncgens = weakref.WeakSet() + + # Strong references to asynchronous generators which are being being + # closed by the loop: see _asyncgen_finalizer_hook(). + self._close_asyncgens = set() + # Set to True when `loop.shutdown_asyncgens` is called. self._asyncgens_shutdown_called = False # Set to True when `loop.shutdown_default_executor` is called. @@ -555,10 +560,22 @@ def _check_default_executor(self): if self._executor_shutdown_called: raise RuntimeError('Executor shutdown has been called') + async def _asyncgen_close(self, agen): + await agen.aclose() + self._close_asyncgens.discard(agen) + def _asyncgen_finalizer_hook(self, agen): + if self.is_closed(): + self._asyncgens.discard(agen) + return + + # gh-117536: Store a strong reference to the asynchronous generator + # to make sure that shutdown_asyncgens() can close it even if + # asyncio.run() cancels all tasks. + self._close_asyncgens.add(agen) self._asyncgens.discard(agen) - if not self.is_closed(): - self.call_soon_threadsafe(self.create_task, agen.aclose()) + + self.call_soon_threadsafe(self.create_task, self._asyncgen_close(agen)) def _asyncgen_firstiter_hook(self, agen): if self._asyncgens_shutdown_called: @@ -573,12 +590,12 @@ async def shutdown_asyncgens(self): """Shutdown all active asynchronous generators.""" self._asyncgens_shutdown_called = True - if not len(self._asyncgens): + closing_agens = list(set(self._asyncgens) | self._close_asyncgens) + if not closing_agens: # If Python version is <3.6 or we don't have any asynchronous # generators alive. return - closing_agens = list(self._asyncgens) self._asyncgens.clear() results = await tasks.gather( diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index c14a0bb180d79b..05924215fce72d 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1044,6 +1044,27 @@ def test_asyncgen_finalization_by_gc_in_other_thread(self): test_utils.run_briefly(self.loop) self.assertTrue(status['finalized']) + def test_shutdown_asyncgens(self): + # gh-117536: Test shutdown_asyncgens() using asyncio.run() which + # may cancel the task which closes the asynchronous generator before + # calling shutdown_asyncgens(). + + ns = {'state': 'not started'} + async def agen(): + try: + ns['state'] = 'started' + yield 0 + yield 1 + finally: + ns['state'] = 'finalized' + + async def reproducer(): + async for item in agen(): + break + + asyncio.run(reproducer()) + self.assertEqual(ns['state'], 'finalized') + class MyProto(asyncio.Protocol): done = None diff --git a/Misc/NEWS.d/next/Library/2024-04-11-14-11-02.gh-issue-117536.0jp7ep.rst b/Misc/NEWS.d/next/Library/2024-04-11-14-11-02.gh-issue-117536.0jp7ep.rst new file mode 100644 index 00000000000000..f2e28b12a5c17c --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-04-11-14-11-02.gh-issue-117536.0jp7ep.rst @@ -0,0 +1,2 @@ +:mod:`asyncio`: Make the finalization of asynchronous generators more +reliable. Patch by Victor Stinner. From a46668d665bfb321583fc32d96ac218df008daae Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 11 Apr 2024 15:38:26 +0200 Subject: [PATCH 2/4] Update test_asyncgen --- Lib/test/test_asyncgen.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/Lib/test/test_asyncgen.py b/Lib/test/test_asyncgen.py index 39605dca3886c8..923d7f8eb1af0d 100644 --- a/Lib/test/test_asyncgen.py +++ b/Lib/test/test_asyncgen.py @@ -1570,13 +1570,8 @@ async def main(): message, = messages self.assertIsInstance(message['exception'], ZeroDivisionError) - self.assertIn('unhandled exception during asyncio.run() shutdown', + self.assertIn('an error occurred during closing of asynchronous generator ', message['message']) - with self.assertWarnsRegex(RuntimeWarning, - f"coroutine method 'aclose' of '{async_iterate.__qualname__}' " - f"was never awaited"): - del message, messages - gc_collect() def test_async_gen_expression_01(self): async def arange(n): @@ -1630,10 +1625,6 @@ async def main(): asyncio.run(main()) self.assertEqual([], messages) - with self.assertWarnsRegex(RuntimeWarning, - f"coroutine method 'aclose' of '{async_iterate.__qualname__}' " - f"was never awaited"): - gc_collect() def test_async_gen_await_same_anext_coro_twice(self): async def async_iterate(): From 176b870e454b50c2d45bb3c7e0c66f6897ed2c83 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 11 Apr 2024 17:55:38 +0200 Subject: [PATCH 3/4] Add a second test --- Lib/test/test_asyncio/test_base_events.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 05924215fce72d..01d163d021a400 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1058,11 +1058,27 @@ async def agen(): finally: ns['state'] = 'finalized' - async def reproducer(): + async def reproducer(agen): async for item in agen(): break - asyncio.run(reproducer()) + asyncio.run(reproducer(agen)) + self.assertEqual(ns['state'], 'finalized') + + # Similar than previous test, but the generator uses 'await' in the + # finally block. + ns['state'] = 'not started' + async def agen_await(): + try: + ns['state'] = 'started' + yield 0 + yield 1 + finally: + ns['state'] = 'await' + await asyncio.sleep(0) + ns['state'] = 'finalized' + + asyncio.run(reproducer(agen_await)) self.assertEqual(ns['state'], 'finalized') From 4f52b05cfc46f28b90be82c20a13605e685c9826 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Thu, 11 Apr 2024 17:57:38 +0200 Subject: [PATCH 4/4] Use different function name to avoid confusion --- Lib/test/test_asyncio/test_base_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 01d163d021a400..bbcd0efcf01d2f 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -1050,7 +1050,7 @@ def test_shutdown_asyncgens(self): # calling shutdown_asyncgens(). ns = {'state': 'not started'} - async def agen(): + async def agen_basic(): try: ns['state'] = 'started' yield 0 @@ -1062,7 +1062,7 @@ async def reproducer(agen): async for item in agen(): break - asyncio.run(reproducer(agen)) + asyncio.run(reproducer(agen_basic)) self.assertEqual(ns['state'], 'finalized') # Similar than previous test, but the generator uses 'await' in the