Skip to content

gh-114940: Add _Py_FOR_EACH_TSTATE_UNLOCKED(), and Friends #127077

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Include/internal/pycore_pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@ extern int _PyOS_InterruptOccurred(PyThreadState *tstate);
#define HEAD_UNLOCK(runtime) \
PyMutex_Unlock(&(runtime)->interpreters.mutex)

#define _Py_FOR_EACH_TSTATE_UNLOCKED(interp, t) \
for (PyThreadState *t = interp->threads.head; t; t = t->next)
#define _Py_FOR_EACH_TSTATE_BEGIN(interp, t) \
HEAD_LOCK(interp->runtime); \
_Py_FOR_EACH_TSTATE_UNLOCKED(interp, t)
#define _Py_FOR_EACH_TSTATE_END(interp) \
HEAD_UNLOCK(interp->runtime)


// Get the configuration of the current interpreter.
// The caller must hold the GIL.
// Export for test_peg_generator.
Expand Down
6 changes: 4 additions & 2 deletions Objects/codeobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -2871,20 +2871,22 @@ get_indices_in_use(PyInterpreterState *interp, struct flag_set *in_use)
assert(interp->stoptheworld.world_stopped);
assert(in_use->flags == NULL);
int32_t max_index = 0;
for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
int32_t idx = ((_PyThreadStateImpl *) p)->tlbc_index;
if (idx > max_index) {
max_index = idx;
}
}
_Py_FOR_EACH_TSTATE_END(interp);
in_use->size = (size_t) max_index + 1;
in_use->flags = PyMem_Calloc(in_use->size, sizeof(*in_use->flags));
if (in_use->flags == NULL) {
return -1;
}
for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
in_use->flags[((_PyThreadStateImpl *) p)->tlbc_index] = 1;
}
_Py_FOR_EACH_TSTATE_END(interp);
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion Objects/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ get_reftotal(PyInterpreterState *interp)
since we can't determine which interpreter updated it. */
Py_ssize_t total = REFTOTAL(interp);
#ifdef Py_GIL_DISABLED
for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) {
_Py_FOR_EACH_TSTATE_UNLOCKED(interp, p) {
/* This may race with other threads modifications to their reftotal */
_PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)p;
total += _Py_atomic_load_ssize_relaxed(&tstate_impl->reftotal);
Expand Down
2 changes: 1 addition & 1 deletion Objects/obmalloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1405,7 +1405,7 @@ get_mimalloc_allocated_blocks(PyInterpreterState *interp)
{
size_t allocated_blocks = 0;
#ifdef Py_GIL_DISABLED
for (PyThreadState *t = interp->threads.head; t != NULL; t = t->next) {
_Py_FOR_EACH_TSTATE_UNLOCKED(interp, t) {
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *)t;
for (int i = 0; i < _Py_MIMALLOC_HEAP_COUNT; i++) {
mi_heap_t *heap = &tstate->mimalloc.heaps[i];
Expand Down
3 changes: 2 additions & 1 deletion Python/ceval.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,12 @@ Py_SetRecursionLimit(int new_limit)
{
PyInterpreterState *interp = _PyInterpreterState_GET();
interp->ceval.recursion_limit = new_limit;
for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
int depth = p->py_recursion_limit - p->py_recursion_remaining;
p->py_recursion_limit = new_limit;
p->py_recursion_remaining = new_limit - depth;
}
_Py_FOR_EACH_TSTATE_END(interp);
}

/* The function _Py_EnterRecursiveCallTstate() only calls _Py_CheckRecursiveCall()
Expand Down
14 changes: 4 additions & 10 deletions Python/ceval_gil.c
Original file line number Diff line number Diff line change
Expand Up @@ -977,25 +977,19 @@ make_pending_calls(PyThreadState *tstate)
void
_Py_set_eval_breaker_bit_all(PyInterpreterState *interp, uintptr_t bit)
{
_PyRuntimeState *runtime = &_PyRuntime;

HEAD_LOCK(runtime);
for (PyThreadState *tstate = interp->threads.head; tstate != NULL; tstate = tstate->next) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, tstate) {
_Py_set_eval_breaker_bit(tstate, bit);
}
HEAD_UNLOCK(runtime);
_Py_FOR_EACH_TSTATE_END(interp);
}

void
_Py_unset_eval_breaker_bit_all(PyInterpreterState *interp, uintptr_t bit)
{
_PyRuntimeState *runtime = &_PyRuntime;

HEAD_LOCK(runtime);
for (PyThreadState *tstate = interp->threads.head; tstate != NULL; tstate = tstate->next) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, tstate) {
_Py_unset_eval_breaker_bit(tstate, bit);
}
HEAD_UNLOCK(runtime);
_Py_FOR_EACH_TSTATE_END(interp);
}

void
Expand Down
25 changes: 10 additions & 15 deletions Python/gc_free_threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ gc_visit_heaps_lock_held(PyInterpreterState *interp, mi_block_visit_fun *visitor
Py_ssize_t offset_pre = offset_base + 2 * sizeof(PyObject*);

// visit each thread's heaps for GC objects
for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) {
_Py_FOR_EACH_TSTATE_UNLOCKED(interp, p) {
struct _mimalloc_thread_state *m = &((_PyThreadStateImpl *)p)->mimalloc;
if (!_Py_atomic_load_int(&m->initialized)) {
// The thread may not have called tstate_mimalloc_bind() yet.
Expand Down Expand Up @@ -374,8 +374,7 @@ gc_visit_stackref(_PyStackRef stackref)
static void
gc_visit_thread_stacks(PyInterpreterState *interp)
{
HEAD_LOCK(&_PyRuntime);
for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
for (_PyInterpreterFrame *f = p->current_frame; f != NULL; f = f->previous) {
PyObject *executable = PyStackRef_AsPyObjectBorrow(f->f_executable);
if (executable == NULL || !PyCode_Check(executable)) {
Expand All @@ -390,7 +389,7 @@ gc_visit_thread_stacks(PyInterpreterState *interp)
}
}
}
HEAD_UNLOCK(&_PyRuntime);
_Py_FOR_EACH_TSTATE_END(interp);
}

static void
Expand Down Expand Up @@ -429,14 +428,13 @@ process_delayed_frees(PyInterpreterState *interp)

// Merge the queues from other threads into our own queue so that we can
// process all of the pending delayed free requests at once.
HEAD_LOCK(&_PyRuntime);
for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
_PyThreadStateImpl *other = (_PyThreadStateImpl *)p;
if (other != current_tstate) {
llist_concat(&current_tstate->mem_free_queue, &other->mem_free_queue);
}
}
HEAD_UNLOCK(&_PyRuntime);
_Py_FOR_EACH_TSTATE_END(interp);

_PyMem_ProcessDelayed((PyThreadState *)current_tstate);
}
Expand Down Expand Up @@ -1226,8 +1224,7 @@ gc_collect_internal(PyInterpreterState *interp, struct collection_state *state,
state->gcstate->old[i-1].count = 0;
}

HEAD_LOCK(&_PyRuntime);
for (PyThreadState *p = interp->threads.head; p != NULL; p = p->next) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *)p;

// merge per-thread refcount for types into the type's actual refcount
Expand All @@ -1236,7 +1233,7 @@ gc_collect_internal(PyInterpreterState *interp, struct collection_state *state,
// merge refcounts for all queued objects
merge_queued_objects(tstate, state);
}
HEAD_UNLOCK(&_PyRuntime);
_Py_FOR_EACH_TSTATE_END(interp);

process_delayed_frees(interp);

Expand Down Expand Up @@ -1991,13 +1988,11 @@ PyUnstable_GC_VisitObjects(gcvisitobjects_t callback, void *arg)
void
_PyGC_ClearAllFreeLists(PyInterpreterState *interp)
{
HEAD_LOCK(&_PyRuntime);
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *)interp->threads.head;
while (tstate != NULL) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *)p;
_PyObject_ClearFreeLists(&tstate->freelists, 0);
tstate = (_PyThreadStateImpl *)tstate->base.next;
}
HEAD_UNLOCK(&_PyRuntime);
_Py_FOR_EACH_TSTATE_END(interp);
}

#endif // Py_GIL_DISABLED
7 changes: 2 additions & 5 deletions Python/instrumentation.c
Original file line number Diff line number Diff line change
Expand Up @@ -1006,13 +1006,10 @@ set_global_version(PyThreadState *tstate, uint32_t version)

#ifdef Py_GIL_DISABLED
// Set the version on all threads in free-threaded builds.
_PyRuntimeState *runtime = &_PyRuntime;
HEAD_LOCK(runtime);
for (tstate = interp->threads.head; tstate;
tstate = PyThreadState_Next(tstate)) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, tstate) {
set_version_raw(&tstate->eval_breaker, version);
};
HEAD_UNLOCK(runtime);
_Py_FOR_EACH_TSTATE_END(interp);
#else
// Normal builds take the current version from instrumentation_version when
// attaching a thread, so we only have to set the current thread's version.
Expand Down
98 changes: 46 additions & 52 deletions Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -790,18 +790,15 @@ interpreter_clear(PyInterpreterState *interp, PyThreadState *tstate)
}

// Clear the current/main thread state last.
HEAD_LOCK(runtime);
PyThreadState *p = interp->threads.head;
HEAD_UNLOCK(runtime);
while (p != NULL) {
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
// See https://github.com/python/cpython/issues/102126
// Must be called without HEAD_LOCK held as it can deadlock
// if any finalizer tries to acquire that lock.
HEAD_UNLOCK(runtime);
PyThreadState_Clear(p);
HEAD_LOCK(runtime);
p = p->next;
HEAD_UNLOCK(runtime);
}
_Py_FOR_EACH_TSTATE_END(interp);
if (tstate->interp == interp) {
/* We fix tstate->_status below when we for sure aren't using it
(e.g. no longer need the GIL). */
Expand Down Expand Up @@ -1796,10 +1793,9 @@ tstate_delete_common(PyThreadState *tstate, int release_gil)
static void
zapthreads(PyInterpreterState *interp)
{
PyThreadState *tstate;
/* No need to lock the mutex here because this should only happen
when the threads are all really dead (XXX famous last words). */
while ((tstate = interp->threads.head) != NULL) {
_Py_FOR_EACH_TSTATE_UNLOCKED(interp, tstate) {
tstate_verify_not_active(tstate);
tstate_delete_common(tstate, 0);
free_threadstate((_PyThreadStateImpl *)tstate);
Expand Down Expand Up @@ -2156,7 +2152,7 @@ decrement_stoptheworld_countdown(struct _stoptheworld_state *stw)
}

#ifdef Py_GIL_DISABLED
// Interpreter for _Py_FOR_EACH_THREAD(). For global stop-the-world events,
// Interpreter for _Py_FOR_EACH_STW_INTERP(). For global stop-the-world events,
// we start with the first interpreter and then iterate over all interpreters.
// For per-interpreter stop-the-world events, we only operate on the one
// interpreter.
Expand All @@ -2171,10 +2167,9 @@ interp_for_stop_the_world(struct _stoptheworld_state *stw)
// Loops over threads for a stop-the-world event.
// For global: all threads in all interpreters
// For per-interpreter: all threads in the interpreter
#define _Py_FOR_EACH_THREAD(stw, i, t) \
for (i = interp_for_stop_the_world((stw)); \
i != NULL; i = ((stw->is_global) ? i->next : NULL)) \
for (t = i->threads.head; t; t = t->next)
#define _Py_FOR_EACH_STW_INTERP(stw, i) \
for (PyInterpreterState *i = interp_for_stop_the_world((stw)); \
i != NULL; i = ((stw->is_global) ? i->next : NULL))


// Try to transition threads atomically from the "detached" state to the
Expand All @@ -2183,19 +2178,19 @@ static bool
park_detached_threads(struct _stoptheworld_state *stw)
{
int num_parked = 0;
PyInterpreterState *i;
PyThreadState *t;
_Py_FOR_EACH_THREAD(stw, i, t) {
int state = _Py_atomic_load_int_relaxed(&t->state);
if (state == _Py_THREAD_DETACHED) {
// Atomically transition to "suspended" if in "detached" state.
if (_Py_atomic_compare_exchange_int(&t->state,
&state, _Py_THREAD_SUSPENDED)) {
num_parked++;
_Py_FOR_EACH_STW_INTERP(stw, i) {
_Py_FOR_EACH_TSTATE_UNLOCKED(i, t) {
int state = _Py_atomic_load_int_relaxed(&t->state);
if (state == _Py_THREAD_DETACHED) {
// Atomically transition to "suspended" if in "detached" state.
if (_Py_atomic_compare_exchange_int(
&t->state, &state, _Py_THREAD_SUSPENDED)) {
num_parked++;
}
}
else if (state == _Py_THREAD_ATTACHED && t != stw->requester) {
_Py_set_eval_breaker_bit(t, _PY_EVAL_PLEASE_STOP_BIT);
}
}
else if (state == _Py_THREAD_ATTACHED && t != stw->requester) {
_Py_set_eval_breaker_bit(t, _PY_EVAL_PLEASE_STOP_BIT);
}
}
stw->thread_countdown -= num_parked;
Expand All @@ -2222,12 +2217,12 @@ stop_the_world(struct _stoptheworld_state *stw)
stw->stop_event = (PyEvent){0}; // zero-initialize (unset)
stw->requester = _PyThreadState_GET(); // may be NULL

PyInterpreterState *i;
PyThreadState *t;
_Py_FOR_EACH_THREAD(stw, i, t) {
if (t != stw->requester) {
// Count all the other threads (we don't wait on ourself).
stw->thread_countdown++;
_Py_FOR_EACH_STW_INTERP(stw, i) {
_Py_FOR_EACH_TSTATE_UNLOCKED(i, t) {
if (t != stw->requester) {
// Count all the other threads (we don't wait on ourself).
stw->thread_countdown++;
}
}
}

Expand Down Expand Up @@ -2268,14 +2263,14 @@ start_the_world(struct _stoptheworld_state *stw)
stw->requested = 0;
stw->world_stopped = 0;
// Switch threads back to the detached state.
PyInterpreterState *i;
PyThreadState *t;
_Py_FOR_EACH_THREAD(stw, i, t) {
if (t != stw->requester) {
assert(_Py_atomic_load_int_relaxed(&t->state) ==
_Py_THREAD_SUSPENDED);
_Py_atomic_store_int(&t->state, _Py_THREAD_DETACHED);
_PyParkingLot_UnparkAll(&t->state);
_Py_FOR_EACH_STW_INTERP(stw, i) {
_Py_FOR_EACH_TSTATE_UNLOCKED(i, t) {
if (t != stw->requester) {
assert(_Py_atomic_load_int_relaxed(&t->state) ==
_Py_THREAD_SUSPENDED);
_Py_atomic_store_int(&t->state, _Py_THREAD_DETACHED);
_PyParkingLot_UnparkAll(&t->state);
}
}
}
stw->requester = NULL;
Expand Down Expand Up @@ -2339,7 +2334,6 @@ _PyEval_StartTheWorld(PyInterpreterState *interp)
int
PyThreadState_SetAsyncExc(unsigned long id, PyObject *exc)
{
_PyRuntimeState *runtime = &_PyRuntime;
PyInterpreterState *interp = _PyInterpreterState_GET();

/* Although the GIL is held, a few C API functions can be called
Expand All @@ -2348,12 +2342,16 @@ PyThreadState_SetAsyncExc(unsigned long id, PyObject *exc)
* list of thread states we're traversing, so to prevent that we lock
* head_mutex for the duration.
*/
HEAD_LOCK(runtime);
for (PyThreadState *tstate = interp->threads.head; tstate != NULL; tstate = tstate->next) {
if (tstate->thread_id != id) {
continue;
PyThreadState *tstate = NULL;
_Py_FOR_EACH_TSTATE_BEGIN(interp, t) {
if (t->thread_id == id) {
tstate = t;
break;
}
}
_Py_FOR_EACH_TSTATE_END(interp);

if (tstate != NULL) {
/* Tricky: we need to decref the current value
* (if any) in tstate->async_exc, but that can in turn
* allow arbitrary Python code to run, including
Expand All @@ -2363,14 +2361,12 @@ PyThreadState_SetAsyncExc(unsigned long id, PyObject *exc)
*/
Py_XINCREF(exc);
PyObject *old_exc = _Py_atomic_exchange_ptr(&tstate->async_exc, exc);
HEAD_UNLOCK(runtime);

Py_XDECREF(old_exc);
_Py_set_eval_breaker_bit(tstate, _PY_ASYNC_EXCEPTION_BIT);
return 1;
}
HEAD_UNLOCK(runtime);
return 0;

return tstate != NULL;
}

//---------------------------------
Expand Down Expand Up @@ -2510,8 +2506,7 @@ _PyThread_CurrentFrames(void)
HEAD_LOCK(runtime);
PyInterpreterState *i;
for (i = runtime->interpreters.head; i != NULL; i = i->next) {
PyThreadState *t;
for (t = i->threads.head; t != NULL; t = t->next) {
_Py_FOR_EACH_TSTATE_UNLOCKED(i, t) {
_PyInterpreterFrame *frame = t->current_frame;
frame = _PyFrame_GetFirstComplete(frame);
if (frame == NULL) {
Expand Down Expand Up @@ -2576,8 +2571,7 @@ _PyThread_CurrentExceptions(void)
HEAD_LOCK(runtime);
PyInterpreterState *i;
for (i = runtime->interpreters.head; i != NULL; i = i->next) {
PyThreadState *t;
for (t = i->threads.head; t != NULL; t = t->next) {
_Py_FOR_EACH_TSTATE_UNLOCKED(i, t) {
_PyErr_StackItem *err_info = _PyErr_GetTopmostException(t);
if (err_info == NULL) {
continue;
Expand Down
Loading