Skip to content

Commit 4b803b1

Browse files
fantixhexin02
and
hexin02
committed
Fix call_soon_threadsafe thread safety
Don't start the idle handler in other threads or signal handlers, leaving the job to `_on_wake()`. Co-authored-by: hexin02 <[email protected]>
1 parent b0526cd commit 4b803b1

File tree

3 files changed

+39
-3
lines changed

3 files changed

+39
-3
lines changed

tests/test_base.py

+28
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import fcntl
33
import logging
44
import os
5+
import random
56
import sys
67
import threading
78
import time
@@ -702,6 +703,33 @@ async def foo():
702703
self.loop.run_until_complete(
703704
self.loop.shutdown_default_executor())
704705

706+
def test_call_soon_threadsafe_safety(self):
707+
ITERATIONS = 4096
708+
counter = [0]
709+
710+
def cb():
711+
counter[0] += 1
712+
if counter[0] < ITERATIONS - 512:
713+
h = self.loop.call_later(0.01, lambda: None)
714+
self.loop.call_later(
715+
0.0005 + random.random() * 0.0005, h.cancel
716+
)
717+
718+
def scheduler():
719+
loop = self.loop
720+
for i in range(ITERATIONS):
721+
if loop.is_running():
722+
loop.call_soon_threadsafe(cb)
723+
time.sleep(0.001)
724+
loop.call_soon_threadsafe(loop.stop)
725+
726+
thread = threading.Thread(target=scheduler)
727+
728+
self.loop.call_soon(thread.start)
729+
self.loop.run_forever()
730+
thread.join()
731+
self.assertEqual(counter[0], ITERATIONS)
732+
705733

706734
class TestBaseUV(_TestBase, UVTestCase):
707735

uvloop/loop.pxd

+1
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ cdef class Loop:
145145
cdef _exec_queued_writes(self)
146146

147147
cdef inline _call_soon(self, object callback, object args, object context)
148+
cdef inline _append_ready_handle(self, Handle handle)
148149
cdef inline _call_soon_handle(self, Handle handle)
149150

150151
cdef _call_later(self, uint64_t delay, object callback, object args,

uvloop/loop.pyx

+10-3
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ cdef class Loop:
427427
if handle._cancelled:
428428
self.remove_signal_handler(sig) # Remove it properly.
429429
else:
430-
self._call_soon_handle(handle)
430+
self._append_ready_handle(handle)
431431
self.handler_async.send()
432432

433433
cdef _on_wake(self):
@@ -667,10 +667,13 @@ cdef class Loop:
667667
self._call_soon_handle(handle)
668668
return handle
669669

670-
cdef inline _call_soon_handle(self, Handle handle):
670+
cdef inline _append_ready_handle(self, Handle handle):
671671
self._check_closed()
672672
self._ready.append(handle)
673673
self._ready_len += 1
674+
675+
cdef inline _call_soon_handle(self, Handle handle):
676+
self._append_ready_handle(handle)
674677
if not self.handler_idle.running:
675678
self.handler_idle.start()
676679

@@ -1281,7 +1284,11 @@ cdef class Loop:
12811284
"""Like call_soon(), but thread-safe."""
12821285
if not args:
12831286
args = None
1284-
handle = self._call_soon(callback, args, context)
1287+
cdef Handle handle = new_Handle(self, callback, args, context)
1288+
self._append_ready_handle(handle) # deque append is atomic
1289+
# libuv async handler is thread-safe while the idle handler is not -
1290+
# we only set the async handler here, which will start the idle handler
1291+
# in _on_wake() from the loop and eventually call the callback.
12851292
self.handler_async.send()
12861293
return handle
12871294

0 commit comments

Comments
 (0)