Skip to content

Commit d2deffe

Browse files
authored
bugfix: write to another transport in resume_writing() fails (#498)
Fixes #496
1 parent 25b5f1e commit d2deffe

File tree

3 files changed

+66
-17
lines changed

3 files changed

+66
-17
lines changed

tests/test_tcp.py

+53
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,59 @@ async def runner():
654654
self.assertIsNone(
655655
self.loop.run_until_complete(connection_lost_called))
656656

657+
def test_resume_writing_write_different_transport(self):
658+
loop = self.loop
659+
660+
class P1(asyncio.Protocol):
661+
def __init__(self, t2):
662+
self.t2 = t2
663+
self.paused = False
664+
self.waiter = loop.create_future()
665+
666+
def data_received(self, data):
667+
self.waiter.set_result(data)
668+
669+
def pause_writing(self):
670+
self.paused = True
671+
672+
def resume_writing(self):
673+
self.paused = False
674+
self.t2.write(b'hello')
675+
676+
s1, s2 = socket.socketpair()
677+
s1.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
678+
s2.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
679+
680+
async def _test(t1, p1, t2):
681+
t1.set_write_buffer_limits(1024, 1023)
682+
683+
# fill s1 up first
684+
t2.pause_reading()
685+
while not p1.paused:
686+
t1.write(b' ' * 1024)
687+
688+
# trigger resume_writing() in _exec_queued_writes() with tight loop
689+
t2.resume_reading()
690+
while p1.paused:
691+
t1.write(b' ')
692+
await asyncio.sleep(0)
693+
694+
# t2.write() in p1.resume_writing() should work fine
695+
data = await asyncio.wait_for(p1.waiter, 5)
696+
self.assertEqual(data, b'hello')
697+
698+
async def test():
699+
t2, _ = await loop.create_connection(asyncio.Protocol, sock=s2)
700+
t1, p1 = await loop.create_connection(lambda: P1(t2), sock=s1)
701+
try:
702+
await _test(t1, p1, t2)
703+
finally:
704+
t1.close()
705+
t2.close()
706+
707+
with s1, s2:
708+
loop.run_until_complete(test())
709+
657710

658711
class Test_UV_TCP(_TestTCP, tb.UVTestCase):
659712

uvloop/loop.pxd

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ cdef class Loop:
4949
object _exception_handler
5050
object _default_executor
5151
object _ready
52-
set _queued_streams
52+
set _queued_streams, _executing_streams
5353
Py_ssize_t _ready_len
5454

5555
set _servers

uvloop/loop.pyx

+12-16
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ cdef class Loop:
177177
self._default_executor = None
178178

179179
self._queued_streams = set()
180+
self._executing_streams = set()
180181
self._ready = col_deque()
181182
self._ready_len = 0
182183

@@ -645,25 +646,20 @@ cdef class Loop:
645646

646647
cdef:
647648
UVStream stream
648-
int queued_len
649649

650-
if UVLOOP_DEBUG:
651-
queued_len = len(self._queued_streams)
652-
653-
for pystream in self._queued_streams:
654-
stream = <UVStream>pystream
655-
stream._exec_write()
656-
657-
if UVLOOP_DEBUG:
658-
if len(self._queued_streams) != queued_len:
659-
raise RuntimeError(
660-
'loop._queued_streams are not empty after '
661-
'_exec_queued_writes')
662-
663-
self._queued_streams.clear()
650+
streams = self._queued_streams
651+
self._queued_streams = self._executing_streams
652+
self._executing_streams = streams
653+
try:
654+
for pystream in streams:
655+
stream = <UVStream>pystream
656+
stream._exec_write()
657+
finally:
658+
streams.clear()
664659

665660
if self.handler_check__exec_writes.running:
666-
self.handler_check__exec_writes.stop()
661+
if len(self._queued_streams) == 0:
662+
self.handler_check__exec_writes.stop()
667663

668664
cdef inline _call_soon(self, object callback, object args, object context):
669665
cdef Handle handle

0 commit comments

Comments
 (0)