Skip to content

Commit e3acd45

Browse files
gpsheadebonnal
authored andcommitted
pythongh-97514: Authenticate the forkserver control socket. (pythonGH-99309)
This adds authentication to the forkserver control socket. In the past only filesystem permissions protected this socket from code injection into the forkserver process by limiting access to the same UID, which didn't exist when Linux abstract namespace sockets were used (see issue) meaning that any process in the same system network namespace could inject code. We've since stopped using abstract namespace sockets by default, but protecting our control sockets regardless of type is a good idea. This reuses the HMAC based shared key auth already used by `multiprocessing.connection` sockets for other purposes. Doing this is useful so that filesystem permissions are not relied upon and trust isn't implied by default between all processes running as the same UID with access to the unix socket. ### pyperformance benchmarks No significant changes. Including `concurrent_imap` which exercises `multiprocessing.Pool.imap` in that suite. ### Microbenchmarks This does _slightly_ slow down forkserver use. How much so appears to depend on the platform. Modern platforms and simple platforms are less impacted. This PR adds additional IPC round trips to the control socket to tell forkserver to spawn a new process. Systems with potentially high latency IPC are naturally impacted more. Typically a 1-4% slowdown on a very targeted process creation microbenchmark, with a worst case overloaded system slowdown of 20%. No evidence that these slowdowns appear in practical sense. See the PR for details.
1 parent 50dfba5 commit e3acd45

File tree

6 files changed

+141
-16
lines changed

6 files changed

+141
-16
lines changed

Doc/whatsnew/3.14.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,11 @@ multiprocessing
421421
:func:`multiprocessing.get_context` (preferred) or change the default via
422422
:func:`multiprocessing.set_start_method`.
423423
(Contributed by Gregory P. Smith in :gh:`84559`.)
424+
* :mod:`multiprocessing`'s ``"forkserver"`` start method now authenticates
425+
its control socket to avoid solely relying on filesystem permissions
426+
to restrict what other processes could cause the forkserver to spawn workers
427+
and run code.
428+
(Contributed by Gregory P. Smith for :gh:`97514`.)
424429
* The :ref:`multiprocessing proxy objects <multiprocessing-proxy_objects>`
425430
for *list* and *dict* types gain previously overlooked missing methods:
426431

Lib/multiprocessing/connection.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ def close(self):
181181
finally:
182182
self._handle = None
183183

184+
def _detach(self):
185+
"""Stop managing the underlying file descriptor or handle."""
186+
self._handle = None
187+
184188
def send_bytes(self, buf, offset=0, size=None):
185189
"""Send the bytes data from a bytes-like object"""
186190
self._check_closed()

Lib/multiprocessing/forkserver.py

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import threading
1010
import warnings
1111

12+
from . import AuthenticationError
1213
from . import connection
1314
from . import process
1415
from .context import reduction
@@ -25,6 +26,7 @@
2526

2627
MAXFDS_TO_SEND = 256
2728
SIGNED_STRUCT = struct.Struct('q') # large enough for pid_t
29+
_AUTHKEY_LEN = 32 # <= PIPEBUF so it fits a single write to an empty pipe.
2830

2931
#
3032
# Forkserver class
@@ -33,6 +35,7 @@
3335
class ForkServer(object):
3436

3537
def __init__(self):
38+
self._forkserver_authkey = None
3639
self._forkserver_address = None
3740
self._forkserver_alive_fd = None
3841
self._forkserver_pid = None
@@ -59,6 +62,7 @@ def _stop_unlocked(self):
5962
if not util.is_abstract_socket_namespace(self._forkserver_address):
6063
os.unlink(self._forkserver_address)
6164
self._forkserver_address = None
65+
self._forkserver_authkey = None
6266

6367
def set_forkserver_preload(self, modules_names):
6468
'''Set list of module names to try to load in forkserver process.'''
@@ -83,6 +87,7 @@ def connect_to_new_process(self, fds):
8387
process data.
8488
'''
8589
self.ensure_running()
90+
assert self._forkserver_authkey
8691
if len(fds) + 4 >= MAXFDS_TO_SEND:
8792
raise ValueError('too many fds')
8893
with socket.socket(socket.AF_UNIX) as client:
@@ -93,6 +98,18 @@ def connect_to_new_process(self, fds):
9398
resource_tracker.getfd()]
9499
allfds += fds
95100
try:
101+
client.setblocking(True)
102+
wrapped_client = connection.Connection(client.fileno())
103+
# The other side of this exchange happens in the child as
104+
# implemented in main().
105+
try:
106+
connection.answer_challenge(
107+
wrapped_client, self._forkserver_authkey)
108+
connection.deliver_challenge(
109+
wrapped_client, self._forkserver_authkey)
110+
finally:
111+
wrapped_client._detach()
112+
del wrapped_client
96113
reduction.sendfds(client, allfds)
97114
return parent_r, parent_w
98115
except:
@@ -120,6 +137,7 @@ def ensure_running(self):
120137
return
121138
# dead, launch it again
122139
os.close(self._forkserver_alive_fd)
140+
self._forkserver_authkey = None
123141
self._forkserver_address = None
124142
self._forkserver_alive_fd = None
125143
self._forkserver_pid = None
@@ -130,9 +148,9 @@ def ensure_running(self):
130148
if self._preload_modules:
131149
desired_keys = {'main_path', 'sys_path'}
132150
data = spawn.get_preparation_data('ignore')
133-
data = {x: y for x, y in data.items() if x in desired_keys}
151+
main_kws = {x: y for x, y in data.items() if x in desired_keys}
134152
else:
135-
data = {}
153+
main_kws = {}
136154

137155
with socket.socket(socket.AF_UNIX) as listener:
138156
address = connection.arbitrary_address('AF_UNIX')
@@ -144,19 +162,31 @@ def ensure_running(self):
144162
# all client processes own the write end of the "alive" pipe;
145163
# when they all terminate the read end becomes ready.
146164
alive_r, alive_w = os.pipe()
165+
# A short lived pipe to initialize the forkserver authkey.
166+
authkey_r, authkey_w = os.pipe()
147167
try:
148-
fds_to_pass = [listener.fileno(), alive_r]
168+
fds_to_pass = [listener.fileno(), alive_r, authkey_r]
169+
main_kws['authkey_r'] = authkey_r
149170
cmd %= (listener.fileno(), alive_r, self._preload_modules,
150-
data)
171+
main_kws)
151172
exe = spawn.get_executable()
152173
args = [exe] + util._args_from_interpreter_flags()
153174
args += ['-c', cmd]
154175
pid = util.spawnv_passfds(exe, args, fds_to_pass)
155176
except:
156177
os.close(alive_w)
178+
os.close(authkey_w)
157179
raise
158180
finally:
159181
os.close(alive_r)
182+
os.close(authkey_r)
183+
# Authenticate our control socket to prevent access from
184+
# processes we have not shared this key with.
185+
try:
186+
self._forkserver_authkey = os.urandom(_AUTHKEY_LEN)
187+
os.write(authkey_w, self._forkserver_authkey)
188+
finally:
189+
os.close(authkey_w)
160190
self._forkserver_address = address
161191
self._forkserver_alive_fd = alive_w
162192
self._forkserver_pid = pid
@@ -165,8 +195,18 @@ def ensure_running(self):
165195
#
166196
#
167197

168-
def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
169-
'''Run forkserver.'''
198+
def main(listener_fd, alive_r, preload, main_path=None, sys_path=None,
199+
*, authkey_r=None):
200+
"""Run forkserver."""
201+
if authkey_r is not None:
202+
try:
203+
authkey = os.read(authkey_r, _AUTHKEY_LEN)
204+
assert len(authkey) == _AUTHKEY_LEN, f'{len(authkey)} < {_AUTHKEY_LEN}'
205+
finally:
206+
os.close(authkey_r)
207+
else:
208+
authkey = b''
209+
170210
if preload:
171211
if sys_path is not None:
172212
sys.path[:] = sys_path
@@ -257,8 +297,24 @@ def sigchld_handler(*_unused):
257297
if listener in rfds:
258298
# Incoming fork request
259299
with listener.accept()[0] as s:
260-
# Receive fds from client
261-
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
300+
try:
301+
if authkey:
302+
wrapped_s = connection.Connection(s.fileno())
303+
# The other side of this exchange happens in
304+
# in connect_to_new_process().
305+
try:
306+
connection.deliver_challenge(
307+
wrapped_s, authkey)
308+
connection.answer_challenge(
309+
wrapped_s, authkey)
310+
finally:
311+
wrapped_s._detach()
312+
del wrapped_s
313+
# Receive fds from client
314+
fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
315+
except (EOFError, BrokenPipeError, AuthenticationError):
316+
s.close()
317+
continue
262318
if len(fds) > MAXFDS_TO_SEND:
263319
raise RuntimeError(
264320
"Too many ({0:n}) fds to send".format(

Lib/multiprocessing/reduction.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,12 @@ def detach(self):
139139
__all__ += ['DupFd', 'sendfds', 'recvfds']
140140
import array
141141

142-
# On MacOSX we should acknowledge receipt of fds -- see Issue14669
143-
ACKNOWLEDGE = sys.platform == 'darwin'
144-
145142
def sendfds(sock, fds):
146143
'''Send an array of fds over an AF_UNIX socket.'''
147144
fds = array.array('i', fds)
148145
msg = bytes([len(fds) % 256])
149146
sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
150-
if ACKNOWLEDGE and sock.recv(1) != b'A':
147+
if sock.recv(1) != b'A':
151148
raise RuntimeError('did not receive acknowledgement of fd')
152149

153150
def recvfds(sock, size):
@@ -158,8 +155,11 @@ def recvfds(sock, size):
158155
if not msg and not ancdata:
159156
raise EOFError
160157
try:
161-
if ACKNOWLEDGE:
162-
sock.send(b'A')
158+
# We send/recv an Ack byte after the fds to work around an old
159+
# macOS bug; it isn't clear if this is still required but it
160+
# makes unit testing fd sending easier.
161+
# See: https://github.com/python/cpython/issues/58874
162+
sock.send(b'A') # Acknowledge
163163
if len(ancdata) != 1:
164164
raise RuntimeError('received %d items of ancdata' %
165165
len(ancdata))

Lib/test/_test_multiprocessing.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -846,8 +846,8 @@ def test_error_on_stdio_flush_2(self):
846846
finally:
847847
setattr(sys, stream_name, old_stream)
848848

849-
@classmethod
850-
def _sleep_and_set_event(self, evt, delay=0.0):
849+
@staticmethod
850+
def _sleep_and_set_event(evt, delay=0.0):
851851
time.sleep(delay)
852852
evt.set()
853853

@@ -898,6 +898,56 @@ def test_forkserver_sigkill(self):
898898
if os.name != 'nt':
899899
self.check_forkserver_death(signal.SIGKILL)
900900

901+
def test_forkserver_auth_is_enabled(self):
902+
if self.TYPE == "threads":
903+
self.skipTest(f"test not appropriate for {self.TYPE}")
904+
if multiprocessing.get_start_method() != "forkserver":
905+
self.skipTest("forkserver start method specific")
906+
907+
forkserver = multiprocessing.forkserver._forkserver
908+
forkserver.ensure_running()
909+
self.assertTrue(forkserver._forkserver_pid)
910+
authkey = forkserver._forkserver_authkey
911+
self.assertTrue(authkey)
912+
self.assertGreater(len(authkey), 15)
913+
addr = forkserver._forkserver_address
914+
self.assertTrue(addr)
915+
916+
# Demonstrate that a raw auth handshake, as Client performs, does not
917+
# raise an error.
918+
client = multiprocessing.connection.Client(addr, authkey=authkey)
919+
client.close()
920+
921+
# That worked, now launch a quick process.
922+
proc = self.Process(target=sys.exit)
923+
proc.start()
924+
proc.join()
925+
self.assertEqual(proc.exitcode, 0)
926+
927+
def test_forkserver_without_auth_fails(self):
928+
if self.TYPE == "threads":
929+
self.skipTest(f"test not appropriate for {self.TYPE}")
930+
if multiprocessing.get_start_method() != "forkserver":
931+
self.skipTest("forkserver start method specific")
932+
933+
forkserver = multiprocessing.forkserver._forkserver
934+
forkserver.ensure_running()
935+
self.assertTrue(forkserver._forkserver_pid)
936+
authkey_len = len(forkserver._forkserver_authkey)
937+
with unittest.mock.patch.object(
938+
forkserver, '_forkserver_authkey', None):
939+
# With an incorrect authkey we should get an auth rejection
940+
# rather than the above protocol error.
941+
forkserver._forkserver_authkey = b'T' * authkey_len
942+
proc = self.Process(target=sys.exit)
943+
with self.assertRaises(multiprocessing.AuthenticationError):
944+
proc.start()
945+
del proc
946+
947+
# authkey restored, launching processes should work again.
948+
proc = self.Process(target=sys.exit)
949+
proc.start()
950+
proc.join()
901951

902952
#
903953
#
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Authentication was added to the :mod:`multiprocessing` forkserver start
2+
method control socket so that only processes with the authentication key
3+
generated by the process that spawned the forkserver can control it. This
4+
is an enhancement over the other :gh:`97514` fixes so that access is no
5+
longer limited only by filesystem permissions.
6+
7+
The file descriptor exchange of control pipes with the forked worker process
8+
now requires an explicit acknowledgement byte to be sent over the socket after
9+
the exchange on all forkserver supporting platforms. That makes testing the
10+
above much easier.

0 commit comments

Comments
 (0)