Skip to content

bpo-30064: Refactor sock_* asyncio API #10419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Nov 12, 2018
89 changes: 49 additions & 40 deletions Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,26 +358,29 @@ async def sock_recv(self, sock, n):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
try:
return sock.recv(n)
except (BlockingIOError, InterruptedError):
pass
fut = self.create_future()
self._sock_recv(fut, None, sock, n)
fd = sock.fileno()
self.add_reader(fd, self._sock_recv, fut, sock, n)
fut.add_done_callback(
functools.partial(self._sock_read_done, fd))
return await fut

def _sock_recv(self, fut, registered_fd, sock, n):
def _sock_read_done(self, fd, fut):
self.remove_reader(fd)

def _sock_recv(self, fut, sock, n):
# _sock_recv() can add itself as an I/O callback if the operation can't
# be done immediately. Don't use it directly, call sock_recv().
if registered_fd is not None:
# Remove the callback early. It should be rare that the
# selector says the fd is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
self.remove_reader(registered_fd)
if fut.cancelled():
if fut.done():
return
try:
data = sock.recv(n)
except (BlockingIOError, InterruptedError):
fd = sock.fileno()
self.add_reader(fd, self._sock_recv, fut, fd, sock, n)
return # try again next time
except Exception as exc:
fut.set_exception(exc)
else:
Expand All @@ -391,27 +394,27 @@ async def sock_recv_into(self, sock, buf):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
try:
return sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
pass
fut = self.create_future()
self._sock_recv_into(fut, None, sock, buf)
fd = sock.fileno()
self.add_reader(fd, self._sock_recv_into, fut, sock, buf)
fut.add_done_callback(
functools.partial(self._sock_read_done, fd))
return await fut

def _sock_recv_into(self, fut, registered_fd, sock, buf):
def _sock_recv_into(self, fut, sock, buf):
# _sock_recv_into() can add itself as an I/O callback if the operation
# can't be done immediately. Don't use it directly, call
# sock_recv_into().
if registered_fd is not None:
# Remove the callback early. It should be rare that the
# selector says the FD is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
self.remove_reader(registered_fd)
if fut.cancelled():
if fut.done():
return
try:
nbytes = sock.recv_into(buf)
except (BlockingIOError, InterruptedError):
fd = sock.fileno()
self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf)
return # try again next time
except Exception as exc:
fut.set_exception(exc)
else:
Expand All @@ -428,34 +431,40 @@ async def sock_sendall(self, sock, data):
"""
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = self.create_future()
if data:
self._sock_sendall(fut, None, sock, data)
try:
n = sock.send(data)
except (BlockingIOError, InterruptedError):
n = 0

if n == len(data):
# all data sent
return
else:
fut.set_result(None)
data = bytearray(memoryview(data)[n:])

fut = self.create_future()
fd = sock.fileno()
fut.add_done_callback(
functools.partial(self._sock_write_done, fd))
self.add_writer(fd, self._sock_sendall, fut, sock, data)
return await fut

def _sock_sendall(self, fut, registered_fd, sock, data):
if registered_fd is not None:
self.remove_writer(registered_fd)
if fut.cancelled():
def _sock_sendall(self, fut, sock, data):
if fut.done():
# Future cancellation can be scheduled on previous loop iteration
return

try:
n = sock.send(data)
except (BlockingIOError, InterruptedError):
n = 0
return
except Exception as exc:
fut.set_exception(exc)
return

if n == len(data):
fut.set_result(None)
else:
if n:
data = data[n:]
fd = sock.fileno()
self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
del data[:n]

async def sock_connect(self, sock, address):
"""Connect to a remote socket at address.
Expand Down Expand Up @@ -484,18 +493,18 @@ def _sock_connect(self, fut, sock, address):
# becomes writable to be notified when the connection succeed or
# fails.
fut.add_done_callback(
functools.partial(self._sock_connect_done, fd))
functools.partial(self._sock_write_done, fd))
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except Exception as exc:
fut.set_exception(exc)
else:
fut.set_result(None)

def _sock_connect_done(self, fd, fut):
def _sock_write_done(self, fd, fut):
self.remove_writer(fd)

def _sock_connect_cb(self, fut, sock, address):
if fut.cancelled():
if fut.done():
return

try:
Expand Down Expand Up @@ -529,7 +538,7 @@ def _sock_accept(self, fut, registered, sock):
fd = sock.fileno()
if registered:
self.remove_reader(fd)
if fut.cancelled():
if fut.done():
return
try:
conn, address = sock.accept()
Expand Down
Loading