Skip to content

Ping at regular intervals. #449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ Server

.. automodule:: websockets.server

.. autofunction:: serve(ws_handler, host=None, port=None, *, create_protocol=None, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None, origins=None, extensions=None, subprotocols=None, extra_headers=None, compression='deflate', **kwds)
.. autofunction:: serve(ws_handler, host=None, port=None, *, create_protocol=None, ping_interval=20, ping_timeout=20, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None, origins=None, extensions=None, subprotocols=None, extra_headers=None, compression='deflate', **kwds)

.. autofunction:: unix_serve(ws_handler, path, *, create_protocol=None, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None, origins=None, extensions=None, subprotocols=None, extra_headers=None, compression='deflate', **kwds)
.. autofunction:: unix_serve(ws_handler, path, *, create_protocol=None, ping_interval=20, ping_timeout=20, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None, origins=None, extensions=None, subprotocols=None, extra_headers=None, compression='deflate', **kwds)


.. autoclass:: WebSocketServer
Expand All @@ -43,7 +43,7 @@ Server
.. automethod:: wait_closed()
.. autoattribute:: sockets

.. autoclass:: WebSocketServerProtocol(ws_handler, ws_server, *, host=None, port=None, secure=None, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None, origins=None, extensions=None, subprotocols=None, extra_headers=None)
.. autoclass:: WebSocketServerProtocol(ws_handler, ws_server, *, host=None, port=None, secure=None, ping_interval=20, ping_timeout=20, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None, origins=None, extensions=None, subprotocols=None, extra_headers=None)

.. automethod:: handshake(origins=None, available_extensions=None, available_subprotocols=None, extra_headers=None)
.. automethod:: process_request(path, request_headers)
Expand All @@ -54,9 +54,9 @@ Client

.. automodule:: websockets.client

.. autofunction:: connect(uri, *, create_protocol=None, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None, origin=None, extensions=None, subprotocols=None, extra_headers=None, compression='deflate', **kwds)
.. autofunction:: connect(uri, *, create_protocol=None, ping_interval=20, ping_timeout=20, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None, origin=None, extensions=None, subprotocols=None, extra_headers=None, compression='deflate', **kwds)

.. autoclass:: WebSocketClientProtocol(*, host=None, port=None, secure=None, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None, origin=None, extensions=None, subprotocols=None, extra_headers=None)
.. autoclass:: WebSocketClientProtocol(*, host=None, port=None, secure=None, ping_interval=20, ping_timeout=20, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None, origin=None, extensions=None, subprotocols=None, extra_headers=None)

.. automethod:: handshake(wsuri, origin=None, available_extensions=None, available_subprotocols=None, extra_headers=None)

Expand All @@ -65,7 +65,7 @@ Shared

.. automodule:: websockets.protocol

.. autoclass:: WebSocketCommonProtocol(*, host=None, port=None, secure=None, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None)
.. autoclass:: WebSocketCommonProtocol(*, host=None, port=None, secure=None, ping_interval=20, ping_timeout=20, timeout=10, max_size=2 ** 20, max_queue=2 ** 5, read_limit=2 ** 16, write_limit=2 ** 16, loop=None)

.. automethod:: close(code=1000, reason='')

Expand Down
5 changes: 4 additions & 1 deletion docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ Changelog

*In development*

* Added an interactive client: `python -m websockets <uri>`
* websockets sends Ping frames at regular intervals and closes the connection
if it doesn't receive a matching Pong frame. See :class:`~protocol.WebSocketCommonProtocol` for details.

* Added an interactive client: ``python -m websockets <uri>``.

* Changed the ``origins`` argument to represent the lack of an origin with
``None`` rather than ``''``.
Expand Down
22 changes: 0 additions & 22 deletions docs/cheatsheet.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,28 +76,6 @@ in particular. Fortunately Python's official documentation provides advice to

.. _develop with asyncio: https://docs.python.org/3/library/asyncio-dev.html

Keeping connections open
------------------------

Pinging the other side once in a while is a good way to check whether the
connection is still working, and also to keep it open in case something kills
idle connections after some time::

while True:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=20)
except asyncio.TimeoutError:
# No data in 20 seconds, check the connection.
try:
pong_waiter = await ws.ping()
await asyncio.wait_for(pong_waiter, timeout=10)
except asyncio.TimeoutError:
# No response to ping in 10 seconds, disconnect.
break
else:
# do something with msg
...

Passing additional arguments to the connection handler
------------------------------------------------------

Expand Down
11 changes: 11 additions & 0 deletions docs/design.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ two tasks:
with an exception other than :exc:`~asyncio.CancelledError`. See :ref:`data
transfer <data-transfer>` below.

- :attr:`~protocol.WebSocketCommonProtocol.keepalive_ping_task` runs
:meth:`~protocol.WebSocketCommonProtocol.keepalive_ping()` which sends Ping
frames at regular intervals and ensures that corresponding Pong frames are
received. It is cancelled when the connection terminates. It never exits
with an exception other than :exc:`~asyncio.CancelledError`.

- :attr:`~protocol.WebSocketCommonProtocol.close_connection_task` runs
:meth:`~protocol.WebSocketCommonProtocol.close_connection()` which waits for
the data transfer to terminate, then takes care of closing the TCP
Expand Down Expand Up @@ -302,6 +308,11 @@ easier to implement the timeout on the closing handshake. Canceling
of canceling :attr:`~protocol.WebSocketCommonProtocol.close_connection_task`
and failing to close the TCP connection, thus leaking resources.

Then :attr:`~protocol.WebSocketCommonProtocol.close_connection_task` cancels
:attr:`~protocol.WebSocketCommonProtocol.keepalive_ping`. This task has no
protocol compliance responsibilities. Terminating it to avoid leaking it is
the only concern.

Terminating the TCP connection can take up to ``2 * timeout`` on the server
side and ``3 * timeout`` on the client side. Clients start by waiting for the
server to close the connection, hence the extra ``timeout``. Then both sides
Expand Down
Binary file modified docs/lifecycle.graffle
Binary file not shown.
2 changes: 1 addition & 1 deletion docs/lifecycle.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 10 additions & 5 deletions websockets/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,10 @@ class Connect:
a ``wss://`` URI, if this argument isn't provided explicitly, it's set to
``True``, which means Python's default :class:`~ssl.SSLContext` is used.

The behavior of the ``timeout``, ``max_size``, and ``max_queue``,
``read_limit``, and ``write_limit`` optional arguments is described in the
documentation of :class:`~websockets.protocol.WebSocketCommonProtocol`.
The behavior of the ``ping_interval``, ``ping_timeout``, ``timeout``,
``max_size``, ``max_queue``, ``read_limit``, and ``write_limit`` optional
arguments is described in the documentation of
:class:`~websockets.protocol.WebSocketCommonProtocol`.

The ``create_protocol`` parameter allows customizing the asyncio protocol
that manages the connection. It should be a callable or class accepting
Expand Down Expand Up @@ -326,7 +327,9 @@ class Connect:

def __init__(self, uri, *,
create_protocol=None,
timeout=10, max_size=2 ** 20, max_queue=2 ** 5,
ping_interval=20, ping_timeout=20,
timeout=10,
max_size=2 ** 20, max_queue=2 ** 5,
read_limit=2 ** 16, write_limit=2 ** 16,
loop=None, legacy_recv=False, klass=None,
origin=None, extensions=None, subprotocols=None,
Expand Down Expand Up @@ -364,7 +367,9 @@ def __init__(self, uri, *,

factory = lambda: create_protocol(
host=wsuri.host, port=wsuri.port, secure=wsuri.secure,
timeout=timeout, max_size=max_size, max_queue=max_queue,
ping_interval=ping_interval, ping_timeout=ping_timeout,
timeout=timeout,
max_size=max_size, max_queue=max_queue,
read_limit=read_limit, write_limit=write_limit,
loop=loop, legacy_recv=legacy_recv,
origin=origin, extensions=extensions, subprotocols=subprotocols,
Expand Down
76 changes: 73 additions & 3 deletions websockets/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,27 @@ class WebSocketCommonProtocol(asyncio.StreamReaderProtocol):
The ``host``, ``port`` and ``secure`` parameters are simply stored as
attributes for handlers that need them.

Once the connection is open, a `Ping frame`_ is sent every
``ping_interval`` seconds. This serves as a keepalive. It helps keeping
the connection open, especially in the presence of proxies with short
timeouts. Set ``ping_interval`` to ``None`` to disable this behavior.

.. _Ping frame: https://tools.ietf.org/html/rfc6455#section-5.5.2

If the corresponding `Pong frame`_ isn't received within ``ping_timeout``
seconds, the connection is considered unusable and is closed with status
code 1011. This ensures that the remote endpoint remains responsive. Set
``ping_timeout`` to ``None`` to disable this behavior.

.. _Pong frame: https://tools.ietf.org/html/rfc6455#section-5.5.3

The ``timeout`` parameter defines a maximum wait time in seconds for
completing the closing handshake and terminating the TCP connection.
:meth:`close()` completes in at most ``4 * timeout`` on the server side
and ``5 * timeout`` on the client side.

``timeout`` is a parameter of the protocol because websockets usually
calls :meth:`close()` implicitly:
``timeout`` needs to be a parameter of the protocol because websockets
usually calls :meth:`close()` implicitly:

- on the server side, when the connection handler terminates,
- on the client side, when exiting the context manager for the connection.
Expand Down Expand Up @@ -148,12 +162,16 @@ class WebSocketCommonProtocol(asyncio.StreamReaderProtocol):

def __init__(self, *,
host=None, port=None, secure=None,
timeout=10, max_size=2 ** 20, max_queue=2 ** 5,
ping_interval=20, ping_timeout=20,
timeout=10,
max_size=2 ** 20, max_queue=2 ** 5,
read_limit=2 ** 16, write_limit=2 ** 16,
loop=None, legacy_recv=False):
self.host = host
self.port = port
self.secure = secure
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.timeout = timeout
self.max_size = max_size
self.max_queue = max_queue
Expand Down Expand Up @@ -218,6 +236,9 @@ def __init__(self, *,
# Exception that occurred during data transfer, if any.
self.transfer_data_exc = None

# Task sending keepalive pings.
self.keepalive_ping_task = None

# Task closing the TCP connection.
self.close_connection_task = None

Expand Down Expand Up @@ -247,6 +268,9 @@ def connection_open(self):
# Start the task that receives incoming WebSocket messages.
self.transfer_data_task = asyncio_ensure_future(
self.transfer_data(), loop=self.loop)
# Start the task that sends pings at regular intervals.
self.keepalive_ping_task = asyncio_ensure_future(
self.keepalive_ping(), loop=self.loop)
# Start the task that eventually closes the TCP connection.
self.close_connection_task = asyncio_ensure_future(
self.close_connection(), loop=self.loop)
Expand Down Expand Up @@ -798,6 +822,48 @@ def write_close_frame(self, data=b''):
# 7.1.2. Start the WebSocket Closing Handshake
yield from self.write_frame(OP_CLOSE, data, State.CLOSING)

@asyncio.coroutine
def keepalive_ping(self):
"""
Send a Ping frame and wait for a Pong frame at regular intervals.

This coroutine exits when the connection terminates and one of the
following happens:
- :meth:`ping` raises :exc:`ConnectionClosed`, or
- :meth:`close_connection` cancels :attr:`keepalive_ping_task`.

"""
if self.ping_interval is None:
return

try:
while True:
yield from asyncio.sleep(self.ping_interval, loop=self.loop)

# ping() cannot raise ConnectionClosed, only CancelledError:
# - If the connection is CLOSING, keepalive_ping_task will be
# cancelled by close_connection() before ping() returns.
# - If the connection is CLOSED, keepalive_ping_task must be
# cancelled already.
ping_waiter = yield from self.ping()

if self.ping_timeout is not None:
try:
yield from asyncio.wait_for(
ping_waiter, self.ping_timeout, loop=self.loop)
except asyncio.TimeoutError:
logger.debug(
"%s ! timed out waiting for pong", self.side)
self.fail_connection(1011)
break

except asyncio.CancelledError:
raise

except Exception as exc:
logger.warning(
"Unexpected exception in keepalive ping task", exc_info=True)

@asyncio.coroutine
def close_connection(self):
"""
Expand All @@ -819,6 +885,10 @@ def close_connection(self):
except asyncio.CancelledError:
pass

# Cancel the keepalive ping task.
if self.keepalive_ping_task is not None:
self.keepalive_ping_task.cancel()

# Cancel all pending pings because they'll never receive a pong.
for ping in self.pings.values():
ping.cancel()
Expand Down
15 changes: 10 additions & 5 deletions websockets/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,9 +656,10 @@ class Serve:
:class:`WebSocketServerProtocol` instance. It defaults to
:class:`WebSocketServerProtocol`.

The behavior of the ``timeout``, ``max_size``, and ``max_queue``,
``read_limit``, and ``write_limit`` optional arguments is described in the
documentation of :class:`~websockets.protocol.WebSocketCommonProtocol`.
The behavior of the ``ping_interval``, ``ping_timeout``, ``timeout``,
``max_size``, ``max_queue``, ``read_limit``, and ``write_limit`` optional
arguments is described in the documentation of
:class:`~websockets.protocol.WebSocketCommonProtocol`.

:func:`serve` also accepts the following optional arguments:

Expand Down Expand Up @@ -701,7 +702,9 @@ class Serve:

def __init__(self, ws_handler, host=None, port=None, *,
path=None, create_protocol=None,
timeout=10, max_size=2 ** 20, max_queue=2 ** 5,
ping_interval=20, ping_timeout=20,
timeout=10,
max_size=2 ** 20, max_queue=2 ** 5,
read_limit=2 ** 16, write_limit=2 ** 16,
loop=None, legacy_recv=False, klass=None,
origins=None, extensions=None, subprotocols=None,
Expand Down Expand Up @@ -735,7 +738,9 @@ def __init__(self, ws_handler, host=None, port=None, *,
factory = lambda: create_protocol(
ws_handler, ws_server,
host=host, port=port, secure=secure,
timeout=timeout, max_size=max_size, max_queue=max_queue,
ping_interval=ping_interval, ping_timeout=ping_timeout,
timeout=timeout,
max_size=max_size, max_queue=max_queue,
read_limit=read_limit, write_limit=write_limit,
loop=loop, legacy_recv=legacy_recv,
origins=origins, extensions=extensions, subprotocols=subprotocols,
Expand Down
8 changes: 6 additions & 2 deletions websockets/test_client_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,18 @@ def run_loop_once(self):
self.loop.run_forever()

def start_server(self, **kwds):
# Don't enable compression by default in tests.
# Disable compression by default in tests.
kwds.setdefault('compression', None)
# Disable pings by default in tests.
kwds.setdefault('ping_interval', None)
start_server = serve(handler, 'localhost', 0, **kwds)
self.server = self.loop.run_until_complete(start_server)

def start_client(self, resource_name='/', user_info=None, **kwds):
# Don't enable compression by default in tests.
# Disable compression by default in tests.
kwds.setdefault('compression', None)
# Disable pings by default in tests.
kwds.setdefault('ping_interval', None)
secure = kwds.get('ssl') is not None
server_uri = get_server_uri(
self.server, secure, resource_name, user_info)
Expand Down
Loading