diff --git a/Doc/library/asyncio-graph.rst b/Doc/library/asyncio-graph.rst new file mode 100644 index 00000000000000..fc8edeb426c567 --- /dev/null +++ b/Doc/library/asyncio-graph.rst @@ -0,0 +1,145 @@ +.. currentmodule:: asyncio + + +.. _asyncio-graph: + +======================== +Call Graph Introspection +======================== + +**Source code:** :source:`Lib/asyncio/graph.py` + +------------------------------------- + +asyncio has powerful runtime call graph introspection utilities +to trace the entire call graph of a running *coroutine* or *task*, or +a suspended *future*. These utilities and the underlying machinery +can be used from within a Python program or by external profilers +and debuggers. + +.. versionadded:: next + + +.. function:: print_call_graph(future=None, /, *, file=None, depth=1, limit=None) + + Print the async call graph for the current task or the provided + :class:`Task` or :class:`Future`. + + This function prints entries starting from the top frame and going + down towards the invocation point. + + The function receives an optional *future* argument. + If not passed, the current running task will be used. + + If the function is called on *the current task*, the optional + keyword-only *depth* argument can be used to skip the specified + number of frames from top of the stack. + + If the optional keyword-only *limit* argument is provided, each call stack + in the resulting graph is truncated to include at most ``abs(limit)`` + entries. If *limit* is positive, the entries left are the closest to + the invocation point. If *limit* is negative, the topmost entries are + left. If *limit* is omitted or ``None``, all entries are present. + If *limit* is ``0``, the call stack is not printed at all, only + "awaited by" information is printed. + + If *file* is omitted or ``None``, the function will print + to :data:`sys.stdout`. + + **Example:** + + The following Python code: + + .. code-block:: python + + import asyncio + + async def test(): + asyncio.print_call_graph() + + async def main(): + async with asyncio.TaskGroup() as g: + g.create_task(test()) + + asyncio.run(main()) + + will print:: + + * Task(name='Task-2', id=0x1039f0fe0) + + Call stack: + | File 't2.py', line 4, in async test() + + Awaited by: + * Task(name='Task-1', id=0x103a5e060) + + Call stack: + | File 'taskgroups.py', line 107, in async TaskGroup.__aexit__() + | File 't2.py', line 7, in async main() + +.. function:: format_call_graph(future=None, /, *, depth=1, limit=None) + + Like :func:`print_call_graph`, but returns a string. + If *future* is ``None`` and there's no current task, + the function returns an empty string. + + +.. function:: capture_call_graph(future=None, /, *, depth=1, limit=None) + + Capture the async call graph for the current task or the provided + :class:`Task` or :class:`Future`. + + The function receives an optional *future* argument. + If not passed, the current running task will be used. If there's no + current task, the function returns ``None``. + + If the function is called on *the current task*, the optional + keyword-only *depth* argument can be used to skip the specified + number of frames from top of the stack. + + Returns a ``FutureCallGraph`` data class object: + + * ``FutureCallGraph(future, call_stack, awaited_by)`` + + Where *future* is a reference to a :class:`Future` or + a :class:`Task` (or their subclasses.) + + ``call_stack`` is a tuple of ``FrameCallGraphEntry`` objects. + + ``awaited_by`` is a tuple of ``FutureCallGraph`` objects. + + * ``FrameCallGraphEntry(frame)`` + + Where *frame* is a frame object of a regular Python function + in the call stack. + + +Low level utility functions +=========================== + +To introspect an async call graph asyncio requires cooperation from +control flow structures, such as :func:`shield` or :class:`TaskGroup`. +Any time an intermediate :class:`Future` object with low-level APIs like +:meth:`Future.add_done_callback() ` is +involved, the following two functions should be used to inform asyncio +about how exactly such intermediate future objects are connected with +the tasks they wrap or control. + + +.. function:: future_add_to_awaited_by(future, waiter, /) + + Record that *future* is awaited on by *waiter*. + + Both *future* and *waiter* must be instances of + :class:`Future` or :class:`Task` or their subclasses, + otherwise the call would have no effect. + + A call to ``future_add_to_awaited_by()`` must be followed by an + eventual call to the :func:`future_discard_from_awaited_by` function + with the same arguments. + + +.. function:: future_discard_from_awaited_by(future, waiter, /) + + Record that *future* is no longer awaited on by *waiter*. + + Both *future* and *waiter* must be instances of + :class:`Future` or :class:`Task` or their subclasses, otherwise + the call would have no effect. diff --git a/Doc/library/asyncio.rst b/Doc/library/asyncio.rst index 5f83b3a2658da4..7d368dae49dc1d 100644 --- a/Doc/library/asyncio.rst +++ b/Doc/library/asyncio.rst @@ -99,6 +99,7 @@ You can experiment with an ``asyncio`` concurrent context in the :term:`REPL`: asyncio-subprocess.rst asyncio-queue.rst asyncio-exceptions.rst + asyncio-graph.rst .. toctree:: :caption: Low-level APIs diff --git a/Doc/library/inspect.rst b/Doc/library/inspect.rst index 0085207d3055f2..544efed1a76b96 100644 --- a/Doc/library/inspect.rst +++ b/Doc/library/inspect.rst @@ -150,6 +150,12 @@ attributes (see :ref:`import-mod-attrs` for module attributes): | | f_locals | local namespace seen by | | | | this frame | +-----------------+-------------------+---------------------------+ +| | f_generator | returns the generator or | +| | | coroutine object that | +| | | owns this frame, or | +| | | ``None`` if the frame is | +| | | of a regular function | ++-----------------+-------------------+---------------------------+ | | f_trace | tracing function for this | | | | frame, or ``None`` | +-----------------+-------------------+---------------------------+ @@ -310,6 +316,10 @@ attributes (see :ref:`import-mod-attrs` for module attributes): Add ``__builtins__`` attribute to functions. +.. versionchanged:: next + + Add ``f_generator`` attribute to frames. + .. function:: getmembers(object[, predicate]) Return all the members of an object in a list of ``(name, value)`` diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 0dcecd4944f2f6..89fb319eadd2f7 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -749,6 +749,11 @@ asyncio reduces memory usage. (Contributed by Kumar Aditya in :gh:`107803`.) +* :mod:`asyncio` has new utility functions for introspecting and printing + the program's call graph: :func:`asyncio.capture_call_graph` and + :func:`asyncio.print_call_graph`. + (Contributed by Yury Selivanov, Pablo Galindo Salgado, and Ɓukasz Langa + in :gh:`91048`.) base64 ------ diff --git a/Include/internal/pycore_debug_offsets.h b/Include/internal/pycore_debug_offsets.h index 184f4b9360b6d3..34debf35d14df4 100644 --- a/Include/internal/pycore_debug_offsets.h +++ b/Include/internal/pycore_debug_offsets.h @@ -11,6 +11,41 @@ extern "C" { #define _Py_Debug_Cookie "xdebugpy" +#if defined(__APPLE__) +# include +#endif + +// Macros to burn global values in custom sections so out-of-process +// profilers can locate them easily. + +#define GENERATE_DEBUG_SECTION(name, declaration) \ + _GENERATE_DEBUG_SECTION_WINDOWS(name) \ + _GENERATE_DEBUG_SECTION_APPLE(name) \ + declaration \ + _GENERATE_DEBUG_SECTION_LINUX(name) + +#if defined(MS_WINDOWS) +#define _GENERATE_DEBUG_SECTION_WINDOWS(name) \ + _Pragma(Py_STRINGIFY(section(Py_STRINGIFY(name), read, write))) \ + __declspec(allocate(Py_STRINGIFY(name))) +#else +#define _GENERATE_DEBUG_SECTION_WINDOWS(name) +#endif + +#if defined(__APPLE__) +#define _GENERATE_DEBUG_SECTION_APPLE(name) \ + __attribute__((section(SEG_DATA "," Py_STRINGIFY(name)))) +#else +#define _GENERATE_DEBUG_SECTION_APPLE(name) +#endif + +#if defined(__linux__) && (defined(__GNUC__) || defined(__clang__)) +#define _GENERATE_DEBUG_SECTION_LINUX(name) \ + __attribute__((section("." Py_STRINGIFY(name)))) +#else +#define _GENERATE_DEBUG_SECTION_LINUX(name) +#endif + #ifdef Py_GIL_DISABLED # define _Py_Debug_gilruntimestate_enabled offsetof(struct _gil_runtime_state, enabled) # define _Py_Debug_Free_Threaded 1 @@ -69,6 +104,7 @@ typedef struct _Py_DebugOffsets { uint64_t instr_ptr; uint64_t localsplus; uint64_t owner; + uint64_t stackpointer; } interpreter_frame; // Code object offset; @@ -113,6 +149,14 @@ typedef struct _Py_DebugOffsets { uint64_t ob_size; } list_object; + // PySet object offset; + struct _set_object { + uint64_t size; + uint64_t used; + uint64_t table; + uint64_t mask; + } set_object; + // PyDict object offset; struct _dict_object { uint64_t size; @@ -153,6 +197,14 @@ typedef struct _Py_DebugOffsets { uint64_t size; uint64_t collecting; } gc; + + // Generator object offset; + struct _gen_object { + uint64_t size; + uint64_t gi_name; + uint64_t gi_iframe; + uint64_t gi_frame_state; + } gen_object; } _Py_DebugOffsets; @@ -198,6 +250,7 @@ typedef struct _Py_DebugOffsets { .instr_ptr = offsetof(_PyInterpreterFrame, instr_ptr), \ .localsplus = offsetof(_PyInterpreterFrame, localsplus), \ .owner = offsetof(_PyInterpreterFrame, owner), \ + .stackpointer = offsetof(_PyInterpreterFrame, stackpointer), \ }, \ .code_object = { \ .size = sizeof(PyCodeObject), \ @@ -231,6 +284,12 @@ typedef struct _Py_DebugOffsets { .ob_item = offsetof(PyListObject, ob_item), \ .ob_size = offsetof(PyListObject, ob_base.ob_size), \ }, \ + .set_object = { \ + .size = sizeof(PySetObject), \ + .used = offsetof(PySetObject, used), \ + .table = offsetof(PySetObject, table), \ + .mask = offsetof(PySetObject, mask), \ + }, \ .dict_object = { \ .size = sizeof(PyDictObject), \ .ma_keys = offsetof(PyDictObject, ma_keys), \ @@ -260,6 +319,12 @@ typedef struct _Py_DebugOffsets { .size = sizeof(struct _gc_runtime_state), \ .collecting = offsetof(struct _gc_runtime_state, collecting), \ }, \ + .gen_object = { \ + .size = sizeof(PyGenObject), \ + .gi_name = offsetof(PyGenObject, gi_name), \ + .gi_iframe = offsetof(PyGenObject, gi_iframe), \ + .gi_frame_state = offsetof(PyGenObject, gi_frame_state), \ + }, \ } diff --git a/Include/internal/pycore_tstate.h b/Include/internal/pycore_tstate.h index b8bea72baeaaf5..74e1452763e56c 100644 --- a/Include/internal/pycore_tstate.h +++ b/Include/internal/pycore_tstate.h @@ -22,6 +22,7 @@ typedef struct _PyThreadStateImpl { PyThreadState base; PyObject *asyncio_running_loop; // Strong reference + PyObject *asyncio_running_task; // Strong reference struct _qsbr_thread_state *qsbr; // only used by free-threaded build struct llist_node mem_free_queue; // delayed free queue diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index edb615b1b6b1c6..4be7112fa017d4 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -10,6 +10,7 @@ from .events import * from .exceptions import * from .futures import * +from .graph import * from .locks import * from .protocols import * from .runners import * @@ -27,6 +28,7 @@ events.__all__ + exceptions.__all__ + futures.__all__ + + graph.__all__ + locks.__all__ + protocols.__all__ + runners.__all__ + diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py index 359b7a5e3f9eea..d1df6707302277 100644 --- a/Lib/asyncio/futures.py +++ b/Lib/asyncio/futures.py @@ -2,6 +2,7 @@ __all__ = ( 'Future', 'wrap_future', 'isfuture', + 'future_add_to_awaited_by', 'future_discard_from_awaited_by', ) import concurrent.futures @@ -66,6 +67,9 @@ class Future: # `yield Future()` (incorrect). _asyncio_future_blocking = False + # Used by the capture_call_stack() API. + __asyncio_awaited_by = None + __log_traceback = False def __init__(self, *, loop=None): @@ -115,6 +119,12 @@ def _log_traceback(self, val): raise ValueError('_log_traceback can only be set to False') self.__log_traceback = False + @property + def _asyncio_awaited_by(self): + if self.__asyncio_awaited_by is None: + return None + return frozenset(self.__asyncio_awaited_by) + def get_loop(self): """Return the event loop the Future is bound to.""" loop = self._loop @@ -415,6 +425,49 @@ def wrap_future(future, *, loop=None): return new_future +def future_add_to_awaited_by(fut, waiter, /): + """Record that `fut` is awaited on by `waiter`.""" + # For the sake of keeping the implementation minimal and assuming + # that most of asyncio users use the built-in Futures and Tasks + # (or their subclasses), we only support native Future objects + # and their subclasses. + # + # Longer version: tracking requires storing the caller-callee + # dependency somewhere. One obvious choice is to store that + # information right in the future itself in a dedicated attribute. + # This means that we'd have to require all duck-type compatible + # futures to implement a specific attribute used by asyncio for + # the book keeping. Another solution would be to store that in + # a global dictionary. The downside here is that that would create + # strong references and any scenario where the "add" call isn't + # followed by a "discard" call would lead to a memory leak. + # Using WeakDict would resolve that issue, but would complicate + # the C code (_asynciomodule.c). The bottom line here is that + # it's not clear that all this work would be worth the effort. + # + # Note that there's an accelerated version of this function + # shadowing this implementation later in this file. + if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture): + if fut._Future__asyncio_awaited_by is None: + fut._Future__asyncio_awaited_by = set() + fut._Future__asyncio_awaited_by.add(waiter) + + +def future_discard_from_awaited_by(fut, waiter, /): + """Record that `fut` is no longer awaited on by `waiter`.""" + # See the comment in "future_add_to_awaited_by()" body for + # details on implementation. + # + # Note that there's an accelerated version of this function + # shadowing this implementation later in this file. + if isinstance(fut, _PyFuture) and isinstance(waiter, _PyFuture): + if fut._Future__asyncio_awaited_by is not None: + fut._Future__asyncio_awaited_by.discard(waiter) + + +_py_future_add_to_awaited_by = future_add_to_awaited_by +_py_future_discard_from_awaited_by = future_discard_from_awaited_by + try: import _asyncio except ImportError: @@ -422,3 +475,7 @@ def wrap_future(future, *, loop=None): else: # _CFuture is needed for tests. Future = _CFuture = _asyncio.Future + future_add_to_awaited_by = _asyncio.future_add_to_awaited_by + future_discard_from_awaited_by = _asyncio.future_discard_from_awaited_by + _c_future_add_to_awaited_by = future_add_to_awaited_by + _c_future_discard_from_awaited_by = future_discard_from_awaited_by diff --git a/Lib/asyncio/graph.py b/Lib/asyncio/graph.py new file mode 100644 index 00000000000000..d8df7c9919abbf --- /dev/null +++ b/Lib/asyncio/graph.py @@ -0,0 +1,278 @@ +"""Introspection utils for tasks call graphs.""" + +import dataclasses +import sys +import types + +from . import events +from . import futures +from . import tasks + +__all__ = ( + 'capture_call_graph', + 'format_call_graph', + 'print_call_graph', + 'FrameCallGraphEntry', + 'FutureCallGraph', +) + +if False: # for type checkers + from typing import TextIO + +# Sadly, we can't re-use the traceback module's datastructures as those +# are tailored for error reporting, whereas we need to represent an +# async call graph. +# +# Going with pretty verbose names as we'd like to export them to the +# top level asyncio namespace, and want to avoid future name clashes. + + +@dataclasses.dataclass(frozen=True, slots=True) +class FrameCallGraphEntry: + frame: types.FrameType + + +@dataclasses.dataclass(frozen=True, slots=True) +class FutureCallGraph: + future: futures.Future + call_stack: tuple["FrameCallGraphEntry", ...] + awaited_by: tuple["FutureCallGraph", ...] + + +def _build_graph_for_future( + future: futures.Future, + *, + limit: int | None = None, +) -> FutureCallGraph: + if not isinstance(future, futures.Future): + raise TypeError( + f"{future!r} object does not appear to be compatible " + f"with asyncio.Future" + ) + + coro = None + if get_coro := getattr(future, 'get_coro', None): + coro = get_coro() if limit != 0 else None + + st: list[FrameCallGraphEntry] = [] + awaited_by: list[FutureCallGraph] = [] + + while coro is not None: + if hasattr(coro, 'cr_await'): + # A native coroutine or duck-type compatible iterator + st.append(FrameCallGraphEntry(coro.cr_frame)) + coro = coro.cr_await + elif hasattr(coro, 'ag_await'): + # A native async generator or duck-type compatible iterator + st.append(FrameCallGraphEntry(coro.cr_frame)) + coro = coro.ag_await + else: + break + + if future._asyncio_awaited_by: + for parent in future._asyncio_awaited_by: + awaited_by.append(_build_graph_for_future(parent, limit=limit)) + + if limit is not None: + if limit > 0: + st = st[:limit] + elif limit < 0: + st = st[limit:] + st.reverse() + return FutureCallGraph(future, tuple(st), tuple(awaited_by)) + + +def capture_call_graph( + future: futures.Future | None = None, + /, + *, + depth: int = 1, + limit: int | None = None, +) -> FutureCallGraph | None: + """Capture the async call graph for the current task or the provided Future. + + The graph is represented with three data structures: + + * FutureCallGraph(future, call_stack, awaited_by) + + Where 'future' is an instance of asyncio.Future or asyncio.Task. + + 'call_stack' is a tuple of FrameGraphEntry objects. + + 'awaited_by' is a tuple of FutureCallGraph objects. + + * FrameCallGraphEntry(frame) + + Where 'frame' is a frame object of a regular Python function + in the call stack. + + Receives an optional 'future' argument. If not passed, + the current task will be used. If there's no current task, the function + returns None. + + If "capture_call_graph()" is introspecting *the current task*, the + optional keyword-only 'depth' argument can be used to skip the specified + number of frames from top of the stack. + + If the optional keyword-only 'limit' argument is provided, each call stack + in the resulting graph is truncated to include at most ``abs(limit)`` + entries. If 'limit' is positive, the entries left are the closest to + the invocation point. If 'limit' is negative, the topmost entries are + left. If 'limit' is omitted or None, all entries are present. + If 'limit' is 0, the call stack is not captured at all, only + "awaited by" information is present. + """ + + loop = events._get_running_loop() + + if future is not None: + # Check if we're in a context of a running event loop; + # if yes - check if the passed future is the currently + # running task or not. + if loop is None or future is not tasks.current_task(loop=loop): + return _build_graph_for_future(future, limit=limit) + # else: future is the current task, move on. + else: + if loop is None: + raise RuntimeError( + 'capture_call_graph() is called outside of a running ' + 'event loop and no *future* to introspect was provided') + future = tasks.current_task(loop=loop) + + if future is None: + # This isn't a generic call stack introspection utility. If we + # can't determine the current task and none was provided, we + # just return. + return None + + if not isinstance(future, futures.Future): + raise TypeError( + f"{future!r} object does not appear to be compatible " + f"with asyncio.Future" + ) + + call_stack: list[FrameCallGraphEntry] = [] + + f = sys._getframe(depth) if limit != 0 else None + try: + while f is not None: + is_async = f.f_generator is not None + call_stack.append(FrameCallGraphEntry(f)) + + if is_async: + if f.f_back is not None and f.f_back.f_generator is None: + # We've reached the bottom of the coroutine stack, which + # must be the Task that runs it. + break + + f = f.f_back + finally: + del f + + awaited_by = [] + if future._asyncio_awaited_by: + for parent in future._asyncio_awaited_by: + awaited_by.append(_build_graph_for_future(parent, limit=limit)) + + if limit is not None: + limit *= -1 + if limit > 0: + call_stack = call_stack[:limit] + elif limit < 0: + call_stack = call_stack[limit:] + + return FutureCallGraph(future, tuple(call_stack), tuple(awaited_by)) + + +def format_call_graph( + future: futures.Future | None = None, + /, + *, + depth: int = 1, + limit: int | None = None, +) -> str: + """Return the async call graph as a string for `future`. + + If `future` is not provided, format the call graph for the current task. + """ + + def render_level(st: FutureCallGraph, buf: list[str], level: int) -> None: + def add_line(line: str) -> None: + buf.append(level * ' ' + line) + + if isinstance(st.future, tasks.Task): + add_line( + f'* Task(name={st.future.get_name()!r}, id={id(st.future):#x})' + ) + else: + add_line( + f'* Future(id={id(st.future):#x})' + ) + + if st.call_stack: + add_line( + f' + Call stack:' + ) + for ste in st.call_stack: + f = ste.frame + + if f.f_generator is None: + f = ste.frame + add_line( + f' | File {f.f_code.co_filename!r},' + f' line {f.f_lineno}, in' + f' {f.f_code.co_qualname}()' + ) + else: + c = f.f_generator + + try: + f = c.cr_frame + code = c.cr_code + tag = 'async' + except AttributeError: + try: + f = c.ag_frame + code = c.ag_code + tag = 'async generator' + except AttributeError: + f = c.gi_frame + code = c.gi_code + tag = 'generator' + + add_line( + f' | File {f.f_code.co_filename!r},' + f' line {f.f_lineno}, in' + f' {tag} {code.co_qualname}()' + ) + + if st.awaited_by: + add_line( + f' + Awaited by:' + ) + for fut in st.awaited_by: + render_level(fut, buf, level + 1) + + graph = capture_call_graph(future, depth=depth + 1, limit=limit) + if graph is None: + return "" + + buf: list[str] = [] + try: + render_level(graph, buf, 0) + finally: + # 'graph' has references to frames so we should + # make sure it's GC'ed as soon as we don't need it. + del graph + return '\n'.join(buf) + +def print_call_graph( + future: futures.Future | None = None, + /, + *, + file: TextIO | None = None, + depth: int = 1, + limit: int | None = None, +) -> None: + """Print the async call graph for the current task or the provided Future.""" + print(format_call_graph(future, depth=depth, limit=limit), file=file) diff --git a/Lib/asyncio/taskgroups.py b/Lib/asyncio/taskgroups.py index 8fda6c8d55e16c..1633478d1c87c2 100644 --- a/Lib/asyncio/taskgroups.py +++ b/Lib/asyncio/taskgroups.py @@ -6,6 +6,7 @@ from . import events from . import exceptions +from . import futures from . import tasks @@ -197,6 +198,8 @@ def create_task(self, coro, *, name=None, context=None): else: task = self._loop.create_task(coro, name=name, context=context) + futures.future_add_to_awaited_by(task, self._parent_task) + # Always schedule the done callback even if the task is # already done (e.g. if the coro was able to complete eagerly), # otherwise if the task completes with an exception then it will cancel @@ -228,6 +231,8 @@ def _abort(self): def _on_task_done(self, task): self._tasks.discard(task) + futures.future_discard_from_awaited_by(task, self._parent_task) + if self._on_completed_fut is not None and not self._tasks: if not self._on_completed_fut.done(): self._on_completed_fut.set_result(True) diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py index 2112dd4b99d17f..a25854cc4bd69e 100644 --- a/Lib/asyncio/tasks.py +++ b/Lib/asyncio/tasks.py @@ -322,6 +322,7 @@ def __step_run_and_handle_result(self, exc): self._loop.call_soon( self.__step, new_exc, context=self._context) else: + futures.future_add_to_awaited_by(result, self) result._asyncio_future_blocking = False result.add_done_callback( self.__wakeup, context=self._context) @@ -356,6 +357,7 @@ def __step_run_and_handle_result(self, exc): self = None # Needed to break cycles when an exception occurs. def __wakeup(self, future): + futures.future_discard_from_awaited_by(future, self) try: future.result() except BaseException as exc: @@ -502,6 +504,7 @@ async def _wait(fs, timeout, return_when, loop): if timeout is not None: timeout_handle = loop.call_later(timeout, _release_waiter, waiter) counter = len(fs) + cur_task = current_task() def _on_completion(f): nonlocal counter @@ -514,9 +517,11 @@ def _on_completion(f): timeout_handle.cancel() if not waiter.done(): waiter.set_result(None) + futures.future_discard_from_awaited_by(f, cur_task) for f in fs: f.add_done_callback(_on_completion) + futures.future_add_to_awaited_by(f, cur_task) try: await waiter @@ -802,10 +807,19 @@ def gather(*coros_or_futures, return_exceptions=False): outer.set_result([]) return outer - def _done_callback(fut): + loop = events._get_running_loop() + if loop is not None: + cur_task = current_task(loop) + else: + cur_task = None + + def _done_callback(fut, cur_task=cur_task): nonlocal nfinished nfinished += 1 + if cur_task is not None: + futures.future_discard_from_awaited_by(fut, cur_task) + if outer is None or outer.done(): if not fut.cancelled(): # Mark exception retrieved. @@ -862,7 +876,6 @@ def _done_callback(fut): nfuts = 0 nfinished = 0 done_futs = [] - loop = None outer = None # bpo-46672 for arg in coros_or_futures: if arg not in arg_to_fut: @@ -875,12 +888,13 @@ def _done_callback(fut): # can't control it, disable the "destroy pending task" # warning. fut._log_destroy_pending = False - nfuts += 1 arg_to_fut[arg] = fut if fut.done(): done_futs.append(fut) else: + if cur_task is not None: + futures.future_add_to_awaited_by(fut, cur_task) fut.add_done_callback(_done_callback) else: @@ -940,7 +954,15 @@ def shield(arg): loop = futures._get_loop(inner) outer = loop.create_future() - def _inner_done_callback(inner): + if loop is not None and (cur_task := current_task(loop)) is not None: + futures.future_add_to_awaited_by(inner, cur_task) + else: + cur_task = None + + def _inner_done_callback(inner, cur_task=cur_task): + if cur_task is not None: + futures.future_discard_from_awaited_by(inner, cur_task) + if outer.cancelled(): if not inner.cancelled(): # Mark inner's result as retrieved. diff --git a/Lib/test/test_asyncio/test_graph.py b/Lib/test/test_asyncio/test_graph.py new file mode 100644 index 00000000000000..fd2160d4ca3137 --- /dev/null +++ b/Lib/test/test_asyncio/test_graph.py @@ -0,0 +1,436 @@ +import asyncio +import io +import unittest + + +# To prevent a warning "test altered the execution environment" +def tearDownModule(): + asyncio._set_event_loop_policy(None) + + +def capture_test_stack(*, fut=None, depth=1): + + def walk(s): + ret = [ + (f"T<{n}>" if '-' not in (n := s.future.get_name()) else 'T') + if isinstance(s.future, asyncio.Task) else 'F' + ] + + ret.append( + [ + ( + f"s {entry.frame.f_code.co_name}" + if entry.frame.f_generator is None else + ( + f"a {entry.frame.f_generator.cr_code.co_name}" + if hasattr(entry.frame.f_generator, 'cr_code') else + f"ag {entry.frame.f_generator.ag_code.co_name}" + ) + ) for entry in s.call_stack + ] + ) + + ret.append( + sorted([ + walk(ab) for ab in s.awaited_by + ], key=lambda entry: entry[0]) + ) + + return ret + + buf = io.StringIO() + asyncio.print_call_graph(fut, file=buf, depth=depth+1) + + stack = asyncio.capture_call_graph(fut, depth=depth) + return walk(stack), buf.getvalue() + + +class CallStackTestBase: + + async def test_stack_tgroup(self): + + stack_for_c5 = None + + def c5(): + nonlocal stack_for_c5 + stack_for_c5 = capture_test_stack(depth=2) + + async def c4(): + await asyncio.sleep(0) + c5() + + async def c3(): + await c4() + + async def c2(): + await c3() + + async def c1(task): + await task + + async def main(): + async with asyncio.TaskGroup() as tg: + task = tg.create_task(c2(), name="c2_root") + tg.create_task(c1(task), name="sub_main_1") + tg.create_task(c1(task), name="sub_main_2") + + await main() + + self.assertEqual(stack_for_c5[0], [ + # task name + 'T', + # call stack + ['s c5', 'a c4', 'a c3', 'a c2'], + # awaited by + [ + ['T', + ['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], [] + ], + ['T', + ['a c1'], + [ + ['T', + ['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], [] + ] + ] + ], + ['T', + ['a c1'], + [ + ['T', + ['a _aexit', 'a __aexit__', 'a main', 'a test_stack_tgroup'], [] + ] + ] + ] + ] + ]) + + self.assertIn( + ' async CallStackTestBase.test_stack_tgroup()', + stack_for_c5[1]) + + + async def test_stack_async_gen(self): + + stack_for_gen_nested_call = None + + async def gen_nested_call(): + nonlocal stack_for_gen_nested_call + stack_for_gen_nested_call = capture_test_stack() + + async def gen(): + for num in range(2): + yield num + if num == 1: + await gen_nested_call() + + async def main(): + async for el in gen(): + pass + + await main() + + self.assertEqual(stack_for_gen_nested_call[0], [ + 'T', + [ + 's capture_test_stack', + 'a gen_nested_call', + 'ag gen', + 'a main', + 'a test_stack_async_gen' + ], + [] + ]) + + self.assertIn( + 'async generator CallStackTestBase.test_stack_async_gen..gen()', + stack_for_gen_nested_call[1]) + + async def test_stack_gather(self): + + stack_for_deep = None + + async def deep(): + await asyncio.sleep(0) + nonlocal stack_for_deep + stack_for_deep = capture_test_stack() + + async def c1(): + await asyncio.sleep(0) + await deep() + + async def c2(): + await asyncio.sleep(0) + + async def main(): + await asyncio.gather(c1(), c2()) + + await main() + + self.assertEqual(stack_for_deep[0], [ + 'T', + ['s capture_test_stack', 'a deep', 'a c1'], + [ + ['T', ['a main', 'a test_stack_gather'], []] + ] + ]) + + async def test_stack_shield(self): + + stack_for_shield = None + + async def deep(): + await asyncio.sleep(0) + nonlocal stack_for_shield + stack_for_shield = capture_test_stack() + + async def c1(): + await asyncio.sleep(0) + await deep() + + async def main(): + await asyncio.shield(c1()) + + await main() + + self.assertEqual(stack_for_shield[0], [ + 'T', + ['s capture_test_stack', 'a deep', 'a c1'], + [ + ['T', ['a main', 'a test_stack_shield'], []] + ] + ]) + + async def test_stack_timeout(self): + + stack_for_inner = None + + async def inner(): + await asyncio.sleep(0) + nonlocal stack_for_inner + stack_for_inner = capture_test_stack() + + async def c1(): + async with asyncio.timeout(1): + await asyncio.sleep(0) + await inner() + + async def main(): + await asyncio.shield(c1()) + + await main() + + self.assertEqual(stack_for_inner[0], [ + 'T', + ['s capture_test_stack', 'a inner', 'a c1'], + [ + ['T', ['a main', 'a test_stack_timeout'], []] + ] + ]) + + async def test_stack_wait(self): + + stack_for_inner = None + + async def inner(): + await asyncio.sleep(0) + nonlocal stack_for_inner + stack_for_inner = capture_test_stack() + + async def c1(): + async with asyncio.timeout(1): + await asyncio.sleep(0) + await inner() + + async def c2(): + for i in range(3): + await asyncio.sleep(0) + + async def main(t1, t2): + while True: + _, pending = await asyncio.wait([t1, t2]) + if not pending: + break + + t1 = asyncio.create_task(c1()) + t2 = asyncio.create_task(c2()) + try: + await main(t1, t2) + finally: + await t1 + await t2 + + self.assertEqual(stack_for_inner[0], [ + 'T', + ['s capture_test_stack', 'a inner', 'a c1'], + [ + ['T', + ['a _wait', 'a wait', 'a main', 'a test_stack_wait'], + [] + ] + ] + ]) + + async def test_stack_task(self): + + stack_for_inner = None + + async def inner(): + await asyncio.sleep(0) + nonlocal stack_for_inner + stack_for_inner = capture_test_stack() + + async def c1(): + await inner() + + async def c2(): + await asyncio.create_task(c1(), name='there there') + + async def main(): + await c2() + + await main() + + self.assertEqual(stack_for_inner[0], [ + 'T', + ['s capture_test_stack', 'a inner', 'a c1'], + [['T', ['a c2', 'a main', 'a test_stack_task'], []]] + ]) + + async def test_stack_future(self): + + stack_for_fut = None + + async def a2(fut): + await fut + + async def a1(fut): + await a2(fut) + + async def b1(fut): + await fut + + async def main(): + nonlocal stack_for_fut + + fut = asyncio.Future() + async with asyncio.TaskGroup() as g: + g.create_task(a1(fut), name="task A") + g.create_task(b1(fut), name='task B') + + for _ in range(5): + # Do a few iterations to ensure that both a1 and b1 + # await on the future + await asyncio.sleep(0) + + stack_for_fut = capture_test_stack(fut=fut) + fut.set_result(None) + + await main() + + self.assertEqual(stack_for_fut[0], + ['F', + [], + [ + ['T', + ['a a2', 'a a1'], + [['T', ['a test_stack_future'], []]] + ], + ['T', + ['a b1'], + [['T', ['a test_stack_future'], []]] + ], + ]] + ) + + self.assertTrue(stack_for_fut[1].startswith('* Future(id=')) + + +@unittest.skipIf( + not hasattr(asyncio.futures, "_c_future_add_to_awaited_by"), + "C-accelerated asyncio call graph backend missing", +) +class TestCallStackC(CallStackTestBase, unittest.IsolatedAsyncioTestCase): + def setUp(self): + futures = asyncio.futures + tasks = asyncio.tasks + + self._Future = asyncio.Future + asyncio.Future = futures.Future = futures._CFuture + + self._Task = asyncio.Task + asyncio.Task = tasks.Task = tasks._CTask + + self._future_add_to_awaited_by = asyncio.future_add_to_awaited_by + futures.future_add_to_awaited_by = futures._c_future_add_to_awaited_by + asyncio.future_add_to_awaited_by = futures.future_add_to_awaited_by + + self._future_discard_from_awaited_by = asyncio.future_discard_from_awaited_by + futures.future_discard_from_awaited_by = futures._c_future_discard_from_awaited_by + asyncio.future_discard_from_awaited_by = futures.future_discard_from_awaited_by + + + def tearDown(self): + futures = asyncio.futures + tasks = asyncio.tasks + + futures.future_discard_from_awaited_by = self._future_discard_from_awaited_by + asyncio.future_discard_from_awaited_by = self._future_discard_from_awaited_by + del self._future_discard_from_awaited_by + + futures.future_add_to_awaited_by = self._future_add_to_awaited_by + asyncio.future_add_to_awaited_by = self._future_add_to_awaited_by + del self._future_add_to_awaited_by + + asyncio.Task = self._Task + tasks.Task = self._Task + del self._Task + + asyncio.Future = self._Future + futures.Future = self._Future + del self._Future + + +@unittest.skipIf( + not hasattr(asyncio.futures, "_py_future_add_to_awaited_by"), + "Pure Python asyncio call graph backend missing", +) +class TestCallStackPy(CallStackTestBase, unittest.IsolatedAsyncioTestCase): + def setUp(self): + futures = asyncio.futures + tasks = asyncio.tasks + + self._Future = asyncio.Future + asyncio.Future = futures.Future = futures._PyFuture + + self._Task = asyncio.Task + asyncio.Task = tasks.Task = tasks._PyTask + + self._future_add_to_awaited_by = asyncio.future_add_to_awaited_by + futures.future_add_to_awaited_by = futures._py_future_add_to_awaited_by + asyncio.future_add_to_awaited_by = futures.future_add_to_awaited_by + + self._future_discard_from_awaited_by = asyncio.future_discard_from_awaited_by + futures.future_discard_from_awaited_by = futures._py_future_discard_from_awaited_by + asyncio.future_discard_from_awaited_by = futures.future_discard_from_awaited_by + + + def tearDown(self): + futures = asyncio.futures + tasks = asyncio.tasks + + futures.future_discard_from_awaited_by = self._future_discard_from_awaited_by + asyncio.future_discard_from_awaited_by = self._future_discard_from_awaited_by + del self._future_discard_from_awaited_by + + futures.future_add_to_awaited_by = self._future_add_to_awaited_by + asyncio.future_add_to_awaited_by = self._future_add_to_awaited_by + del self._future_add_to_awaited_by + + asyncio.Task = self._Task + tasks.Task = self._Task + del self._Task + + asyncio.Future = self._Future + futures.Future = self._Future + del self._Future diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py index d896fec73d1971..eceae532422f3c 100644 --- a/Lib/test/test_external_inspection.py +++ b/Lib/test/test_external_inspection.py @@ -13,8 +13,10 @@ try: from _testexternalinspection import PROCESS_VM_READV_SUPPORTED from _testexternalinspection import get_stack_trace + from _testexternalinspection import get_async_stack_trace except ImportError: - raise unittest.SkipTest("Test only runs when _testexternalinspection is available") + raise unittest.SkipTest( + "Test only runs when _testexternalinspection is available") def _make_test_script(script_dir, script_basename, source): to_return = make_script(script_dir, script_basename, source) @@ -23,12 +25,14 @@ def _make_test_script(script_dir, script_basename, source): class TestGetStackTrace(unittest.TestCase): - @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS") - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", + "Test only runs on Linux and MacOS") + @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support") def test_remote_stack_trace(self): # Spawn a process with some realistic Python code script = textwrap.dedent("""\ - import time, sys, os + import time, sys def bar(): for x in range(100): if x == 50: @@ -37,8 +41,8 @@ def baz(): foo() def foo(): - fifo = sys.argv[1] - with open(sys.argv[1], "w") as fifo: + fifo_path = sys.argv[1] + with open(fifo_path, "w") as fifo: fifo.write("ready") time.sleep(1000) @@ -74,8 +78,218 @@ def foo(): ] self.assertEqual(stack_trace, expected_stack_trace) - @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS") - @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support") + @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", + "Test only runs on Linux and MacOS") + @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support") + def test_async_remote_stack_trace(self): + # Spawn a process with some realistic Python code + script = textwrap.dedent("""\ + import asyncio + import time + import sys + + def c5(): + fifo_path = sys.argv[1] + with open(fifo_path, "w") as fifo: + fifo.write("ready") + time.sleep(10000) + + async def c4(): + await asyncio.sleep(0) + c5() + + async def c3(): + await c4() + + async def c2(): + await c3() + + async def c1(task): + await task + + async def main(): + async with asyncio.TaskGroup() as tg: + task = tg.create_task(c2(), name="c2_root") + tg.create_task(c1(task), name="sub_main_1") + tg.create_task(c1(task), name="sub_main_2") + + def new_eager_loop(): + loop = asyncio.new_event_loop() + eager_task_factory = asyncio.create_eager_task_factory( + asyncio.Task) + loop.set_task_factory(eager_task_factory) + return loop + + asyncio.run(main(), loop_factory={TASK_FACTORY}) + """) + stack_trace = None + for task_factory_variant in "asyncio.new_event_loop", "new_eager_loop": + with ( + self.subTest(task_factory_variant=task_factory_variant), + os_helper.temp_dir() as work_dir, + ): + script_dir = os.path.join(work_dir, "script_pkg") + os.mkdir(script_dir) + fifo = f"{work_dir}/the_fifo" + os.mkfifo(fifo) + script_name = _make_test_script( + script_dir, 'script', + script.format(TASK_FACTORY=task_factory_variant)) + try: + p = subprocess.Popen( + [sys.executable, script_name, str(fifo)] + ) + with open(fifo, "r") as fifo_file: + response = fifo_file.read() + self.assertEqual(response, "ready") + stack_trace = get_async_stack_trace(p.pid) + except PermissionError: + self.skipTest( + "Insufficient permissions to read the stack trace") + finally: + os.remove(fifo) + p.kill() + p.terminate() + p.wait(timeout=SHORT_TIMEOUT) + + # sets are unordered, so we want to sort "awaited_by"s + stack_trace[2].sort(key=lambda x: x[1]) + + root_task = "Task-1" + expected_stack_trace = [ + ["c5", "c4", "c3", "c2"], + "c2_root", + [ + [["main"], root_task, []], + [["c1"], "sub_main_1", [[["main"], root_task, []]]], + [["c1"], "sub_main_2", [[["main"], root_task, []]]], + ], + ] + self.assertEqual(stack_trace, expected_stack_trace) + + @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", + "Test only runs on Linux and MacOS") + @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support") + def test_asyncgen_remote_stack_trace(self): + # Spawn a process with some realistic Python code + script = textwrap.dedent("""\ + import asyncio + import time + import sys + + async def gen_nested_call(): + fifo_path = sys.argv[1] + with open(fifo_path, "w") as fifo: + fifo.write("ready") + time.sleep(10000) + + async def gen(): + for num in range(2): + yield num + if num == 1: + await gen_nested_call() + + async def main(): + async for el in gen(): + pass + + asyncio.run(main()) + """) + stack_trace = None + with os_helper.temp_dir() as work_dir: + script_dir = os.path.join(work_dir, "script_pkg") + os.mkdir(script_dir) + fifo = f"{work_dir}/the_fifo" + os.mkfifo(fifo) + script_name = _make_test_script(script_dir, 'script', script) + try: + p = subprocess.Popen([sys.executable, script_name, str(fifo)]) + with open(fifo, "r") as fifo_file: + response = fifo_file.read() + self.assertEqual(response, "ready") + stack_trace = get_async_stack_trace(p.pid) + except PermissionError: + self.skipTest("Insufficient permissions to read the stack trace") + finally: + os.remove(fifo) + p.kill() + p.terminate() + p.wait(timeout=SHORT_TIMEOUT) + + # sets are unordered, so we want to sort "awaited_by"s + stack_trace[2].sort(key=lambda x: x[1]) + + expected_stack_trace = [ + ['gen_nested_call', 'gen', 'main'], 'Task-1', [] + ] + self.assertEqual(stack_trace, expected_stack_trace) + + @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", + "Test only runs on Linux and MacOS") + @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support") + def test_async_gather_remote_stack_trace(self): + # Spawn a process with some realistic Python code + script = textwrap.dedent("""\ + import asyncio + import time + import sys + + async def deep(): + await asyncio.sleep(0) + fifo_path = sys.argv[1] + with open(fifo_path, "w") as fifo: + fifo.write("ready") + time.sleep(10000) + + async def c1(): + await asyncio.sleep(0) + await deep() + + async def c2(): + await asyncio.sleep(0) + + async def main(): + await asyncio.gather(c1(), c2()) + + asyncio.run(main()) + """) + stack_trace = None + with os_helper.temp_dir() as work_dir: + script_dir = os.path.join(work_dir, "script_pkg") + os.mkdir(script_dir) + fifo = f"{work_dir}/the_fifo" + os.mkfifo(fifo) + script_name = _make_test_script(script_dir, 'script', script) + try: + p = subprocess.Popen([sys.executable, script_name, str(fifo)]) + with open(fifo, "r") as fifo_file: + response = fifo_file.read() + self.assertEqual(response, "ready") + stack_trace = get_async_stack_trace(p.pid) + except PermissionError: + self.skipTest( + "Insufficient permissions to read the stack trace") + finally: + os.remove(fifo) + p.kill() + p.terminate() + p.wait(timeout=SHORT_TIMEOUT) + + # sets are unordered, so we want to sort "awaited_by"s + stack_trace[2].sort(key=lambda x: x[1]) + + expected_stack_trace = [ + ['deep', 'c1'], 'Task-2', [[['main'], 'Task-1', []]] + ] + self.assertEqual(stack_trace, expected_stack_trace) + + @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", + "Test only runs on Linux and MacOS") + @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support") def test_self_trace(self): stack_trace = get_stack_trace(os.getpid()) self.assertEqual(stack_trace[0], "test_self_trace") diff --git a/Lib/test/test_frame.py b/Lib/test/test_frame.py index 11f191700ccef0..7bd13eada8fedf 100644 --- a/Lib/test/test_frame.py +++ b/Lib/test/test_frame.py @@ -222,6 +222,56 @@ def test_f_lineno_del_segfault(self): with self.assertRaises(AttributeError): del f.f_lineno + def test_f_generator(self): + # Test f_generator in different contexts. + + def t0(): + def nested(): + frame = sys._getframe() + return frame.f_generator + + def gen(): + yield nested() + + g = gen() + try: + return next(g) + finally: + g.close() + + def t1(): + frame = sys._getframe() + return frame.f_generator + + def t2(): + frame = sys._getframe() + yield frame.f_generator + + async def t3(): + frame = sys._getframe() + return frame.f_generator + + # For regular functions f_generator is None + self.assertIsNone(t0()) + self.assertIsNone(t1()) + + # For generators f_generator is equal to self + g = t2() + try: + frame_g = next(g) + self.assertIs(g, frame_g) + finally: + g.close() + + # Ditto for coroutines + c = t3() + try: + c.send(None) + except StopIteration as ex: + self.assertIs(ex.value, c) + else: + raise AssertionError('coroutine did not exit') + class ReprTest(unittest.TestCase): """ diff --git a/Misc/NEWS.d/next/Library/2024-10-02-11-17-23.gh-issue-91048.QWY-b1.rst b/Misc/NEWS.d/next/Library/2024-10-02-11-17-23.gh-issue-91048.QWY-b1.rst new file mode 100644 index 00000000000000..c2faf470ffc9cf --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-10-02-11-17-23.gh-issue-91048.QWY-b1.rst @@ -0,0 +1,2 @@ +Add :func:`asyncio.capture_call_graph` and +:func:`asyncio.print_call_graph` functions. diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 48f0ef95934fa4..bba7416b398101 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -40,12 +40,17 @@ typedef enum { PyObject *prefix##_source_tb; \ PyObject *prefix##_cancel_msg; \ PyObject *prefix##_cancelled_exc; \ + PyObject *prefix##_awaited_by; \ fut_state prefix##_state; \ - /* These bitfields need to be at the end of the struct - so that these and bitfields from TaskObj are contiguous. + /* Used by profilers to make traversing the stack from an external \ + process faster. */ \ + char prefix##_is_task; \ + char prefix##_awaited_by_is_set; \ + /* These bitfields need to be at the end of the struct \ + so that these and bitfields from TaskObj are contiguous. \ */ \ unsigned prefix##_log_tb: 1; \ - unsigned prefix##_blocking: 1; + unsigned prefix##_blocking: 1; \ typedef struct { FutureObj_HEAD(fut) @@ -69,12 +74,24 @@ typedef struct { PyObject *sw_arg; } TaskStepMethWrapper; - #define Future_CheckExact(state, obj) Py_IS_TYPE(obj, state->FutureType) #define Task_CheckExact(state, obj) Py_IS_TYPE(obj, state->TaskType) -#define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType) -#define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType) +#define Future_Check(state, obj) \ + (Future_CheckExact(state, obj) \ + || PyObject_TypeCheck(obj, state->FutureType)) + +#define Task_Check(state, obj) \ + (Task_CheckExact(state, obj) \ + || PyObject_TypeCheck(obj, state->TaskType)) + +// This macro is optimized to quickly return for native Future *or* Task +// objects by inlining fast "exact" checks to be called first. +#define TaskOrFuture_Check(state, obj) \ + (Task_CheckExact(state, obj) \ + || Future_CheckExact(state, obj) \ + || PyObject_TypeCheck(obj, state->FutureType) \ + || PyObject_TypeCheck(obj, state->TaskType)) #ifdef Py_GIL_DISABLED # define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex) @@ -84,6 +101,37 @@ typedef struct { # define ASYNCIO_STATE_UNLOCK(state) ((void)state) #endif +typedef struct _Py_AsyncioModuleDebugOffsets { + struct _asyncio_task_object { + uint64_t size; + uint64_t task_name; + uint64_t task_awaited_by; + uint64_t task_is_task; + uint64_t task_awaited_by_is_set; + uint64_t task_coro; + } asyncio_task_object; + struct _asyncio_thread_state { + uint64_t size; + uint64_t asyncio_running_loop; + uint64_t asyncio_running_task; + } asyncio_thread_state; +} Py_AsyncioModuleDebugOffsets; + +GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets AsyncioDebug) + = {.asyncio_task_object = { + .size = sizeof(TaskObj), + .task_name = offsetof(TaskObj, task_name), + .task_awaited_by = offsetof(TaskObj, task_awaited_by), + .task_is_task = offsetof(TaskObj, task_is_task), + .task_awaited_by_is_set = offsetof(TaskObj, task_awaited_by_is_set), + .task_coro = offsetof(TaskObj, task_coro), + }, + .asyncio_thread_state = { + .size = sizeof(_PyThreadStateImpl), + .asyncio_running_loop = offsetof(_PyThreadStateImpl, asyncio_running_loop), + .asyncio_running_task = offsetof(_PyThreadStateImpl, asyncio_running_task), + }}; + /* State of the _asyncio module */ typedef struct { #ifdef Py_GIL_DISABLED @@ -185,6 +233,22 @@ static PyObject * task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *result); +static void +clear_task_coro(TaskObj *task) +{ + Py_CLEAR(task->task_coro); +} + + +static void +set_task_coro(TaskObj *task, PyObject *coro) +{ + assert(coro != NULL); + Py_INCREF(coro); + Py_XSETREF(task->task_coro, coro); +} + + static int _is_coroutine(asyncio_state *state, PyObject *coro) { @@ -437,10 +501,13 @@ future_init(FutureObj *fut, PyObject *loop) Py_CLEAR(fut->fut_source_tb); Py_CLEAR(fut->fut_cancel_msg); Py_CLEAR(fut->fut_cancelled_exc); + Py_CLEAR(fut->fut_awaited_by); fut->fut_state = STATE_PENDING; fut->fut_log_tb = 0; fut->fut_blocking = 0; + fut->fut_awaited_by_is_set = 0; + fut->fut_is_task = 0; if (loop == Py_None) { asyncio_state *state = get_asyncio_state_by_def((PyObject *)fut); @@ -480,6 +547,115 @@ future_init(FutureObj *fut, PyObject *loop) return 0; } +static int +future_awaited_by_add(asyncio_state *state, PyObject *fut, PyObject *thing) +{ + if (!TaskOrFuture_Check(state, fut) || !TaskOrFuture_Check(state, thing)) { + // We only want to support native asyncio Futures. + // For further insight see the comment in the Python + // implementation of "future_add_to_awaited_by()". + return 0; + } + + FutureObj *_fut = (FutureObj *)fut; + + /* Most futures/task are only awaited by one entity, so we want + to avoid always creating a set for `fut_awaited_by`. + */ + if (_fut->fut_awaited_by == NULL) { + assert(!_fut->fut_awaited_by_is_set); + Py_INCREF(thing); + _fut->fut_awaited_by = thing; + return 0; + } + + if (_fut->fut_awaited_by_is_set) { + assert(PySet_CheckExact(_fut->fut_awaited_by)); + return PySet_Add(_fut->fut_awaited_by, thing); + } + + PyObject *set = PySet_New(NULL); + if (set == NULL) { + return -1; + } + if (PySet_Add(set, thing)) { + Py_DECREF(set); + return -1; + } + if (PySet_Add(set, _fut->fut_awaited_by)) { + Py_DECREF(set); + return -1; + } + Py_SETREF(_fut->fut_awaited_by, set); + _fut->fut_awaited_by_is_set = 1; + return 0; +} + +static int +future_awaited_by_discard(asyncio_state *state, PyObject *fut, PyObject *thing) +{ + if (!TaskOrFuture_Check(state, fut) || !TaskOrFuture_Check(state, thing)) { + // We only want to support native asyncio Futures. + // For further insight see the comment in the Python + // implementation of "future_add_to_awaited_by()". + return 0; + } + + FutureObj *_fut = (FutureObj *)fut; + + /* Following the semantics of 'set.discard()' here in not + raising an error if `thing` isn't in the `awaited_by` "set". + */ + if (_fut->fut_awaited_by == NULL) { + return 0; + } + if (_fut->fut_awaited_by == thing) { + Py_CLEAR(_fut->fut_awaited_by); + return 0; + } + if (_fut->fut_awaited_by_is_set) { + assert(PySet_CheckExact(_fut->fut_awaited_by)); + int err = PySet_Discard(_fut->fut_awaited_by, thing); + if (err < 0) { + return -1; + } else { + return 0; + } + } + return 0; +} + +/*[clinic input] +@critical_section +@getter +_asyncio.Future._asyncio_awaited_by +[clinic start generated code]*/ + +static PyObject * +_asyncio_Future__asyncio_awaited_by_get_impl(FutureObj *self) +/*[clinic end generated code: output=932af76d385d2e2a input=64c1783df2d44d2b]*/ +{ + /* Implementation of a Python getter. */ + if (self->fut_awaited_by == NULL) { + Py_RETURN_NONE; + } + if (self->fut_awaited_by_is_set) { + /* Already a set, just wrap it into a frozen set and return. */ + assert(PySet_CheckExact(self->fut_awaited_by)); + return PyFrozenSet_New(self->fut_awaited_by); + } + + PyObject *set = PyFrozenSet_New(NULL); + if (set == NULL) { + return NULL; + } + if (PySet_Add(set, self->fut_awaited_by)) { + Py_DECREF(set); + return NULL; + } + return set; +} + static PyObject * future_set_result(asyncio_state *state, FutureObj *fut, PyObject *res) { @@ -780,6 +956,8 @@ FutureObj_clear(FutureObj *fut) Py_CLEAR(fut->fut_source_tb); Py_CLEAR(fut->fut_cancel_msg); Py_CLEAR(fut->fut_cancelled_exc); + Py_CLEAR(fut->fut_awaited_by); + fut->fut_awaited_by_is_set = 0; PyObject_ClearManagedDict((PyObject *)fut); return 0; } @@ -798,6 +976,7 @@ FutureObj_traverse(FutureObj *fut, visitproc visit, void *arg) Py_VISIT(fut->fut_source_tb); Py_VISIT(fut->fut_cancel_msg); Py_VISIT(fut->fut_cancelled_exc); + Py_VISIT(fut->fut_awaited_by); PyObject_VisitManagedDict((PyObject *)fut, visit, arg); return 0; } @@ -1577,6 +1756,7 @@ static PyGetSetDef FutureType_getsetlist[] = { _ASYNCIO_FUTURE__LOG_TRACEBACK_GETSETDEF _ASYNCIO_FUTURE__SOURCE_TRACEBACK_GETSETDEF _ASYNCIO_FUTURE__CANCEL_MESSAGE_GETSETDEF + _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF {NULL} /* Sentinel */ }; @@ -2053,7 +2233,50 @@ enter_task(asyncio_state *state, PyObject *loop, PyObject *task) Py_DECREF(item); return -1; } - Py_DECREF(item); + + assert(task == item); + Py_CLEAR(item); + + // This block is needed to enable `asyncio.capture_call_graph()` API. + // We want to be enable debuggers and profilers to be able to quickly + // introspect the asyncio running state from another process. + // When we do that, we need to essentially traverse the address space + // of a Python process and understand what every Python thread in it is + // currently doing, mainly: + // + // * current frame + // * current asyncio task + // + // A naive solution would be to require profilers and debuggers to + // find the current task in the "_asynciomodule" module state, but + // unfortunately that would require a lot of complicated remote + // memory reads and logic, as Python's dict is a notoriously complex + // and ever-changing data structure. + // + // So the easier solution is to put a strong reference to the currently + // running `asyncio.Task` on the interpreter thread state (we already + // have some asyncio state there.) + _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); + if (ts->asyncio_running_loop == loop) { + // Protect from a situation when someone calls this method + // from another thread. This shouldn't ever happen though, + // as `enter_task` and `leave_task` can either be called by: + // + // - `asyncio.Task` itself, in `Task.__step()`. That method + // can only be called by the event loop itself. + // + // - third-party Task "from scratch" implementations, that + // our `capture_call_graph` API doesn't support anyway. + // + // That said, we still want to make sure we don't end up in + // a broken state, so we check that we're in the correct thread + // by comparing the *loop* argument to the event loop running + // in the current thread. If they match we know we're in the + // right thread, as asyncio event loops don't change threads. + assert(ts->asyncio_running_task == NULL); + ts->asyncio_running_task = Py_NewRef(task); + } + return 0; } @@ -2078,7 +2301,6 @@ leave_task_predicate(PyObject *item, void *task) static int leave_task(asyncio_state *state, PyObject *loop, PyObject *task) -/*[clinic end generated code: output=0ebf6db4b858fb41 input=51296a46313d1ad8]*/ { int res = _PyDict_DelItemIf(state->current_tasks, loop, leave_task_predicate, task); @@ -2086,6 +2308,14 @@ leave_task(asyncio_state *state, PyObject *loop, PyObject *task) // task was not found return err_leave_task(Py_None, task); } + + // See the comment in `enter_task` for the explanation of why + // the following is needed. + _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); + if (ts->asyncio_running_loop == NULL || ts->asyncio_running_loop == loop) { + Py_CLEAR(ts->asyncio_running_task); + } + return res; } @@ -2158,6 +2388,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, if (future_init((FutureObj*)self, loop)) { return -1; } + self->task_is_task = 1; asyncio_state *state = get_asyncio_state_by_def((PyObject *)self); int is_coro = is_coroutine(state, coro); @@ -2185,8 +2416,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, self->task_must_cancel = 0; self->task_log_destroy_pending = 1; self->task_num_cancels_requested = 0; - Py_INCREF(coro); - Py_XSETREF(self->task_coro, coro); + set_task_coro(self, coro); if (name == Py_None) { // optimization: defer task name formatting @@ -2234,8 +2464,8 @@ static int TaskObj_clear(TaskObj *task) { (void)FutureObj_clear((FutureObj*) task); + clear_task_coro(task); Py_CLEAR(task->task_context); - Py_CLEAR(task->task_coro); Py_CLEAR(task->task_name); Py_CLEAR(task->task_fut_waiter); return 0; @@ -2260,6 +2490,7 @@ TaskObj_traverse(TaskObj *task, visitproc visit, void *arg) Py_VISIT(fut->fut_source_tb); Py_VISIT(fut->fut_cancel_msg); Py_VISIT(fut->fut_cancelled_exc); + Py_VISIT(fut->fut_awaited_by); PyObject_VisitManagedDict((PyObject *)fut, visit, arg); return 0; } @@ -3050,6 +3281,10 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu goto yield_insteadof_yf; } + if (future_awaited_by_add(state, result, (PyObject *)task)) { + goto fail; + } + fut->fut_blocking = 0; /* result.add_done_callback(task._wakeup) */ @@ -3139,6 +3374,10 @@ task_step_handle_result_impl(asyncio_state *state, TaskObj *task, PyObject *resu goto yield_insteadof_yf; } + if (future_awaited_by_add(state, result, (PyObject *)task)) { + goto fail; + } + /* result._asyncio_future_blocking = False */ if (PyObject_SetAttr( result, &_Py_ID(_asyncio_future_blocking), Py_False) == -1) { @@ -3335,7 +3574,7 @@ task_eager_start(asyncio_state *state, TaskObj *task) register_task(state, task); } else { // This seems to really help performance on pyperformance benchmarks - Py_CLEAR(task->task_coro); + clear_task_coro(task); } return retval; @@ -3350,6 +3589,11 @@ task_wakeup_lock_held(TaskObj *task, PyObject *o) assert(o); asyncio_state *state = get_asyncio_state_by_def((PyObject *)task); + + if (future_awaited_by_discard(state, o, (PyObject *)task)) { + return NULL; + } + if (Future_CheckExact(state, o) || Task_CheckExact(state, o)) { PyObject *fut_result = NULL; int res; @@ -3833,6 +4077,50 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) return res; } +/*[clinic input] +_asyncio.future_add_to_awaited_by + + fut: object + waiter: object + / + +Record that `fut` is awaited on by `waiter`. + +[clinic start generated code]*/ + +static PyObject * +_asyncio_future_add_to_awaited_by_impl(PyObject *module, PyObject *fut, + PyObject *waiter) +/*[clinic end generated code: output=0ab9a1a63389e4df input=06e6eaac51f532b9]*/ +{ + asyncio_state *state = get_asyncio_state(module); + if (future_awaited_by_add(state, fut, waiter)) { + return NULL; + } + Py_RETURN_NONE; +} + +/*[clinic input] +_asyncio.future_discard_from_awaited_by + + fut: object + waiter: object + / + +[clinic start generated code]*/ + +static PyObject * +_asyncio_future_discard_from_awaited_by_impl(PyObject *module, PyObject *fut, + PyObject *waiter) +/*[clinic end generated code: output=a03b0b4323b779de input=3833f7639e88e483]*/ +{ + asyncio_state *state = get_asyncio_state(module); + if (future_awaited_by_discard(state, fut, waiter)) { + return NULL; + } + Py_RETURN_NONE; +} + static int module_traverse(PyObject *mod, visitproc visit, void *arg) { @@ -3896,6 +4184,7 @@ module_clear(PyObject *mod) // those get cleared in PyThreadState_Clear. _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); Py_CLEAR(ts->asyncio_running_loop); + Py_CLEAR(ts->asyncio_running_task); return 0; } @@ -3926,7 +4215,6 @@ module_init(asyncio_state *state) goto fail; } - state->context_kwname = Py_BuildValue("(s)", "context"); if (state->context_kwname == NULL) { goto fail; @@ -4007,6 +4295,8 @@ static PyMethodDef asyncio_methods[] = { _ASYNCIO__LEAVE_TASK_METHODDEF _ASYNCIO__SWAP_CURRENT_TASK_METHODDEF _ASYNCIO_ALL_TASKS_METHODDEF + _ASYNCIO_FUTURE_ADD_TO_AWAITED_BY_METHODDEF + _ASYNCIO_FUTURE_DISCARD_FROM_AWAITED_BY_METHODDEF {NULL, NULL} }; diff --git a/Modules/_testexternalinspection.c b/Modules/_testexternalinspection.c index 8a92d5cdd894be..0c31d1b7a3486c 100644 --- a/Modules/_testexternalinspection.c +++ b/Modules/_testexternalinspection.c @@ -59,10 +59,30 @@ # define HAVE_PROCESS_VM_READV 0 #endif +struct _Py_AsyncioModuleDebugOffsets { + struct _asyncio_task_object { + uint64_t size; + uint64_t task_name; + uint64_t task_awaited_by; + uint64_t task_is_task; + uint64_t task_awaited_by_is_set; + uint64_t task_coro; + } asyncio_task_object; + struct _asyncio_thread_state { + uint64_t size; + uint64_t asyncio_running_loop; + uint64_t asyncio_running_task; + } asyncio_thread_state; +}; + #if defined(__APPLE__) && TARGET_OS_OSX -static void* -analyze_macho64(mach_port_t proc_ref, void* base, void* map) -{ +static uintptr_t +return_section_address( + const char* section, + mach_port_t proc_ref, + uintptr_t base, + void* map +) { struct mach_header_64* hdr = (struct mach_header_64*)map; int ncmds = hdr->ncmds; @@ -72,35 +92,40 @@ analyze_macho64(mach_port_t proc_ref, void* base, void* map) mach_vm_size_t size = 0; mach_msg_type_number_t count = sizeof(vm_region_basic_info_data_64_t); mach_vm_address_t address = (mach_vm_address_t)base; - vm_region_basic_info_data_64_t region_info; + vm_region_basic_info_data_64_t r_info; mach_port_t object_name; + uintptr_t vmaddr = 0; for (int i = 0; cmd_cnt < 2 && i < ncmds; i++) { + if (cmd->cmd == LC_SEGMENT_64 && strcmp(cmd->segname, "__TEXT") == 0) { + vmaddr = cmd->vmaddr; + } if (cmd->cmd == LC_SEGMENT_64 && strcmp(cmd->segname, "__DATA") == 0) { while (cmd->filesize != size) { address += size; - if (mach_vm_region( - proc_ref, - &address, - &size, - VM_REGION_BASIC_INFO_64, - (vm_region_info_t)®ion_info, // cppcheck-suppress [uninitvar] - &count, - &object_name) - != KERN_SUCCESS) - { - PyErr_SetString(PyExc_RuntimeError, "Cannot get any more VM maps.\n"); - return NULL; + kern_return_t ret = mach_vm_region( + proc_ref, + &address, + &size, + VM_REGION_BASIC_INFO_64, + (vm_region_info_t)&r_info, // cppcheck-suppress [uninitvar] + &count, + &object_name + ); + if (ret != KERN_SUCCESS) { + PyErr_SetString( + PyExc_RuntimeError, "Cannot get any more VM maps.\n"); + return 0; } } - base = (void*)address - cmd->vmaddr; int nsects = cmd->nsects; - struct section_64* sec = - (struct section_64*)((void*)cmd + sizeof(struct segment_command_64)); + struct section_64* sec = (struct section_64*)( + (void*)cmd + sizeof(struct segment_command_64) + ); for (int j = 0; j < nsects; j++) { - if (strcmp(sec[j].sectname, "PyRuntime") == 0) { - return base + sec[j].addr; + if (strcmp(sec[j].sectname, section) == 0) { + return base + sec[j].addr - vmaddr; } } cmd_cnt++; @@ -108,33 +133,39 @@ analyze_macho64(mach_port_t proc_ref, void* base, void* map) cmd = (struct segment_command_64*)((void*)cmd + cmd->cmdsize); } - return NULL; + return 0; } -static void* -analyze_macho(char* path, void* base, mach_vm_size_t size, mach_port_t proc_ref) -{ +static uintptr_t +search_section_in_file( + const char* secname, + char* path, + uintptr_t base, + mach_vm_size_t size, + mach_port_t proc_ref +) { int fd = open(path, O_RDONLY); if (fd == -1) { PyErr_Format(PyExc_RuntimeError, "Cannot open binary %s\n", path); - return NULL; + return 0; } struct stat fs; if (fstat(fd, &fs) == -1) { - PyErr_Format(PyExc_RuntimeError, "Cannot get size of binary %s\n", path); + PyErr_Format( + PyExc_RuntimeError, "Cannot get size of binary %s\n", path); close(fd); - return NULL; + return 0; } void* map = mmap(0, fs.st_size, PROT_READ, MAP_SHARED, fd, 0); if (map == MAP_FAILED) { PyErr_Format(PyExc_RuntimeError, "Cannot map binary %s\n", path); close(fd); - return NULL; + return 0; } - void* result = NULL; + uintptr_t result = 0; struct mach_header_64* hdr = (struct mach_header_64*)map; switch (hdr->magic) { @@ -142,11 +173,13 @@ analyze_macho(char* path, void* base, mach_vm_size_t size, mach_port_t proc_ref) case MH_CIGAM: case FAT_MAGIC: case FAT_CIGAM: - PyErr_SetString(PyExc_RuntimeError, "32-bit Mach-O binaries are not supported"); + PyErr_SetString( + PyExc_RuntimeError, + "32-bit Mach-O binaries are not supported"); break; case MH_MAGIC_64: case MH_CIGAM_64: - result = analyze_macho64(proc_ref, base, map); + result = return_section_address(secname, proc_ref, base, map); break; default: PyErr_SetString(PyExc_RuntimeError, "Unknown Mach-O magic"); @@ -174,9 +207,8 @@ pid_to_task(pid_t pid) return task; } -static void* -get_py_runtime_macos(pid_t pid) -{ +static uintptr_t +search_map_for_section(pid_t pid, const char* secname, const char* substr) { mach_vm_address_t address = 0; mach_vm_size_t size = 0; mach_msg_type_number_t count = sizeof(vm_region_basic_info_data_64_t); @@ -186,12 +218,11 @@ get_py_runtime_macos(pid_t pid) mach_port_t proc_ref = pid_to_task(pid); if (proc_ref == 0) { PyErr_SetString(PyExc_PermissionError, "Cannot get task for PID"); - return NULL; + return 0; } int match_found = 0; char map_filename[MAXPATHLEN + 1]; - void* result_address = NULL; while (mach_vm_region( proc_ref, &address, @@ -199,15 +230,21 @@ get_py_runtime_macos(pid_t pid) VM_REGION_BASIC_INFO_64, (vm_region_info_t)®ion_info, &count, - &object_name) - == KERN_SUCCESS) + &object_name) == KERN_SUCCESS) { - int path_len = proc_regionfilename(pid, address, map_filename, MAXPATHLEN); + int path_len = proc_regionfilename( + pid, address, map_filename, MAXPATHLEN); if (path_len == 0) { address += size; continue; } + if ((region_info.protection & VM_PROT_READ) == 0 + || (region_info.protection & VM_PROT_EXECUTE) == 0) { + address += size; + continue; + } + char* filename = strrchr(map_filename, '/'); if (filename != NULL) { filename++; // Move past the '/' @@ -215,26 +252,22 @@ get_py_runtime_macos(pid_t pid) filename = map_filename; // No path, use the whole string } - // Check if the filename starts with "python" or "libpython" - if (!match_found && strncmp(filename, "python", 6) == 0) { - match_found = 1; - result_address = analyze_macho(map_filename, (void*)address, size, proc_ref); - } - if (strncmp(filename, "libpython", 9) == 0) { + if (!match_found && strncmp(filename, substr, strlen(substr)) == 0) { match_found = 1; - result_address = analyze_macho(map_filename, (void*)address, size, proc_ref); - break; + return search_section_in_file( + secname, map_filename, address, size, proc_ref); } address += size; } - return result_address; + return 0; } + #endif #ifdef __linux__ -void* -find_python_map_start_address(pid_t pid, char* result_filename) +static uintptr_t +find_map_start_address(pid_t pid, char* result_filename, const char* map) { char maps_file_path[64]; sprintf(maps_file_path, "/proc/%d/maps", pid); @@ -242,17 +275,20 @@ find_python_map_start_address(pid_t pid, char* result_filename) FILE* maps_file = fopen(maps_file_path, "r"); if (maps_file == NULL) { PyErr_SetFromErrno(PyExc_OSError); - return NULL; + return 0; } int match_found = 0; char line[256]; char map_filename[PATH_MAX]; - void* result_address = 0; + uintptr_t result_address = 0; while (fgets(line, sizeof(line), maps_file) != NULL) { unsigned long start_address = 0; - sscanf(line, "%lx-%*x %*s %*s %*s %*s %s", &start_address, map_filename); + sscanf( + line, "%lx-%*x %*s %*s %*s %*s %s", + &start_address, map_filename + ); char* filename = strrchr(map_filename, '/'); if (filename != NULL) { filename++; // Move past the '/' @@ -260,15 +296,9 @@ find_python_map_start_address(pid_t pid, char* result_filename) filename = map_filename; // No path, use the whole string } - // Check if the filename starts with "python" or "libpython" - if (!match_found && strncmp(filename, "python", 6) == 0) { - match_found = 1; - result_address = (void*)start_address; - strcpy(result_filename, map_filename); - } - if (strncmp(filename, "libpython", 9) == 0) { + if (!match_found && strncmp(filename, map, strlen(map)) == 0) { match_found = 1; - result_address = (void*)start_address; + result_address = start_address; strcpy(result_filename, map_filename); break; } @@ -283,18 +313,17 @@ find_python_map_start_address(pid_t pid, char* result_filename) return result_address; } -void* -get_py_runtime_linux(pid_t pid) +static uintptr_t +search_map_for_section(pid_t pid, const char* secname, const char* map) { char elf_file[256]; - void* start_address = (void*)find_python_map_start_address(pid, elf_file); + uintptr_t start_address = find_map_start_address(pid, elf_file, map); if (start_address == 0) { - PyErr_SetString(PyExc_RuntimeError, "No memory map associated with python or libpython found"); - return NULL; + return 0; } - void* result = NULL; + uintptr_t result = 0; void* file_memory = NULL; int fd = open(elf_file, O_RDONLY); @@ -317,20 +346,29 @@ get_py_runtime_linux(pid_t pid) Elf_Ehdr* elf_header = (Elf_Ehdr*)file_memory; - Elf_Shdr* section_header_table = (Elf_Shdr*)(file_memory + elf_header->e_shoff); + Elf_Shdr* section_header_table = + (Elf_Shdr*)(file_memory + elf_header->e_shoff); Elf_Shdr* shstrtab_section = §ion_header_table[elf_header->e_shstrndx]; char* shstrtab = (char*)(file_memory + shstrtab_section->sh_offset); - Elf_Shdr* py_runtime_section = NULL; + Elf_Shdr* section = NULL; for (int i = 0; i < elf_header->e_shnum; i++) { - if (strcmp(".PyRuntime", shstrtab + section_header_table[i].sh_name) == 0) { - py_runtime_section = §ion_header_table[i]; + const char* this_sec_name = ( + shstrtab + + section_header_table[i].sh_name + + 1 // "+1" accounts for the leading "." + ); + + if (strcmp(secname, this_sec_name) == 0) { + section = §ion_header_table[i]; break; } } - Elf_Phdr* program_header_table = (Elf_Phdr*)(file_memory + elf_header->e_phoff); + Elf_Phdr* program_header_table = + (Elf_Phdr*)(file_memory + elf_header->e_phoff); + // Find the first PT_LOAD segment Elf_Phdr* first_load_segment = NULL; for (int i = 0; i < elf_header->e_phnum; i++) { @@ -340,10 +378,12 @@ get_py_runtime_linux(pid_t pid) } } - if (py_runtime_section != NULL && first_load_segment != NULL) { - uintptr_t elf_load_addr = first_load_segment->p_vaddr - - (first_load_segment->p_vaddr % first_load_segment->p_align); - result = start_address + py_runtime_section->sh_addr - elf_load_addr; + if (section != NULL && first_load_segment != NULL) { + uintptr_t elf_load_addr = + first_load_segment->p_vaddr - ( + first_load_segment->p_vaddr % first_load_segment->p_align + ); + result = start_address + (uintptr_t)section->sh_addr - elf_load_addr; } exit: @@ -355,10 +395,28 @@ get_py_runtime_linux(pid_t pid) } return result; } + #endif -ssize_t -read_memory(pid_t pid, void* remote_address, size_t len, void* dst) +static uintptr_t +get_py_runtime(pid_t pid) +{ + uintptr_t address = search_map_for_section(pid, "PyRuntime", "libpython"); + if (address == 0) { + address = search_map_for_section(pid, "PyRuntime", "python"); + } + return address; +} + +static uintptr_t +get_async_debug(pid_t pid) +{ + return search_map_for_section(pid, "AsyncioDebug", "_asyncio.cpython"); +} + + +static ssize_t +read_memory(pid_t pid, uintptr_t remote_address, size_t len, void* dst) { ssize_t total_bytes_read = 0; #if defined(__linux__) && HAVE_PROCESS_VM_READV @@ -394,13 +452,19 @@ read_memory(pid_t pid, void* remote_address, size_t len, void* dst) if (kr != KERN_SUCCESS) { switch (kr) { case KERN_PROTECTION_FAILURE: - PyErr_SetString(PyExc_PermissionError, "Not enough permissions to read memory"); + PyErr_SetString( + PyExc_PermissionError, + "Not enough permissions to read memory"); break; case KERN_INVALID_ARGUMENT: - PyErr_SetString(PyExc_PermissionError, "Invalid argument to mach_vm_read_overwrite"); + PyErr_SetString( + PyExc_PermissionError, + "Invalid argument to mach_vm_read_overwrite"); break; default: - PyErr_SetString(PyExc_RuntimeError, "Unknown error reading memory"); + PyErr_SetString( + PyExc_RuntimeError, + "Unknown error reading memory"); } return -1; } @@ -411,13 +475,22 @@ read_memory(pid_t pid, void* remote_address, size_t len, void* dst) return total_bytes_read; } -int -read_string(pid_t pid, _Py_DebugOffsets* debug_offsets, void* address, char* buffer, Py_ssize_t size) -{ +static int +read_string( + pid_t pid, + _Py_DebugOffsets* debug_offsets, + uintptr_t address, + char* buffer, + Py_ssize_t size +) { Py_ssize_t len; - ssize_t bytes_read = - read_memory(pid, address + debug_offsets->unicode_object.length, sizeof(Py_ssize_t), &len); - if (bytes_read == -1) { + ssize_t bytes_read = read_memory( + pid, + address + debug_offsets->unicode_object.length, + sizeof(Py_ssize_t), + &len + ); + if (bytes_read < 0) { return -1; } if (len >= size) { @@ -426,51 +499,645 @@ read_string(pid_t pid, _Py_DebugOffsets* debug_offsets, void* address, char* buf } size_t offset = debug_offsets->unicode_object.asciiobject_size; bytes_read = read_memory(pid, address + offset, len, buffer); - if (bytes_read == -1) { + if (bytes_read < 0) { return -1; } buffer[len] = '\0'; return 0; } -void* -get_py_runtime(pid_t pid) + +static inline int +read_ptr(pid_t pid, uintptr_t address, uintptr_t *ptr_addr) { -#if defined(__linux__) - return get_py_runtime_linux(pid); -#elif defined(__APPLE__) && TARGET_OS_OSX - return get_py_runtime_macos(pid); -#else - return NULL; -#endif + int bytes_read = read_memory(pid, address, sizeof(void*), ptr_addr); + if (bytes_read < 0) { + return -1; + } + return 0; +} + +static inline int +read_ssize_t(pid_t pid, uintptr_t address, Py_ssize_t *size) +{ + int bytes_read = read_memory(pid, address, sizeof(Py_ssize_t), size); + if (bytes_read < 0) { + return -1; + } + return 0; } static int -parse_code_object( - int pid, - PyObject* result, - struct _Py_DebugOffsets* offsets, - void* address, - void** previous_frame) +read_py_ptr(pid_t pid, uintptr_t address, uintptr_t *ptr_addr) +{ + if (read_ptr(pid, address, ptr_addr)) { + return -1; + } + *ptr_addr &= ~Py_TAG_BITS; + return 0; +} + +static int +read_char(pid_t pid, uintptr_t address, char *result) +{ + int bytes_read = read_memory(pid, address, sizeof(char), result); + if (bytes_read < 0) { + return -1; + } + return 0; +} + +static int +read_int(pid_t pid, uintptr_t address, int *result) +{ + int bytes_read = read_memory(pid, address, sizeof(int), result); + if (bytes_read < 0) { + return -1; + } + return 0; +} + +static int +read_pyobj(pid_t pid, uintptr_t address, PyObject *ptr_addr) +{ + int bytes_read = read_memory(pid, address, sizeof(PyObject), ptr_addr); + if (bytes_read < 0) { + return -1; + } + return 0; +} + +static PyObject * +read_py_str( + pid_t pid, + _Py_DebugOffsets* debug_offsets, + uintptr_t address, + ssize_t max_len +) { + assert(max_len > 0); + + PyObject *result = NULL; + + char *buf = (char *)PyMem_RawMalloc(max_len); + if (buf == NULL) { + PyErr_NoMemory(); + return NULL; + } + if (read_string(pid, debug_offsets, address, buf, max_len)) { + goto err; + } + + result = PyUnicode_FromString(buf); + if (result == NULL) { + goto err; + } + + PyMem_RawFree(buf); + assert(result != NULL); + return result; + +err: + PyMem_RawFree(buf); + return NULL; +} + +static long +read_py_long(pid_t pid, _Py_DebugOffsets* offsets, uintptr_t address) { - void* address_of_function_name; - read_memory( + unsigned int shift = PYLONG_BITS_IN_DIGIT; + + ssize_t size; + uintptr_t lv_tag; + + int bytes_read = read_memory( + pid, address + offsets->long_object.lv_tag, + sizeof(uintptr_t), + &lv_tag); + if (bytes_read < 0) { + return -1; + } + + int negative = (lv_tag & 3) == 2; + size = lv_tag >> 3; + + if (size == 0) { + return 0; + } + + char *digits = (char *)PyMem_RawMalloc(size * sizeof(digit)); + if (!digits) { + PyErr_NoMemory(); + return -1; + } + + bytes_read = read_memory( + pid, + address + offsets->long_object.ob_digit, + sizeof(digit) * size, + digits + ); + if (bytes_read < 0) { + goto error; + } + + long value = 0; + + for (ssize_t i = 0; i < size; ++i) { + long long factor; + if (__builtin_mul_overflow(digits[i], (1UL << (ssize_t)(shift * i)), + &factor) + ) { + goto error; + } + if (__builtin_add_overflow(value, factor, &value)) { + goto error; + } + } + PyMem_RawFree(digits); + if (negative) { + value *= -1; + } + return value; +error: + PyMem_RawFree(digits); + return -1; +} + +static PyObject * +parse_task_name( + int pid, + _Py_DebugOffsets* offsets, + struct _Py_AsyncioModuleDebugOffsets* async_offsets, + uintptr_t task_address +) { + uintptr_t task_name_addr; + int err = read_py_ptr( + pid, + task_address + async_offsets->asyncio_task_object.task_name, + &task_name_addr); + if (err) { + return NULL; + } + + // The task name can be a long or a string so we need to check the type + + PyObject task_name_obj; + err = read_pyobj( + pid, + task_name_addr, + &task_name_obj); + if (err) { + return NULL; + } + + int flags; + err = read_int( + pid, + (uintptr_t)task_name_obj.ob_type + offsets->type_object.tp_flags, + &flags); + if (err) { + return NULL; + } + + if ((flags & Py_TPFLAGS_LONG_SUBCLASS)) { + long res = read_py_long(pid, offsets, task_name_addr); + if (res == -1) { + PyErr_SetString(PyExc_RuntimeError, "Failed to get task name"); + return NULL; + } + return PyUnicode_FromFormat("Task-%d", res); + } + + if(!(flags & Py_TPFLAGS_UNICODE_SUBCLASS)) { + PyErr_SetString(PyExc_RuntimeError, "Invalid task name object"); + return NULL; + } + + return read_py_str( + pid, + offsets, + task_name_addr, + 255 + ); +} + +static int +parse_coro_chain( + int pid, + struct _Py_DebugOffsets* offsets, + struct _Py_AsyncioModuleDebugOffsets* async_offsets, + uintptr_t coro_address, + PyObject *render_to +) { + assert((void*)coro_address != NULL); + + uintptr_t gen_type_addr; + int err = read_ptr( + pid, + coro_address + sizeof(void*), + &gen_type_addr); + if (err) { + return -1; + } + + uintptr_t gen_name_addr; + err = read_py_ptr( + pid, + coro_address + offsets->gen_object.gi_name, + &gen_name_addr); + if (err) { + return -1; + } + + PyObject *name = read_py_str( + pid, + offsets, + gen_name_addr, + 255 + ); + if (name == NULL) { + return -1; + } + + if (PyList_Append(render_to, name)) { + return -1; + } + Py_DECREF(name); + + int gi_frame_state; + err = read_int( + pid, + coro_address + offsets->gen_object.gi_frame_state, + &gi_frame_state); + + if (gi_frame_state == FRAME_SUSPENDED_YIELD_FROM) { + char owner; + err = read_char( pid, - (void*)(address + offsets->code_object.name), - sizeof(void*), - &address_of_function_name); + coro_address + offsets->gen_object.gi_iframe + + offsets->interpreter_frame.owner, + &owner + ); + if (err) { + return -1; + } + if (owner != FRAME_OWNED_BY_GENERATOR) { + PyErr_SetString( + PyExc_RuntimeError, + "generator doesn't own its frame \\_o_/"); + return -1; + } - if (address_of_function_name == NULL) { - PyErr_SetString(PyExc_RuntimeError, "No function name found"); + uintptr_t stackpointer_addr; + err = read_py_ptr( + pid, + coro_address + offsets->gen_object.gi_iframe + + offsets->interpreter_frame.stackpointer, + &stackpointer_addr); + if (err) { + return -1; + } + + if ((void*)stackpointer_addr != NULL) { + uintptr_t gi_await_addr; + err = read_py_ptr( + pid, + stackpointer_addr - sizeof(void*), + &gi_await_addr); + if (err) { + return -1; + } + + if ((void*)gi_await_addr != NULL) { + uintptr_t gi_await_addr_type_addr; + int err = read_ptr( + pid, + gi_await_addr + sizeof(void*), + &gi_await_addr_type_addr); + if (err) { + return -1; + } + + if (gen_type_addr == gi_await_addr_type_addr) { + /* This needs an explanation. We always start with parsing + native coroutine / generator frames. Ultimately they + are awaiting on something. That something can be + a native coroutine frame or... an iterator. + If it's the latter -- we can't continue building + our chain. So the condition to bail out of this is + to do that when the type of the current coroutine + doesn't match the type of whatever it points to + in its cr_await. + */ + err = parse_coro_chain( + pid, + offsets, + async_offsets, + gi_await_addr, + render_to + ); + if (err) { + return -1; + } + } + } + } + + } + + return 0; +} + + +static int +parse_task_awaited_by( + int pid, + struct _Py_DebugOffsets* offsets, + struct _Py_AsyncioModuleDebugOffsets* async_offsets, + uintptr_t task_address, + PyObject *awaited_by +); + + +static int +parse_task( + int pid, + struct _Py_DebugOffsets* offsets, + struct _Py_AsyncioModuleDebugOffsets* async_offsets, + uintptr_t task_address, + PyObject *render_to +) { + char is_task; + int err = read_char( + pid, + task_address + async_offsets->asyncio_task_object.task_is_task, + &is_task); + if (err) { + return -1; + } + + uintptr_t refcnt; + read_ptr(pid, task_address + sizeof(Py_ssize_t), &refcnt); + + PyObject* result = PyList_New(0); + if (result == NULL) { + return -1; + } + + PyObject *call_stack = PyList_New(0); + if (call_stack == NULL) { + goto err; + } + if (PyList_Append(result, call_stack)) { + Py_DECREF(call_stack); + goto err; + } + /* we can operate on a borrowed one to simplify cleanup */ + Py_DECREF(call_stack); + + if (is_task) { + PyObject *tn = parse_task_name( + pid, offsets, async_offsets, task_address); + if (tn == NULL) { + goto err; + } + if (PyList_Append(result, tn)) { + Py_DECREF(tn); + goto err; + } + Py_DECREF(tn); + + uintptr_t coro_addr; + err = read_py_ptr( + pid, + task_address + async_offsets->asyncio_task_object.task_coro, + &coro_addr); + if (err) { + goto err; + } + + if ((void*)coro_addr != NULL) { + err = parse_coro_chain( + pid, + offsets, + async_offsets, + coro_addr, + call_stack + ); + if (err) { + goto err; + } + + if (PyList_Reverse(call_stack)) { + goto err; + } + } + } + + if (PyList_Append(render_to, result)) { + goto err; + } + Py_DECREF(result); + + PyObject *awaited_by = PyList_New(0); + if (awaited_by == NULL) { + goto err; + } + if (PyList_Append(result, awaited_by)) { + Py_DECREF(awaited_by); + goto err; + } + /* we can operate on a borrowed one to simplify cleanup */ + Py_DECREF(awaited_by); + + if (parse_task_awaited_by(pid, offsets, async_offsets, + task_address, awaited_by) + ) { + goto err; + } + + return 0; + +err: + Py_DECREF(result); + return -1; +} + +static int +parse_tasks_in_set( + int pid, + struct _Py_DebugOffsets* offsets, + struct _Py_AsyncioModuleDebugOffsets* async_offsets, + uintptr_t set_addr, + PyObject *awaited_by +) { + uintptr_t set_obj; + if (read_py_ptr( + pid, + set_addr, + &set_obj) + ) { return -1; } - char function_name[256]; - if (read_string(pid, offsets, address_of_function_name, function_name, sizeof(function_name)) != 0) { + Py_ssize_t num_els; + if (read_ssize_t( + pid, + set_obj + offsets->set_object.used, + &num_els) + ) { return -1; } - PyObject* py_function_name = PyUnicode_FromString(function_name); + Py_ssize_t set_len; + if (read_ssize_t( + pid, + set_obj + offsets->set_object.mask, + &set_len) + ) { + return -1; + } + set_len++; // The set contains the `mask+1` element slots. + + uintptr_t table_ptr; + if (read_ptr( + pid, + set_obj + offsets->set_object.table, + &table_ptr) + ) { + return -1; + } + + Py_ssize_t i = 0; + Py_ssize_t els = 0; + while (i < set_len) { + uintptr_t key_addr; + if (read_py_ptr(pid, table_ptr, &key_addr)) { + return -1; + } + + if ((void*)key_addr != NULL) { + Py_ssize_t ref_cnt; + if (read_ssize_t(pid, table_ptr, &ref_cnt)) { + return -1; + } + + if (ref_cnt) { + // if 'ref_cnt=0' it's a set dummy marker + + if (parse_task( + pid, + offsets, + async_offsets, + key_addr, + awaited_by) + ) { + return -1; + } + + if (++els == num_els) { + break; + } + } + } + + table_ptr += sizeof(void*) * 2; + i++; + } + return 0; +} + + +static int +parse_task_awaited_by( + int pid, + struct _Py_DebugOffsets* offsets, + struct _Py_AsyncioModuleDebugOffsets* async_offsets, + uintptr_t task_address, + PyObject *awaited_by +) { + uintptr_t task_ab_addr; + int err = read_py_ptr( + pid, + task_address + async_offsets->asyncio_task_object.task_awaited_by, + &task_ab_addr); + if (err) { + return -1; + } + + if ((void*)task_ab_addr == NULL) { + return 0; + } + + char awaited_by_is_a_set; + err = read_char( + pid, + task_address + async_offsets->asyncio_task_object.task_awaited_by_is_set, + &awaited_by_is_a_set); + if (err) { + return -1; + } + + if (awaited_by_is_a_set) { + if (parse_tasks_in_set( + pid, + offsets, + async_offsets, + task_address + async_offsets->asyncio_task_object.task_awaited_by, + awaited_by) + ) { + return -1; + } + } else { + uintptr_t sub_task; + if (read_py_ptr( + pid, + task_address + async_offsets->asyncio_task_object.task_awaited_by, + &sub_task) + ) { + return -1; + } + + if (parse_task( + pid, + offsets, + async_offsets, + sub_task, + awaited_by) + ) { + return -1; + } + } + + return 0; +} + +static int +parse_code_object( + int pid, + PyObject* result, + struct _Py_DebugOffsets* offsets, + uintptr_t address, + uintptr_t* previous_frame +) { + uintptr_t address_of_function_name; + int bytes_read = read_memory( + pid, + address + offsets->code_object.name, + sizeof(void*), + &address_of_function_name + ); + if (bytes_read < 0) { + return -1; + } + + if ((void*)address_of_function_name == NULL) { + PyErr_SetString(PyExc_RuntimeError, "No function name found"); + return -1; + } + + PyObject* py_function_name = read_py_str( + pid, offsets, address_of_function_name, 256); if (py_function_name == NULL) { return -1; } @@ -486,25 +1153,26 @@ parse_code_object( static int parse_frame_object( - int pid, - PyObject* result, - struct _Py_DebugOffsets* offsets, - void* address, - void** previous_frame) -{ + int pid, + PyObject* result, + struct _Py_DebugOffsets* offsets, + uintptr_t address, + uintptr_t* previous_frame +) { + int err; + ssize_t bytes_read = read_memory( - pid, - (void*)(address + offsets->interpreter_frame.previous), - sizeof(void*), - previous_frame); - if (bytes_read == -1) { + pid, + address + offsets->interpreter_frame.previous, + sizeof(void*), + previous_frame + ); + if (bytes_read < 0) { return -1; } char owner; - bytes_read = - read_memory(pid, (void*)(address + offsets->interpreter_frame.owner), sizeof(char), &owner); - if (bytes_read < 0) { + if (read_char(pid, address + offsets->interpreter_frame.owner, &owner)) { return -1; } @@ -513,27 +1181,256 @@ parse_frame_object( } uintptr_t address_of_code_object; + err = read_py_ptr( + pid, + address + offsets->interpreter_frame.executable, + &address_of_code_object + ); + if (err) { + return -1; + } + + if ((void*)address_of_code_object == NULL) { + return 0; + } + + return parse_code_object( + pid, result, offsets, address_of_code_object, previous_frame); +} + +static int +parse_async_frame_object( + int pid, + PyObject* result, + struct _Py_DebugOffsets* offsets, + uintptr_t address, + uintptr_t* previous_frame, + uintptr_t* code_object +) { + int err; + + ssize_t bytes_read = read_memory( + pid, + address + offsets->interpreter_frame.previous, + sizeof(void*), + previous_frame + ); + if (bytes_read < 0) { + return -1; + } + + char owner; bytes_read = read_memory( + pid, address + offsets->interpreter_frame.owner, sizeof(char), &owner); + if (bytes_read < 0) { + return -1; + } + + if (owner == FRAME_OWNED_BY_CSTACK || owner == FRAME_OWNED_BY_INTERPRETER) { + return 0; // C frame + } + + if (owner != FRAME_OWNED_BY_GENERATOR + && owner != FRAME_OWNED_BY_THREAD) { + PyErr_Format(PyExc_RuntimeError, "Unhandled frame owner %d.\n", owner); + return -1; + } + + err = read_py_ptr( + pid, + address + offsets->interpreter_frame.executable, + code_object + ); + if (err) { + return -1; + } + + assert(code_object != NULL); + if ((void*)*code_object == NULL) { + return 0; + } + + if (parse_code_object( + pid, result, offsets, *code_object, previous_frame)) { + return -1; + } + + return 1; +} + +static int +read_offsets( + int pid, + uintptr_t *runtime_start_address, + _Py_DebugOffsets* debug_offsets +) { + *runtime_start_address = get_py_runtime(pid); + assert(runtime_start_address != NULL); + if ((void*)*runtime_start_address == NULL) { + if (!PyErr_Occurred()) { + PyErr_SetString( + PyExc_RuntimeError, "Failed to get .PyRuntime address"); + } + return -1; + } + size_t size = sizeof(struct _Py_DebugOffsets); + ssize_t bytes_read = read_memory( + pid, *runtime_start_address, size, debug_offsets); + if (bytes_read < 0) { + return -1; + } + return 0; +} + +static int +read_async_debug( + int pid, + struct _Py_AsyncioModuleDebugOffsets* async_debug +) { + uintptr_t async_debug_addr = get_async_debug(pid); + if (!async_debug_addr) { + return -1; + } + size_t size = sizeof(struct _Py_AsyncioModuleDebugOffsets); + ssize_t bytes_read = read_memory( + pid, async_debug_addr, size, async_debug); + if (bytes_read < 0) { + return -1; + } + return 0; +} + +static int +find_running_frame( + int pid, + uintptr_t runtime_start_address, + _Py_DebugOffsets* local_debug_offsets, + uintptr_t *frame +) { + off_t interpreter_state_list_head = + local_debug_offsets->runtime_state.interpreters_head; + + uintptr_t address_of_interpreter_state; + int bytes_read = read_memory( pid, - (void*)(address + offsets->interpreter_frame.executable), + runtime_start_address + interpreter_state_list_head, sizeof(void*), - &address_of_code_object); + &address_of_interpreter_state); + if (bytes_read < 0) { + return -1; + } + + if (address_of_interpreter_state == 0) { + PyErr_SetString(PyExc_RuntimeError, "No interpreter state found"); + return -1; + } + + uintptr_t address_of_thread; + bytes_read = read_memory( + pid, + address_of_interpreter_state + + local_debug_offsets->interpreter_state.threads_head, + sizeof(void*), + &address_of_thread); + if (bytes_read < 0) { + return -1; + } + + // No Python frames are available for us (can happen at tear-down). + if ((void*)address_of_thread != NULL) { + int err = read_ptr( + pid, + address_of_thread + local_debug_offsets->thread_state.current_frame, + frame); + if (err) { + return -1; + } + return 0; + } + + *frame = (uintptr_t)NULL; + return 0; +} + +static int +find_running_task( + int pid, + uintptr_t runtime_start_address, + _Py_DebugOffsets *local_debug_offsets, + struct _Py_AsyncioModuleDebugOffsets *async_offsets, + uintptr_t *running_task_addr +) { + *running_task_addr = (uintptr_t)NULL; + + off_t interpreter_state_list_head = + local_debug_offsets->runtime_state.interpreters_head; + + uintptr_t address_of_interpreter_state; + int bytes_read = read_memory( + pid, + runtime_start_address + interpreter_state_list_head, + sizeof(void*), + &address_of_interpreter_state); + if (bytes_read < 0) { + return -1; + } + + if (address_of_interpreter_state == 0) { + PyErr_SetString(PyExc_RuntimeError, "No interpreter state found"); + return -1; + } + + uintptr_t address_of_thread; + bytes_read = read_memory( + pid, + address_of_interpreter_state + + local_debug_offsets->interpreter_state.threads_head, + sizeof(void*), + &address_of_thread); + if (bytes_read < 0) { + return -1; + } + + uintptr_t address_of_running_loop; + // No Python frames are available for us (can happen at tear-down). + if ((void*)address_of_thread == NULL) { + return 0; + } + + bytes_read = read_py_ptr( + pid, + address_of_thread + + async_offsets->asyncio_thread_state.asyncio_running_loop, + &address_of_running_loop); if (bytes_read == -1) { return -1; } - if (address_of_code_object == 0) { + // no asyncio loop is now running + if ((void*)address_of_running_loop == NULL) { return 0; } - address_of_code_object &= ~Py_TAG_BITS; - return parse_code_object(pid, result, offsets, (void *)address_of_code_object, previous_frame); + + int err = read_ptr( + pid, + address_of_thread + + async_offsets->asyncio_thread_state.asyncio_running_task, + running_task_addr); + if (err) { + return -1; + } + + return 0; } static PyObject* get_stack_trace(PyObject* self, PyObject* args) { -#if (!defined(__linux__) && !defined(__APPLE__)) || (defined(__linux__) && !HAVE_PROCESS_VM_READV) - PyErr_SetString(PyExc_RuntimeError, "get_stack_trace is not supported on this platform"); +#if (!defined(__linux__) && !defined(__APPLE__)) || \ + (defined(__linux__) && !HAVE_PROCESS_VM_READV) + PyErr_SetString( + PyExc_RuntimeError, + "get_stack_trace is not supported on this platform"); return NULL; #endif int pid; @@ -542,88 +1439,205 @@ get_stack_trace(PyObject* self, PyObject* args) return NULL; } - void* runtime_start_address = get_py_runtime(pid); - if (runtime_start_address == NULL) { - if (!PyErr_Occurred()) { - PyErr_SetString(PyExc_RuntimeError, "Failed to get .PyRuntime address"); - } + uintptr_t runtime_start_address = get_py_runtime(pid); + struct _Py_DebugOffsets local_debug_offsets; + + if (read_offsets(pid, &runtime_start_address, &local_debug_offsets)) { return NULL; } - size_t size = sizeof(struct _Py_DebugOffsets); - struct _Py_DebugOffsets local_debug_offsets; - ssize_t bytes_read = read_memory(pid, runtime_start_address, size, &local_debug_offsets); - if (bytes_read == -1) { + uintptr_t address_of_current_frame; + if (find_running_frame( + pid, runtime_start_address, &local_debug_offsets, + &address_of_current_frame) + ) { return NULL; } - off_t interpreter_state_list_head = local_debug_offsets.runtime_state.interpreters_head; - void* address_of_interpreter_state; - bytes_read = read_memory( - pid, - (void*)(runtime_start_address + interpreter_state_list_head), - sizeof(void*), - &address_of_interpreter_state); - if (bytes_read == -1) { + PyObject* result = PyList_New(0); + if (result == NULL) { return NULL; } - if (address_of_interpreter_state == NULL) { - PyErr_SetString(PyExc_RuntimeError, "No interpreter state found"); + while ((void*)address_of_current_frame != NULL) { + if (parse_frame_object( + pid, + result, + &local_debug_offsets, + address_of_current_frame, + &address_of_current_frame) + < 0) + { + Py_DECREF(result); + return NULL; + } + } + + return result; +} + +static PyObject* +get_async_stack_trace(PyObject* self, PyObject* args) +{ +#if (!defined(__linux__) && !defined(__APPLE__)) || \ + (defined(__linux__) && !HAVE_PROCESS_VM_READV) + PyErr_SetString( + PyExc_RuntimeError, + "get_stack_trace is not supported on this platform"); + return NULL; +#endif + int pid; + + if (!PyArg_ParseTuple(args, "i", &pid)) { return NULL; } - void* address_of_thread; - bytes_read = read_memory( - pid, - (void*)(address_of_interpreter_state + local_debug_offsets.interpreter_state.threads_head), - sizeof(void*), - &address_of_thread); - if (bytes_read == -1) { + uintptr_t runtime_start_address = get_py_runtime(pid); + struct _Py_DebugOffsets local_debug_offsets; + + if (read_offsets(pid, &runtime_start_address, &local_debug_offsets)) { return NULL; } - PyObject* result = PyList_New(0); + struct _Py_AsyncioModuleDebugOffsets local_async_debug; + if (read_async_debug(pid, &local_async_debug)) { + return NULL; + } + + PyObject* result = PyList_New(1); if (result == NULL) { return NULL; } + PyObject* calls = PyList_New(0); + if (calls == NULL) { + return NULL; + } + if (PyList_SetItem(result, 0, calls)) { /* steals ref to 'calls' */ + Py_DECREF(result); + Py_DECREF(calls); + return NULL; + } - // No Python frames are available for us (can happen at tear-down). - if (address_of_thread != NULL) { - void* address_of_current_frame; - (void)read_memory( - pid, - (void*)(address_of_thread + local_debug_offsets.thread_state.current_frame), - sizeof(void*), - &address_of_current_frame); - while (address_of_current_frame != NULL) { - if (parse_frame_object( - pid, - result, - &local_debug_offsets, - address_of_current_frame, - &address_of_current_frame) - < 0) - { - Py_DECREF(result); - return NULL; - } + uintptr_t running_task_addr = (uintptr_t)NULL; + if (find_running_task( + pid, runtime_start_address, &local_debug_offsets, &local_async_debug, + &running_task_addr) + ) { + goto result_err; + } + + if ((void*)running_task_addr == NULL) { + PyErr_SetString(PyExc_RuntimeError, "No running task found"); + goto result_err; + } + + uintptr_t running_coro_addr; + if (read_py_ptr( + pid, + running_task_addr + local_async_debug.asyncio_task_object.task_coro, + &running_coro_addr + )) { + goto result_err; + } + + if ((void*)running_coro_addr == NULL) { + PyErr_SetString(PyExc_RuntimeError, "Running task coro is NULL"); + goto result_err; + } + + // note: genobject's gi_iframe is an embedded struct so the address to + // the offset leads directly to its first field: f_executable + uintptr_t address_of_running_task_code_obj; + if (read_py_ptr( + pid, + running_coro_addr + local_debug_offsets.gen_object.gi_iframe, + &address_of_running_task_code_obj + )) { + goto result_err; + } + + if ((void*)address_of_running_task_code_obj == NULL) { + PyErr_SetString(PyExc_RuntimeError, "Running task code object is NULL"); + goto result_err; + } + + uintptr_t address_of_current_frame; + if (find_running_frame( + pid, runtime_start_address, &local_debug_offsets, + &address_of_current_frame) + ) { + goto result_err; + } + + uintptr_t address_of_code_object; + while ((void*)address_of_current_frame != NULL) { + int res = parse_async_frame_object( + pid, + calls, + &local_debug_offsets, + address_of_current_frame, + &address_of_current_frame, + &address_of_code_object + ); + + if (res < 0) { + goto result_err; + } + + if (address_of_code_object == address_of_running_task_code_obj) { + break; } } + PyObject *tn = parse_task_name( + pid, &local_debug_offsets, &local_async_debug, running_task_addr); + if (tn == NULL) { + goto result_err; + } + if (PyList_Append(result, tn)) { + Py_DECREF(tn); + goto result_err; + } + Py_DECREF(tn); + + PyObject* awaited_by = PyList_New(0); + if (awaited_by == NULL) { + goto result_err; + } + if (PyList_Append(result, awaited_by)) { + Py_DECREF(awaited_by); + goto result_err; + } + Py_DECREF(awaited_by); + + if (parse_task_awaited_by( + pid, &local_debug_offsets, &local_async_debug, + running_task_addr, awaited_by) + ) { + goto result_err; + } + return result; + +result_err: + Py_DECREF(result); + return NULL; } + static PyMethodDef methods[] = { - {"get_stack_trace", get_stack_trace, METH_VARARGS, "Get the Python stack from a given PID"}, - {NULL, NULL, 0, NULL}, + {"get_stack_trace", get_stack_trace, METH_VARARGS, + "Get the Python stack from a given PID"}, + {"get_async_stack_trace", get_async_stack_trace, METH_VARARGS, + "Get the asyncio stack from a given PID"}, + {NULL, NULL, 0, NULL}, }; static struct PyModuleDef module = { - .m_base = PyModuleDef_HEAD_INIT, - .m_name = "_testexternalinspection", - .m_size = -1, - .m_methods = methods, + .m_base = PyModuleDef_HEAD_INIT, + .m_name = "_testexternalinspection", + .m_size = -1, + .m_methods = methods, }; PyMODINIT_FUNC @@ -636,7 +1650,8 @@ PyInit__testexternalinspection(void) #ifdef Py_GIL_DISABLED PyUnstable_Module_SetGIL(mod, Py_MOD_GIL_NOT_USED); #endif - int rc = PyModule_AddIntConstant(mod, "PROCESS_VM_READV_SUPPORTED", HAVE_PROCESS_VM_READV); + int rc = PyModule_AddIntConstant( + mod, "PROCESS_VM_READV_SUPPORTED", HAVE_PROCESS_VM_READV); if (rc < 0) { Py_DECREF(mod); return NULL; diff --git a/Modules/clinic/_asynciomodule.c.h b/Modules/clinic/_asynciomodule.c.h index 794585572b13b9..c6b7e39788be71 100644 --- a/Modules/clinic/_asynciomodule.c.h +++ b/Modules/clinic/_asynciomodule.c.h @@ -9,6 +9,31 @@ preserve #include "pycore_critical_section.h"// Py_BEGIN_CRITICAL_SECTION() #include "pycore_modsupport.h" // _PyArg_UnpackKeywords() +#if !defined(_asyncio_Future__asyncio_awaited_by_DOCSTR) +# define _asyncio_Future__asyncio_awaited_by_DOCSTR NULL +#endif +#if defined(_ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF) +# undef _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF +# define _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF {"_asyncio_awaited_by", (getter)_asyncio_Future__asyncio_awaited_by_get, (setter)_asyncio_Future__asyncio_awaited_by_set, _asyncio_Future__asyncio_awaited_by_DOCSTR}, +#else +# define _ASYNCIO_FUTURE__ASYNCIO_AWAITED_BY_GETSETDEF {"_asyncio_awaited_by", (getter)_asyncio_Future__asyncio_awaited_by_get, NULL, _asyncio_Future__asyncio_awaited_by_DOCSTR}, +#endif + +static PyObject * +_asyncio_Future__asyncio_awaited_by_get_impl(FutureObj *self); + +static PyObject * +_asyncio_Future__asyncio_awaited_by_get(PyObject *self, void *Py_UNUSED(context)) +{ + PyObject *return_value = NULL; + + Py_BEGIN_CRITICAL_SECTION(self); + return_value = _asyncio_Future__asyncio_awaited_by_get_impl((FutureObj *)self); + Py_END_CRITICAL_SECTION(); + + return return_value; +} + PyDoc_STRVAR(_asyncio_Future___init____doc__, "Future(*, loop=None)\n" "--\n" @@ -2088,4 +2113,65 @@ _asyncio_all_tasks(PyObject *module, PyObject *const *args, Py_ssize_t nargs, Py exit: return return_value; } -/*[clinic end generated code: output=ec2fa1d60b094978 input=a9049054013a1b77]*/ + +PyDoc_STRVAR(_asyncio_future_add_to_awaited_by__doc__, +"future_add_to_awaited_by($module, fut, waiter, /)\n" +"--\n" +"\n" +"Record that `fut` is awaited on by `waiter`."); + +#define _ASYNCIO_FUTURE_ADD_TO_AWAITED_BY_METHODDEF \ + {"future_add_to_awaited_by", _PyCFunction_CAST(_asyncio_future_add_to_awaited_by), METH_FASTCALL, _asyncio_future_add_to_awaited_by__doc__}, + +static PyObject * +_asyncio_future_add_to_awaited_by_impl(PyObject *module, PyObject *fut, + PyObject *waiter); + +static PyObject * +_asyncio_future_add_to_awaited_by(PyObject *module, PyObject *const *args, Py_ssize_t nargs) +{ + PyObject *return_value = NULL; + PyObject *fut; + PyObject *waiter; + + if (!_PyArg_CheckPositional("future_add_to_awaited_by", nargs, 2, 2)) { + goto exit; + } + fut = args[0]; + waiter = args[1]; + return_value = _asyncio_future_add_to_awaited_by_impl(module, fut, waiter); + +exit: + return return_value; +} + +PyDoc_STRVAR(_asyncio_future_discard_from_awaited_by__doc__, +"future_discard_from_awaited_by($module, fut, waiter, /)\n" +"--\n" +"\n"); + +#define _ASYNCIO_FUTURE_DISCARD_FROM_AWAITED_BY_METHODDEF \ + {"future_discard_from_awaited_by", _PyCFunction_CAST(_asyncio_future_discard_from_awaited_by), METH_FASTCALL, _asyncio_future_discard_from_awaited_by__doc__}, + +static PyObject * +_asyncio_future_discard_from_awaited_by_impl(PyObject *module, PyObject *fut, + PyObject *waiter); + +static PyObject * +_asyncio_future_discard_from_awaited_by(PyObject *module, PyObject *const *args, Py_ssize_t nargs) +{ + PyObject *return_value = NULL; + PyObject *fut; + PyObject *waiter; + + if (!_PyArg_CheckPositional("future_discard_from_awaited_by", nargs, 2, 2)) { + goto exit; + } + fut = args[0]; + waiter = args[1]; + return_value = _asyncio_future_discard_from_awaited_by_impl(module, fut, waiter); + +exit: + return return_value; +} +/*[clinic end generated code: output=fe4ffe08404ad566 input=a9049054013a1b77]*/ diff --git a/Objects/frameobject.c b/Objects/frameobject.c index d6b4065e1302bc..44b3a2a75626c9 100644 --- a/Objects/frameobject.c +++ b/Objects/frameobject.c @@ -1672,6 +1672,15 @@ frame_settrace(PyFrameObject *f, PyObject* v, void *closure) return 0; } +static PyObject * +frame_getgenerator(PyFrameObject *f, void *arg) { + if (f->f_frame->owner == FRAME_OWNED_BY_GENERATOR) { + PyObject *gen = (PyObject *)_PyGen_GetGeneratorFromFrame(f->f_frame); + return Py_NewRef(gen); + } + Py_RETURN_NONE; +} + static PyGetSetDef frame_getsetlist[] = { {"f_back", (getter)frame_getback, NULL, NULL}, @@ -1684,6 +1693,7 @@ static PyGetSetDef frame_getsetlist[] = { {"f_builtins", (getter)frame_getbuiltins, NULL, NULL}, {"f_code", (getter)frame_getcode, NULL, NULL}, {"f_trace_opcodes", (getter)frame_gettrace_opcodes, (setter)frame_settrace_opcodes, NULL}, + {"f_generator", (getter)frame_getgenerator, NULL, NULL}, {0} }; diff --git a/Objects/genobject.c b/Objects/genobject.c index b32140766c4a38..df66881cf1e37d 100644 --- a/Objects/genobject.c +++ b/Objects/genobject.c @@ -1146,7 +1146,6 @@ cr_getcode(PyObject *coro, void *Py_UNUSED(ignored)) return _gen_getcode(_PyGen_CAST(coro), "cr_code"); } - static PyGetSetDef coro_getsetlist[] = { {"__name__", gen_get_name, gen_set_name, PyDoc_STR("name of the coroutine")}, diff --git a/Python/pylifecycle.c b/Python/pylifecycle.c index ea8a291a8e5eb4..f6526725d5dccc 100644 --- a/Python/pylifecycle.c +++ b/Python/pylifecycle.c @@ -111,23 +111,7 @@ static void call_ll_exitfuncs(_PyRuntimeState *runtime); _Py_COMP_DIAG_PUSH _Py_COMP_DIAG_IGNORE_DEPR_DECLS -#if defined(MS_WINDOWS) - -#pragma section("PyRuntime", read, write) -__declspec(allocate("PyRuntime")) - -#elif defined(__APPLE__) - -__attribute__(( - section(SEG_DATA ",PyRuntime") -)) - -#endif - -_PyRuntimeState _PyRuntime -#if defined(__linux__) && (defined(__GNUC__) || defined(__clang__)) -__attribute__ ((section (".PyRuntime"))) -#endif +GENERATE_DEBUG_SECTION(PyRuntime, _PyRuntimeState _PyRuntime) = _PyRuntimeState_INIT(_PyRuntime, _Py_Debug_Cookie); _Py_COMP_DIAG_POP diff --git a/Python/pystate.c b/Python/pystate.c index e5003021b83f00..26047edb459480 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1515,6 +1515,7 @@ init_threadstate(_PyThreadStateImpl *_tstate, tstate->dict_global_version = 0; _tstate->asyncio_running_loop = NULL; + _tstate->asyncio_running_task = NULL; tstate->delete_later = NULL; @@ -1697,6 +1698,7 @@ PyThreadState_Clear(PyThreadState *tstate) Py_CLEAR(tstate->threading_local_sentinel); Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop); + Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_task); Py_CLEAR(tstate->dict); Py_CLEAR(tstate->async_exc); diff --git a/Tools/c-analyzer/cpython/ignored.tsv b/Tools/c-analyzer/cpython/ignored.tsv index da2cfedfd802c8..1aabe262eac480 100644 --- a/Tools/c-analyzer/cpython/ignored.tsv +++ b/Tools/c-analyzer/cpython/ignored.tsv @@ -53,6 +53,9 @@ Python/pyhash.c - _Py_HashSecret - ## thread-safe hashtable (internal locks) Python/parking_lot.c - buckets - +## data needed for introspecting asyncio state from debuggers and profilers +Modules/_asynciomodule.c - AsyncioDebug - + ################################## ## state tied to Py_Main()