Skip to content

Commit 7438792

Browse files
authored
bpo-30064: Refactor sock_* asyncio API (#10419)
1 parent 9404e77 commit 7438792

File tree

5 files changed

+167
-446
lines changed

5 files changed

+167
-446
lines changed

Lib/asyncio/selector_events.py

+49-40
Original file line numberDiff line numberDiff line change
@@ -358,26 +358,29 @@ async def sock_recv(self, sock, n):
358358
"""
359359
if self._debug and sock.gettimeout() != 0:
360360
raise ValueError("the socket must be non-blocking")
361+
try:
362+
return sock.recv(n)
363+
except (BlockingIOError, InterruptedError):
364+
pass
361365
fut = self.create_future()
362-
self._sock_recv(fut, None, sock, n)
366+
fd = sock.fileno()
367+
self.add_reader(fd, self._sock_recv, fut, sock, n)
368+
fut.add_done_callback(
369+
functools.partial(self._sock_read_done, fd))
363370
return await fut
364371

365-
def _sock_recv(self, fut, registered_fd, sock, n):
372+
def _sock_read_done(self, fd, fut):
373+
self.remove_reader(fd)
374+
375+
def _sock_recv(self, fut, sock, n):
366376
# _sock_recv() can add itself as an I/O callback if the operation can't
367377
# be done immediately. Don't use it directly, call sock_recv().
368-
if registered_fd is not None:
369-
# Remove the callback early. It should be rare that the
370-
# selector says the fd is ready but the call still returns
371-
# EAGAIN, and I am willing to take a hit in that case in
372-
# order to simplify the common case.
373-
self.remove_reader(registered_fd)
374-
if fut.cancelled():
378+
if fut.done():
375379
return
376380
try:
377381
data = sock.recv(n)
378382
except (BlockingIOError, InterruptedError):
379-
fd = sock.fileno()
380-
self.add_reader(fd, self._sock_recv, fut, fd, sock, n)
383+
return # try again next time
381384
except Exception as exc:
382385
fut.set_exception(exc)
383386
else:
@@ -391,27 +394,27 @@ async def sock_recv_into(self, sock, buf):
391394
"""
392395
if self._debug and sock.gettimeout() != 0:
393396
raise ValueError("the socket must be non-blocking")
397+
try:
398+
return sock.recv_into(buf)
399+
except (BlockingIOError, InterruptedError):
400+
pass
394401
fut = self.create_future()
395-
self._sock_recv_into(fut, None, sock, buf)
402+
fd = sock.fileno()
403+
self.add_reader(fd, self._sock_recv_into, fut, sock, buf)
404+
fut.add_done_callback(
405+
functools.partial(self._sock_read_done, fd))
396406
return await fut
397407

398-
def _sock_recv_into(self, fut, registered_fd, sock, buf):
408+
def _sock_recv_into(self, fut, sock, buf):
399409
# _sock_recv_into() can add itself as an I/O callback if the operation
400410
# can't be done immediately. Don't use it directly, call
401411
# sock_recv_into().
402-
if registered_fd is not None:
403-
# Remove the callback early. It should be rare that the
404-
# selector says the FD is ready but the call still returns
405-
# EAGAIN, and I am willing to take a hit in that case in
406-
# order to simplify the common case.
407-
self.remove_reader(registered_fd)
408-
if fut.cancelled():
412+
if fut.done():
409413
return
410414
try:
411415
nbytes = sock.recv_into(buf)
412416
except (BlockingIOError, InterruptedError):
413-
fd = sock.fileno()
414-
self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf)
417+
return # try again next time
415418
except Exception as exc:
416419
fut.set_exception(exc)
417420
else:
@@ -428,34 +431,40 @@ async def sock_sendall(self, sock, data):
428431
"""
429432
if self._debug and sock.gettimeout() != 0:
430433
raise ValueError("the socket must be non-blocking")
431-
fut = self.create_future()
432-
if data:
433-
self._sock_sendall(fut, None, sock, data)
434+
try:
435+
n = sock.send(data)
436+
except (BlockingIOError, InterruptedError):
437+
n = 0
438+
439+
if n == len(data):
440+
# all data sent
441+
return
434442
else:
435-
fut.set_result(None)
443+
data = bytearray(memoryview(data)[n:])
444+
445+
fut = self.create_future()
446+
fd = sock.fileno()
447+
fut.add_done_callback(
448+
functools.partial(self._sock_write_done, fd))
449+
self.add_writer(fd, self._sock_sendall, fut, sock, data)
436450
return await fut
437451

438-
def _sock_sendall(self, fut, registered_fd, sock, data):
439-
if registered_fd is not None:
440-
self.remove_writer(registered_fd)
441-
if fut.cancelled():
452+
def _sock_sendall(self, fut, sock, data):
453+
if fut.done():
454+
# Future cancellation can be scheduled on previous loop iteration
442455
return
443-
444456
try:
445457
n = sock.send(data)
446458
except (BlockingIOError, InterruptedError):
447-
n = 0
459+
return
448460
except Exception as exc:
449461
fut.set_exception(exc)
450462
return
451463

452464
if n == len(data):
453465
fut.set_result(None)
454466
else:
455-
if n:
456-
data = data[n:]
457-
fd = sock.fileno()
458-
self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
467+
del data[:n]
459468

460469
async def sock_connect(self, sock, address):
461470
"""Connect to a remote socket at address.
@@ -484,18 +493,18 @@ def _sock_connect(self, fut, sock, address):
484493
# becomes writable to be notified when the connection succeed or
485494
# fails.
486495
fut.add_done_callback(
487-
functools.partial(self._sock_connect_done, fd))
496+
functools.partial(self._sock_write_done, fd))
488497
self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
489498
except Exception as exc:
490499
fut.set_exception(exc)
491500
else:
492501
fut.set_result(None)
493502

494-
def _sock_connect_done(self, fd, fut):
503+
def _sock_write_done(self, fd, fut):
495504
self.remove_writer(fd)
496505

497506
def _sock_connect_cb(self, fut, sock, address):
498-
if fut.cancelled():
507+
if fut.done():
499508
return
500509

501510
try:
@@ -529,7 +538,7 @@ def _sock_accept(self, fut, registered, sock):
529538
fd = sock.fileno()
530539
if registered:
531540
self.remove_reader(fd)
532-
if fut.cancelled():
541+
if fut.done():
533542
return
534543
try:
535544
conn, address = sock.accept()

0 commit comments

Comments
 (0)