@@ -54,48 +54,164 @@ def _new_module(name):
54
54
# A dict mapping module names to weakrefs of _ModuleLock instances
55
55
# Dictionary protected by the global import lock
56
56
_module_locks = {}
57
- # A dict mapping thread ids to _ModuleLock instances
57
+
58
+ # A dict mapping thread IDs to lists of _ModuleLock instances. This maps a
59
+ # thread to the module locks it is blocking on acquiring. The values are
60
+ # lists because a single thread could perform a re-entrant import and be "in
61
+ # the process" of blocking on locks for more than one module. A thread can
62
+ # be "in the process" because a thread cannot actually block on acquiring
63
+ # more than one lock but it can have set up bookkeeping that reflects that
64
+ # it intends to block on acquiring more than one lock.
58
65
_blocking_on = {}
59
66
60
67
68
+ class _BlockingOnManager :
69
+ """A context manager responsible to updating ``_blocking_on``."""
70
+ def __init__ (self , thread_id , lock ):
71
+ self .thread_id = thread_id
72
+ self .lock = lock
73
+
74
+ def __enter__ (self ):
75
+ """Mark the running thread as waiting for self.lock. via _blocking_on."""
76
+ # Interactions with _blocking_on are *not* protected by the global
77
+ # import lock here because each thread only touches the state that it
78
+ # owns (state keyed on its thread id). The global import lock is
79
+ # re-entrant (i.e., a single thread may take it more than once) so it
80
+ # wouldn't help us be correct in the face of re-entrancy either.
81
+
82
+ self .blocked_on = _blocking_on .setdefault (self .thread_id , [])
83
+ self .blocked_on .append (self .lock )
84
+
85
+ def __exit__ (self , * args , ** kwargs ):
86
+ """Remove self.lock from this thread's _blocking_on list."""
87
+ self .blocked_on .remove (self .lock )
88
+
89
+
61
90
class _DeadlockError (RuntimeError ):
62
91
pass
63
92
64
93
94
+
95
+ def _has_deadlocked (target_id , * , seen_ids , candidate_ids , blocking_on ):
96
+ """Check if 'target_id' is holding the same lock as another thread(s).
97
+
98
+ The search within 'blocking_on' starts with the threads listed in
99
+ 'candidate_ids'. 'seen_ids' contains any threads that are considered
100
+ already traversed in the search.
101
+
102
+ Keyword arguments:
103
+ target_id -- The thread id to try to reach.
104
+ seen_ids -- A set of threads that have already been visited.
105
+ candidate_ids -- The thread ids from which to begin.
106
+ blocking_on -- A dict representing the thread/blocking-on graph. This may
107
+ be the same object as the global '_blocking_on' but it is
108
+ a parameter to reduce the impact that global mutable
109
+ state has on the result of this function.
110
+ """
111
+ if target_id in candidate_ids :
112
+ # If we have already reached the target_id, we're done - signal that it
113
+ # is reachable.
114
+ return True
115
+
116
+ # Otherwise, try to reach the target_id from each of the given candidate_ids.
117
+ for tid in candidate_ids :
118
+ if not (candidate_blocking_on := blocking_on .get (tid )):
119
+ # There are no edges out from this node, skip it.
120
+ continue
121
+ elif tid in seen_ids :
122
+ # bpo 38091: the chain of tid's we encounter here eventually leads
123
+ # to a fixed point or a cycle, but does not reach target_id.
124
+ # This means we would not actually deadlock. This can happen if
125
+ # other threads are at the beginning of acquire() below.
126
+ return False
127
+ seen_ids .add (tid )
128
+
129
+ # Follow the edges out from this thread.
130
+ edges = [lock .owner for lock in candidate_blocking_on ]
131
+ if _has_deadlocked (target_id , seen_ids = seen_ids , candidate_ids = edges ,
132
+ blocking_on = blocking_on ):
133
+ return True
134
+
135
+ return False
136
+
137
+
65
138
class _ModuleLock :
66
139
"""A recursive lock implementation which is able to detect deadlocks
67
140
(e.g. thread 1 trying to take locks A then B, and thread 2 trying to
68
141
take locks B then A).
69
142
"""
70
143
71
144
def __init__ (self , name ):
72
- self .lock = _thread .allocate_lock ()
145
+ # Create an RLock for protecting the import process for the
146
+ # corresponding module. Since it is an RLock, a single thread will be
147
+ # able to take it more than once. This is necessary to support
148
+ # re-entrancy in the import system that arises from (at least) signal
149
+ # handlers and the garbage collector. Consider the case of:
150
+ #
151
+ # import foo
152
+ # -> ...
153
+ # -> importlib._bootstrap._ModuleLock.acquire
154
+ # -> ...
155
+ # -> <garbage collector>
156
+ # -> __del__
157
+ # -> import foo
158
+ # -> ...
159
+ # -> importlib._bootstrap._ModuleLock.acquire
160
+ # -> _BlockingOnManager.__enter__
161
+ #
162
+ # If a different thread than the running one holds the lock then the
163
+ # thread will have to block on taking the lock, which is what we want
164
+ # for thread safety.
165
+ self .lock = _thread .RLock ()
73
166
self .wakeup = _thread .allocate_lock ()
167
+
168
+ # The name of the module for which this is a lock.
74
169
self .name = name
170
+
171
+ # Can end up being set to None if this lock is not owned by any thread
172
+ # or the thread identifier for the owning thread.
75
173
self .owner = None
76
- self .count = 0
77
- self .waiters = 0
174
+
175
+ # Represent the number of times the owning thread has acquired this lock
176
+ # via a list of True. This supports RLock-like ("re-entrant lock")
177
+ # behavior, necessary in case a single thread is following a circular
178
+ # import dependency and needs to take the lock for a single module
179
+ # more than once.
180
+ #
181
+ # Counts are represented as a list of True because list.append(True)
182
+ # and list.pop() are both atomic and thread-safe in CPython and it's hard
183
+ # to find another primitive with the same properties.
184
+ self .count = []
185
+
186
+ # This is a count of the number of threads that are blocking on
187
+ # self.wakeup.acquire() awaiting to get their turn holding this module
188
+ # lock. When the module lock is released, if this is greater than
189
+ # zero, it is decremented and `self.wakeup` is released one time. The
190
+ # intent is that this will let one other thread make more progress on
191
+ # acquiring this module lock. This repeats until all the threads have
192
+ # gotten a turn.
193
+ #
194
+ # This is incremented in self.acquire() when a thread notices it is
195
+ # going to have to wait for another thread to finish.
196
+ #
197
+ # See the comment above count for explanation of the representation.
198
+ self .waiters = []
78
199
79
200
def has_deadlock (self ):
80
- # Deadlock avoidance for concurrent circular imports.
81
- me = _thread .get_ident ()
82
- tid = self .owner
83
- seen = set ()
84
- while True :
85
- lock = _blocking_on .get (tid )
86
- if lock is None :
87
- return False
88
- tid = lock .owner
89
- if tid == me :
90
- return True
91
- if tid in seen :
92
- # bpo 38091: the chain of tid's we encounter here
93
- # eventually leads to a fixpoint or a cycle, but
94
- # does not reach 'me'. This means we would not
95
- # actually deadlock. This can happen if other
96
- # threads are at the beginning of acquire() below.
97
- return False
98
- seen .add (tid )
201
+ # To avoid deadlocks for concurrent or re-entrant circular imports,
202
+ # look at _blocking_on to see if any threads are blocking
203
+ # on getting the import lock for any module for which the import lock
204
+ # is held by this thread.
205
+ return _has_deadlocked (
206
+ # Try to find this thread.
207
+ target_id = _thread .get_ident (),
208
+ seen_ids = set (),
209
+ # Start from the thread that holds the import lock for this
210
+ # module.
211
+ candidate_ids = [self .owner ],
212
+ # Use the global "blocking on" state.
213
+ blocking_on = _blocking_on ,
214
+ )
99
215
100
216
def acquire (self ):
101
217
"""
@@ -104,35 +220,78 @@ def acquire(self):
104
220
Otherwise, the lock is always acquired and True is returned.
105
221
"""
106
222
tid = _thread .get_ident ()
107
- _blocking_on [tid ] = self
108
- try :
223
+ with _BlockingOnManager (tid , self ):
109
224
while True :
225
+ # Protect interaction with state on self with a per-module
226
+ # lock. This makes it safe for more than one thread to try to
227
+ # acquire the lock for a single module at the same time.
110
228
with self .lock :
111
- if self .count == 0 or self .owner == tid :
229
+ if self .count == [] or self .owner == tid :
230
+ # If the lock for this module is unowned then we can
231
+ # take the lock immediately and succeed. If the lock
232
+ # for this module is owned by the running thread then
233
+ # we can also allow the acquire to succeed. This
234
+ # supports circular imports (thread T imports module A
235
+ # which imports module B which imports module A).
112
236
self .owner = tid
113
- self .count += 1
237
+ self .count . append ( True )
114
238
return True
239
+
240
+ # At this point we know the lock is held (because count !=
241
+ # 0) by another thread (because owner != tid). We'll have
242
+ # to get in line to take the module lock.
243
+
244
+ # But first, check to see if this thread would create a
245
+ # deadlock by acquiring this module lock. If it would
246
+ # then just stop with an error.
247
+ #
248
+ # It's not clear who is expected to handle this error.
249
+ # There is one handler in _lock_unlock_module but many
250
+ # times this method is called when entering the context
251
+ # manager _ModuleLockManager instead - so _DeadlockError
252
+ # will just propagate up to application code.
253
+ #
254
+ # This seems to be more than just a hypothetical -
255
+ # https://stackoverflow.com/questions/59509154
256
+ # https://github.com/encode/django-rest-framework/issues/7078
115
257
if self .has_deadlock ():
116
- raise _DeadlockError ('deadlock detected by %r' % self )
258
+ raise _DeadlockError (f'deadlock detected by { self !r} ' )
259
+
260
+ # Check to see if we're going to be able to acquire the
261
+ # lock. If we are going to have to wait then increment
262
+ # the waiters so `self.release` will know to unblock us
263
+ # later on. We do this part non-blockingly so we don't
264
+ # get stuck here before we increment waiters. We have
265
+ # this extra acquire call (in addition to the one below,
266
+ # outside the self.lock context manager) to make sure
267
+ # self.wakeup is held when the next acquire is called (so
268
+ # we block). This is probably needlessly complex and we
269
+ # should just take self.wakeup in the return codepath
270
+ # above.
117
271
if self .wakeup .acquire (False ):
118
- self .waiters += 1
119
- # Wait for a release() call
272
+ self .waiters .append (None )
273
+
274
+ # Now take the lock in a blocking fashion. This won't
275
+ # complete until the thread holding this lock
276
+ # (self.owner) calls self.release.
120
277
self .wakeup .acquire ()
278
+
279
+ # Taking the lock has served its purpose (making us wait), so we can
280
+ # give it up now. We'll take it w/o blocking again on the
281
+ # next iteration around this 'while' loop.
121
282
self .wakeup .release ()
122
- finally :
123
- del _blocking_on [tid ]
124
283
125
284
def release (self ):
126
285
tid = _thread .get_ident ()
127
286
with self .lock :
128
287
if self .owner != tid :
129
288
raise RuntimeError ('cannot release un-acquired lock' )
130
- assert self .count > 0
131
- self .count -= 1
132
- if self .count == 0 :
289
+ assert len ( self .count ) > 0
290
+ self .count . pop ()
291
+ if not len ( self .count ) :
133
292
self .owner = None
134
- if self .waiters :
135
- self .waiters -= 1
293
+ if len ( self .waiters ) > 0 :
294
+ self .waiters . pop ()
136
295
self .wakeup .release ()
137
296
138
297
def __repr__ (self ):
0 commit comments