Skip to content

Commit 12a7d0a

Browse files
authored
Fix race condition in websocket transport close (#133)
1 parent fce2782 commit 12a7d0a

File tree

1 file changed

+55
-5
lines changed

1 file changed

+55
-5
lines changed

gql/transport/websockets.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,8 @@ async def connect(self) -> None:
482482

483483
GRAPHQLWS_SUBPROTOCOL: Subprotocol = cast(Subprotocol, "graphql-ws")
484484

485+
log.debug("connect: starting")
486+
485487
if self.websocket is None and not self._connecting:
486488

487489
# Set connecting to True to avoid a race condition if user is trying
@@ -537,6 +539,8 @@ async def connect(self) -> None:
537539
else:
538540
raise TransportAlreadyConnected("Transport is already connected")
539541

542+
log.debug("connect: done")
543+
540544
async def _clean_close(self, e: Exception) -> None:
541545
"""Coroutine which will:
542546
@@ -569,35 +573,81 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:
569573
- close the websocket connection
570574
- send the exception to all the remaining listeners
571575
"""
572-
if self.websocket:
576+
577+
log.debug("_close_coro: starting")
578+
579+
try:
580+
581+
# We should always have an active websocket connection here
582+
assert self.websocket is not None
573583

574584
# Saving exception to raise it later if trying to use the transport
575585
# after it has already closed.
576586
self.close_exception = e
577587

578588
if clean_close:
579-
await self._clean_close(e)
589+
log.debug("_close_coro: starting clean_close")
590+
try:
591+
await self._clean_close(e)
592+
except Exception as exc: # pragma: no cover
593+
log.warning("Ignoring exception in _clean_close: " + repr(exc))
594+
595+
log.debug("_close_coro: sending exception to listeners")
580596

581597
# Send an exception to all remaining listeners
582598
for query_id, listener in self.listeners.items():
583599
await listener.set_exception(e)
584600

601+
log.debug("_close_coro: close websocket connection")
602+
585603
await self.websocket.close()
586604

587-
self.websocket = None
605+
log.debug("_close_coro: websocket connection closed")
606+
607+
except Exception as exc: # pragma: no cover
608+
log.warning("Exception catched in _close_coro: " + repr(exc))
609+
610+
finally:
588611

612+
log.debug("_close_coro: start cleanup")
613+
614+
self.websocket = None
589615
self.close_task = None
616+
590617
self._wait_closed.set()
591618

619+
log.debug("_close_coro: exiting")
620+
592621
async def _fail(self, e: Exception, clean_close: bool = True) -> None:
622+
log.debug("_fail: starting with exception: " + repr(e))
623+
593624
if self.close_task is None:
594-
self.close_task = asyncio.shield(
595-
asyncio.ensure_future(self._close_coro(e, clean_close=clean_close))
625+
626+
if self.websocket is None:
627+
log.debug("_fail started with self.websocket == None -> already closed")
628+
else:
629+
self.close_task = asyncio.shield(
630+
asyncio.ensure_future(self._close_coro(e, clean_close=clean_close))
631+
)
632+
else:
633+
log.debug(
634+
"close_task is not None in _fail. Previous exception is: "
635+
+ repr(self.close_exception)
636+
+ " New exception is: "
637+
+ repr(e)
596638
)
597639

598640
async def close(self) -> None:
641+
log.debug("close: starting")
642+
599643
await self._fail(TransportClosed("Websocket GraphQL transport closed by user"))
600644
await self.wait_closed()
601645

646+
log.debug("close: done")
647+
602648
async def wait_closed(self) -> None:
649+
log.debug("wait_close: starting")
650+
603651
await self._wait_closed.wait()
652+
653+
log.debug("wait_close: done")

0 commit comments

Comments
 (0)