Skip to content

Commit 2317b87

Browse files
encukouhattya
authored andcommitted
pythongh-71936: Fix race condition in multiprocessing.Pool (pythonGH-124973)
* pythongh-71936: Fix race condition in multiprocessing.Pool Proxes of shared objects register a Finalizer in BaseProxy._incref(), and it will call BaseProxy._decref() when it is GCed. This may cause a race condition with Pool(maxtasksperchild=None) on Windows. A connection would be closed and raised TypeError when a GC occurs between _ConnectionBase._check_writable() and _ConnectionBase._send_bytes() in _ConnectionBase.send() in the second or later task, and a new object is allocated that shares the id() of a previously deleted one. Instead of using the id() of the token (or the proxy), use a unique, non-reusable number. (cherry picked from commit ba088c8) Co-authored-by: Petr Viktorin <[email protected]> Co-Authored-By: Akinori Hattori <[email protected]>
1 parent ef0a005 commit 2317b87

File tree

3 files changed

+22
-13
lines changed

3 files changed

+22
-13
lines changed

Lib/multiprocessing/managers.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -755,22 +755,29 @@ class BaseProxy(object):
755755
_address_to_local = {}
756756
_mutex = util.ForkAwareThreadLock()
757757

758+
# Each instance gets a `_serial` number. Unlike `id(...)`, this number
759+
# is never reused.
760+
_next_serial = 1
761+
758762
def __init__(self, token, serializer, manager=None,
759763
authkey=None, exposed=None, incref=True, manager_owned=False):
760764
with BaseProxy._mutex:
761-
tls_idset = BaseProxy._address_to_local.get(token.address, None)
762-
if tls_idset is None:
763-
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
764-
BaseProxy._address_to_local[token.address] = tls_idset
765+
tls_serials = BaseProxy._address_to_local.get(token.address, None)
766+
if tls_serials is None:
767+
tls_serials = util.ForkAwareLocal(), ProcessLocalSet()
768+
BaseProxy._address_to_local[token.address] = tls_serials
769+
770+
self._serial = BaseProxy._next_serial
771+
BaseProxy._next_serial += 1
765772

766773
# self._tls is used to record the connection used by this
767774
# thread to communicate with the manager at token.address
768-
self._tls = tls_idset[0]
775+
self._tls = tls_serials[0]
769776

770-
# self._idset is used to record the identities of all shared
771-
# objects for which the current process owns references and
777+
# self._all_serials is a set used to record the identities of all
778+
# shared objects for which the current process owns references and
772779
# which are in the manager at token.address
773-
self._idset = tls_idset[1]
780+
self._all_serials = tls_serials[1]
774781

775782
self._token = token
776783
self._id = self._token.id
@@ -850,20 +857,20 @@ def _incref(self):
850857
dispatch(conn, None, 'incref', (self._id,))
851858
util.debug('INCREF %r', self._token.id)
852859

853-
self._idset.add(self._id)
860+
self._all_serials.add(self._serial)
854861

855862
state = self._manager and self._manager._state
856863

857864
self._close = util.Finalize(
858865
self, BaseProxy._decref,
859-
args=(self._token, self._authkey, state,
860-
self._tls, self._idset, self._Client),
866+
args=(self._token, self._serial, self._authkey, state,
867+
self._tls, self._all_serials, self._Client),
861868
exitpriority=10
862869
)
863870

864871
@staticmethod
865-
def _decref(token, authkey, state, tls, idset, _Client):
866-
idset.discard(token.id)
872+
def _decref(token, serial, authkey, state, tls, idset, _Client):
873+
idset.discard(serial)
867874

868875
# check whether manager is still alive
869876
if state is None or state.value == State.STARTED:

Misc/ACKS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,7 @@ Larry Hastings
723723
Tim Hatch
724724
Zac Hatfield-Dodds
725725
Shane Hathaway
726+
Akinori Hattori
726727
Michael Haubenwallner
727728
Janko Hauser
728729
Flavian Hautbois
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a race condition in :class:`multiprocessing.pool.Pool`.

0 commit comments

Comments
 (0)