Skip to content

Commit 0169a30

Browse files
committed
Queue decrefs
1 parent ad58b75 commit 0169a30

File tree

3 files changed

+67
-15
lines changed

3 files changed

+67
-15
lines changed

Include/internal/pycore_pymem.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,13 @@ static inline void _PyObject_XDecRefDelayed(PyObject *obj)
132132
// Periodically process delayed free requests.
133133
extern void _PyMem_ProcessDelayed(PyThreadState *tstate);
134134

135+
136+
// Periodically process delayed free requests when the world is stopped.
137+
// Notify of any objects whic should be freeed.
138+
typedef void (*delayed_dealloc_cb)(PyObject *, void *);
139+
extern void _PyMem_ProcessDelayedNoDealloc(PyThreadState *tstate,
140+
delayed_dealloc_cb cb, void *state);
141+
135142
// Abandon all thread-local delayed free requests and push them to the
136143
// interpreter's queue.
137144
extern void _PyMem_AbandonDelayed(PyThreadState *tstate);

Objects/obmalloc.c

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,10 +1093,20 @@ struct _mem_work_chunk {
10931093
};
10941094

10951095
static void
1096-
free_work_item(uintptr_t ptr)
1096+
free_work_item(uintptr_t ptr, delayed_dealloc_cb cb, void *state)
10971097
{
10981098
if (ptr & 0x01) {
1099-
Py_DECREF((PyObject*)(char *)(ptr - 1));
1099+
PyObject *obj = (PyObject*)(char *)(ptr - 1);
1100+
if (cb == NULL) {
1101+
assert(!_PyInterpreterState_GET()->stoptheworld.world_stopped);
1102+
Py_DECREF(obj);
1103+
return;
1104+
}
1105+
1106+
Py_ssize_t refcount = _Py_ExplicitMergeRefcount(obj, -1);
1107+
if (refcount == 0) {
1108+
cb(obj, state);
1109+
}
11001110
}
11011111
else {
11021112
PyMem_Free((void *)ptr);
@@ -1107,15 +1117,16 @@ static void
11071117
free_delayed(uintptr_t ptr)
11081118
{
11091119
#ifndef Py_GIL_DISABLED
1110-
free_work_item(ptr);
1120+
free_work_item(ptr, NULL, NULL);
11111121
#else
11121122
PyInterpreterState *interp = _PyInterpreterState_GET();
11131123
if (_PyInterpreterState_GetFinalizing(interp) != NULL ||
11141124
interp->stoptheworld.world_stopped)
11151125
{
11161126
// Free immediately during interpreter shutdown or if the world is
11171127
// stopped.
1118-
free_work_item(ptr);
1128+
assert(!interp->stoptheworld.world_stopped || !(ptr & 0x01));
1129+
free_work_item(ptr, NULL, NULL);
11191130
return;
11201131
}
11211132

@@ -1142,7 +1153,8 @@ free_delayed(uintptr_t ptr)
11421153
if (buf == NULL) {
11431154
// failed to allocate a buffer, free immediately
11441155
_PyEval_StopTheWorld(tstate->base.interp);
1145-
free_work_item(ptr);
1156+
// TODO: Fix me
1157+
free_work_item(ptr, NULL, NULL);
11461158
_PyEval_StartTheWorld(tstate->base.interp);
11471159
return;
11481160
}
@@ -1185,7 +1197,7 @@ work_queue_first(struct llist_node *head)
11851197

11861198
static void
11871199
process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr,
1188-
bool keep_empty)
1200+
bool keep_empty, delayed_dealloc_cb cb, void *state)
11891201
{
11901202
while (!llist_empty(head)) {
11911203
struct _mem_work_chunk *buf = work_queue_first(head);
@@ -1196,7 +1208,7 @@ process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr,
11961208
return;
11971209
}
11981210

1199-
free_work_item(item->ptr);
1211+
free_work_item(item->ptr, cb, state);
12001212
buf->rd_idx++;
12011213
}
12021214

@@ -1214,15 +1226,16 @@ process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr,
12141226

12151227
static void
12161228
process_interp_queue(struct _Py_mem_interp_free_queue *queue,
1217-
struct _qsbr_thread_state *qsbr)
1229+
struct _qsbr_thread_state *qsbr, delayed_dealloc_cb cb,
1230+
void *state)
12181231
{
12191232
if (!_Py_atomic_load_int_relaxed(&queue->has_work)) {
12201233
return;
12211234
}
12221235

12231236
// Try to acquire the lock, but don't block if it's already held.
12241237
if (_PyMutex_LockTimed(&queue->mutex, 0, 0) == PY_LOCK_ACQUIRED) {
1225-
process_queue(&queue->head, qsbr, false);
1238+
process_queue(&queue->head, qsbr, false, cb, state);
12261239

12271240
int more_work = !llist_empty(&queue->head);
12281241
_Py_atomic_store_int_relaxed(&queue->has_work, more_work);
@@ -1238,10 +1251,23 @@ _PyMem_ProcessDelayed(PyThreadState *tstate)
12381251
_PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)tstate;
12391252

12401253
// Process thread-local work
1241-
process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true);
1254+
process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true, NULL, NULL);
1255+
1256+
// Process shared interpreter work
1257+
process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr, NULL, NULL);
1258+
}
1259+
1260+
void
1261+
_PyMem_ProcessDelayedNoDealloc(PyThreadState *tstate, delayed_dealloc_cb cb, void *state)
1262+
{
1263+
PyInterpreterState *interp = tstate->interp;
1264+
_PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)tstate;
1265+
1266+
// Process thread-local work
1267+
process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true, cb, state);
12421268

12431269
// Process shared interpreter work
1244-
process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr);
1270+
process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr, cb, state);
12451271
}
12461272

12471273
void
@@ -1283,7 +1309,7 @@ _PyMem_FiniDelayed(PyInterpreterState *interp)
12831309
// Free the remaining items immediately. There should be no other
12841310
// threads accessing the memory at this point during shutdown.
12851311
struct _mem_work_item *item = &buf->array[buf->rd_idx];
1286-
free_work_item(item->ptr);
1312+
free_work_item(item->ptr, NULL, NULL);
12871313
buf->rd_idx++;
12881314
}
12891315

Python/gc_free_threading.c

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,25 @@ merge_queued_objects(_PyThreadStateImpl *tstate, struct collection_state *state)
417417
}
418418

419419
static void
420-
process_delayed_frees(PyInterpreterState *interp)
420+
queue_freed_object(PyObject *obj, void *arg)
421+
{
422+
struct collection_state *state = (struct collection_state *)arg;
423+
424+
// GC objects with zero refcount are handled subsequently by the
425+
// GC as if they were cyclic trash, but we have to handle dead
426+
// non-GC objects here. Add one to the refcount so that we can
427+
// decref and deallocate the object once we start the world again.
428+
if (!_PyObject_GC_IS_TRACKED(obj)) {
429+
obj->ob_ref_shared += (1 << _Py_REF_SHARED_SHIFT);
430+
#ifdef Py_REF_DEBUG
431+
_Py_IncRefTotal(_PyThreadState_GET());
432+
#endif
433+
worklist_push(&state->objs_to_decref, obj);
434+
}
435+
}
436+
437+
static void
438+
process_delayed_frees(PyInterpreterState *interp, struct collection_state *state)
421439
{
422440
// In STW status, we can observe the latest write sequence by
423441
// advancing the write sequence immediately.
@@ -426,8 +444,9 @@ process_delayed_frees(PyInterpreterState *interp)
426444
_Py_qsbr_quiescent_state(current_tstate->qsbr);
427445
HEAD_LOCK(&_PyRuntime);
428446
PyThreadState *tstate = interp->threads.head;
447+
429448
while (tstate != NULL) {
430-
_PyMem_ProcessDelayed(tstate);
449+
_PyMem_ProcessDelayedNoDealloc(tstate, queue_freed_object, state);
431450
tstate = (PyThreadState *)tstate->next;
432451
}
433452
HEAD_UNLOCK(&_PyRuntime);
@@ -1233,7 +1252,7 @@ gc_collect_internal(PyInterpreterState *interp, struct collection_state *state,
12331252
}
12341253
HEAD_UNLOCK(&_PyRuntime);
12351254

1236-
process_delayed_frees(interp);
1255+
process_delayed_frees(interp, state);
12371256

12381257
// Find unreachable objects
12391258
int err = deduce_unreachable_heap(interp, state);

0 commit comments

Comments
 (0)