Skip to content

Commit a5900ec

Browse files
authored
bpo-40221: Update multiprocessing to use _at_fork_reinit (GH-19511)
1 parent e560f90 commit a5900ec

File tree

2 files changed

+10
-9
lines changed

2 files changed

+10
-9
lines changed

Lib/multiprocessing/queues.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ def __init__(self, maxsize=0, *, ctx):
4949
self._sem = ctx.BoundedSemaphore(maxsize)
5050
# For use by concurrent.futures
5151
self._ignore_epipe = False
52-
53-
self._after_fork()
52+
self._reset()
5453

5554
if sys.platform != 'win32':
5655
register_after_fork(self, Queue._after_fork)
@@ -63,11 +62,17 @@ def __getstate__(self):
6362
def __setstate__(self, state):
6463
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
6564
self._rlock, self._wlock, self._sem, self._opid) = state
66-
self._after_fork()
65+
self._reset()
6766

6867
def _after_fork(self):
6968
debug('Queue._after_fork()')
70-
self._notempty = threading.Condition(threading.Lock())
69+
self._reset(after_fork=True)
70+
71+
def _reset(self, after_fork=False):
72+
if after_fork:
73+
self._notempty._at_fork_reinit()
74+
else:
75+
self._notempty = threading.Condition(threading.Lock())
7176
self._buffer = collections.deque()
7277
self._thread = None
7378
self._jointhread = None

Lib/multiprocessing/resource_sharer.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ class _ResourceSharer(object):
6363
def __init__(self):
6464
self._key = 0
6565
self._cache = {}
66-
self._old_locks = []
6766
self._lock = threading.Lock()
6867
self._listener = None
6968
self._address = None
@@ -113,10 +112,7 @@ def _afterfork(self):
113112
for key, (send, close) in self._cache.items():
114113
close()
115114
self._cache.clear()
116-
# If self._lock was locked at the time of the fork, it may be broken
117-
# -- see issue 6721. Replace it without letting it be gc'ed.
118-
self._old_locks.append(self._lock)
119-
self._lock = threading.Lock()
115+
self._lock._at_fork_reinit()
120116
if self._listener is not None:
121117
self._listener.close()
122118
self._listener = None

0 commit comments

Comments
 (0)