Skip to content

Commit 4230697

Browse files
[3.12] gh-110206: Fix multiprocessing test_notify_all (GH-130933) (#130951)
The test could deadlock trying join on the worker processes due to a combination of behaviors: * The use of `assertReachesEventually` did not ensure that workers actually woken.release() because the SyncManager's Semaphore does not implement get_value. * This mean that the test could finish and the variable "sleeping" would got out of scope and be collected. This unregisters the proxy leading to failures in the worker or possibly the manager. * The subsequent call to `p.join()` during cleanUp therefore never finished. This takes two approaches to fix this: 1) Use woken.acquire() to ensure that the workers actually finish calling woken.release() 2) At the end of the test, wait until the workers are finished, while `cond`, `sleeping`, and `woken` are still valid. (cherry picked from commit c476410) Co-authored-by: Sam Gross <[email protected]>
1 parent 56fa6f3 commit 4230697

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

Lib/test/_test_multiprocessing.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,18 +1649,19 @@ def test_notify_all(self):
16491649
woken = self.Semaphore(0)
16501650

16511651
# start some threads/processes which will timeout
1652+
workers = []
16521653
for i in range(3):
16531654
p = self.Process(target=self.f,
16541655
args=(cond, sleeping, woken, TIMEOUT1))
16551656
p.daemon = True
16561657
p.start()
1657-
self.addCleanup(p.join)
1658+
workers.append(p)
16581659

16591660
t = threading.Thread(target=self.f,
16601661
args=(cond, sleeping, woken, TIMEOUT1))
16611662
t.daemon = True
16621663
t.start()
1663-
self.addCleanup(t.join)
1664+
workers.append(t)
16641665

16651666
# wait for them all to sleep
16661667
for i in range(6):
@@ -1679,12 +1680,12 @@ def test_notify_all(self):
16791680
p = self.Process(target=self.f, args=(cond, sleeping, woken))
16801681
p.daemon = True
16811682
p.start()
1682-
self.addCleanup(p.join)
1683+
workers.append(p)
16831684

16841685
t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
16851686
t.daemon = True
16861687
t.start()
1687-
self.addCleanup(t.join)
1688+
workers.append(t)
16881689

16891690
# wait for them to all sleep
16901691
for i in range(6):
@@ -1700,11 +1701,17 @@ def test_notify_all(self):
17001701
cond.release()
17011702

17021703
# check they have all woken
1703-
self.assertReachesEventually(lambda: get_value(woken), 6)
1704+
for i in range(6):
1705+
woken.acquire()
1706+
self.assertReturnsIfImplemented(0, get_value, woken)
17041707

17051708
# check state is not mucked up
17061709
self.check_invariant(cond)
17071710

1711+
for w in workers:
1712+
# NOTE: join_process and join_thread are the same
1713+
threading_helper.join_thread(w)
1714+
17081715
def test_notify_n(self):
17091716
cond = self.Condition()
17101717
sleeping = self.Semaphore(0)

0 commit comments

Comments
 (0)