Skip to content

Commit 7be8743

Browse files
miss-islingtonencukouhattya
authored
[3.13] pythongh-71936: Fix race condition in multiprocessing.Pool (pythonGH-124973) (pythonGH-126869)
Proxes of shared objects register a Finalizer in BaseProxy._incref(), and it will call BaseProxy._decref() when it is GCed. This may cause a race condition with Pool(maxtasksperchild=None) on Windows. A connection would be closed and raised TypeError when a GC occurs between _ConnectionBase._check_writable() and _ConnectionBase._send_bytes() in _ConnectionBase.send() in the second or later task, and a new object is allocated that shares the id() of a previously deleted one. Instead of using the id() of the token (or the proxy), use a unique, non-reusable number. (cherry picked from commit ba088c8) Co-authored-by: Petr Viktorin <[email protected]> Co-authored-by: Akinori Hattori <[email protected]>
1 parent ecda3ae commit 7be8743

File tree

3 files changed

+22
-13
lines changed

3 files changed

+22
-13
lines changed

Lib/multiprocessing/managers.py

+20-13
Original file line numberDiff line numberDiff line change
@@ -758,22 +758,29 @@ class BaseProxy(object):
758758
_address_to_local = {}
759759
_mutex = util.ForkAwareThreadLock()
760760

761+
# Each instance gets a `_serial` number. Unlike `id(...)`, this number
762+
# is never reused.
763+
_next_serial = 1
764+
761765
def __init__(self, token, serializer, manager=None,
762766
authkey=None, exposed=None, incref=True, manager_owned=False):
763767
with BaseProxy._mutex:
764-
tls_idset = BaseProxy._address_to_local.get(token.address, None)
765-
if tls_idset is None:
766-
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
767-
BaseProxy._address_to_local[token.address] = tls_idset
768+
tls_serials = BaseProxy._address_to_local.get(token.address, None)
769+
if tls_serials is None:
770+
tls_serials = util.ForkAwareLocal(), ProcessLocalSet()
771+
BaseProxy._address_to_local[token.address] = tls_serials
772+
773+
self._serial = BaseProxy._next_serial
774+
BaseProxy._next_serial += 1
768775

769776
# self._tls is used to record the connection used by this
770777
# thread to communicate with the manager at token.address
771-
self._tls = tls_idset[0]
778+
self._tls = tls_serials[0]
772779

773-
# self._idset is used to record the identities of all shared
774-
# objects for which the current process owns references and
780+
# self._all_serials is a set used to record the identities of all
781+
# shared objects for which the current process owns references and
775782
# which are in the manager at token.address
776-
self._idset = tls_idset[1]
783+
self._all_serials = tls_serials[1]
777784

778785
self._token = token
779786
self._id = self._token.id
@@ -856,20 +863,20 @@ def _incref(self):
856863
dispatch(conn, None, 'incref', (self._id,))
857864
util.debug('INCREF %r', self._token.id)
858865

859-
self._idset.add(self._id)
866+
self._all_serials.add(self._serial)
860867

861868
state = self._manager and self._manager._state
862869

863870
self._close = util.Finalize(
864871
self, BaseProxy._decref,
865-
args=(self._token, self._authkey, state,
866-
self._tls, self._idset, self._Client),
872+
args=(self._token, self._serial, self._authkey, state,
873+
self._tls, self._all_serials, self._Client),
867874
exitpriority=10
868875
)
869876

870877
@staticmethod
871-
def _decref(token, authkey, state, tls, idset, _Client):
872-
idset.discard(token.id)
878+
def _decref(token, serial, authkey, state, tls, idset, _Client):
879+
idset.discard(serial)
873880

874881
# check whether manager is still alive
875882
if state is None or state.value == State.STARTED:

Misc/ACKS

+1
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,7 @@ Larry Hastings
730730
Tim Hatch
731731
Zac Hatfield-Dodds
732732
Shane Hathaway
733+
Akinori Hattori
733734
Michael Haubenwallner
734735
Janko Hauser
735736
Flavian Hautbois
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a race condition in :class:`multiprocessing.pool.Pool`.

0 commit comments

Comments
 (0)