Skip to content

bpo-21109: Add SafeTarFile #15244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 147 additions & 1 deletion Doc/library/tarfile.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
.. sectionauthor:: Lars Gustäbel <[email protected]>

**Source code:** :source:`Lib/tarfile.py`

--------------

The :mod:`tarfile` module makes it possible to read and write tar
Expand Down Expand Up @@ -512,6 +512,137 @@ be finalized; only the internally used file object will be closed. See the



.. _safetarfile-objects:

SafeTarFile Objects
-------------------

In general, it is no good idea to extract tar archives from sources you do not
completely trust. Archives that were created carelessly or maliciously may
contain file system objects in configurations that pose a variety of risks to
the system if they are extracted, for example overwriting existing files in
unanticipated locations. See the warning for :meth:`TarFile.extractall`.

The :class:`SafeTarFile` class is a replacement for the :class:`TarFile` class
that can be used identically but tries to safeguard against a number of
unwanted side-effects. :class:`SafeTarFile` does this by identifying bad
archives and preventing the bad parts from being extracted. The default
behaviour of the :class:`SafeTarFile` class is to raise a :exc:`SecurityError`
exception in case of a bad archive member or a :exc:`LimitError` in case of an
exceeded limit.

.. note::

There is no additional benefit in using :class:`SafeTarFile` for the
creation of tar archives.

.. versionadded:: 3.5
Added the :class:`SafeTarFile` class.

.. class:: SafeTarFile(..., ignore_warnings=None, max_files=100000, max_total=1073741824)

:class:`SafeTarFile` offers a few additional keyword arguments to the
arguments it has in common with the :class:`TarFile` class:

*ignore_warnings* takes a list of constants one for each warning that
you like to ignore, by default no warnings are ignored. See the first part
of :ref:`safetarfile-configuration` for the constants.

*max_files* is the maximum allowed number of files stored in the tar
archive, default is ``100000``. To disable the limit, pass :const:`None` or
``0``.

*max_total* is the maximum allowed size in bytes that all files together may
occupy when extracted. This defaults to 1 GiB. To disable the limit, pass
:const:`None` or ``0``.

.. method:: SafeTarFile.analyze()

Check the archive for possible issues, and generate a 2-tuple for each
member consisting of the member's :class:`TarInfo` object and a :class:`set`
that is either empty (good) or contains one or more warnings described in
:ref:`safetarfile-configuration` (bad). No :exc:`SecurityError` exceptions
are raised. If a limit is exceeded a :exc:`LimitError` is raised.

.. method:: SafeTarFile.filter()

Return a generator that only produces :class:`TarInfo` objects that are not
marked as bad, e.g. to restore the good parts of an archive. However, if a
limit is exceeded a :exc:`LimitError` is raised.

.. method:: SafeTarFile.is_safe()

Analyze the archive and return :const:`True` if there were no issues found
and it should be safe to extract the archive to the file system. Neither
:exc:`SecurityError` nor :exc:`LimitError` will be raised.



.. _safetarfile-configuration:

SafeTarFile configuration
~~~~~~~~~~~~~~~~~~~~~~~~~

There are two different types of checks built into :class:`SafeTarFile`. The
first type takes care of archive members whose configuration poses a risk to
the system when they are extracted. Each of these checks can be switched off
by passing a list of the following constants as the *ignore_warnings* argument
to the :class:`SafeTarFile` constructor. These constants are also stored in
the :attr:`warning` attribute of a :exc:`SecurityError`.

.. data:: WARN_ABSOLUTE_NAME

An absolute pathname (names starting with a ``"/"``).

.. data:: WARN_RELATIVE_NAME

A relative pathname (names starting with ``".."``) that breaks out of the
destination directory.

.. data:: WARN_DUPLICATE_NAME

A duplicate pathname.

.. data:: WARN_ABSOLUTE_LINKNAME

An absolute linkname.

.. data:: WARN_RELATIVE_LINKNAME

A relative linkname that breaks out of the destination directory.

.. data:: WARN_SETUID_SET

A regular file with a set-user-id permission bit set.

.. data:: WARN_SETGID_SET

A regular file with a set-group-id permission bit set.

.. data:: WARN_CHARACTER_DEVICE

A character device node.

.. data:: WARN_BLOCK_DEVICE

A block device node.

The second type of check makes sure that the archive complies to a number of
user-defined limits, e.g. to prevent denial-of-service scenarios by excessive
use of memory or disk space. These limits can be configured using the keyword
arguments exclusive to the :class:`SafeTarFile` constructor. The following
constants are stored in the :attr:`warning` attribute of a :exc:`LimitError`.

.. data:: LIMIT_MAX_FILES

Maximum allowed number of files exceeded.

.. data:: LIMIT_MAX_SIZE

Maximum allowed total size of unpacked contents exceeded.



.. _tarinfo-objects:

TarInfo Objects
Expand Down Expand Up @@ -804,6 +935,21 @@ parameter in :meth:`TarFile.add`::
tar.add("foo", filter=reset)
tar.close()

How to safely extract a tar archive from an untrusted source::

import tarfile

with tarfile.safe_open("sample.tar", ignore_warnings={tarfile.WARN_DUPLICATE_NAME}) as tar:
# We don't care about duplicate archive members.
if not tar.is_safe():
print("sample.tar has the following issues:")
for tarinfo, warnings in tar.analyze():
print(tarinfo.name, ",".join(warnings))
print("extracting the good parts")
tar.extractall(members=tar.filter())
else:
tar.extractall()


.. _tar-formats:

Expand Down
193 changes: 189 additions & 4 deletions Lib/tarfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@
pass

# from tarfile import *
__all__ = ["TarFile", "TarInfo", "is_tarfile", "TarError", "ReadError",
"CompressionError", "StreamError", "ExtractError", "HeaderError",
"ENCODING", "USTAR_FORMAT", "GNU_FORMAT", "PAX_FORMAT",
"DEFAULT_FORMAT", "open"]
__all__ = ["TarFile", "SafeTarFile", "TarInfo", "is_tarfile", "TarError",
"ReadError", "CompressionError", "StreamError", "ExtractError",
"HeaderError", "SecurityError", "LimitError", "ENCODING",
"USTAR_FORMAT", "GNU_FORMAT", "PAX_FORMAT", "LIMIT_MAX_SIZE",
"LIMIT_MAX_FILES", "WARN_BLOCK_DEVICE", "WARN_CHARACTER_DEVICE",
"WARN_SETGID_SET", "WARN_SETUID_SET", "WARN_RELATIVE_LINKNAME",
"WARN_ABSOLUTE_LINKNAME", "WARN_DUPLICATE_NAME", "WARN_RELATIVE_NAME",
"WARN_ABSOLUTE_NAME", "DEFAULT_FORMAT", "open", "safe_open"]

#---------------------------------------------------------
# tar constants
Expand Down Expand Up @@ -143,6 +147,20 @@
"size": int
}

# SafeTarFile-related string constants.
WARN_ABSOLUTE_NAME = "absolute name"
WARN_RELATIVE_NAME = "relative name"
WARN_DUPLICATE_NAME = "duplicate name"
WARN_ABSOLUTE_LINKNAME = "absolute linkname"
WARN_RELATIVE_LINKNAME = "relative linkname"
WARN_SETUID_SET = "setuid set"
WARN_SETGID_SET = "setgid set"
WARN_CHARACTER_DEVICE = "character device"
WARN_BLOCK_DEVICE = "block device"

LIMIT_MAX_FILES = "file limit exceeded"
LIMIT_MAX_SIZE = "space limit exceeded"

#---------------------------------------------------------
# initialization
#---------------------------------------------------------
Expand Down Expand Up @@ -296,6 +314,19 @@ class InvalidHeaderError(HeaderError):
class SubsequentHeaderError(HeaderError):
"""Exception for missing and invalid extended headers."""
pass
class SecurityError(TarError):
"""Exception for potentially dangerous contents."""
def __init__(self, tarinfo, warning):
self.tarinfo = tarinfo
self.warning = warning
def __str__(self):
return "%s: %s" % (self.tarinfo, self.warning)
class LimitError(SecurityError):
"""Exception for an exceeded limit."""
def __init__(self, warning):
super().__init__(None, warning)
def __str__(self):
return self.warning

#---------------------------
# internal stream interface
Expand Down Expand Up @@ -2455,6 +2486,159 @@ def __exit__(self, type, value, traceback):
self.fileobj.close()
self.closed = True

class SafeTarFile(TarFile):
"""A subclass of TarFile that safeguards against malicious data.
"""

def __init__(self, *args, ignore_warnings=None,
max_files=100000, max_total=1024**3, **kwargs):
super().__init__(*args, **kwargs)

if ignore_warnings:
self.ignore_warnings = set(ignore_warnings)
else:
self.ignore_warnings = set()

self.max_files = max_files
self.max_total = max_total
self.symlink_effective_name_map = {}

def __iter__(self):
"""Safe iterator over the TarFile, that raises a SecurityError
exception on the first warning.
"""
for tarinfo, warnings in self.analyze():
if warnings:
raise SecurityError(tarinfo, warnings.pop())
yield tarinfo

def analyze(self):
"""Generate a list of (TarInfo, warnings) tuples.
"""
self.names = set()
self.total = 0

for tarinfo in super().__iter__():
warnings = set(self._check_member(tarinfo))
yield tarinfo, warnings - self.ignore_warnings

def filter(self):
"""Generate a list of good TarInfo objects.
"""
for tarinfo, warnings in self.analyze():
if warnings:
continue
yield tarinfo

def is_safe(self):
"""Return True if the archive should be safe to extract.
"""
try:
for tarinfo, warnings in self.analyze():
if warnings:
return False
else:
return True

except LimitError:
return False

def _check_member(self, tarinfo):
"""Check a single TarInfo object for problems. Override this in a
subclass if you want to add more checks.
"""
if self.max_files and len(self.members) == self.max_files:
raise LimitError(LIMIT_MAX_FILES)

self.total = tarinfo.size
if self.max_total and self.total > self.max_total:
raise LimitError(LIMIT_MAX_SIZE)

effective_name = self._get_effective_name(tarinfo.name)
if effective_name in self.symlink_effective_name_map:
del self.symlink_effective_name_map[effective_name]

yield from self._check_all(tarinfo, effective_name)

if tarinfo.issym():
effective_linkname = self._get_effective_name(tarinfo.linkname)
cwd = os.path.dirname(effective_name)
relative_effective_linkname = effective_linkname if (os.path.isabs(effective_linkname)) \
else os.path.relpath(effective_linkname, cwd)
self.symlink_effective_name_map[effective_name] = relative_effective_linkname
yield from self._check_symlink(effective_name, relative_effective_linkname)
elif tarinfo.islnk():
yield from self._check_link(tarinfo)
elif tarinfo.ischr() or tarinfo.isblk():
yield from self._check_device(tarinfo)

def _get_effective_name(self, given_name):
namelist = given_name.split("/")
if len(namelist) > 1:
effective_name = ""

for i in range(len(namelist)):
name = namelist[i]

if name == "":
effective_name += "/"
else:
effective_name += name

effective_name = os.path.normpath(effective_name)
if effective_name in self.symlink_effective_name_map:
effective_name = self.symlink_effective_name_map[effective_name]

if i < len(namelist) - 1 and effective_name[len(effective_name)-1] != "/":
effective_name += "/"

return effective_name
else:
return given_name

def _check_all(self, tarinfo, effective_name):
if os.path.isabs(effective_name):
yield WARN_ABSOLUTE_NAME

name = os.path.normpath(effective_name)
if name.startswith(".."):
yield WARN_RELATIVE_NAME

if effective_name in self.names:
yield WARN_DUPLICATE_NAME
else:
self.names.add(effective_name)

if tarinfo.isreg() and tarinfo.mode & stat.S_ISUID:
yield WARN_SETUID_SET

if tarinfo.isreg() and tarinfo.mode & stat.S_ISGID:
yield WARN_SETGID_SET

def _check_symlink(self, effective_name, effective_linkname):
if os.path.isabs(effective_linkname):
yield WARN_ABSOLUTE_LINKNAME

linkname = os.path.join(os.path.dirname(effective_name), effective_linkname)
linkname = os.path.normpath(linkname)

if linkname.startswith(".."):
yield WARN_RELATIVE_LINKNAME

def _check_link(self, tarinfo):
if os.path.isabs(tarinfo.linkname):
yield WARN_ABSOLUTE_LINKNAME

linkname = os.path.normpath(tarinfo.linkname)
if linkname.startswith(".."):
yield WARN_RELATIVE_LINKNAME

def _check_device(self, tarinfo):
if tarinfo.ischr():
yield WARN_CHARACTER_DEVICE
elif tarinfo.isblk():
yield WARN_BLOCK_DEVICE

#--------------------
# exported functions
#--------------------
Expand All @@ -2470,6 +2654,7 @@ def is_tarfile(name):
return False

open = TarFile.open
safe_open = SafeTarFile.open


def main():
Expand Down
Binary file added Lib/test/tarfiletestdata/sly_absolute0.tar
Binary file not shown.
Binary file added Lib/test/tarfiletestdata/sly_absolute1.tar
Binary file not shown.
Binary file added Lib/test/tarfiletestdata/sly_dirsymlink0.tar
Binary file not shown.
Binary file added Lib/test/tarfiletestdata/sly_dirsymlink1.tar
Binary file not shown.
Binary file added Lib/test/tarfiletestdata/sly_dirsymlink2.tar
Binary file not shown.
Binary file added Lib/test/tarfiletestdata/sly_dirsymlink3.tar
Binary file not shown.
Binary file added Lib/test/tarfiletestdata/sly_relative0.tar
Binary file not shown.
Binary file added Lib/test/tarfiletestdata/sly_relative1.tar
Binary file not shown.
Binary file added Lib/test/tarfiletestdata/sly_symlink.tar
Binary file not shown.
File renamed without changes.
Loading