-
Notifications
You must be signed in to change notification settings - Fork 531
/
Copy pathextractor.py
564 lines (510 loc) · 21.9 KB
/
extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
# Copyright (C) 2022 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Extraction of archives
"""
import os
import re
import shutil
import sys
import tarfile
import tempfile
from pathlib import Path
import filetype
import zstandard
from rpmfile.cli import main as rpmextract
from cve_bin_tool.async_utils import (
ChangeDirContext,
FileIO,
aio_glob,
aio_inpath,
aio_makedirs,
aio_mkdtemp,
aio_rmdir,
aio_run_command,
aio_unpack_archive,
async_wrap,
run_coroutine,
)
from .error_handler import (
ErrorHandler,
ErrorMode,
ExtractionFailed,
ExtractionToolNotFound,
UnknownArchiveType,
)
from .log import LOGGER
# Run rpmfile in a thread
rpmextract = async_wrap(rpmextract)
# Extractor dictionary keys
EXTENSIONS = "extensions"
MIMES = "mimes"
class BaseExtractor:
"""Extracts tar, rpm, etc. files"""
def __init__(self, logger=None, error_mode=ErrorMode.TruncTrace):
# Sets up logger and if we should extract files or just report
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
self.error_mode = error_mode
self.tempdir = None
# Adding filetype LZMA (see comments on line 438)
filetype.add_type(Lzma())
self.file_extractors = {
self.extract_file_tar: {
EXTENSIONS: [
".tgz",
".tar.gz",
".tar",
".tar.xz",
".tar.bz2",
".xz",
".bz2",
".gz",
],
MIMES: [
"application/x-tar",
"application/gzip",
],
},
self.extract_file_rpm: {EXTENSIONS: [".rpm"], MIMES: []},
self.extract_file_deb: {EXTENSIONS: [".deb", ".ipk"], MIMES: []},
self.extract_file_cab: {EXTENSIONS: [".cab"], MIMES: []},
self.extract_file_apk: {EXTENSIONS: [".apk"], MIMES: []},
self.extract_file_zst: {EXTENSIONS: [".zst"], MIMES: []},
self.extract_file_pkg: {EXTENSIONS: [".pkg"], MIMES: []},
self.extract_file_zip: {
EXTENSIONS: [
".exe",
".zip",
".jar",
".msi",
".egg",
".whl",
".war",
".ear",
".aar",
],
MIMES: [
"application/x-msdownload",
"application/x-7z-compressed",
"application/x-lzip",
"application/lzma",
],
},
}
def can_extract(self, filename):
"""Check if the filename is something we know how to extract"""
# Do not try to extract symlinks
try:
if Path(filename).is_symlink():
return False
except PermissionError:
return False
for ext in self.file_extractors:
if Path(filename).suffix in self.file_extractors[ext][EXTENSIONS]:
return True
if os.path.isfile(filename):
try:
guess = filetype.guess(filename)
except PermissionError:
return False
for ext in self.file_extractors:
if guess is not None and guess.MIME in self.file_extractors[ext][MIMES]:
return True
return False
def tar_member_filter(self, members, extraction_path):
"""Generator function to serve as a backported filter for tarfile extraction
based on https://docs.python.org/3/library/tarfile.html#examples
"""
for tarmember in members:
if tarmember.isfile() and str(
Path(extraction_path, tarmember.name).resolve()
).startsWith(extraction_path):
yield tarmember
async def extract_file_tar(self, filename, extraction_path):
"""Extract tar files"""
# make sure we have full path for later checks
extraction_path = str(Path(extraction_path).resolve())
with ErrorHandler(mode=ErrorMode.Ignore) as e:
# Python 3.12 has a data filter we can use in extract
# tarfile has this available in older versions as well
if hasattr(tarfile, "data_filter"):
with tarfile.open(filename) as tar:
tar.extractall(path=extraction_path, filter="data") # nosec
# nosec line because bandit doesn't understand filters yet
elif sys.platform == "win32":
# Windows users must use python 3.12 or later because the
# workaround below fails on windows
# Patches welcome if you can fix this!
self.logger.error(
"Install python 3.12 or later to support tarfile extraction"
)
return ExtractionToolNotFound
# Some versions may need us to implement a filter to avoid unsafe behaviour
# we could consider logging a warning here
else:
with tarfile.open(filename) as tar:
tar.extractall(
path=extraction_path,
members=self.tar_member_filter(tar, extraction_path),
) # nosec
return e.exit_code
async def extract_file_rpm(self, filename, extraction_path):
"""Extract rpm packages"""
extraction_path_pathlib = Path(extraction_path)
if sys.platform.startswith("linux"):
if not await aio_inpath("rpm2cpio") or not await aio_inpath("cpio"):
await rpmextract("-xC", extraction_path, filename)
else:
stdout, stderr, _ = await aio_run_command(["rpm2cpio", filename])
if stderr or not stdout:
return 1
cpio_path = str(extraction_path_pathlib / "data.cpio")
async with FileIO(cpio_path, "wb") as f:
await f.write(stdout)
stdout, stderr, _ = await aio_run_command(
["cpio", "-idm", "--file", cpio_path]
)
if stdout or not stderr:
return 1
else:
if not await aio_inpath("7z"):
with ErrorHandler(mode=self.error_mode, logger=self.logger):
# ExtractionToolNotFound
self.logger.error(f"No extraction tool found for {filename}")
self.logger.error("rpm2cpio or 7z can be used to extract rpm files")
else:
stdout, stderr, _ = await aio_run_command(["7z", "x", filename])
if stderr or not stdout:
return 1
filenames = await aio_glob(str(extraction_path_pathlib / "*.cpio"))
if not filenames:
filenames = await aio_glob(
str(extraction_path_pathlib / "*.cpio.zstd")
)
filename = filenames[0]
exit_code = await self.extract_file_zst(filename, extraction_path)
if exit_code:
return 1
filenames = await aio_glob(str(extraction_path_pathlib / "*.cpio"))
filename = filenames[0]
stdout, stderr, _ = await aio_run_command(["7z", "x", filename])
if stderr or not stdout:
return 1
return 0
async def extract_file_zst(self, filename: str, extraction_path: str) -> int:
"""Extract zstd compressed files"""
dctx = zstandard.ZstdDecompressor()
with ErrorHandler(mode=ErrorMode.Ignore) as e:
if filename.endswith(".cpio.zstd"):
with open(filename, "rb") as compressed:
output_path = Path(extraction_path) / Path(filename).stem
with open(output_path, "wb") as destination:
dctx.copy_stream(compressed, destination)
else:
# assume it's a tar.zstd so use tar with unzstd
if await aio_inpath("tar"):
stdout, stderr, _ = await aio_run_command(
["tar", "--use-compress-program=unzstd", "-xvf", filename]
)
# Assume anything in stderr is bad
if stderr:
return 1
elif await aio_inpath("7z"):
stdout, stderr, _ = await aio_run_command(["7z", "x", filename])
if stderr:
return 1
else:
# ExtractionToolNotFound
self.logger.error(f"No extraction tool found for {filename}")
self.logger.error(
"tar or 7zip-zstd is required to extract tar.zstd files"
)
return e.exit_code
async def extract_file_pkg(self, filename: str, extraction_path: str) -> int:
"""Extract pkg files"""
async def _extract_through_7z() -> int:
"""Extract file using `7z`"""
temp = str(Path(self.tempdir) / Path(filename).stem)
stdout, stderr, _ = await aio_run_command(
["7z", "x", filename, f"-o{self.tempdir}"]
)
stdout, stderr, _ = await aio_run_command(
["7z", "x", temp, f"-o{extraction_path}"]
)
if not stdout:
return 1
return 0
if sys.platform.startswith("win"):
if await aio_inpath("7z"):
return await _extract_through_7z()
# Tarfile wasn't used here because it can't open [.pkg] files directly
# and failed to manage distinct compression types in different versions of FreeBSD packages.
# Reference: https://github.com/intel/cve-bin-tool/pull/1580#discussion_r829346602
if await aio_inpath("tar"):
stdout, stderr, return_code = await aio_run_command(
["tar", "xf", filename, "-C", extraction_path]
)
if (stderr or not stdout) and return_code != 0:
return 1
return 0
if await aio_inpath("7z"):
return await _extract_through_7z()
return 1
async def extract_file_deb(self, filename, extraction_path):
"""Extract debian packages"""
is_ar = True
is_zst = False
process_can_fail = True
if await aio_inpath("file"):
stdout, stderr, return_code = await aio_run_command(
["file", filename], process_can_fail
)
if not re.search(b"Debian binary package", stdout):
is_ar = False
if re.search(b"data compression zst", stdout):
is_zst = True
if is_ar:
if not await aio_inpath("ar"):
with ErrorHandler(mode=self.error_mode, logger=self.logger):
# ExtractionToolNotFound
self.logger.error(f"No extraction tool found for {filename}")
self.logger.error("'ar' is required to extract deb files")
else:
stdout, stderr, _ = await aio_run_command(["ar", "x", filename])
if stderr:
return 1
else:
self.logger.debug(f"Extracting {filename} as a tar.gzip file")
with ErrorHandler(mode=ErrorMode.Ignore) as e:
await aio_unpack_archive(filename, extraction_path, format="gztar")
datafile = await aio_glob(str(Path(extraction_path) / "data.tar.*"))
if is_zst:
return await self.extract_file_zst(datafile[0], extraction_path)
else:
with ErrorHandler(mode=ErrorMode.Ignore) as e:
await aio_unpack_archive(datafile[0], extraction_path)
return e.exit_code
async def extract_file_apk(self, filename, extraction_path):
"""Check whether it is alpine or android package"""
is_tar = True
process_can_fail = True
if await aio_inpath("unzip"):
stdout, stderr, return_code = await aio_run_command(
["unzip", "-l", filename], process_can_fail
)
if return_code == 0:
is_tar = False
elif await aio_inpath("7z"):
stdout, stderr, return_code = await aio_run_command(
["7z", "t", filename], process_can_fail
)
if re.search(b"Type = Zip", stdout):
is_tar = False
elif await aio_inpath("zipinfo"):
stdout, stderr, return_code = await aio_run_command(
["zipinfo", filename], process_can_fail
)
if return_code == 0:
is_tar = False
elif await aio_inpath("file"):
stdout, stderr, return_code = await aio_run_command(
["file", filename], process_can_fail
)
if re.search(b"Zip archive data", stdout):
is_tar = False
if is_tar:
self.logger.debug(f"Extracting {filename} as a tar.gzip file")
with ErrorHandler(mode=ErrorMode.Ignore) as e:
await aio_unpack_archive(filename, extraction_path, format="gztar")
return e.exit_code
else:
return await self.extract_file_zip(filename, extraction_path)
async def extract_file_cab(self, filename, extraction_path):
"""Extract cab files"""
if sys.platform.startswith("linux"):
if not await aio_inpath("cabextract"):
with ErrorHandler(mode=self.error_mode, logger=self.logger):
# ExtractionToolNotFound
self.logger.error(f"No extraction tool found for {filename}")
self.logger.error("'cabextract' is required to extract cab files")
else:
stdout, stderr, _ = await aio_run_command(
["cabextract", "-d", extraction_path, filename]
)
if stderr or not stdout:
return 1
else:
if not await aio_inpath("Expand"):
with ErrorHandler(mode=self.error_mode, logger=self.logger):
# ExtractionToolNotFound
self.logger.error(f"No extraction tool found for {filename}")
self.logger.error("'Expand' is required to extract cab files")
else:
stdout, stderr, _ = await aio_run_command(
["Expand", filename, "-R -F:*", extraction_path]
)
if stderr or not stdout:
return 1
return 0
@staticmethod
async def extract_file_zip(filename, extraction_path, process_can_fail=True):
"""Extracts ZIP files using an invalid key to prevent
freezing during extraction if they are password protected.
Providing a key during extraction has no effect if the zip file is
not password protected and extraction will happen as normal."""
is_exe = filename.endswith(".exe")
key = "StaticInvalidKey"
if await aio_inpath("unzip"):
stdout, stderr, _ = await aio_run_command(
["unzip", "-P", key, "-n", "-d", extraction_path, filename],
process_can_fail,
)
if stderr:
if "incorrect password" in stderr.decode():
LOGGER.error(
f"Failed to extract {filename}: The file is password protected"
)
return 0
if is_exe:
return 0 # not all .exe files are zipfiles, no need for error
return 1
elif await aio_inpath("7z"):
stdout, stderr, _ = await aio_run_command(
["7z", "x", f"-p{key}", filename], process_can_fail
)
if stderr or not stdout:
if "Wrong password" in stderr.decode():
LOGGER.error(
f"Failed to extract {filename}: The file is password protected"
)
return 0
if is_exe:
return 0 # not all .exe files are zipfiles, no need for error
return 1
else:
with ErrorHandler(mode=ErrorMode.Ignore) as e:
await aio_unpack_archive(filename, extraction_path)
return e.exit_code
return 0
class TempDirExtractorContext(BaseExtractor):
"""Extracts tar, rpm, etc. files"""
def __init__(self, raise_failure=False, *args, **kwargs):
super().__init__(*args, **kwargs)
self.raise_failure = raise_failure
async def aio_extract(self, filename):
"""Run the extractor"""
filename_pathlib = Path(filename)
# Resolve path in case of cwd change
filename = str(filename_pathlib.resolve())
for extractor in self.file_extractors:
for extension in self.file_extractors[extractor][EXTENSIONS]:
if filename.endswith(extension):
extracted_path = str(
Path(self.tempdir) / f"{filename_pathlib.name}.extracted"
)
if Path(extracted_path).exists():
await aio_rmdir(extracted_path)
await aio_makedirs(extracted_path, 0o700)
async with ChangeDirContext(extracted_path):
if await extractor(filename, extracted_path) != 0:
if self.raise_failure:
with ErrorHandler(
mode=self.error_mode, logger=self.logger
):
raise ExtractionFailed(filename)
else:
self.logger.warning(f"Failure extracting {filename}")
else:
self.logger.debug(
f"Extracted {filename} to {extracted_path}"
)
return extracted_path
guess = filetype.guess(filename)
if (
guess is not None
and guess.MIME in self.file_extractors[extractor][MIMES]
):
extracted_path = str(
Path(self.tempdir) / f"{filename_pathlib.name}.extracted"
)
if Path(extracted_path).exists():
await aio_rmdir(extracted_path)
await aio_makedirs(extracted_path, 0o700)
async with ChangeDirContext(extracted_path):
if await extractor(filename, extracted_path) != 0:
if self.raise_failure:
with ErrorHandler(mode=self.error_mode, logger=self.logger):
raise ExtractionFailed(filename)
else:
self.logger.warning(f"Failure extracting {filename}")
else:
self.logger.debug(f"Extracted {filename} to {extracted_path}")
return extracted_path
with ErrorHandler(mode=self.error_mode, logger=self.logger):
raise UnknownArchiveType(filename)
async def __aenter__(self):
"""Create a temporary directory to extract files to."""
self.tempdir = await aio_mkdtemp(prefix="cve-bin-tool-")
return self
async def __aexit__(self, exc_type, exc, exc_tb):
"""Removes all extraction directories that need to be cleaned up."""
# removing directory can raise exception so wrap it around ErrorHandler.
with ErrorHandler(mode=self.error_mode, logger=self.logger):
await aio_rmdir(self.tempdir)
def extract(self, filename):
"""
Run the extractor.
Args:
filename (str): The name of the file to extract.
Returns:
str: The path to the extracted files.
"""
return run_coroutine(self.aio_extract(filename))
def __enter__(self):
"""
Create a temporary directory to extract files to.
Returns:
TempDirExtractorContext: The current instance with a temporary directory created.
"""
self.tempdir = tempfile.mkdtemp(prefix="cve-bin-tool-")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Remove all extraction directories that need to be cleaned up.
Args:
exc_type (type): The exception type.
exc_val (Exception): The exception instance.
exc_tb (traceback): The traceback object.
Returns:
None
"""
with ErrorHandler(mode=self.error_mode, logger=self.logger):
shutil.rmtree(self.tempdir)
# Creating type LZMA for binary recognition and extraction because cve-bin-tool encounters extraction failure for this filetype
# Using python library filetype defined at https://github.com/h2non/filetype.py
# Following pattern of type creation according to examples in https://github.com/h2non/filetype.py/tree/master/filetype/types
# Adding type LZMA on line 54
class Lzma(filetype.Type):
"""Implements the lzma compression type matcher."""
MIME = "application/lzma"
EXTENSION = "lzma"
def __init__(self):
super().__init__(mime=Lzma.MIME, extension=Lzma.EXTENSION)
def match(self, buf):
"""
Check if the buffer matches the LZMA file signature.
Args:
buf (bytes): The buffer to check.
Returns:
bool: True if the buffer matches the LZMA file signature, False otherwise.
"""
return (
len(buf) > 3
and buf[0] == 0x5D
and buf[1] == 0x00
and buf[2] == 0x00
and buf[3] == 0x00
)
def Extractor(*args, **kwargs):
"""Provides a context which extraction is done in"""
return TempDirExtractorContext(*args, **kwargs)