From 7121b4ab2c372f7d7436fdbd7b72198b97bd308e Mon Sep 17 00:00:00 2001 From: Dong-hee Na Date: Wed, 15 Apr 2020 00:24:20 +0900 Subject: [PATCH 1/2] bpo-40221: Update multiprocessing to use _at_fork_reinit --- Lib/multiprocessing/queues.py | 12 ++++++++---- Lib/multiprocessing/resource_sharer.py | 6 +----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 835070118387ea..9e6f411941ec75 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -49,8 +49,8 @@ def __init__(self, maxsize=0, *, ctx): self._sem = ctx.BoundedSemaphore(maxsize) # For use by concurrent.futures self._ignore_epipe = False - - self._after_fork() + self._notempty = threading.Condition(threading.Lock()) + self._reset() if sys.platform != 'win32': register_after_fork(self, Queue._after_fork) @@ -63,11 +63,15 @@ def __getstate__(self): def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state - self._after_fork() + self._notempty = threading.Condition(threading.Lock()) + self._reset() def _after_fork(self): debug('Queue._after_fork()') - self._notempty = threading.Condition(threading.Lock()) + self._notempty._at_fork_reinit() + self._reset() + + def _reset(self): self._buffer = collections.deque() self._thread = None self._jointhread = None diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py index 8d5c9900f69fed..66076509a1202e 100644 --- a/Lib/multiprocessing/resource_sharer.py +++ b/Lib/multiprocessing/resource_sharer.py @@ -63,7 +63,6 @@ class _ResourceSharer(object): def __init__(self): self._key = 0 self._cache = {} - self._old_locks = [] self._lock = threading.Lock() self._listener = None self._address = None @@ -113,10 +112,7 @@ def _afterfork(self): for key, (send, close) in self._cache.items(): close() self._cache.clear() - # If self._lock was locked at the time of the fork, it may be broken - # -- see issue 6721. Replace it without letting it be gc'ed. - self._old_locks.append(self._lock) - self._lock = threading.Lock() + self._lock._at_fork_reinit() if self._listener is not None: self._listener.close() self._listener = None From fe3f6e3e7021701e0349b85aa47b5ba14a74a2d6 Mon Sep 17 00:00:00 2001 From: Dong-hee Na Date: Wed, 15 Apr 2020 01:05:19 +0900 Subject: [PATCH 2/2] bpo-40221: Apply code review --- Lib/multiprocessing/queues.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 9e6f411941ec75..c0a284d10c8070 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -49,7 +49,6 @@ def __init__(self, maxsize=0, *, ctx): self._sem = ctx.BoundedSemaphore(maxsize) # For use by concurrent.futures self._ignore_epipe = False - self._notempty = threading.Condition(threading.Lock()) self._reset() if sys.platform != 'win32': @@ -63,15 +62,17 @@ def __getstate__(self): def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state - self._notempty = threading.Condition(threading.Lock()) self._reset() def _after_fork(self): debug('Queue._after_fork()') - self._notempty._at_fork_reinit() - self._reset() + self._reset(after_fork=True) - def _reset(self): + def _reset(self, after_fork=False): + if after_fork: + self._notempty._at_fork_reinit() + else: + self._notempty = threading.Condition(threading.Lock()) self._buffer = collections.deque() self._thread = None self._jointhread = None