@@ -444,6 +444,12 @@ def __init__(self):
444
444
# A weak set of all asynchronous generators that are
445
445
# being iterated by the loop.
446
446
self ._asyncgens = weakref .WeakSet ()
447
+ # A strong set of asynchronous generators that are being closed
448
+ # by asyncgen_finalizer_hook
449
+ self ._closing_asyncgens = set ()
450
+ # A strong set of Handles for callbacks about to schedule async gen
451
+ # aclose tasks
452
+ self ._asyncgens_aclose_handles = set ()
447
453
# Set to True when `loop.shutdown_asyncgens` is called.
448
454
self ._asyncgens_shutdown_called = False
449
455
# Set to True when `loop.shutdown_default_executor` is called.
@@ -556,9 +562,15 @@ def _check_default_executor(self):
556
562
raise RuntimeError ('Executor shutdown has been called' )
557
563
558
564
def _asyncgen_finalizer_hook (self , agen ):
559
- self ._asyncgens .discard (agen )
565
+ self ._closing_asyncgens .add (agen )
566
+
567
+ def do_aclose ():
568
+ self ._closing_asyncgens .discard (agen )
569
+ self ._asyncgens_aclose_handles .discard (handle )
570
+ self .create_task (agen .aclose ())
571
+
560
572
if not self .is_closed ():
561
- self .call_soon_threadsafe (self . create_task , agen . aclose () )
573
+ handle = self .call_soon_threadsafe (do_aclose )
562
574
563
575
def _asyncgen_firstiter_hook (self , agen ):
564
576
if self ._asyncgens_shutdown_called :
@@ -573,13 +585,18 @@ async def shutdown_asyncgens(self):
573
585
"""Shutdown all active asynchronous generators."""
574
586
self ._asyncgens_shutdown_called = True
575
587
576
- if not len (self ._asyncgens ):
588
+ closing_agens = list (self ._asyncgens )
589
+ closing_agens .extend (self ._closing_asyncgens .copy ())
590
+ self ._closing_asyncgens .clear ()
591
+ self ._asyncgens .clear ()
592
+
593
+ if not closing_agens :
577
594
# If Python version is <3.6 or we don't have any asynchronous
578
595
# generators alive.
579
596
return
580
597
581
- closing_agens = list ( self ._asyncgens )
582
- self . _asyncgens . clear ()
598
+ for handle in self ._asyncgens_aclose_handles . copy ():
599
+ handle . cancel ()
583
600
584
601
results = await tasks .gather (
585
602
* [ag .aclose () for ag in closing_agens ],
0 commit comments