@@ -96,6 +96,7 @@ def __init__(
96
96
) -> None :
97
97
self ._params = params
98
98
self .on_stop : Optional [Callable [[], Coroutine [Any , Any , None ]]] = on_stop
99
+ self ._on_stop_task : Optional [asyncio .Task [None ]] = None
99
100
self ._socket : Optional [socket .socket ] = None
100
101
self ._frame_helper : Optional [APIFrameHelper ] = None
101
102
self ._api_version : Optional [APIVersion ] = None
@@ -142,6 +143,10 @@ def _cleanup(self) -> None:
142
143
self ._connect_task .cancel ()
143
144
self ._connect_task = None
144
145
146
+ if self ._keep_alive_task is not None :
147
+ self ._keep_alive_task .cancel ()
148
+ self ._keep_alive_task = None
149
+
145
150
if self ._frame_helper is not None :
146
151
self ._frame_helper .close ()
147
152
self ._frame_helper = None
@@ -151,8 +156,19 @@ def _cleanup(self) -> None:
151
156
self ._socket = None
152
157
153
158
if self .on_stop and self ._connect_complete :
159
+
160
+ def _remove_on_stop_task ():
161
+ """Remove the stop task from the reconnect loop.
162
+
163
+ We need to do this because the asyncio does not hold
164
+ a strong reference to the task, so it can be garbage
165
+ collected unexpectedly.
166
+ """
167
+ self ._on_stop_task = None
168
+
154
169
# Ensure on_stop is called only once
155
- asyncio .create_task (self .on_stop ())
170
+ self ._on_stop_task = asyncio .create_task (self .on_stop ())
171
+ self ._on_stop_task .add_done_callback (_remove_on_stop_task )
156
172
self .on_stop = None
157
173
158
174
# Note: we don't explicitly cancel the ping/read task here
@@ -318,7 +334,7 @@ async def _keep_alive_loop() -> None:
318
334
self ._report_fatal_error (err )
319
335
return
320
336
321
- asyncio .create_task (_keep_alive_loop ())
337
+ self . _keep_alive_task = asyncio .create_task (_keep_alive_loop ())
322
338
323
339
async def connect (self , * , login : bool ) -> None :
324
340
if self ._connection_state != ConnectionState .INITIALIZED :
0 commit comments