@@ -825,6 +825,7 @@ def clear_dns_cache(
825
825
async def _resolve_host (
826
826
self , host : str , port : int , traces : Optional [List ["Trace" ]] = None
827
827
) -> List [Dict [str , Any ]]:
828
+ """Resolve host and return list of addresses."""
828
829
if is_ip_address (host ):
829
830
return [
830
831
{
@@ -852,8 +853,7 @@ async def _resolve_host(
852
853
return res
853
854
854
855
key = (host , port )
855
-
856
- if (key in self ._cached_hosts ) and (not self ._cached_hosts .expired (key )):
856
+ if key in self ._cached_hosts and not self ._cached_hosts .expired (key ):
857
857
# get result early, before any await (#4014)
858
858
result = self ._cached_hosts .next_addrs (key )
859
859
@@ -862,6 +862,39 @@ async def _resolve_host(
862
862
await trace .send_dns_cache_hit (host )
863
863
return result
864
864
865
+ #
866
+ # If multiple connectors are resolving the same host, we wait
867
+ # for the first one to resolve and then use the result for all of them.
868
+ # We use a throttle event to ensure that we only resolve the host once
869
+ # and then use the result for all the waiters.
870
+ #
871
+ # In this case we need to create a task to ensure that we can shield
872
+ # the task from cancellation as cancelling this lookup should not cancel
873
+ # the underlying lookup or else the cancel event will get broadcast to
874
+ # all the waiters across all connections.
875
+ #
876
+ resolved_host_task = asyncio .create_task (
877
+ self ._resolve_host_with_throttle (key , host , port , traces )
878
+ )
879
+ try :
880
+ return await asyncio .shield (resolved_host_task )
881
+ except asyncio .CancelledError :
882
+
883
+ def drop_exception (fut : "asyncio.Future[List[Dict[str, Any]]]" ) -> None :
884
+ with suppress (Exception , asyncio .CancelledError ):
885
+ fut .result ()
886
+
887
+ resolved_host_task .add_done_callback (drop_exception )
888
+ raise
889
+
890
+ async def _resolve_host_with_throttle (
891
+ self ,
892
+ key : Tuple [str , int ],
893
+ host : str ,
894
+ port : int ,
895
+ traces : Optional [List ["Trace" ]],
896
+ ) -> List [Dict [str , Any ]]:
897
+ """Resolve host with a dns events throttle."""
865
898
if key in self ._throttle_dns_events :
866
899
# get event early, before any await (#4014)
867
900
event = self ._throttle_dns_events [key ]
@@ -1163,22 +1196,11 @@ async def _create_direct_connection(
1163
1196
host = host .rstrip ("." ) + "."
1164
1197
port = req .port
1165
1198
assert port is not None
1166
- host_resolved = asyncio .ensure_future (
1167
- self ._resolve_host (host , port , traces = traces ), loop = self ._loop
1168
- )
1169
1199
try :
1170
1200
# Cancelling this lookup should not cancel the underlying lookup
1171
1201
# or else the cancel event will get broadcast to all the waiters
1172
1202
# across all connections.
1173
- hosts = await asyncio .shield (host_resolved )
1174
- except asyncio .CancelledError :
1175
-
1176
- def drop_exception (fut : "asyncio.Future[List[Dict[str, Any]]]" ) -> None :
1177
- with suppress (Exception , asyncio .CancelledError ):
1178
- fut .result ()
1179
-
1180
- host_resolved .add_done_callback (drop_exception )
1181
- raise
1203
+ hosts = await self ._resolve_host (host , port , traces = traces )
1182
1204
except OSError as exc :
1183
1205
if exc .errno is None and isinstance (exc , asyncio .TimeoutError ):
1184
1206
raise
0 commit comments