Skip to content

Commit 29094a2

Browse files
committed
Add support for reconnecting automatically.
Fix #414.
1 parent cb11516 commit 29094a2

File tree

6 files changed

+149
-7
lines changed

6 files changed

+149
-7
lines changed

docs/howto/logging.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ Here's what websockets logs at each level.
210210

211211
* Server starting and stopping
212212
* Server establishing and closing connections
213+
* Client reconnecting automatically
213214

214215
:attr:`~logging.DEBUG`
215216
......................

docs/project/changelog.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ They may change at any time.
5151

5252
* Added compatibility with Python 3.10.
5353

54+
* Added support for reconnecting automatically by using
55+
:func:`~legacy.client.connect` as an asynchronous iterator.
56+
5457
* Added ``open_timeout`` to :func:`~legacy.client.connect`.
5558

5659
* Improved logging.

docs/spelling_wordlist.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ autoscaler
66
awaitable
77
aymeric
88
backend
9+
backoff
910
backpressure
1011
balancer
1112
balancers
@@ -52,6 +53,7 @@ pong
5253
pongs
5354
proxying
5455
pythonic
56+
reconnection
5557
redis
5658
retransmit
5759
runtime

src/websockets/legacy/client.py

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,18 @@
99
import logging
1010
import warnings
1111
from types import TracebackType
12-
from typing import Any, Callable, Generator, List, Optional, Sequence, Tuple, Type, cast
12+
from typing import (
13+
Any,
14+
AsyncIterator,
15+
Callable,
16+
Generator,
17+
List,
18+
Optional,
19+
Sequence,
20+
Tuple,
21+
Type,
22+
cast,
23+
)
1324

1425
from ..datastructures import Headers, HeadersLike
1526
from ..exceptions import (
@@ -412,12 +423,23 @@ class Connect:
412423
Awaiting :func:`connect` yields a :class:`WebSocketClientProtocol` which
413424
can then be used to send and receive messages.
414425
415-
:func:`connect` can also be used as a asynchronous context manager::
426+
:func:`connect` can be used as a asynchronous context manager::
416427
417428
async with connect(...) as websocket:
418429
...
419430
420-
In that case, the connection is closed when exiting the context.
431+
The connection is closed automatically when exiting the context.
432+
433+
:func:`connect` can be used as an infinite asynchronous iterator to
434+
reconnect automatically on errors::
435+
436+
async for websocket in connect(...):
437+
...
438+
439+
You must catch all exceptions, or else you will exit the loop prematurely.
440+
As above, connections are closed automatically. Connection attempts are
441+
delayed with exponential backoff, starting at three seconds and
442+
increasing up to one minute.
421443
422444
:func:`connect` is a wrapper around the event loop's
423445
:meth:`~asyncio.loop.create_connection` method. Unknown keyword arguments
@@ -577,6 +599,10 @@ def __init__(
577599
)
578600

579601
self.open_timeout = open_timeout
602+
if logger is None:
603+
logger = logging.getLogger("websockets.client")
604+
self.logger = logger
605+
580606
# This is a coroutine function.
581607
self._create_connection = create_connection
582608
self._wsuri = wsuri
@@ -615,7 +641,38 @@ def handle_redirect(self, uri: str) -> None:
615641
# Set the new WebSocket URI. This suffices for same-origin redirects.
616642
self._wsuri = new_wsuri
617643

618-
# async with connect(...)
644+
# async for ... in connect(...):
645+
646+
BACKOFF_MIN = 2.0
647+
BACKOFF_MAX = 60.0
648+
BACKOFF_FACTOR = 1.5
649+
650+
async def __aiter__(self) -> AsyncIterator[WebSocketClientProtocol]:
651+
backoff_delay = self.BACKOFF_MIN
652+
while True:
653+
try:
654+
async with self as protocol:
655+
yield protocol
656+
# Remove this branch when dropping support for Python < 3.8
657+
# because CancelledError no longer inherits Exception.
658+
except asyncio.CancelledError: # pragma: no cover
659+
raise
660+
except Exception:
661+
# Connection timed out - increase backoff delay
662+
backoff_delay = backoff_delay * self.BACKOFF_FACTOR
663+
backoff_delay = min(backoff_delay, self.BACKOFF_MAX)
664+
self.logger.info(
665+
"! connect failed; retrying in %d seconds",
666+
int(backoff_delay),
667+
exc_info=True,
668+
)
669+
await asyncio.sleep(backoff_delay)
670+
continue
671+
else:
672+
# Connection succeeded - reset backoff delay
673+
backoff_delay = self.BACKOFF_MIN
674+
675+
# async with connect(...) as ...:
619676

620677
async def __aenter__(self) -> WebSocketClientProtocol:
621678
return await self
@@ -628,7 +685,7 @@ async def __aexit__(
628685
) -> None:
629686
await self.protocol.close()
630687

631-
# await connect(...)
688+
# ... = await connect(...)
632689

633690
def __await__(self) -> Generator[Any, None, WebSocketClientProtocol]:
634691
# Create a suitable iterator by calling __await__ on a coroutine.
@@ -665,7 +722,7 @@ async def __await_impl__(self) -> WebSocketClientProtocol:
665722
else:
666723
raise SecurityError("too many redirects")
667724

668-
# yield from connect(...)
725+
# ... = yield from connect(...)
669726

670727
__iter__ = __await__
671728

src/websockets/legacy/server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,7 @@ class Serve:
904904
:exc:`~websockets.exceptions.ConnectionClosedOK` exception on their
905905
current or next interaction with the WebSocket connection.
906906
907-
:func:`serve` can also be used as an asynchronous context manager::
907+
:func:`serve` can be used as an asynchronous context manager::
908908
909909
stop = asyncio.Future() # set this future to exit the server
910910

tests/legacy/test_client_server.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,6 +1471,85 @@ async def run_client():
14711471
self.assertEqual(messages, self.MESSAGES)
14721472

14731473

1474+
class ReconnectionTests(ClientServerTestsMixin, AsyncioTestCase):
1475+
async def echo_handler(ws, path):
1476+
async for msg in ws:
1477+
await ws.send(msg)
1478+
1479+
service_available = True
1480+
1481+
async def maybe_service_unavailable(path, headers):
1482+
if not ReconnectionTests.service_available:
1483+
return http.HTTPStatus.SERVICE_UNAVAILABLE, [], b""
1484+
1485+
async def disable_server(self, duration):
1486+
ReconnectionTests.service_available = False
1487+
await asyncio.sleep(duration)
1488+
ReconnectionTests.service_available = True
1489+
1490+
@with_server(handler=echo_handler, process_request=maybe_service_unavailable)
1491+
def test_reconnect(self):
1492+
# Big, ugly integration test :-(
1493+
1494+
async def run_client():
1495+
iteration = 0
1496+
connect_inst = connect(get_server_uri(self.server))
1497+
connect_inst.BACKOFF_MIN = 10 * MS
1498+
connect_inst.BACKOFF_MAX = 200 * MS
1499+
async for ws in connect_inst:
1500+
await ws.send("spam")
1501+
msg = await ws.recv()
1502+
self.assertEqual(msg, "spam")
1503+
1504+
iteration += 1
1505+
if iteration == 1:
1506+
# Exit block normally.
1507+
pass
1508+
elif iteration == 2:
1509+
# Disable server for a little bit
1510+
asyncio.create_task(self.disable_server(70 * MS))
1511+
await asyncio.sleep(0)
1512+
elif iteration == 3:
1513+
# Exit block after catching connection error.
1514+
server_ws = next(iter(self.server.websockets))
1515+
await server_ws.close()
1516+
with self.assertRaises(ConnectionClosed):
1517+
await ws.recv()
1518+
else:
1519+
# Exit block with an exception.
1520+
raise Exception("BOOM!")
1521+
1522+
with self.assertLogs("websockets", logging.INFO) as logs:
1523+
with self.assertRaisesRegex(Exception, "BOOM!"):
1524+
self.loop.run_until_complete(run_client())
1525+
1526+
self.assertEqual(
1527+
[record.getMessage() for record in logs.records],
1528+
[
1529+
# Iteration 1
1530+
"connection open",
1531+
"connection closed",
1532+
# Iteration 2
1533+
"connection open",
1534+
"connection closed",
1535+
# Iteration 3
1536+
"connection failed (503 Service Unavailable)",
1537+
"connection closed",
1538+
"! connect failed; retrying in 0 seconds",
1539+
"connection failed (503 Service Unavailable)",
1540+
"connection closed",
1541+
"! connect failed; retrying in 0 seconds",
1542+
"connection failed (503 Service Unavailable)",
1543+
"connection closed",
1544+
"! connect failed; retrying in 0 seconds",
1545+
"connection open",
1546+
"connection closed",
1547+
# Iteration 4
1548+
"connection open",
1549+
],
1550+
)
1551+
1552+
14741553
class LoggerTests(ClientServerTestsMixin, AsyncioTestCase):
14751554
def test_logger_client(self):
14761555
with self.assertLogs("test.server", logging.DEBUG) as server_logs:

0 commit comments

Comments
 (0)