Skip to content

fix incorrect main thread id value in mp.Process #453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion tests/test_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async def coro():
with self.assertRaisesRegex(TypeError, 'coroutines cannot be used'):
self.loop.add_signal_handler(signal.SIGHUP, coro)

def test_wakeup_fd_unchanged(self):
def test_signals_wakeup_fd_unchanged(self):
async def runner():
PROG = R"""\
import uvloop
Expand Down Expand Up @@ -349,6 +349,42 @@ async def f(): pass

self.loop.run_until_complete(runner())

def test_signals_fork_in_thread(self):
# Refs #452, when forked from a thread, the main-thread-only signal
# operations failed thread ID checks because we didn't update
# MAIN_THREAD_ID after fork. It's now a lazy value set when needed and
# cleared after fork.
PROG = R"""\
import asyncio
import multiprocessing
import signal
import sys
import threading
import uvloop

multiprocessing.set_start_method('fork')

def subprocess():
loop = """ + self.NEW_LOOP + """
loop.add_signal_handler(signal.SIGINT, lambda *a: None)

def run():
loop = """ + self.NEW_LOOP + """
loop.add_signal_handler(signal.SIGINT, lambda *a: None)
p = multiprocessing.Process(target=subprocess)
t = threading.Thread(target=p.start)
t.start()
t.join()
p.join()
sys.exit(p.exitcode)

run()
"""

subprocess.check_call([
sys.executable, b'-W', b'ignore', b'-c', PROG,
])


class Test_UV_Signals(_TestSignal, tb.UVTestCase):
NEW_LOOP = 'uvloop.new_event_loop()'
Expand Down
11 changes: 11 additions & 0 deletions uvloop/includes/fork_handler.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
volatile uint64_t MAIN_THREAD_ID = 0;
volatile int8_t MAIN_THREAD_ID_SET = 0;

typedef void (*OnForkHandler)();

Expand All @@ -9,6 +11,10 @@ Note: Fork handler needs to be in C (not cython) otherwise it would require
GIL to be present, but some forks can exec non-python processes.
*/
void handleAtFork(void) {
// Reset the MAIN_THREAD_ID on fork, because the main thread ID is not
// always the same after fork, especially when forked from within a thread.
MAIN_THREAD_ID_SET = 0;

if (__forkHandler != NULL) {
__forkHandler();
}
Expand All @@ -25,3 +31,8 @@ void resetForkHandler(void)
{
__forkHandler = NULL;
}

void setMainThreadID(uint64_t id) {
MAIN_THREAD_ID = id;
MAIN_THREAD_ID_SET = 1;
}
2 changes: 1 addition & 1 deletion uvloop/includes/stdlib.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ cdef int ssl_SSL_ERROR_WANT_READ = ssl.SSL_ERROR_WANT_READ
cdef int ssl_SSL_ERROR_WANT_WRITE = ssl.SSL_ERROR_WANT_WRITE
cdef int ssl_SSL_ERROR_SYSCALL = ssl.SSL_ERROR_SYSCALL

cdef uint64_t MAIN_THREAD_ID = <uint64_t><int64_t>threading.main_thread().ident
cdef threading_Thread = threading.Thread
cdef threading_main_thread = threading.main_thread

cdef int subprocess_PIPE = subprocess.PIPE
cdef int subprocess_STDOUT = subprocess.STDOUT
Expand Down
5 changes: 5 additions & 0 deletions uvloop/includes/system.pxd
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from libc.stdint cimport int8_t, uint64_t

cdef extern from "arpa/inet.h" nogil:

int ntohl(int)
Expand Down Expand Up @@ -85,7 +87,10 @@ cdef extern from "includes/compat.h" nogil:

cdef extern from "includes/fork_handler.h":

uint64_t MAIN_THREAD_ID
int8_t MAIN_THREAD_ID_SET
ctypedef void (*OnForkHandler)()
void handleAtFork()
void setForkHandler(OnForkHandler handler)
void resetForkHandler()
void setMainThreadID(uint64_t id)
1 change: 0 additions & 1 deletion uvloop/loop.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ cdef class Loop:
bint _stopping

uint64_t _thread_id
bint _thread_is_main

object _task_factory
object _exception_handler
Expand Down
13 changes: 6 additions & 7 deletions uvloop/loop.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ cdef class Loop:

self._closed = 0
self._debug = 0
self._thread_is_main = 0
self._thread_id = 0
self._running = 0
self._stopping = 0
Expand Down Expand Up @@ -216,7 +215,11 @@ cdef class Loop:
self._servers = set()

cdef inline _is_main_thread(self):
return MAIN_THREAD_ID == PyThread_get_thread_ident()
cdef uint64_t main_thread_id = system.MAIN_THREAD_ID
if system.MAIN_THREAD_ID_SET == 0:
main_thread_id = <uint64_t><int64_t>threading_main_thread().ident
system.setMainThreadID(main_thread_id)
return main_thread_id == PyThread_get_thread_ident()

def __init__(self):
self.set_debug((not sys_ignore_environment
Expand Down Expand Up @@ -520,7 +523,6 @@ cdef class Loop:
self._last_error = None

self._thread_id = PyThread_get_thread_ident()
self._thread_is_main = MAIN_THREAD_ID == self._thread_id
self._running = 1

self.handler_check__exec_writes.start()
Expand All @@ -541,7 +543,6 @@ cdef class Loop:

self._pause_signals()

self._thread_is_main = 0
self._thread_id = 0
self._running = 0
self._stopping = 0
Expand Down Expand Up @@ -3287,16 +3288,14 @@ cdef Loop __forking_loop = None


cdef void __get_fork_handler() nogil:
global __forking
global __forking_loop

with gil:
if (__forking and __forking_loop is not None and
__forking_loop.active_process_handler is not None):
__forking_loop.active_process_handler._after_fork()

cdef __install_atfork():
global __atfork_installed

if __atfork_installed:
return
__atfork_installed = 1
Expand Down