Skip to content

bugfix: write to another transport in resume_writing() fails #498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions tests/test_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,59 @@ async def runner():
self.assertIsNone(
self.loop.run_until_complete(connection_lost_called))

def test_resume_writing_write_different_transport(self):
loop = self.loop

class P1(asyncio.Protocol):
def __init__(self, t2):
self.t2 = t2
self.paused = False
self.waiter = loop.create_future()

def data_received(self, data):
self.waiter.set_result(data)

def pause_writing(self):
self.paused = True

def resume_writing(self):
self.paused = False
self.t2.write(b'hello')

s1, s2 = socket.socketpair()
s1.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
s2.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)

async def _test(t1, p1, t2):
t1.set_write_buffer_limits(1024, 1023)

# fill s1 up first
t2.pause_reading()
while not p1.paused:
t1.write(b' ' * 1024)

# trigger resume_writing() in _exec_queued_writes() with tight loop
t2.resume_reading()
while p1.paused:
t1.write(b' ')
await asyncio.sleep(0)

# t2.write() in p1.resume_writing() should work fine
data = await asyncio.wait_for(p1.waiter, 5)
self.assertEqual(data, b'hello')

async def test():
t2, _ = await loop.create_connection(asyncio.Protocol, sock=s2)
t1, p1 = await loop.create_connection(lambda: P1(t2), sock=s1)
try:
await _test(t1, p1, t2)
finally:
t1.close()
t2.close()

with s1, s2:
loop.run_until_complete(test())


class Test_UV_TCP(_TestTCP, tb.UVTestCase):

Expand Down
2 changes: 1 addition & 1 deletion uvloop/loop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ cdef class Loop:
object _exception_handler
object _default_executor
object _ready
set _queued_streams
set _queued_streams, _executing_streams
Py_ssize_t _ready_len

set _servers
Expand Down
28 changes: 12 additions & 16 deletions uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ cdef class Loop:
self._default_executor = None

self._queued_streams = set()
self._executing_streams = set()
self._ready = col_deque()
self._ready_len = 0

Expand Down Expand Up @@ -645,25 +646,20 @@ cdef class Loop:

cdef:
UVStream stream
int queued_len

if UVLOOP_DEBUG:
queued_len = len(self._queued_streams)

for pystream in self._queued_streams:
stream = <UVStream>pystream
stream._exec_write()

if UVLOOP_DEBUG:
if len(self._queued_streams) != queued_len:
raise RuntimeError(
'loop._queued_streams are not empty after '
'_exec_queued_writes')

self._queued_streams.clear()
streams = self._queued_streams
self._queued_streams = self._executing_streams
self._executing_streams = streams
try:
for pystream in streams:
stream = <UVStream>pystream
stream._exec_write()
finally:
streams.clear()

if self.handler_check__exec_writes.running:
self.handler_check__exec_writes.stop()
if len(self._queued_streams) == 0:
self.handler_check__exec_writes.stop()

cdef inline _call_soon(self, object callback, object args, object context):
cdef Handle handle
Expand Down