Skip to content

[3.7] bpo-33613, test_semaphore_tracker_sigint: fix race condition (GH-7850) #9055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions Lib/multiprocessing/semaphore_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

__all__ = ['ensure_running', 'register', 'unregister']

_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)


class SemaphoreTracker(object):

Expand All @@ -43,10 +46,16 @@ def ensure_running(self):
with self._lock:
if self._pid is not None:
# semaphore tracker was launched before, is it still running?
pid, status = os.waitpid(self._pid, os.WNOHANG)
if not pid:
# => still alive
return
try:
pid, _ = os.waitpid(self._pid, os.WNOHANG)
except ChildProcessError:
# The process terminated
pass
else:
if not pid:
# => still alive
return

# => dead, launch it again
os.close(self._fd)
self._fd = None
Expand All @@ -68,7 +77,19 @@ def ensure_running(self):
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags()
args += ['-c', cmd % r]
pid = util.spawnv_passfds(exe, args, fds_to_pass)
# bpo-33613: Register a signal mask that will block the signals.
# This signal mask will be inherited by the child that is going
# to be spawned and will protect the child from a race condition
# that can make the child die before it registers signal handlers
# for SIGINT and SIGTERM. The mask is unregistered after spawning
# the child.
try:
if _HAVE_SIGMASK:
signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
pid = util.spawnv_passfds(exe, args, fds_to_pass)
finally:
if _HAVE_SIGMASK:
signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)
except:
os.close(w)
raise
Expand Down Expand Up @@ -104,12 +125,13 @@ def _send(self, cmd, name):
unregister = _semaphore_tracker.unregister
getfd = _semaphore_tracker.getfd


def main(fd):
'''Run semaphore tracker.'''
# protect the process from ^C and "killall python" etc
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
if _HAVE_SIGMASK:
signal.pthread_sigmask(signal.SIG_UNBLOCK, _IGNORED_SIGNALS)

for f in (sys.stdin, sys.stdout):
try:
Expand Down
27 changes: 21 additions & 6 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import struct
import operator
import weakref
import warnings
import test.support
import test.support.script_helper
from test import support
Expand Down Expand Up @@ -4482,17 +4483,19 @@ def check_semaphore_tracker_death(self, signum, should_die):
# bpo-31310: if the semaphore tracker process has died, it should
# be restarted implicitly.
from multiprocessing.semaphore_tracker import _semaphore_tracker
_semaphore_tracker.ensure_running()
pid = _semaphore_tracker._pid
if pid is not None:
os.kill(pid, signal.SIGKILL)
os.waitpid(pid, 0)
with warnings.catch_warnings(record=True) as all_warn:
_semaphore_tracker.ensure_running()
pid = _semaphore_tracker._pid

os.kill(pid, signum)
time.sleep(1.0) # give it time to die

ctx = multiprocessing.get_context("spawn")
with contextlib.ExitStack() as stack:
if should_die:
stack.enter_context(self.assertWarnsRegex(
UserWarning,
"semaphore_tracker: process died"))
with warnings.catch_warnings(record=True) as all_warn:
sem = ctx.Semaphore()
sem.acquire()
sem.release()
Expand All @@ -4502,11 +4505,23 @@ def check_semaphore_tracker_death(self, signum, should_die):
del sem
gc.collect()
self.assertIsNone(wr())
if should_die:
self.assertEqual(len(all_warn), 1)
the_warn = all_warn[0]
issubclass(the_warn.category, UserWarning)
self.assertTrue("semaphore_tracker: process died"
in str(the_warn.message))
else:
self.assertEqual(len(all_warn), 0)

def test_semaphore_tracker_sigint(self):
# Catchable signal (ignored by semaphore tracker)
self.check_semaphore_tracker_death(signal.SIGINT, False)

def test_semaphore_tracker_sigterm(self):
# Catchable signal (ignored by semaphore tracker)
self.check_semaphore_tracker_death(signal.SIGTERM, False)

def test_semaphore_tracker_sigkill(self):
# Uncatchable signal.
self.check_semaphore_tracker_death(signal.SIGKILL, True)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix a race condition in ``multiprocessing.semaphore_tracker`` when the
tracker receives SIGINT before it can register signal handlers for ignoring
it.