Skip to content

Commit 2f5a108

Browse files
committed
Add support for reconnecting automatically.
Fix #414.
1 parent ebc8448 commit 2f5a108

File tree

6 files changed

+149
-7
lines changed

6 files changed

+149
-7
lines changed

docs/howto/logging.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ Here's what websockets logs at each level.
210210

211211
* Server starting and stopping
212212
* Server establishing and closing connections
213+
* Client reconnecting automatically
213214

214215
:attr:`~logging.DEBUG`
215216
......................

docs/project/changelog.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ They may change at any time.
5151

5252
* Added compatibility with Python 3.10.
5353

54+
* Added support for reconnecting automatically by using
55+
:func:`~legacy.client.connect` as an asynchronous iterator.
56+
5457
* Added ``open_timeout`` to :func:`~legacy.client.connect`.
5558

5659
* Improved logging.

docs/spelling_wordlist.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ autoscaler
66
awaitable
77
aymeric
88
backend
9+
backoff
910
backpressure
1011
balancer
1112
balancers
@@ -52,6 +53,7 @@ pong
5253
pongs
5354
proxying
5455
pythonic
56+
reconnection
5557
redis
5658
retransmit
5759
runtime

src/websockets/legacy/client.py

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,18 @@
99
import logging
1010
import warnings
1111
from types import TracebackType
12-
from typing import Any, Callable, Generator, List, Optional, Sequence, Tuple, Type, cast
12+
from typing import (
13+
Any,
14+
AsyncIterator,
15+
Callable,
16+
Generator,
17+
List,
18+
Optional,
19+
Sequence,
20+
Tuple,
21+
Type,
22+
cast,
23+
)
1324

1425
from ..datastructures import Headers, HeadersLike
1526
from ..exceptions import (
@@ -413,12 +424,23 @@ class Connect:
413424
Awaiting :func:`connect` yields a :class:`WebSocketClientProtocol` which
414425
can then be used to send and receive messages.
415426
416-
:func:`connect` can also be used as a asynchronous context manager::
427+
:func:`connect` can be used as a asynchronous context manager::
417428
418429
async with connect(...) as websocket:
419430
...
420431
421-
In that case, the connection is closed when exiting the context.
432+
The connection is closed automatically when exiting the context.
433+
434+
:func:`connect` can be used as an infinite asynchronous iterator to
435+
reconnect automatically on errors::
436+
437+
async for websocket in connect(...):
438+
...
439+
440+
You must catch all exceptions, or else you will exit the loop prematurely.
441+
As above, connections are closed automatically. Connection attempts are
442+
delayed with exponential backoff, starting at three seconds and
443+
increasing up to one minute.
422444
423445
:func:`connect` is a wrapper around the event loop's
424446
:meth:`~asyncio.loop.create_connection` method. Unknown keyword arguments
@@ -577,6 +599,10 @@ def __init__(
577599
)
578600

579601
self.open_timeout = open_timeout
602+
if logger is None:
603+
logger = logging.getLogger("websockets.client")
604+
self.logger = logger
605+
580606
# This is a coroutine function.
581607
self._create_connection = create_connection
582608
self._wsuri = wsuri
@@ -615,7 +641,38 @@ def handle_redirect(self, uri: str) -> None:
615641
# Set the new WebSocket URI. This suffices for same-origin redirects.
616642
self._wsuri = new_wsuri
617643

618-
# async with connect(...)
644+
# async for ... in connect(...):
645+
646+
BACKOFF_MIN = 2.0
647+
BACKOFF_MAX = 60.0
648+
BACKOFF_FACTOR = 1.5
649+
650+
async def __aiter__(self) -> AsyncIterator[WebSocketClientProtocol]:
651+
backoff_delay = self.BACKOFF_MIN
652+
while True:
653+
try:
654+
async with self as protocol:
655+
yield protocol
656+
# Remove this branch when dropping support for Python < 3.8
657+
# because CancelledError no longer inherits Exception.
658+
except asyncio.CancelledError: # pragma: no cover
659+
raise
660+
except Exception:
661+
# Connection timed out - increase backoff delay
662+
backoff_delay = backoff_delay * self.BACKOFF_FACTOR
663+
backoff_delay = min(backoff_delay, self.BACKOFF_MAX)
664+
self.logger.info(
665+
"! connect failed; retrying in %d seconds",
666+
int(backoff_delay),
667+
exc_info=True,
668+
)
669+
await asyncio.sleep(backoff_delay)
670+
continue
671+
else:
672+
# Connection succeeded - reset backoff delay
673+
backoff_delay = self.BACKOFF_MIN
674+
675+
# async with connect(...) as ...:
619676

620677
async def __aenter__(self) -> WebSocketClientProtocol:
621678
return await self
@@ -628,7 +685,7 @@ async def __aexit__(
628685
) -> None:
629686
await self.protocol.close()
630687

631-
# await connect(...)
688+
# ... = await connect(...)
632689

633690
def __await__(self) -> Generator[Any, None, WebSocketClientProtocol]:
634691
# Create a suitable iterator by calling __await__ on a coroutine.
@@ -665,7 +722,7 @@ async def __await_impl__(self) -> WebSocketClientProtocol:
665722
else:
666723
raise SecurityError("too many redirects")
667724

668-
# yield from connect(...)
725+
# ... = yield from connect(...)
669726

670727
__iter__ = __await__
671728

src/websockets/legacy/server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,7 @@ class Serve:
904904
:exc:`~websockets.exceptions.ConnectionClosedOK` exception on their
905905
current or next interaction with the WebSocket connection.
906906
907-
:func:`serve` can also be used as an asynchronous context manager::
907+
:func:`serve` can be used as an asynchronous context manager::
908908
909909
stop = asyncio.Future() # set this future to exit the server
910910

tests/legacy/test_client_server.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,6 +1474,85 @@ async def run_client():
14741474
self.assertEqual(messages, self.MESSAGES)
14751475

14761476

1477+
class ReconnectionTests(ClientServerTestsMixin, AsyncioTestCase):
1478+
async def echo_handler(ws, path):
1479+
async for msg in ws:
1480+
await ws.send(msg)
1481+
1482+
service_available = True
1483+
1484+
async def maybe_service_unavailable(path, headers):
1485+
if not ReconnectionTests.service_available:
1486+
return http.HTTPStatus.SERVICE_UNAVAILABLE, [], b""
1487+
1488+
async def disable_server(self, duration):
1489+
ReconnectionTests.service_available = False
1490+
await asyncio.sleep(duration)
1491+
ReconnectionTests.service_available = True
1492+
1493+
@with_server(handler=echo_handler, process_request=maybe_service_unavailable)
1494+
def test_reconnect(self):
1495+
# Big, ugly integration test :-(
1496+
1497+
async def run_client():
1498+
iteration = 0
1499+
connect_inst = connect(get_server_uri(self.server))
1500+
connect_inst.BACKOFF_MIN = 10 * MS
1501+
connect_inst.BACKOFF_MAX = 200 * MS
1502+
async for ws in connect_inst:
1503+
await ws.send("spam")
1504+
msg = await ws.recv()
1505+
self.assertEqual(msg, "spam")
1506+
1507+
iteration += 1
1508+
if iteration == 1:
1509+
# Exit block normally.
1510+
pass
1511+
elif iteration == 2:
1512+
# Disable server for a little bit
1513+
asyncio.create_task(self.disable_server(70 * MS))
1514+
await asyncio.sleep(0)
1515+
elif iteration == 3:
1516+
# Exit block after catching connection error.
1517+
server_ws = next(iter(self.server.websockets))
1518+
await server_ws.close()
1519+
with self.assertRaises(ConnectionClosed):
1520+
await ws.recv()
1521+
else:
1522+
# Exit block with an exception.
1523+
raise Exception("BOOM!")
1524+
1525+
with self.assertLogs("websockets", logging.INFO) as logs:
1526+
with self.assertRaisesRegex(Exception, "BOOM!"):
1527+
self.loop.run_until_complete(run_client())
1528+
1529+
self.assertEqual(
1530+
[record.getMessage() for record in logs.records],
1531+
[
1532+
# Iteration 1
1533+
"connection open",
1534+
"connection closed",
1535+
# Iteration 2
1536+
"connection open",
1537+
"connection closed",
1538+
# Iteration 3
1539+
"connection failed (503 Service Unavailable)",
1540+
"connection closed",
1541+
"! connect failed; retrying in 0 seconds",
1542+
"connection failed (503 Service Unavailable)",
1543+
"connection closed",
1544+
"! connect failed; retrying in 0 seconds",
1545+
"connection failed (503 Service Unavailable)",
1546+
"connection closed",
1547+
"! connect failed; retrying in 0 seconds",
1548+
"connection open",
1549+
"connection closed",
1550+
# Iteration 4
1551+
"connection open",
1552+
],
1553+
)
1554+
1555+
14771556
class LoggerTests(ClientServerTestsMixin, AsyncioTestCase):
14781557
def test_logger_client(self):
14791558
with self.assertLogs("test.server", logging.DEBUG) as server_logs:

0 commit comments

Comments
 (0)