Skip to content

bpo-33530: Implement Happy Eyeballs in asyncio, v2 #7237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a755bbb
Implement Happy Eyeballs in asyncio.
twisteroidambassador May 24, 2018
fc29450
Use module import instead of individual imports.
twisteroidambassador May 29, 2018
d792c43
Change TODO.
twisteroidambassador May 29, 2018
f9111d0
Rename helpers.py to staggered.py.
twisteroidambassador May 29, 2018
ded34e0
Add create_connection()'s new arguments in documentation.
twisteroidambassador May 29, 2018
b069c95
Add blurb.
twisteroidambassador May 29, 2018
c5d3a92
Implement Happy Eyeballs in asyncio.
twisteroidambassador May 24, 2018
38c7caa
Use module import instead of individual imports.
twisteroidambassador May 29, 2018
70ec96d
Change TODO.
twisteroidambassador May 29, 2018
cef0a76
Rename helpers.py to staggered.py.
twisteroidambassador May 29, 2018
73a4a5a
Add create_connection()'s new arguments in documentation.
twisteroidambassador May 29, 2018
b3a6e1c
Add blurb.
twisteroidambassador May 29, 2018
632166d
Merge remote-tracking branch 'origin/happy-eyeballs' into happy-eyeballs
twisteroidambassador May 30, 2018
b8d7e41
Remove _roundrobin as a standalone function.
twisteroidambassador Jun 3, 2018
ecdc83a
Rename delay to happy_eyeballs_delay and decouple from interleave.
twisteroidambassador Jun 3, 2018
5fa5a9b
Update docs.
twisteroidambassador Jun 5, 2018
321e4ac
Update comments.
twisteroidambassador Jun 5, 2018
b4227ed
Import staggered_race into main asyncio package.
twisteroidambassador Jun 5, 2018
cffacc7
Change `delay` to `happy_eyeballs_delay` in AbstractEventLoop.
twisteroidambassador Jun 26, 2018
3e304dd
Change `delay` to `happy_eyeballs_delay` in documentation and blurb.
twisteroidambassador Jun 26, 2018
a8c39b9
Add suggested value for `happy_eyeballs_delay` in documentation.
twisteroidambassador Jun 26, 2018
f6f2219
Tune RST markup
asvetlov Aug 1, 2018
f0e37e6
Keep .staggered as private API
asvetlov May 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,27 @@ Creating connections
If given, these should all be integers from the corresponding
:mod:`socket` module constants.

* *happy_eyeballs_delay*, if given, enables Happy Eyeballs for this
connection. It should
be a floating-point number representing the amount of time in seconds
to wait for a connection attempt to complete, before starting the next
attempt in parallel. This is the "Connection Attempt Delay" as defined
in :rfc:`8305`. A sensible default value recommended by the RFC is ``0.25``
(250 milliseconds).

* *interleave* controls address reordering when a host name resolves to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's worth naming the parameter interleave_af to make it more explicit.

multiple IP addresses.
If ``0`` or unspecified, no reordering is done, and addresses are
tried in the order returned by :meth:`getaddrinfo`. If a positive integer
is specified, the addresses are interleaved by address family, and the
given integer is interpreted as "First Address Family Count" as defined
in :rfc:`8305`. The default is ``0`` if *happy_eyeballs_delay* is not
specified, and ``1`` if it is.

* *sock*, if given, should be an existing, already connected
:class:`socket.socket` object to be used by the transport.
If *sock* is given, none of *host*, *port*, *family*, *proto*, *flags*
If *sock* is given, none of *host*, *port*, *family*, *proto*, *flags*,
*happy_eyeballs_delay*, *interleave*
and *local_addr* should be specified.

* *local_addr*, if given, is a ``(local_host, local_port)`` tuple used
Expand All @@ -353,6 +371,10 @@ Creating connections
to wait for the SSL handshake to complete before aborting the connection.
``10.0`` seconds if ``None`` (default).

.. versionadded:: 3.8

The *happy_eyeballs_delay* and *interleave* parameters.

.. versionadded:: 3.7

The *ssl_handshake_timeout* parameter.
Expand Down
125 changes: 89 additions & 36 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import collections
import collections.abc
import concurrent.futures
import functools
import heapq
import itertools
import logging
Expand All @@ -40,6 +41,7 @@
from . import futures
from . import protocols
from . import sslproto
from . import staggered
from . import tasks
from . import transports
from .log import logger
Expand Down Expand Up @@ -147,6 +149,28 @@ def _ipaddr_info(host, port, family, type, proto):
return None


def _interleave_addrinfos(addrinfos, first_address_family_count=1):
"""Interleave list of addrinfo tuples by family."""
# Group addresses by family
addrinfos_by_family = collections.OrderedDict()
for addr in addrinfos:
family = addr[0]
if family not in addrinfos_by_family:
addrinfos_by_family[family] = []
addrinfos_by_family[family].append(addr)
addrinfos_lists = list(addrinfos_by_family.values())

reordered = []
if first_address_family_count > 1:
reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
del addrinfos_lists[0][:first_address_family_count - 1]
reordered.extend(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed _roundrobin as a standalone function and folded it into _interleave_addrinfo. Do we still want to move _interleave_addrinfo to staggered.py? Now that the interleave option works independently of happy_eyeballs_delay, it feels more natural to leave _interleave_addrinfo here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK to keep it here now

a for a in itertools.chain.from_iterable(
itertools.zip_longest(*addrinfos_lists)
) if a is not None)
return reordered


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

base_events.py is already overly long. Would it make sense to move these two functions to staggered.py (keeping them private)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to group all Happy Eyeballs-related functions in the same module (in that case the name staggered.py leaves something to be desired). On the other hand, if interleave is to be made meaningful without using happy eyeballs, then it feels weird to have _interleave_addrinfos somewhere else.

def _run_until_complete_cb(fut):
if not fut.cancelled():
exc = fut.exception()
Expand Down Expand Up @@ -844,12 +868,49 @@ def _check_sendfile_params(self, sock, file, offset, count):
"offset must be a non-negative integer (got {!r})".format(
offset))

async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
"""Create, bind and connect one socket."""
my_exceptions = []
exceptions.append(my_exceptions)
family, type_, proto, _, address = addr_info
sock = None
try:
sock = socket.socket(family=family, type=type_, proto=proto)
sock.setblocking(False)
if local_addr_infos is not None:
for _, _, _, _, laddr in local_addr_infos:
try:
sock.bind(laddr)
break
except OSError as exc:
msg = (
f'error while attempting to bind on '
f'address {laddr!r}: '
f'{exc.strerror.lower()}'
)
exc = OSError(exc.errno, msg)
my_exceptions.append(exc)
else: # all bind attempts failed
raise my_exceptions.pop()
await self.sock_connect(sock, address)
return sock
except OSError as exc:
my_exceptions.append(exc)
if sock is not None:
sock.close()
raise
except:
if sock is not None:
sock.close()
raise

async def create_connection(
self, protocol_factory, host=None, port=None,
*, ssl=None, family=0,
proto=0, flags=0, sock=None,
local_addr=None, server_hostname=None,
ssl_handshake_timeout=None):
ssl_handshake_timeout=None,
happy_eyeballs_delay=None, interleave=None):
"""Connect to a TCP server.

Create a streaming transport connection to a given Internet host and
Expand Down Expand Up @@ -884,6 +945,10 @@ async def create_connection(
raise ValueError(
'ssl_handshake_timeout is only meaningful with ssl')

if happy_eyeballs_delay is not None and interleave is None:
# If using happy eyeballs, default to interleave addresses by family
interleave = 1

if host is not None or port is not None:
if sock is not None:
raise ValueError(
Expand All @@ -902,43 +967,31 @@ async def create_connection(
flags=flags, loop=self)
if not laddr_infos:
raise OSError('getaddrinfo() returned empty list')
else:
laddr_infos = None

if interleave:
infos = _interleave_addrinfos(infos, interleave)

exceptions = []
for family, type, proto, cname, address in infos:
try:
sock = socket.socket(family=family, type=type, proto=proto)
sock.setblocking(False)
if local_addr is not None:
for _, _, _, _, laddr in laddr_infos:
try:
sock.bind(laddr)
break
except OSError as exc:
msg = (
f'error while attempting to bind on '
f'address {laddr!r}: '
f'{exc.strerror.lower()}'
)
exc = OSError(exc.errno, msg)
exceptions.append(exc)
else:
sock.close()
sock = None
continue
if self._debug:
logger.debug("connect %r to %r", sock, address)
await self.sock_connect(sock, address)
except OSError as exc:
if sock is not None:
sock.close()
exceptions.append(exc)
except:
if sock is not None:
sock.close()
raise
else:
break
else:
if happy_eyeballs_delay is None:
# not using happy eyeballs
for addrinfo in infos:
try:
sock = await self._connect_sock(
exceptions, addrinfo, laddr_infos)
break
except OSError:
continue
else: # using happy eyeballs
sock, _, _ = await staggered.staggered_race(
(functools.partial(self._connect_sock,
exceptions, addrinfo, laddr_infos)
for addrinfo in infos),
happy_eyeballs_delay, loop=self)

if sock is None:
exceptions = [exc for sub in exceptions for exc in sub]
if len(exceptions) == 1:
raise exceptions[0]
else:
Expand Down
3 changes: 2 additions & 1 deletion Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ async def create_connection(
*, ssl=None, family=0, proto=0,
flags=0, sock=None, local_addr=None,
server_hostname=None,
ssl_handshake_timeout=None):
ssl_handshake_timeout=None,
happy_eyeballs_delay=None, interleave=None):
raise NotImplementedError

async def create_server(
Expand Down
147 changes: 147 additions & 0 deletions Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Support for running coroutines in parallel with staggered start times."""

__all__ = 'staggered_race',

import contextlib
import typing

from . import events
from . import futures
from . import locks
from . import tasks


async def staggered_race(
coro_fns: typing.Iterable[typing.Callable[[], typing.Awaitable]],
delay: typing.Optional[float],
*,
loop: events.AbstractEventLoop = None,
) -> typing.Tuple[
typing.Any,
typing.Optional[int],
typing.List[typing.Optional[Exception]]
]:
"""Run coroutines with staggered start times and take the first to finish.
This method takes an iterable of coroutine functions. The first one is
started immediately. From then on, whenever the immediately preceding one
fails (raises an exception), or when *delay* seconds has passed, the next
coroutine is started. This continues until one of the coroutines complete
successfully, in which case all others are cancelled, or until all
coroutines fail.
The coroutines provided should be well-behaved in the following way:
* They should only ``return`` if completed successfully.
* They should always raise an exception if they did not complete
successfully. In particular, if they handle cancellation, they should
probably reraise, like this::
try:
# do work
except asyncio.CancelledError:
# undo partially completed work
raise
Args:
coro_fns: an iterable of coroutine functions, i.e. callables that
return a coroutine object when called. Use ``functools.partial`` or
lambdas to pass arguments.
delay: amount of time, in seconds, between starting coroutines. If
``None``, the coroutines will run sequentially.
loop: the event loop to use.
Returns:
tuple *(winner_result, winner_index, exceptions)* where
- *winner_result*: the result of the winning coroutine, or ``None``
if no coroutines won.
- *winner_index*: the index of the winning coroutine in
``coro_fns``, or ``None`` if no coroutines won. If the winning
coroutine may return None on success, *winner_index* can be used
to definitively determine whether any coroutine won.
- *exceptions*: list of exceptions returned by the coroutines.
``len(exceptions)`` is equal to the number of coroutines actually
started, and the order is the same as in ``coro_fns``. The winning
coroutine's entry is ``None``.
"""
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
loop = loop or events.get_running_loop()
enum_coro_fns = enumerate(coro_fns)
winner_result = None
winner_index = None
exceptions = []
running_tasks = []

async def run_one_coro(
previous_failed: typing.Optional[locks.Event]) -> None:
# Wait for the previous task to finish, or for delay seconds
if previous_failed is not None:
with contextlib.suppress(futures.TimeoutError):
# Use asyncio.wait_for() instead of asyncio.wait() here, so
# that if we get cancelled at this point, Event.wait() is also
# cancelled, otherwise there will be a "Task destroyed but it is
# pending" later.
await tasks.wait_for(previous_failed.wait(), delay)
# Get the next coroutine to run
try:
this_index, coro_fn = next(enum_coro_fns)
except StopIteration:
return
# Start task that will run the next coroutine
this_failed = locks.Event()
next_task = loop.create_task(run_one_coro(this_failed))
running_tasks.append(next_task)
assert len(running_tasks) == this_index + 2
# Prepare place to put this coroutine's exceptions if not won
exceptions.append(None)
assert len(exceptions) == this_index + 1

try:
result = await coro_fn()
except Exception as e:
exceptions[this_index] = e
this_failed.set() # Kickstart the next coroutine
else:
# Store winner's results
nonlocal winner_index, winner_result
assert winner_index is None
winner_index = this_index
winner_result = result
# Cancel all other tasks. We take care to not cancel the current
# task as well. If we do so, then since there is no `await` after
# here and CancelledError are usually thrown at one, we will
# encounter a curious corner case where the current task will end
# up as done() == True, cancelled() == False, exception() ==
# asyncio.CancelledError. This behavior is specified in
# https://bugs.python.org/issue30048
for i, t in enumerate(running_tasks):
if i != this_index:
t.cancel()

first_task = loop.create_task(run_one_coro(None))
running_tasks.append(first_task)
try:
# Wait for a growing list of tasks to all finish: poor man's version of
# curio's TaskGroup or trio's nursery
done_count = 0
while done_count != len(running_tasks):
done, _ = await tasks.wait(running_tasks)
done_count = len(done)
# If run_one_coro raises an unhandled exception, it's probably a
# programming error, and I want to see it.
if __debug__:
for d in done:
if d.done() and not d.cancelled() and d.exception():
raise d.exception()
return winner_result, winner_index, exceptions
finally:
# Make sure no tasks are left running if we leave this function
for t in running_tasks:
t.cancel()
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Implemented Happy Eyeballs in `asyncio.create_connection()`. Added two new
arguments, *happy_eyeballs_delay* and *interleave*,
to specify Happy Eyeballs behavior.