Skip to content

Commit ba088c8

Browse files
encukouhattya
andauthored
pythongh-71936: Fix race condition in multiprocessing.Pool (pythonGH-124973)
* pythongh-71936: Fix race condition in multiprocessing.Pool Proxes of shared objects register a Finalizer in BaseProxy._incref(), and it will call BaseProxy._decref() when it is GCed. This may cause a race condition with Pool(maxtasksperchild=None) on Windows. A connection would be closed and raised TypeError when a GC occurs between _ConnectionBase._check_writable() and _ConnectionBase._send_bytes() in _ConnectionBase.send() in the second or later task, and a new object is allocated that shares the id() of a previously deleted one. Instead of using the id() of the token (or the proxy), use a unique, non-reusable number. Co-Authored-By: Akinori Hattori <[email protected]>
1 parent 1e40c5b commit ba088c8

File tree

3 files changed

+22
-13
lines changed

3 files changed

+22
-13
lines changed

Lib/multiprocessing/managers.py

+20-13
Original file line numberDiff line numberDiff line change
@@ -759,22 +759,29 @@ class BaseProxy(object):
759759
_address_to_local = {}
760760
_mutex = util.ForkAwareThreadLock()
761761

762+
# Each instance gets a `_serial` number. Unlike `id(...)`, this number
763+
# is never reused.
764+
_next_serial = 1
765+
762766
def __init__(self, token, serializer, manager=None,
763767
authkey=None, exposed=None, incref=True, manager_owned=False):
764768
with BaseProxy._mutex:
765-
tls_idset = BaseProxy._address_to_local.get(token.address, None)
766-
if tls_idset is None:
767-
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
768-
BaseProxy._address_to_local[token.address] = tls_idset
769+
tls_serials = BaseProxy._address_to_local.get(token.address, None)
770+
if tls_serials is None:
771+
tls_serials = util.ForkAwareLocal(), ProcessLocalSet()
772+
BaseProxy._address_to_local[token.address] = tls_serials
773+
774+
self._serial = BaseProxy._next_serial
775+
BaseProxy._next_serial += 1
769776

770777
# self._tls is used to record the connection used by this
771778
# thread to communicate with the manager at token.address
772-
self._tls = tls_idset[0]
779+
self._tls = tls_serials[0]
773780

774-
# self._idset is used to record the identities of all shared
775-
# objects for which the current process owns references and
781+
# self._all_serials is a set used to record the identities of all
782+
# shared objects for which the current process owns references and
776783
# which are in the manager at token.address
777-
self._idset = tls_idset[1]
784+
self._all_serials = tls_serials[1]
778785

779786
self._token = token
780787
self._id = self._token.id
@@ -857,20 +864,20 @@ def _incref(self):
857864
dispatch(conn, None, 'incref', (self._id,))
858865
util.debug('INCREF %r', self._token.id)
859866

860-
self._idset.add(self._id)
867+
self._all_serials.add(self._serial)
861868

862869
state = self._manager and self._manager._state
863870

864871
self._close = util.Finalize(
865872
self, BaseProxy._decref,
866-
args=(self._token, self._authkey, state,
867-
self._tls, self._idset, self._Client),
873+
args=(self._token, self._serial, self._authkey, state,
874+
self._tls, self._all_serials, self._Client),
868875
exitpriority=10
869876
)
870877

871878
@staticmethod
872-
def _decref(token, authkey, state, tls, idset, _Client):
873-
idset.discard(token.id)
879+
def _decref(token, serial, authkey, state, tls, idset, _Client):
880+
idset.discard(serial)
874881

875882
# check whether manager is still alive
876883
if state is None or state.value == State.STARTED:

Misc/ACKS

+1
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,7 @@ Larry Hastings
733733
Tim Hatch
734734
Zac Hatfield-Dodds
735735
Shane Hathaway
736+
Akinori Hattori
736737
Michael Haubenwallner
737738
Janko Hauser
738739
Flavian Hautbois
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a race condition in :class:`multiprocessing.pool.Pool`.

0 commit comments

Comments
 (0)