diff --git a/pipenv/patched/README.md b/pipenv/patched/README.md index 1ec7d90023..7e660dedcf 100644 --- a/pipenv/patched/README.md +++ b/pipenv/patched/README.md @@ -3,8 +3,6 @@ - Pip is modified, to make it work with Pipenv's custom virtualenv locations. - Pip is modified, to make it work with pip-tool modifications. - Pip is modified, to make it resolve deep extras links. -- Safety is hacked together to always work on any system. -- TOML libraries are upgraded to... work. Don't touch. diff --git a/pipenv/patched/patched.txt b/pipenv/patched/patched.txt index 369c758520..662f25f211 100644 --- a/pipenv/patched/patched.txt +++ b/pipenv/patched/patched.txt @@ -1,2 +1 @@ pip==24.3.1 -safety==2.3.2 diff --git a/pipenv/patched/safety/LICENSE b/pipenv/patched/safety/LICENSE deleted file mode 100644 index 55a1eb037e..0000000000 --- a/pipenv/patched/safety/LICENSE +++ /dev/null @@ -1,11 +0,0 @@ - -MIT License - -Copyright (c) 2016, pyup.io - -Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - diff --git a/pipenv/patched/safety/VERSION b/pipenv/patched/safety/VERSION deleted file mode 100644 index f90b1afc08..0000000000 --- a/pipenv/patched/safety/VERSION +++ /dev/null @@ -1 +0,0 @@ -2.3.2 diff --git a/pipenv/patched/safety/__init__.py b/pipenv/patched/safety/__init__.py deleted file mode 100644 index da2f5d50d9..0000000000 --- a/pipenv/patched/safety/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -# -*- coding: utf-8 -*- - -__author__ = """pyup.io""" -__email__ = 'support@pyup.io' - -import os - -ROOT = os.path.dirname(os.path.abspath(__file__)) - -with open(os.path.join(ROOT, 'VERSION')) as version_file: - VERSION = version_file.read().strip() diff --git a/pipenv/patched/safety/__main__.py b/pipenv/patched/safety/__main__.py deleted file mode 100644 index be36e88b5e..0000000000 --- a/pipenv/patched/safety/__main__.py +++ /dev/null @@ -1,8 +0,0 @@ -"""Allow safety to be executable through `python -m safety`.""" -from __future__ import absolute_import - -from pipenv.patched.safety.cli import cli - - -if __name__ == "__main__": # pragma: no cover - cli(prog_name="safety") diff --git a/pipenv/patched/safety/alerts/__init__.py b/pipenv/patched/safety/alerts/__init__.py deleted file mode 100644 index 787035f131..0000000000 --- a/pipenv/patched/safety/alerts/__init__.py +++ /dev/null @@ -1,42 +0,0 @@ -import sys -import json -from typing import Any -import pipenv.vendor.click as click - -from dataclasses import dataclass - -from . import github -from pipenv.patched.safety.util import SafetyPolicyFile - -@dataclass -class Alert: - report: Any - key: str - policy: Any = None - requirements_files: Any = None - -@click.group(help="Send alerts based on the results of a Safety scan.") -@click.option('--check-report', help='JSON output of Safety Check to work with.', type=click.File('r'), default=sys.stdin) -@click.option("--policy-file", type=SafetyPolicyFile(), default='.safety-policy.yml', - help="Define the policy file to be used") -@click.option("--key", envvar="SAFETY_API_KEY", - help="API Key for pyup.io's vulnerability database. Can be set as SAFETY_API_KEY " - "environment variable.", required=True) -@click.pass_context -def alert(ctx, check_report, policy_file, key): - with check_report: - # TODO: This breaks --help for subcommands - try: - safety_report = json.load(check_report) - except json.decoder.JSONDecodeError as e: - click.secho("Error decoding input JSON: {}".format(e.msg), fg='red') - sys.exit(1) - - if not 'report_meta' in safety_report: - click.secho("You must pass in a valid Safety Check JSON report", fg='red') - sys.exit(1) - - ctx.obj = Alert(report=safety_report, policy=policy_file if policy_file else {}, key=key) - -alert.add_command(github.github_pr) -alert.add_command(github.github_issue) diff --git a/pipenv/patched/safety/alerts/github.py b/pipenv/patched/safety/alerts/github.py deleted file mode 100644 index 7ee5ba0416..0000000000 --- a/pipenv/patched/safety/alerts/github.py +++ /dev/null @@ -1,298 +0,0 @@ -import re -import sys - -import pipenv.vendor.click as click - -try: - import github as pygithub -except ImportError: - pygithub = None - -from . import utils, requirements - -def create_branch(repo, base_branch, new_branch): - ref = repo.get_git_ref("heads/" + base_branch) - repo.create_git_ref(ref="refs/heads/" + new_branch, sha=ref.object.sha) - -def delete_branch(repo, branch): - ref = repo.get_git_ref(f"heads/{branch}") - ref.delete() - -@click.command() -@click.option('--repo', help='GitHub standard repo path (eg, my-org/my-project)') -@click.option('--token', help='GitHub Access Token') -@click.option('--base-url', help='Optional custom Base URL, if you\'re using GitHub enterprise', default=None) -@click.pass_obj -@utils.require_files_report -def github_pr(obj, repo, token, base_url): - """ - Create a GitHub PR to fix any vulnerabilities using PyUp's remediation data. - - Normally, this is run by a GitHub action. If you're running this manually, ensure that your local repo is up to date and on HEAD - otherwise you'll see strange results. - """ - if pygithub is None: - click.secho("pygithub is not installed. Did you install Safety with GitHub support? Try pip install safety[github]", fg='red') - sys.exit(1) - - # TODO: Improve access to our config in future. - branch_prefix = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('branch-prefix', 'pyup/') - pr_prefix = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('pr-prefix', '[PyUp] ') - assignees = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('assignees', []) - labels = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('labels', ['security']) - label_severity = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('label-severity', True) - ignore_cvss_severity_below = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('ignore-cvss-severity-below', 0) - ignore_cvss_unknown_severity = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('ignore-cvss-unknown-severity', False) - - gh = pygithub.Github(token, **({"base_url": base_url} if base_url else {})) - repo = gh.get_repo(repo) - try: - self_user = gh.get_user().login - except pygithub.GithubException: - # If we're using a token from an action (or integration) we can't call `get_user()`. Fall back - # to assuming we're running under an action - self_user = "web-flow" - - pulls = repo.get_pulls(state='open', sort='created', base=repo.default_branch) - pending_updates = set(obj.report['remediations'].keys()) - - # TODO: Refactor this loop into a fn to iterate over remediations nicely - for name, contents in obj.requirements_files.items(): - raw_contents = contents - contents = contents.decode('utf-8') # TODO - encoding? - parsed_req_file = requirements.RequirementFile(name, contents) - for pkg, remediation in obj.report['remediations'].items(): - if remediation['recommended_version'] is None: - print(f"The GitHub PR alerter only currently supports remediations that have a recommended_version: {pkg}") - continue - - # We have a single remediation that can have multiple vulnerabilities - vulns = [x for x in obj.report['vulnerabilities'] if x['package_name'] == pkg and x['analyzed_version'] == remediation['current_version']] - - if ignore_cvss_unknown_severity and all(x['severity'] is None for x in vulns): - print("All vulnerabilities have unknown severity, and ignore_cvss_unknown_severity is set.") - continue - - highest_base_score = 0 - for vuln in vulns: - if vuln['severity'] is not None: - highest_base_score = max(highest_base_score, (vuln['severity'].get('cvssv3', {}) or {}).get('base_score', 10)) - - if ignore_cvss_severity_below: - at_least_one_match = False - for vuln in vulns: - # Consider a None severity as a match, since it's controlled by a different flag - # If we can't find a base_score but we have severity data, assume it's critical for now. - if vuln['severity'] is None or (vuln['severity'].get('cvssv3', {}) or {}).get('base_score', 10) >= ignore_cvss_severity_below: - at_least_one_match = True - - if not at_least_one_match: - print(f"None of the vulnerabilities found have a score greater than or equal to the ignore_cvss_severity_below of {ignore_cvss_severity_below}") - continue - - for parsed_req in parsed_req_file.requirements: - if parsed_req.name == pkg: - updated_contents = parsed_req.update_version(contents, remediation['recommended_version']) - pending_updates.discard(pkg) - - new_branch = branch_prefix + utils.generate_branch_name(pkg, remediation) - skip_create = False - - # Few possible cases: - # 1. No existing PRs exist for this change (don't need to handle) - # 2. An existing PR exists, and it's out of date (eg, recommended 0.5.1 and we want 0.5.2) - # 3. An existing PR exists, and it's not mergable anymore (eg, needs a rebase) - # 4. An existing PR exists, and everything's up to date. - # 5. An existing PR exists, but it's not needed anymore (perhaps we've been updated to a later version) - # 6. No existing PRs exist, but a branch does exist (perhaps the PR was closed but a stale branch left behind) - # In any case, we only act if we've been the only committer to the branch. - for pr in pulls: - if not pr.head.ref.startswith(branch_prefix): - continue - - authors = [commit.committer.login for commit in pr.get_commits()] - only_us = all([x == self_user for x in authors]) - - try: - _, pr_pkg, pr_ver = pr.head.ref.split('/') - except ValueError: - # It's possible that something weird has manually been done, so skip that - print('Found an invalid branch name on an open PR, that matches our prefix. Skipping.') - continue - - if pr_pkg != pkg: - continue - - # Case 4 - if pr_pkg == pkg and pr_ver == remediation['recommended_version'] and pr.mergeable: - print(f"An up to date PR #{pr.number} for {pkg} was found, no action will be taken.") - - skip_create = True - continue - - if not only_us: - print(f"There are other committers on the PR #{pr.number} for {pkg}. No further action will be taken.") - continue - - # Case 2 - if pr_pkg == pkg and pr_ver != remediation['recommended_version']: - print(f"Closing stale PR #{pr.number} for {pkg} as a newer recommended version became") - - pr.create_issue_comment("This PR has been replaced, since a newer recommended version became available.") - pr.edit(state='closed') - delete_branch(repo, pr.head.ref) - - # Case 3 - if not pr.mergeable: - print(f"Closing PR #{pr.number} for {pkg} as it has become unmergable and we were the only committer") - - pr.create_issue_comment("This PR has been replaced since it became unmergable.") - pr.edit(state='closed') - delete_branch(repo, pr.head.ref) - - if updated_contents == contents: - print(f"Couldn't update {pkg} to {remediation['recommended_version']}") - continue - - if skip_create: - continue - - try: - create_branch(repo, repo.default_branch, new_branch) - except pygithub.GithubException as e: - if e.data['message'] == "Reference already exists": - # There might be a stale branch. If the bot is the only committer, nuke it. - comparison = repo.compare(repo.default_branch, new_branch) - authors = [commit.committer.login for commit in comparison.commits] - only_us = all([x == self_user for x in authors]) - - if only_us: - delete_branch(repo, new_branch) - create_branch(repo, repo.default_branch, new_branch) - else: - print(f"The branch '{new_branch}' already exists - but there is no matching PR and this branch has committers other than us. This remediation will be skipped.") - continue - else: - raise e - - try: - repo.update_file( - path=name, - message=utils.generate_commit_message(pkg, remediation), - content=updated_contents, - branch=new_branch, - sha=utils.git_sha1(raw_contents) - ) - except pygithub.GithubException as e: - if "does not match" in e.data['message']: - click.secho(f"GitHub blocked a commit on our branch to the requirements file, {name}, as the local hash we computed didn't match the version on {repo.default_branch}. Make sure you're running safety against the latest code on your default branch.", fg='red') - continue - else: - raise e - - pr = repo.create_pull(title=pr_prefix + utils.generate_title(pkg, remediation, vulns), body=utils.generate_body(pkg, remediation, vulns, api_key=obj.key), head=new_branch, base=repo.default_branch) - print(f"Created Pull Request to update {pkg}") - - for assignee in assignees: - pr.add_to_assignees(assignee) - - for label in labels: - pr.add_to_labels(label) - - if label_severity: - score_as_label = utils.cvss3_score_to_label(highest_base_score) - if score_as_label: - pr.add_to_labels(score_as_label) - - if len(pending_updates) > 0: - click.secho("The following remediations were not followed: {}".format(', '.join(pending_updates)), fg='red') - -@click.command() -@click.option('--repo', help='GitHub standard repo path (eg, my-org/my-project)') -@click.option('--token', help='GitHub Access Token') -@click.option('--base-url', help='Optional custom Base URL, if you\'re using GitHub enterprise', default=None) -@click.pass_obj -@utils.require_files_report # TODO: For now, it can be removed in the future to support env scans. -def github_issue(obj, repo, token, base_url): - """ - Create a GitHub Issue for any vulnerabilities found using PyUp's remediation data. - - Normally, this is run by a GitHub action. If you're running this manually, ensure that your local repo is up to date and on HEAD - otherwise you'll see strange results. - """ - if pygithub is None: - click.secho("pygithub is not installed. Did you install Safety with GitHub support? Try pip install safety[github]", fg='red') - sys.exit(1) - - # TODO: Improve access to our config in future. - issue_prefix = obj.policy.get('alert', {}).get('security', {}).get('github-issue', {}).get('issue-prefix', '[PyUp] ') - assignees = obj.policy.get('alert', {}).get('security', {}).get('github-issue', {}).get('assignees', []) - labels = obj.policy.get('alert', {}).get('security', {}).get('github-issue', {}).get('labels', ['security']) - label_severity = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('label-severity', True) - ignore_cvss_severity_below = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('ignore-cvss-severity-below', 0) - ignore_cvss_unknown_severity = obj.policy.get('alert', {}).get('security', {}).get('github-pr', {}).get('ignore-cvss-unknown-severity', False) - - gh = pygithub.Github(token, **({"base_url": base_url} if base_url else {})) - repo = gh.get_repo(repo) - - issues = list(repo.get_issues(state='open', sort='created')) - ISSUE_TITLE_REGEX = re.escape(issue_prefix) + r"Security Vulnerability in (.+)" - - for name, contents in obj.requirements_files.items(): - raw_contents = contents - contents = contents.decode('utf-8') # TODO - encoding? - parsed_req_file = requirements.RequirementFile(name, contents) - for pkg, remediation in obj.report['remediations'].items(): - if remediation['recommended_version'] is None: - print(f"The GitHub Issue alerter only currently supports remediations that have a recommended_version: {pkg}") - continue - - # We have a single remediation that can have multiple vulnerabilities - vulns = [x for x in obj.report['vulnerabilities'] if x['package_name'] == pkg and x['analyzed_version'] == remediation['current_version']] - - if ignore_cvss_unknown_severity and all(x['severity'] is None for x in vulns): - print("All vulnerabilities have unknown severity, and ignore_cvss_unknown_severity is set.") - continue - - highest_base_score = 0 - for vuln in vulns: - if vuln['severity'] is not None: - highest_base_score = max(highest_base_score, (vuln['severity'].get('cvssv3', {}) or {}).get('base_score', 10)) - - if ignore_cvss_severity_below: - at_least_one_match = False - for vuln in vulns: - # Consider a None severity as a match, since it's controlled by a different flag - # If we can't find a base_score but we have severity data, assume it's critical for now. - if vuln['severity'] is None or (vuln['severity'].get('cvssv3', {}) or {}).get('base_score', 10) >= ignore_cvss_severity_below: - at_least_one_match = True - - if not at_least_one_match: - print(f"None of the vulnerabilities found have a score greater than or equal to the ignore_cvss_severity_below of {ignore_cvss_severity_below}") - continue - - for parsed_req in parsed_req_file.requirements: - if parsed_req.name == pkg: - skip = False - for issue in issues: - match = re.match(ISSUE_TITLE_REGEX, issue.title) - if match: - if match.group(1) == pkg: - skip = True - - # For now, we just skip issues if they already exist - we don't try and update them. - if skip: - print(f"An issue already exists for {pkg} - skipping") - continue - - pr = repo.create_issue(title=issue_prefix + utils.generate_issue_title(pkg, remediation), body=utils.generate_issue_body(pkg, remediation, vulns, api_key=obj.key)) - print(f"Created issue to update {pkg}") - - for assignee in assignees: - pr.add_to_assignees(assignee) - - for label in labels: - pr.add_to_labels(label) - - if label_severity: - score_as_label = utils.cvss3_score_to_label(highest_base_score) - if score_as_label: - pr.add_to_labels(score_as_label) diff --git a/pipenv/patched/safety/alerts/requirements.py b/pipenv/patched/safety/alerts/requirements.py deleted file mode 100644 index 0a6fda199b..0000000000 --- a/pipenv/patched/safety/alerts/requirements.py +++ /dev/null @@ -1,339 +0,0 @@ -from __future__ import unicode_literals - -from pipenv.vendor.packaging.version import parse as parse_version -from pipenv.vendor.packaging.specifiers import SpecifierSet -import pipenv.patched.pip._vendor.requests as requests - -from datetime import datetime -from pipenv.vendor.dparse import parse, parser, updater, filetypes -from pipenv.vendor.dparse.dependencies import Dependency -from pipenv.vendor.dparse.parser import setuptools_parse_requirements_backport as parse_requirements - - -class RequirementFile(object): - def __init__(self, path, content, sha=None): - self.path = path - self.content = content - self.sha = sha - self._requirements = None - self._other_files = None - self._is_valid = None - self.is_pipfile = False - self.is_pipfile_lock = False - self.is_setup_cfg = False - - def __str__(self): - return "RequirementFile(path='{path}', sha='{sha}', content='{content}')".format( - path=self.path, - content=self.content[:30] + "[truncated]" if len(self.content) > 30 else self.content, - sha=self.sha - ) - - @property - def is_valid(self): - if self._is_valid is None: - self._parse() - return self._is_valid - - @property - def requirements(self): - if not self._requirements: - self._parse() - return self._requirements - - @property - def other_files(self): - if not self._other_files: - self._parse() - return self._other_files - - @staticmethod - def parse_index_server(line): - return parser.Parser.parse_index_server(line) - - def _hash_parser(self, line): - return parser.Parser.parse_hashes(line) - - def _parse_requirements_txt(self): - self.parse_dependencies(filetypes.requirements_txt) - - def _parse_conda_yml(self): - self.parse_dependencies(filetypes.conda_yml) - - def _parse_tox_ini(self): - self.parse_dependencies(filetypes.tox_ini) - - def _parse_pipfile(self): - self.parse_dependencies(filetypes.pipfile) - self.is_pipfile = True - - def _parse_pipfile_lock(self): - self.parse_dependencies(filetypes.pipfile_lock) - self.is_pipfile_lock = True - - def _parse_setup_cfg(self): - self.parse_dependencies(filetypes.setup_cfg) - self.is_setup_cfg = True - - def _parse(self): - self._requirements, self._other_files = [], [] - if self.path.endswith('.yml') or self.path.endswith(".yaml"): - self._parse_conda_yml() - elif self.path.endswith('.ini'): - self._parse_tox_ini() - elif self.path.endswith("Pipfile"): - self._parse_pipfile() - elif self.path.endswith("Pipfile.lock"): - self._parse_pipfile_lock() - elif self.path.endswith('setup.cfg'): - self._parse_setup_cfg() - else: - self._parse_requirements_txt() - self._is_valid = len(self._requirements) > 0 or len(self._other_files) > 0 - - def parse_dependencies(self, file_type): - result = parse( - self.content, - path=self.path, - sha=self.sha, - file_type=file_type, - marker=( - ("pyup: ignore file", "pyup:ignore file"), # file marker - ("pyup: ignore", "pyup:ignore"), # line marker - ) - ) - for dep in result.dependencies: - req = Requirement( - name=dep.name, - specs=dep.specs, - line=dep.line, - lineno=dep.line_numbers[0] if dep.line_numbers else 0, - extras=dep.extras, - file_type=file_type, - ) - req.index_server = dep.index_server - if self.is_pipfile: - req.pipfile = self.path - req.hashes = dep.hashes - self._requirements.append(req) - self._other_files = result.resolved_files - - def iter_lines(self, lineno=0): - for line in self.content.splitlines()[lineno:]: - yield line - - @classmethod - def resolve_file(cls, file_path, line): - return parser.Parser.resolve_file(file_path, line) - - -class Requirement(object): - def __init__(self, name, specs, line, lineno, extras, file_type): - self.name = name - self.key = name.lower() - self.specs = specs - self.line = line - self.lineno = lineno - self.index_server = None - self.extras = extras - self.hashes = [] - self.file_type = file_type - self.pipfile = None - - self.hashCmp = ( - self.key, - self.specs, - frozenset(self.extras), - ) - - self._is_insecure = None - self._changelog = None - - if len(self.specs._specs) == 1 and next(iter(self.specs._specs))._spec[0] == "~=": - # convert compatible releases to something more easily consumed, - # e.g. '~=1.2.3' is equivalent to '>=1.2.3,<1.3.0', while '~=1.2' - # is equivalent to '>=1.2,<2.0' - min_version = next(iter(self.specs._specs))._spec[1] - max_version = list(parse_version(min_version).release) - max_version[-1] = 0 - max_version[-2] = max_version[-2] + 1 - max_version = '.'.join(str(x) for x in max_version) - - self.specs = SpecifierSet('>=%s,<%s' % (min_version, max_version)) - - def __eq__(self, other): - return ( - isinstance(other, Requirement) and - self.hashCmp == other.hashCmp - ) - - def __ne__(self, other): - return not self == other - - def __str__(self): - return "Requirement.parse({line}, {lineno})".format(line=self.line, lineno=self.lineno) - - def __repr__(self): - return self.__str__() - - @property - def is_pinned(self): - if len(self.specs._specs) == 1 and next(iter(self.specs._specs))._spec[0] == "==": - return True - return False - - @property - def is_open_ranged(self): - if len(self.specs._specs) == 1 and next(iter(self.specs._specs))._spec[0] == ">=": - return True - return False - - @property - def is_ranged(self): - return len(self.specs._specs) >= 1 and not self.is_pinned - - @property - def is_loose(self): - return len(self.specs._specs) == 0 - - @staticmethod - def convert_semver(version): - semver = {'major': 0, "minor": 0, "patch": 0} - version = version.split(".") - # don't be overly clever here. repitition makes it more readable and works exactly how - # it is supposed to - try: - semver['major'] = int(version[0]) - semver['minor'] = int(version[1]) - semver['patch'] = int(version[2]) - except (IndexError, ValueError): - pass - return semver - - @property - def can_update_semver(self): - # return early if there's no update filter set - if "pyup: update" not in self.line: - return True - update = self.line.split("pyup: update")[1].strip().split("#")[0] - current_version = Requirement.convert_semver(next(iter(self.specs._specs))._spec[1]) - next_version = Requirement.convert_semver(self.latest_version) - if update == "major": - if current_version['major'] < next_version['major']: - return True - elif update == 'minor': - if current_version['major'] < next_version['major'] \ - or current_version['minor'] < next_version['minor']: - return True - return False - - @property - def filter(self): - rqfilter = False - if "rq.filter:" in self.line: - rqfilter = self.line.split("rq.filter:")[1].strip().split("#")[0] - elif "pyup:" in self.line: - if "pyup: update" not in self.line: - rqfilter = self.line.split("pyup:")[1].strip().split("#")[0] - # unset the filter once the date set in 'until' is reached - if "until" in rqfilter: - rqfilter, until = [l.strip() for l in rqfilter.split("until")] - try: - until = datetime.strptime(until, "%Y-%m-%d") - if until < datetime.now(): - rqfilter = False - except ValueError: - # wrong date formatting - pass - if rqfilter: - try: - rqfilter, = parse_requirements("filter " + rqfilter) - if len(rqfilter.specifier._specs) > 0: - return rqfilter.specifier - except ValueError: - pass - return False - - @property - def version(self): - if self.is_pinned: - return next(iter(self.specs._specs))._spec[1] - - specs = self.specs - if self.filter: - specs = SpecifierSet( - ",".join(["".join(s._spec) for s in list(specs._specs) + list(self.filter._specs)]) - ) - return self.get_latest_version_within_specs( - specs, - versions=self.package.versions, - prereleases=self.prereleases - ) - - def get_hashes(self, version): - r = requests.get('https://pypi.org/pypi/{name}/{version}/json'.format( - name=self.key, - version=version - )) - hashes = [] - data = r.json() - - for item in data.get("urls", {}): - sha256 = item.get("digests", {}).get("sha256", False) - if sha256: - hashes.append({"hash": sha256, "method": "sha256"}) - return hashes - - def update_version(self, content, version, update_hashes=True): - if self.file_type == filetypes.tox_ini: - updater_class = updater.ToxINIUpdater - elif self.file_type == filetypes.conda_yml: - updater_class = updater.CondaYMLUpdater - elif self.file_type == filetypes.requirements_txt: - updater_class = updater.RequirementsTXTUpdater - elif self.file_type == filetypes.pipfile: - updater_class = updater.PipfileUpdater - elif self.file_type == filetypes.pipfile_lock: - updater_class = updater.PipfileLockUpdater - elif self.file_type == filetypes.setup_cfg: - updater_class = updater.SetupCFGUpdater - else: - raise NotImplementedError - - dep = Dependency( - name=self.name, - specs=self.specs, - line=self.line, - line_numbers=[self.lineno, ] if self.lineno != 0 else None, - dependency_type=self.file_type, - hashes=self.hashes, - extras=self.extras - ) - hashes = [] - if self.hashes and update_hashes: - hashes = self.get_hashes(version) - - return updater_class.update( - content=content, - dependency=dep, - version=version, - hashes=hashes, - spec="==" - ) - - @classmethod - def parse(cls, s, lineno, file_type=filetypes.requirements_txt): - # setuptools requires a space before the comment. If this isn't the case, add it. - if "\t#" in s: - parsed, = parse_requirements(s.replace("\t#", "\t #")) - else: - parsed, = parse_requirements(s) - - return cls( - name=parsed.name, - specs=parsed.specifier, - line=s, - lineno=lineno, - extras=parsed.extras, - file_type=file_type - ) diff --git a/pipenv/patched/safety/alerts/utils.py b/pipenv/patched/safety/alerts/utils.py deleted file mode 100644 index 056be8fc9e..0000000000 --- a/pipenv/patched/safety/alerts/utils.py +++ /dev/null @@ -1,132 +0,0 @@ -import hashlib -import os -import sys - -from functools import wraps -from pipenv.vendor.packaging.version import parse as parse_version -from pathlib import Path - -import pipenv.vendor.click as click - -# Jinja2 will only be installed if the optional deps are installed. -# It's fine if our functions fail, but don't let this top level -# import error out. -try: - import jinja2 -except ImportError: - jinja2 = None - -import pipenv.patched.pip._vendor.requests as requests - - -def highest_base_score(vulns): - highest_base_score = 0 - for vuln in vulns: - if vuln['severity'] is not None: - highest_base_score = max(highest_base_score, (vuln['severity'].get('cvssv3', {}) or {}).get('base_score', 10)) - - return highest_base_score - -def generate_branch_name(pkg, remediation): - return pkg + "/" + remediation['recommended_version'] - -def generate_issue_title(pkg, remediation): - return f"Security Vulnerability in {pkg}" - -def generate_title(pkg, remediation, vulns): - suffix = "y" if len(vulns) == 1 else "ies" - return f"Update {pkg} from {remediation['current_version']} to {remediation['recommended_version']} to fix {len(vulns)} vulnerabilit{suffix}" - -def generate_body(pkg, remediation, vulns, *, api_key): - changelog = fetch_changelog(pkg, remediation['current_version'], remediation['recommended_version'], api_key=api_key) - - p = Path(__file__).parent / 'templates' - env = jinja2.Environment(loader=jinja2.FileSystemLoader(Path(p))) - template = env.get_template('pr.jinja2') - - overall_impact = cvss3_score_to_label(highest_base_score(vulns)) - result = template.render({"pkg": pkg, "remediation": remediation, "vulns": vulns, "changelog": changelog, "overall_impact": overall_impact, "summary_changelog": False }) - - # GitHub has a PR body length limit of 65536. If we're going over that, skip the changelog and just use a link. - if len(result) > 65500: - return template.render({"pkg": pkg, "remediation": remediation, "vulns": vulns, "changelog": changelog, "overall_impact": overall_impact, "summary_changelog": True }) - - return result - -def generate_issue_body(pkg, remediation, vulns, *, api_key): - changelog = fetch_changelog(pkg, remediation['current_version'], remediation['recommended_version'], api_key=api_key) - - p = Path(__file__).parent / 'templates' - env = jinja2.Environment(loader=jinja2.FileSystemLoader(Path(p))) - template = env.get_template('issue.jinja2') - - overall_impact = cvss3_score_to_label(highest_base_score(vulns)) - result = template.render({"pkg": pkg, "remediation": remediation, "vulns": vulns, "changelog": changelog, "overall_impact": overall_impact, "summary_changelog": False }) - - # GitHub has a PR body length limit of 65536. If we're going over that, skip the changelog and just use a link. - if len(result) > 65500: - return template.render({"pkg": pkg, "remediation": remediation, "vulns": vulns, "changelog": changelog, "overall_impact": overall_impact, "summary_changelog": True }) - -def generate_commit_message(pkg, remediation): - return f"Update {pkg} from {remediation['current_version']} to {remediation['recommended_version']}" - -def git_sha1(raw_contents): - return hashlib.sha1(b"blob " + str(len(raw_contents)).encode('ascii') + b"\0" + raw_contents).hexdigest() - -def fetch_changelog(package, from_version, to_version, *, api_key): - from_version = parse_version(from_version) - to_version = parse_version(to_version) - changelog = {} - - r = requests.get( - "https://pyup.io/api/v1/changelogs/{}/".format(package), - headers={"X-Api-Key": api_key} - ) - - if r.status_code == 200: - data = r.json() - if data: - # sort the changelog by release - sorted_log = sorted(data.items(), key=lambda v: parse_version(v[0]), reverse=True) - - # go over each release and add it to the log if it's within the "upgrade - # range" e.g. update from 1.2 to 1.3 includes a changelog for 1.2.1 but - # not for 0.4. - for version, log in sorted_log: - parsed_version = parse_version(version) - if parsed_version > from_version and parsed_version <= to_version: - changelog[version] = log - - return changelog - -def cvss3_score_to_label(score): - if score >= 0.1 and score <= 3.9: - return 'low' - elif score >= 4.0 and score <= 6.9: - return 'medium' - elif score >= 7.0 and score <= 8.9: - return 'high' - elif score >= 9.0: - return 'critical' - - return None - -def require_files_report(func): - @wraps(func) - def inner(obj, *args, **kwargs): - if obj.report['report_meta']['scan_target'] != "files": - click.secho("This report was generated against an environment, but this alerter requires a file.", fg='red') - sys.exit(1) - - files = obj.report['report_meta']['scanned'] - obj.requirements_files = {} - for f in files: - if not os.path.exists(f): - cwd = os.getcwd() - click.secho("A requirements file scanned in the report, {}, does not exist (looking in {}).".format(f, cwd), fg='red') - sys.exit(1) - - obj.requirements_files[f] = open(f, "rb").read() - - return func(obj, *args, **kwargs) - return inner diff --git a/pipenv/patched/safety/cli.py b/pipenv/patched/safety/cli.py deleted file mode 100644 index ac4b5b9193..0000000000 --- a/pipenv/patched/safety/cli.py +++ /dev/null @@ -1,381 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import - -import json -import logging -import os -import sys -import tempfile - -import pipenv.vendor.click as click - -from pipenv.patched.safety import safety -from pipenv.patched.safety.alerts import alert -from pipenv.patched.safety.constants import EXIT_CODE_VULNERABILITIES_FOUND, EXIT_CODE_OK, EXIT_CODE_FAILURE -from pipenv.patched.safety.errors import SafetyException, SafetyError -from pipenv.patched.safety.formatter import SafetyFormatter -from pipenv.patched.safety.output_utils import should_add_nl -from pipenv.patched.safety.safety import get_packages, read_vulnerabilities, fetch_policy, post_results -from pipenv.patched.safety.util import get_proxy_dict, get_packages_licenses, output_exception, \ - MutuallyExclusiveOption, DependentOption, transform_ignore, SafetyPolicyFile, active_color_if_needed, \ - get_processed_options, get_safety_version, json_alias, bare_alias, SafetyContext, is_a_remote_mirror, \ - filter_announcements - -LOG = logging.getLogger(__name__) - - -@click.group() -@click.option('--debug/--no-debug', default=False) -@click.option('--telemetry/--disable-telemetry', default=True, hidden=True) -@click.option('--disable-optional-telemetry-data', default=False, cls=MutuallyExclusiveOption, - mutually_exclusive=["telemetry", "disable-telemetry"], is_flag=True, show_default=True) -@click.version_option(version=get_safety_version()) -@click.pass_context -def cli(ctx, debug, telemetry, disable_optional_telemetry_data): - """ - Safety checks Python dependencies for known security vulnerabilities and suggests the proper - remediations for vulnerabilities detected. Safety can be run on developer machines, in CI/CD pipelines and - on production systems. - """ - SafetyContext().safety_source = 'cli' - ctx.telemetry = telemetry and not disable_optional_telemetry_data - level = logging.CRITICAL - if debug: - level = logging.DEBUG - - logging.basicConfig(format='%(asctime)s %(name)s => %(message)s', level=level) - - LOG.info(f'Telemetry enabled: {ctx.telemetry}') - - @ctx.call_on_close - def clean_up_on_close(): - LOG.debug('Calling clean up on close function.') - safety.close_session() - - -@cli.command() -@click.option("--key", default="", envvar="SAFETY_API_KEY", - help="API Key for pyup.io's vulnerability database. Can be set as SAFETY_API_KEY " - "environment variable. Default: empty") -@click.option("--db", default="", - help="Path to a local or remote vulnerability database. Default: empty") -@click.option("--full-report/--short-report", default=False, cls=MutuallyExclusiveOption, - mutually_exclusive=["output", "json", "bare"], - with_values={"output": ['json', 'bare'], "json": [True, False], "bare": [True, False]}, - help='Full reports include a security advisory (if available). Default: --short-report') -@click.option("--cache", is_flag=False, flag_value=60, default=0, - help="Cache requests to the vulnerability database locally. Default: 0 seconds", - hidden=True) -@click.option("--stdin", default=False, cls=MutuallyExclusiveOption, mutually_exclusive=["files"], - help="Read input from stdin.", is_flag=True, show_default=True) -@click.option("files", "--file", "-r", multiple=True, type=click.File(), cls=MutuallyExclusiveOption, - mutually_exclusive=["stdin"], - help="Read input from one (or multiple) requirement files. Default: empty") -@click.option("--ignore", "-i", multiple=True, type=str, default=[], callback=transform_ignore, - help="Ignore one (or multiple) vulnerabilities by ID. Default: empty") -@click.option('--json', default=False, cls=MutuallyExclusiveOption, mutually_exclusive=["output", "bare"], - with_values={"output": ['screen', 'text', 'bare', 'json'], "bare": [True, False]}, callback=json_alias, - hidden=True, is_flag=True, show_default=True) -@click.option('--bare', default=False, cls=MutuallyExclusiveOption, mutually_exclusive=["output", "json"], - with_values={"output": ['screen', 'text', 'bare', 'json'], "json": [True, False]}, callback=bare_alias, - hidden=True, is_flag=True, show_default=True) -@click.option('--output', "-o", type=click.Choice(['screen', 'text', 'json', 'bare'], case_sensitive=False), - default='screen', callback=active_color_if_needed, envvar='SAFETY_OUTPUT') -@click.option("--proxy-protocol", "-pr", type=click.Choice(['http', 'https']), default='https', cls=DependentOption, required_options=['proxy_host'], - help="Proxy protocol (https or http) --proxy-protocol") -@click.option("--proxy-host", "-ph", multiple=False, type=str, default=None, - help="Proxy host IP or DNS --proxy-host") -@click.option("--proxy-port", "-pp", multiple=False, type=int, default=80, cls=DependentOption, required_options=['proxy_host'], - help="Proxy port number --proxy-port") -@click.option("--exit-code/--continue-on-error", default=True, - help="Output standard exit codes. Default: --exit-code") -@click.option("--policy-file", type=SafetyPolicyFile(), default='.safety-policy.yml', - help="Define the policy file to be used") -@click.option("--audit-and-monitor/--disable-audit-and-monitor", default=True, - help="Send results back to pyup.io for viewing on your dashboard. Requires an API key.") -@click.option("--project", default=None, - help="Project to associate this scan with on pyup.io. Defaults to a canonicalized github style name if available, otherwise unknown") - -@click.option("--save-json", default="", help="Path to where output file will be placed, if the path is a directory, " - "Safety will use safety-report.json as filename. Default: empty") -@click.pass_context -def check(ctx, key, db, full_report, stdin, files, cache, ignore, output, json, bare, proxy_protocol, proxy_host, proxy_port, - exit_code, policy_file, save_json, audit_and_monitor, project): - """ - Find vulnerabilities in Python dependencies at the target provided. - - """ - LOG.info('Running check command') - - try: - packages = get_packages(files, stdin) - proxy_dictionary = get_proxy_dict(proxy_protocol, proxy_host, proxy_port) - - if key: - server_policies = fetch_policy(key=key, proxy=proxy_dictionary) - server_audit_and_monitor = server_policies["audit_and_monitor"] - server_safety_policy = server_policies["safety_policy"] - else: - server_audit_and_monitor = False - server_safety_policy = "" - - if server_safety_policy and policy_file: - click.secho( - "Warning: both a local policy file '{policy_filename}' and a server sent policy are present. " - "Continuing with the local policy file.".format(policy_filename=policy_file['filename']), - fg="yellow", - file=sys.stderr - ) - elif server_safety_policy: - with tempfile.NamedTemporaryFile(prefix='server-safety-policy-') as tmp: - tmp.write(server_safety_policy.encode('utf-8')) - tmp.seek(0) - - policy_file = SafetyPolicyFile().convert(tmp.name, param=None, ctx=None) - LOG.info('Using server side policy file') - - ignore_severity_rules = None - ignore, ignore_severity_rules, exit_code = get_processed_options(policy_file, ignore, - ignore_severity_rules, exit_code) - - is_env_scan = not stdin and not files - params = {'stdin': stdin, 'files': files, 'policy_file': policy_file, 'continue_on_error': not exit_code, - 'ignore_severity_rules': ignore_severity_rules, 'project': project, 'audit_and_monitor': server_audit_and_monitor and audit_and_monitor} - LOG.info('Calling the check function') - vulns, db_full = safety.check(packages=packages, key=key, db_mirror=db, cached=cache, ignore_vulns=ignore, - ignore_severity_rules=ignore_severity_rules, proxy=proxy_dictionary, - include_ignored=True, is_env_scan=is_env_scan, telemetry=ctx.parent.telemetry, - params=params) - LOG.debug('Vulnerabilities returned: %s', vulns) - LOG.debug('full database returned is None: %s', db_full is None) - - LOG.info('Safety is going to calculate remediations') - remediations = safety.calculate_remediations(vulns, db_full) - - announcements = [] - if not db or is_a_remote_mirror(db): - LOG.info('Not local DB used, Getting announcements') - announcements = safety.get_announcements(key=key, proxy=proxy_dictionary, telemetry=ctx.parent.telemetry) - - json_report = None - if save_json or (server_audit_and_monitor and audit_and_monitor): - default_name = 'safety-report.json' - json_report = SafetyFormatter(output='json').render_vulnerabilities(announcements, vulns, remediations, - full_report, packages) - - if server_audit_and_monitor and audit_and_monitor: - policy_contents = '' - if policy_file: - policy_contents = policy_file.get('raw', '') - - r = post_results(key=key, proxy=proxy_dictionary, safety_json=json_report, policy_file=policy_contents) - SafetyContext().params['audit_and_monitor_url'] = r.get('url') - - if save_json: - if os.path.isdir(save_json): - save_json = os.path.join(save_json, default_name) - - with open(save_json, 'w+') as output_json_file: - output_json_file.write(json_report) - - LOG.info('Safety is going to render the vulnerabilities report using %s output', output) - if json_report and output == 'json': - output_report = json_report - else: - output_report = SafetyFormatter(output=output).render_vulnerabilities(announcements, vulns, remediations, - full_report, packages) - - # Announcements are send to stderr if not terminal, it doesn't depend on "exit_code" value - stderr_announcements = filter_announcements(announcements=announcements, by_type='error') - if stderr_announcements and (not sys.stdout.isatty() and os.environ.get("SAFETY_OS_DESCRIPTION", None) != 'run'): - LOG.info('sys.stdout is not a tty, error announcements are going to be send to stderr') - click.secho(SafetyFormatter(output='text').render_announcements(stderr_announcements), fg="red", - file=sys.stderr) - - found_vulns = list(filter(lambda v: not v.ignored, vulns)) - LOG.info('Vulnerabilities found (Not ignored): %s', len(found_vulns)) - LOG.info('All vulnerabilities found (ignored and Not ignored): %s', len(vulns)) - - click.secho(output_report, nl=should_add_nl(output, found_vulns), file=sys.stdout) - - if exit_code and found_vulns: - LOG.info('Exiting with default code for vulnerabilities found') - sys.exit(EXIT_CODE_VULNERABILITIES_FOUND) - - sys.exit(EXIT_CODE_OK) - - except SafetyError as e: - LOG.exception('Expected SafetyError happened: %s', e) - output_exception(e, exit_code_output=exit_code) - except Exception as e: - LOG.exception('Unexpected Exception happened: %s', e) - exception = e if isinstance(e, SafetyException) else SafetyException(info=e) - output_exception(exception, exit_code_output=exit_code) - - -@cli.command() -@click.option("--full-report/--short-report", default=False, cls=MutuallyExclusiveOption, mutually_exclusive=["output"], with_values={"output": ['json', 'bare']}, - help='Full reports include a security advisory (if available). Default: ' - '--short-report') -@click.option('--output', "-o", type=click.Choice(['screen', 'text', 'json', 'bare'], case_sensitive=False), - default='screen', callback=active_color_if_needed) -@click.option("file", "--file", "-f", type=click.File(), required=True, - help="Read input from an insecure report file. Default: empty") -@click.pass_context -def review(ctx, full_report, output, file): - """ - Show an output from a previous exported JSON report. - """ - LOG.info('Running check command') - report = {} - - try: - report = read_vulnerabilities(file) - except SafetyError as e: - LOG.exception('Expected SafetyError happened: %s', e) - output_exception(e, exit_code_output=True) - except Exception as e: - LOG.exception('Unexpected Exception happened: %s', e) - exception = e if isinstance(e, SafetyException) else SafetyException(info=e) - output_exception(exception, exit_code_output=True) - - params = {'file': file} - vulns, remediations, packages = safety.review(report, params=params) - - announcements = safety.get_announcements(key=None, proxy=None, telemetry=ctx.parent.telemetry) - output_report = SafetyFormatter(output=output).render_vulnerabilities(announcements, vulns, remediations, - full_report, packages) - - found_vulns = list(filter(lambda v: not v.ignored, vulns)) - click.secho(output_report, nl=should_add_nl(output, found_vulns), file=sys.stdout) - sys.exit(EXIT_CODE_OK) - - -@cli.command() -@click.option("--key", envvar="SAFETY_API_KEY", - help="API Key for pyup.io's vulnerability database. Can be set as SAFETY_API_KEY " - "environment variable. Default: empty") -@click.option("--db", default="", - help="Path to a local license database. Default: empty") -@click.option('--output', "-o", type=click.Choice(['screen', 'text', 'json', 'bare'], case_sensitive=False), - default='screen') -@click.option("--cache", default=0, - help='Whether license database file should be cached.' - 'Default: 0 seconds') -@click.option("files", "--file", "-r", multiple=True, type=click.File(), - help="Read input from one (or multiple) requirement files. Default: empty") -@click.option("proxyhost", "--proxy-host", "-ph", multiple=False, type=str, default=None, - help="Proxy host IP or DNS --proxy-host") -@click.option("proxyport", "--proxy-port", "-pp", multiple=False, type=int, default=80, - help="Proxy port number --proxy-port") -@click.option("proxyprotocol", "--proxy-protocol", "-pr", multiple=False, type=str, default='http', - help="Proxy protocol (https or http) --proxy-protocol") -@click.pass_context -def license(ctx, key, db, output, cache, files, proxyprotocol, proxyhost, proxyport): - """ - Find the open source licenses used by your Python dependencies. - """ - LOG.info('Running license command') - packages = get_packages(files, False) - - proxy_dictionary = get_proxy_dict(proxyprotocol, proxyhost, proxyport) - licenses_db = {} - - try: - licenses_db = safety.get_licenses(key=key, db_mirror=db, cached=cache, proxy=proxy_dictionary, - telemetry=ctx.parent.telemetry) - except SafetyError as e: - LOG.exception('Expected SafetyError happened: %s', e) - output_exception(e, exit_code_output=False) - except Exception as e: - LOG.exception('Unexpected Exception happened: %s', e) - exception = e if isinstance(e, SafetyException) else SafetyException(info=e) - output_exception(exception, exit_code_output=False) - - filtered_packages_licenses = get_packages_licenses(packages=packages, licenses_db=licenses_db) - - announcements = [] - if not db: - announcements = safety.get_announcements(key=key, proxy=proxy_dictionary, telemetry=ctx.parent.telemetry) - - output_report = SafetyFormatter(output=output).render_licenses(announcements, filtered_packages_licenses) - - click.secho(output_report, nl=True) - - -@cli.command() -@click.option("--path", default=".", help="Path where the generated file will be saved. Default: current directory") -@click.argument('name') -@click.pass_context -def generate(ctx, name, path): - """Create a boilerplate supported file type. - - NAME is the name of the file type to generate. Valid values are: policy_file - """ - if name != 'policy_file': - click.secho(f'This Safety version only supports "policy_file" generation. "{name}" is not supported.', fg='red', - file=sys.stderr) - sys.exit(EXIT_CODE_FAILURE) - - LOG.info('Running generate %s', name) - - if not os.path.exists(path): - click.secho(f'The path "{path}" does not exist.', fg='red', - file=sys.stderr) - sys.exit(EXIT_CODE_FAILURE) - - policy = os.path.join(path, '.safety-policy.yml') - ROOT = os.path.dirname(os.path.abspath(__file__)) - - try: - with open(policy, "w") as f: - f.write(open(os.path.join(ROOT, 'safety-policy-template.yml')).read()) - LOG.debug('Safety created the policy file.') - msg = f'A default Safety policy file has been generated! Review the file contents in the path {path} in the ' \ - 'file: .safety-policy.yml' - click.secho(msg, fg='green') - except Exception as exc: - if isinstance(exc, OSError): - LOG.debug('Unable to generate %s because: %s', name, exc.errno) - - click.secho(f'Unable to generate {name}, because: {str(exc)} error.', fg='red', - file=sys.stderr) - sys.exit(EXIT_CODE_FAILURE) - - -@cli.command() -@click.option("--path", default=".safety-policy.yml", help="Path where the generated file will be saved. Default: current directory") -@click.argument('name') -@click.pass_context -def validate(ctx, name, path): - """Verify the validity of a supported file type. - - NAME is the name of the file type to validate. Valid values are: policy_file - """ - if name != 'policy_file': - click.secho(f'This Safety version only supports "policy_file" validation. "{name}" is not supported.', fg='red', - file=sys.stderr) - sys.exit(EXIT_CODE_FAILURE) - - LOG.info('Running validate %s', name) - - if not os.path.exists(path): - click.secho(f'The path "{path}" does not exist.', fg='red', file=sys.stderr) - sys.exit(EXIT_CODE_FAILURE) - - try: - values = SafetyPolicyFile().convert(path, None, None) - except Exception as e: - click.secho(str(e).lstrip(), fg='red', file=sys.stderr) - sys.exit(EXIT_CODE_FAILURE) - - del values['raw'] - - click.secho(f'The Safety policy file was successfully parsed with the following values:', fg='green') - click.secho(json.dumps(values, indent=4, default=str)) - -cli.add_command(alert) - - -if __name__ == "__main__": - cli() diff --git a/pipenv/patched/safety/constants.py b/pipenv/patched/safety/constants.py deleted file mode 100644 index 85f41c56bf..0000000000 --- a/pipenv/patched/safety/constants.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -import os - -OPEN_MIRRORS = [ - "https://pyup.io/aws/safety/free/", -] - -API_VERSION = 'v1/' -SAFETY_ENDPOINT = 'safety/' -API_BASE_URL = 'https://pyup.io/api/' + API_VERSION + SAFETY_ENDPOINT - -API_MIRRORS = [ - API_BASE_URL -] - -REQUEST_TIMEOUT = 5 - -CACHE_FILE = os.path.join( - os.path.expanduser("~"), - ".safety", - "cache.json" -) - -# Colors -YELLOW = 'yellow' -RED = 'red' -GREEN = 'green' - - -# Exit codes -EXIT_CODE_OK = 0 -EXIT_CODE_FAILURE = 1 -EXIT_CODE_VULNERABILITIES_FOUND = 64 -EXIT_CODE_INVALID_API_KEY = 65 -EXIT_CODE_TOO_MANY_REQUESTS = 66 -EXIT_CODE_UNABLE_TO_LOAD_LOCAL_VULNERABILITY_DB = 67 -EXIT_CODE_UNABLE_TO_FETCH_VULNERABILITY_DB = 68 -EXIT_CODE_MALFORMED_DB = 69 diff --git a/pipenv/patched/safety/errors.py b/pipenv/patched/safety/errors.py deleted file mode 100644 index 1deb92bcd5..0000000000 --- a/pipenv/patched/safety/errors.py +++ /dev/null @@ -1,106 +0,0 @@ -from pipenv.patched.safety.constants import EXIT_CODE_FAILURE, EXIT_CODE_INVALID_API_KEY, EXIT_CODE_TOO_MANY_REQUESTS, \ - EXIT_CODE_UNABLE_TO_FETCH_VULNERABILITY_DB, EXIT_CODE_UNABLE_TO_LOAD_LOCAL_VULNERABILITY_DB, EXIT_CODE_MALFORMED_DB - - -class SafetyException(Exception): - - def __init__(self, message="Unhandled exception happened: {info}", info=""): - self.message = message.format(info=info) - super().__init__(self.message) - - def get_exit_code(self): - return EXIT_CODE_FAILURE - - -class SafetyError(Exception): - - def __init__(self, message="Unhandled Safety generic error"): - self.message = message - super().__init__(self.message) - - def get_exit_code(self): - return EXIT_CODE_FAILURE - - -class MalformedDatabase(SafetyError): - - def __init__(self, reason=None, fetched_from="server", - message="Sorry, something went wrong.\n" + - "Safety CLI can not read the data fetched from {fetched_from} because is malformed.\n"): - info = "Reason, {reason}".format(reason=reason) - self.message = message.format(fetched_from=fetched_from) + (info if reason else "") - super().__init__(self.message) - - def get_exit_code(self): - return EXIT_CODE_MALFORMED_DB - - -class DatabaseFetchError(SafetyError): - - def __init__(self, message="Unable to load vulnerability database"): - self.message = message - super().__init__(self.message) - - def get_exit_code(self): - return EXIT_CODE_UNABLE_TO_FETCH_VULNERABILITY_DB - - -class DatabaseFileNotFoundError(DatabaseFetchError): - - def __init__(self, db=None, message="Unable to find vulnerability database in {db}"): - self.db = db - self.message = message.format(db=db) - super().__init__(self.message) - - def get_exit_code(self): - return EXIT_CODE_UNABLE_TO_LOAD_LOCAL_VULNERABILITY_DB - - -class InvalidKeyError(DatabaseFetchError): - - def __init__(self, key=None, message="Your API Key '{key}' is invalid. See {link}.", reason=None): - self.key = key - self.link = 'https://bit.ly/3OY2wEI' - self.message = message.format(key=key, link=self.link) if key else message - info = f" Reason: {reason}" - self.message = self.message + (info if reason else "") - super().__init__(self.message) - - def get_exit_code(self): - return EXIT_CODE_INVALID_API_KEY - - -class TooManyRequestsError(DatabaseFetchError): - - def __init__(self, reason=None, - message="Too many requests."): - info = f" Reason: {reason}" - self.message = message + (info if reason else "") - super().__init__(self.message) - - def get_exit_code(self): - return EXIT_CODE_TOO_MANY_REQUESTS - - -class NetworkConnectionError(DatabaseFetchError): - - def __init__(self, message="Check your network connection, unable to reach the server."): - self.message = message - super().__init__(self.message) - - -class RequestTimeoutError(DatabaseFetchError): - - def __init__(self, message="Check your network connection, the request timed out."): - self.message = message - super().__init__(self.message) - - -class ServerError(DatabaseFetchError): - - def __init__(self, reason=None, - message="Sorry, something went wrong.\n" + "Safety CLI can not connect to the server.\n" + - "Our engineers are working quickly to resolve the issue."): - info = f" Reason: {reason}" - self.message = message + (info if reason else "") - super().__init__(self.message) diff --git a/pipenv/patched/safety/formatter.py b/pipenv/patched/safety/formatter.py deleted file mode 100644 index 132bfb9e67..0000000000 --- a/pipenv/patched/safety/formatter.py +++ /dev/null @@ -1,56 +0,0 @@ -import logging -from abc import ABCMeta, abstractmethod - -NOT_IMPLEMENTED = "You should implement this." - -LOG = logging.getLogger(__name__) - - -class FormatterAPI: - """ - Strategy Abstract class, with all the render methods that the concrete implementations should support - """ - - __metaclass__ = ABCMeta - - @abstractmethod - def render_vulnerabilities(self, announcements, vulnerabilities, remediations, full, packages): - raise NotImplementedError(NOT_IMPLEMENTED) # pragma: no cover - - @abstractmethod - def render_licenses(self, announcements, licenses): - raise NotImplementedError(NOT_IMPLEMENTED) # pragma: no cover - - @abstractmethod - def render_announcements(self, announcements): - raise NotImplementedError(NOT_IMPLEMENTED) # pragma: no cover - - -class SafetyFormatter(FormatterAPI): - - def render_vulnerabilities(self, announcements, vulnerabilities, remediations, full, packages): - LOG.info('Safety is going to render_vulnerabilities with format: %s', self.format) - return self.format.render_vulnerabilities(announcements, vulnerabilities, remediations, full, packages) - - def render_licenses(self, announcements, licenses): - LOG.info('Safety is going to render_licenses with format: %s', self.format) - return self.format.render_licenses(announcements, licenses) - - def render_announcements(self, announcements): - LOG.info('Safety is going to render_announcements with format: %s', self.format) - return self.format.render_announcements(announcements) - - def __init__(self, output): - from pipenv.patched.safety.formatters.screen import ScreenReport - from pipenv.patched.safety.formatters.text import TextReport - from pipenv.patched.safety.formatters.json import JsonReport - from pipenv.patched.safety.formatters.bare import BareReport - - self.format = ScreenReport() - - if output == 'json': - self.format = JsonReport() - elif output == 'bare': - self.format = BareReport() - elif output == 'text': - self.format = TextReport() diff --git a/pipenv/patched/safety/formatters/__init__.py b/pipenv/patched/safety/formatters/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/pipenv/patched/safety/formatters/bare.py b/pipenv/patched/safety/formatters/bare.py deleted file mode 100644 index 1e730a5c63..0000000000 --- a/pipenv/patched/safety/formatters/bare.py +++ /dev/null @@ -1,38 +0,0 @@ -from collections import namedtuple - -from pipenv.patched.safety.formatter import FormatterAPI -from pipenv.patched.safety.util import get_basic_announcements - - -class BareReport(FormatterAPI): - """Bare report, for command line tools""" - - def render_vulnerabilities(self, announcements, vulnerabilities, remediations, full, packages): - parsed_announcements = [] - - Announcement = namedtuple("Announcement", ["name"]) - - for announcement in get_basic_announcements(announcements): - normalized_message = "-".join(announcement.get('message', 'none').lower().split()) - parsed_announcements.append(Announcement(name=normalized_message)) - - announcements_to_render = [announcement.name for announcement in parsed_announcements] - affected_packages = list(set([v.package_name for v in vulnerabilities if not v.ignored])) - - return " ".join(announcements_to_render + affected_packages) - - def render_licenses(self, announcements, packages_licenses): - parsed_announcements = [] - - for announcement in get_basic_announcements(announcements): - normalized_message = "-".join(announcement.get('message', 'none').lower().split()) - parsed_announcements.append({'license': normalized_message}) - - announcements_to_render = [announcement.get('license') for announcement in parsed_announcements] - - licenses = list(set([pkg_li.get('license') for pkg_li in packages_licenses])) - sorted_licenses = sorted(licenses) - return " ".join(announcements_to_render + sorted_licenses) - - def render_announcements(self, announcements): - print('render_announcements bare') diff --git a/pipenv/patched/safety/formatters/json.py b/pipenv/patched/safety/formatters/json.py deleted file mode 100644 index 477c9f9673..0000000000 --- a/pipenv/patched/safety/formatters/json.py +++ /dev/null @@ -1,84 +0,0 @@ -import logging - -import json as json_parser - -from pipenv.patched.pip._vendor.requests.models import PreparedRequest - -from pipenv.patched.safety.formatter import FormatterAPI -from pipenv.patched.safety.output_utils import get_report_brief_info -from pipenv.patched.safety.util import get_basic_announcements - -LOG = logging.getLogger(__name__) - - -class JsonReport(FormatterAPI): - """Json report, for when the output is input for something else""" - - def render_vulnerabilities(self, announcements, vulnerabilities, remediations, full, packages): - remediations_recommended = len(remediations.keys()) - LOG.debug('Rendering %s vulnerabilities, %s remediations with full_report: %s', len(vulnerabilities), - remediations_recommended, full) - vulns_ignored = [vuln.to_dict() for vuln in vulnerabilities if vuln.ignored] - vulns = [vuln.to_dict() for vuln in vulnerabilities if not vuln.ignored] - - report = get_report_brief_info(as_dict=True, report_type=1, vulnerabilities_found=len(vulns), - vulnerabilities_ignored=len(vulns_ignored), - remediations_recommended=remediations_recommended) - - remed = {} - for k, v in remediations.items(): - if k not in remed: - remed[k] = {} - - closest = v.get('closest_secure_version', {}) - upgrade = closest.get('major', None) - downgrade = closest.get('minor', None) - - recommended_version = None - - if upgrade: - recommended_version = str(upgrade) - elif downgrade: - recommended_version = str(downgrade) - - remed[k]['current_version'] = v.get('version', None) - remed[k]['vulnerabilities_found'] = v.get('vulns_found', 0) - remed[k]['recommended_version'] = recommended_version - remed[k]['other_recommended_versions'] = [other_v for other_v in v.get('secure_versions', []) if - other_v != recommended_version] - remed[k]['more_info_url'] = v.get('more_info_url', '') - - # Use Request's PreparedRequest to handle parsing, joining etc the URL since we're adding query - # parameters and don't know what the server might send down. - if remed[k]['more_info_url']: - req = PreparedRequest() - req.prepare_url(remed[k]['more_info_url'], {'from': remed[k]['current_version'], 'to': recommended_version}) - remed[k]['more_info_url'] = req.url - - template = { - "report_meta": report, - "scanned_packages": {p.name: p.to_dict(short_version=True) for p in packages}, - "affected_packages": {v.pkg.name: v.pkg.to_dict() for v in vulnerabilities}, - "announcements": [{'type': item.get('type'), 'message': item.get('message')} for item in - get_basic_announcements(announcements)], - "vulnerabilities": vulns, - "ignored_vulnerabilities": vulns_ignored, - "remediations": remed - } - - return json_parser.dumps(template, indent=4) - - def render_licenses(self, announcements, licenses): - unique_license_types = set([lic['license'] for lic in licenses]) - report = get_report_brief_info(as_dict=True, report_type=2, licenses_found=len(unique_license_types)) - - template = { - "report_meta": report, - "announcements": get_basic_announcements(announcements), - "licenses": licenses, - } - - return json_parser.dumps(template, indent=4) - - def render_announcements(self, announcements): - return json_parser.dumps({"announcements": get_basic_announcements(announcements)}, indent=4) diff --git a/pipenv/patched/safety/formatters/screen.py b/pipenv/patched/safety/formatters/screen.py deleted file mode 100644 index d75d1ff2a4..0000000000 --- a/pipenv/patched/safety/formatters/screen.py +++ /dev/null @@ -1,143 +0,0 @@ -import pipenv.vendor.click as click - -from pipenv.patched.safety.formatter import FormatterAPI -from pipenv.patched.safety.output_utils import build_announcements_section_content, format_long_text, \ - add_empty_line, format_vulnerability, get_final_brief, \ - build_report_brief_section, format_license, get_final_brief_license, build_remediation_section, \ - build_primary_announcement -from pipenv.patched.safety.util import get_primary_announcement, get_basic_announcements, get_terminal_size - - -class ScreenReport(FormatterAPI): - DIVIDER_SECTIONS = '+' + '=' * (get_terminal_size().columns - 2) + '+' - - REPORT_BANNER = DIVIDER_SECTIONS + '\n' + r""" - /$$$$$$ /$$ - /$$__ $$ | $$ - /$$$$$$$ /$$$$$$ | $$ \__//$$$$$$ /$$$$$$ /$$ /$$ - /$$_____/ |____ $$| $$$$ /$$__ $$|_ $$_/ | $$ | $$ - | $$$$$$ /$$$$$$$| $$_/ | $$$$$$$$ | $$ | $$ | $$ - \____ $$ /$$__ $$| $$ | $$_____/ | $$ /$$| $$ | $$ - /$$$$$$$/| $$$$$$$| $$ | $$$$$$$ | $$$$/| $$$$$$$ - |_______/ \_______/|__/ \_______/ \___/ \____ $$ - /$$ | $$ - | $$$$$$/ - by pyup.io \______/ - -""" + DIVIDER_SECTIONS - - ANNOUNCEMENTS_HEADING = format_long_text(click.style('ANNOUNCEMENTS', bold=True)) - - def __build_announcements_section(self, announcements): - announcements_section = [] - - basic_announcements = get_basic_announcements(announcements) - - if basic_announcements: - announcements_content = build_announcements_section_content(basic_announcements) - announcements_section = [add_empty_line(), self.ANNOUNCEMENTS_HEADING, add_empty_line(), - announcements_content, add_empty_line(), self.DIVIDER_SECTIONS] - - return announcements_section - - def render_vulnerabilities(self, announcements, vulnerabilities, remediations, full, packages): - announcements_section = self.__build_announcements_section(announcements) - primary_announcement = get_primary_announcement(announcements) - remediation_section = build_remediation_section(remediations) - end_content = [] - - if primary_announcement: - end_content = [add_empty_line(), - build_primary_announcement(primary_announcement, columns=get_terminal_size().columns), - self.DIVIDER_SECTIONS] - - table = [] - ignored = {} - total_ignored = 0 - - for n, vuln in enumerate(vulnerabilities): - if vuln.ignored: - total_ignored += 1 - ignored[vuln.package_name] = ignored.get(vuln.package_name, 0) + 1 - table.append(format_vulnerability(vuln, full)) - - report_brief_section = build_report_brief_section(primary_announcement=primary_announcement, report_type=1, - vulnerabilities_found=max(0, len(vulnerabilities)-total_ignored), - vulnerabilities_ignored=total_ignored, - remediations_recommended=len(remediations)) - - if vulnerabilities: - - final_brief = get_final_brief(len(vulnerabilities), len(remediations), ignored, total_ignored) - - return "\n".join( - [ScreenReport.REPORT_BANNER] + announcements_section + [report_brief_section, - add_empty_line(), - self.DIVIDER_SECTIONS, - format_long_text( - click.style('VULNERABILITIES FOUND', - bold=True, fg='red')), - self.DIVIDER_SECTIONS, - add_empty_line(), - "\n\n".join(table), - final_brief, - add_empty_line(), - self.DIVIDER_SECTIONS] + - remediation_section + end_content - ) - else: - content = format_long_text(click.style("No known security vulnerabilities found.", bold=True, fg='green')) - return "\n".join( - [ScreenReport.REPORT_BANNER] + announcements_section + [report_brief_section, - self.DIVIDER_SECTIONS, - add_empty_line(), - content, - add_empty_line(), - self.DIVIDER_SECTIONS] + - end_content - ) - - def render_licenses(self, announcements, licenses): - unique_license_types = set([lic['license'] for lic in licenses]) - - report_brief_section = build_report_brief_section(primary_announcement=get_primary_announcement(announcements), - report_type=2, licenses_found=len(unique_license_types)) - announcements_section = self.__build_announcements_section(announcements) - - if not licenses: - content = format_long_text(click.style("No packages licenses found.", bold=True, fg='red')) - return "\n".join( - [ScreenReport.REPORT_BANNER] + announcements_section + [report_brief_section, - self.DIVIDER_SECTIONS, - add_empty_line(), - content, - add_empty_line(), - self.DIVIDER_SECTIONS] - ) - - table = [] - for license in licenses: - table.append(format_license(license)) - - final_brief = get_final_brief_license(unique_license_types) - - return "\n".join( - [ScreenReport.REPORT_BANNER] + announcements_section + [report_brief_section, - add_empty_line(), - self.DIVIDER_SECTIONS, - format_long_text( - click.style('LICENSES FOUND', - bold=True, fg='yellow')), - self.DIVIDER_SECTIONS, - add_empty_line(), - "\n".join(table), - final_brief, - add_empty_line(), - self.DIVIDER_SECTIONS] - ) - - def render_announcements(self, announcements): - return self.__build_announcements_section(announcements) - - - diff --git a/pipenv/patched/safety/formatters/text.py b/pipenv/patched/safety/formatters/text.py deleted file mode 100644 index 4f40a961b0..0000000000 --- a/pipenv/patched/safety/formatters/text.py +++ /dev/null @@ -1,134 +0,0 @@ -import pipenv.vendor.click as click - -from pipenv.patched.safety.formatter import FormatterAPI -from pipenv.patched.safety.output_utils import build_announcements_section_content, format_vulnerability, \ - build_report_brief_section, get_final_brief_license, add_empty_line, get_final_brief, build_remediation_section, \ - build_primary_announcement -from pipenv.patched.safety.util import get_primary_announcement, get_basic_announcements - - -class TextReport(FormatterAPI): - """Basic report, intented to be used for terminals with < 80 columns""" - - SMALL_DIVIDER_SECTIONS = '+' + '=' * 78 + '+' - - TEXT_REPORT_BANNER = SMALL_DIVIDER_SECTIONS + '\n' + r""" - /$$$$$$ /$$ - /$$__ $$ | $$ - /$$$$$$$ /$$$$$$ | $$ \__//$$$$$$ /$$$$$$ /$$ /$$ - /$$_____/ |____ $$| $$$$ /$$__ $$|_ $$_/ | $$ | $$ - | $$$$$$ /$$$$$$$| $$_/ | $$$$$$$$ | $$ | $$ | $$ - \____ $$ /$$__ $$| $$ | $$_____/ | $$ /$$| $$ | $$ - /$$$$$$$/| $$$$$$$| $$ | $$$$$$$ | $$$$/| $$$$$$$ - |_______/ \_______/|__/ \_______/ \___/ \____ $$ - /$$ | $$ - | $$$$$$/ - by pyup.io \______/ - -""" + SMALL_DIVIDER_SECTIONS - - def __build_announcements_section(self, announcements): - announcements_table = [] - - basic_announcements = get_basic_announcements(announcements) - - if basic_announcements: - announcements_content = click.unstyle(build_announcements_section_content(basic_announcements, - columns=80, - start_line_decorator=' ' * 2, - end_line_decorator='')) - announcements_table = [add_empty_line(), 'ANNOUNCEMENTS', add_empty_line(), - announcements_content, add_empty_line(), self.SMALL_DIVIDER_SECTIONS] - - return announcements_table - - def render_vulnerabilities(self, announcements, vulnerabilities, remediations, full, packages): - primary_announcement = get_primary_announcement(announcements) - remediation_section = [click.unstyle(rem) for rem in build_remediation_section(remediations, columns=80)] - end_content = [] - - if primary_announcement: - end_content = [add_empty_line(), - build_primary_announcement(primary_announcement, columns=80, only_text=True), - self.SMALL_DIVIDER_SECTIONS] - - announcement_section = self.__build_announcements_section(announcements) - - ignored = {} - total_ignored = 0 - - for n, vuln in enumerate(vulnerabilities): - if vuln.ignored: - total_ignored += 1 - ignored[vuln.package_name] = ignored.get(vuln.package_name, 0) + 1 - - report_brief_section = click.unstyle( - build_report_brief_section(columns=80, primary_announcement=primary_announcement, - vulnerabilities_found=max(0, len(vulnerabilities)-total_ignored), - vulnerabilities_ignored=total_ignored, - remediations_recommended=len(remediations))) - - table = [self.TEXT_REPORT_BANNER] + announcement_section + [ - report_brief_section, - '', - self.SMALL_DIVIDER_SECTIONS, - ] - - if vulnerabilities: - table += [" VULNERABILITIES FOUND", self.SMALL_DIVIDER_SECTIONS] - - for vuln in vulnerabilities: - table.append('\n' + format_vulnerability(vuln, full, only_text=True, columns=80)) - - final_brief = click.unstyle(get_final_brief(len(vulnerabilities), len(remediations), ignored, total_ignored, - kwargs={'columns': 80})) - table += [final_brief, add_empty_line(), self.SMALL_DIVIDER_SECTIONS] + remediation_section + end_content - - else: - table += [add_empty_line(), " No known security vulnerabilities found.", add_empty_line(), - self.SMALL_DIVIDER_SECTIONS] + end_content - - return "\n".join( - table - ) - - def render_licenses(self, announcements, licenses): - unique_license_types = set([lic['license'] for lic in licenses]) - - report_brief_section = click.unstyle( - build_report_brief_section(columns=80, primary_announcement=get_primary_announcement(announcements), - licenses_found=len(unique_license_types))) - - packages_licenses = licenses - announcements_table = self.__build_announcements_section(announcements) - - final_brief = click.unstyle( - get_final_brief_license(unique_license_types, kwargs={'columns': 80})) - - table = [self.TEXT_REPORT_BANNER] + announcements_table + [ - report_brief_section, - self.SMALL_DIVIDER_SECTIONS, - " LICENSES", - self.SMALL_DIVIDER_SECTIONS, - add_empty_line(), - ] - - if not packages_licenses: - table.append(" No packages licenses found.") - table += [final_brief, add_empty_line(), self.SMALL_DIVIDER_SECTIONS] - - return "\n".join(table) - - for pkg_license in packages_licenses: - text = " {0}, version {1}, license {2}\n".format(pkg_license['package'], pkg_license['version'], - pkg_license['license']) - table.append(text) - - table += [final_brief, add_empty_line(), self.SMALL_DIVIDER_SECTIONS] - - return "\n".join(table) - - def render_announcements(self, announcements): - rows = self.__build_announcements_section(announcements) - rows.insert(0, self.SMALL_DIVIDER_SECTIONS) - return '\n'.join(rows) diff --git a/pipenv/patched/safety/models.py b/pipenv/patched/safety/models.py deleted file mode 100644 index 2e01778dc1..0000000000 --- a/pipenv/patched/safety/models.py +++ /dev/null @@ -1,110 +0,0 @@ -from collections import namedtuple -from datetime import datetime -from typing import NamedTuple - - -class DictConverter(object): - - def to_dict(self, **kwargs): - pass - - -announcement_nmt = namedtuple('Announcement', ['type', 'message']) -remediation_nmt = namedtuple('Remediation', ['Package', 'closest_secure_version', 'secure_versions', - 'latest_package_version']) -cve_nmt = namedtuple('Cve', ['name', 'cvssv2', 'cvssv3']) -severity_nmt = namedtuple('Severity', ['source', 'cvssv2', 'cvssv3']) -vulnerability_nmt = namedtuple('Vulnerability', - ['vulnerability_id', 'package_name', 'pkg', 'ignored', 'ignored_reason', 'ignored_expires', - 'vulnerable_spec', 'all_vulnerable_specs', 'analyzed_version', 'advisory', - 'is_transitive', 'published_date', 'fixed_versions', - 'closest_versions_without_known_vulnerabilities', 'resources', 'CVE', 'severity', - 'affected_versions', 'more_info_url']) -package_nmt = namedtuple('Package', ['name', 'version', 'found', 'insecure_versions', 'secure_versions', - 'latest_version_without_known_vulnerabilities', 'latest_version', 'more_info_url']) -package_nmt.__new__.__defaults__ = (None,) * len(package_nmt._fields) # Ugly hack for now -RequirementFile = namedtuple('RequirementFile', ['path']) - - -class Package(package_nmt, DictConverter): - - def to_dict(self, **kwargs): - if kwargs.get('short_version', False): - return { - 'name': self.name, - 'version': self.version, - } - - return {'name': self.name, - 'version': self.version, - 'found': self.found, - 'insecure_versions': self.insecure_versions, - 'secure_versions': self.secure_versions, - 'latest_version_without_known_vulnerabilities': self.latest_version_without_known_vulnerabilities, - 'latest_version': self.latest_version, - 'more_info_url': self.more_info_url - } - - -class Announcement(announcement_nmt): - pass - - -class Remediation(remediation_nmt, DictConverter): - - def to_dict(self): - return {'package': self.Package.name, - 'closest_secure_version': self.closest_secure_version, - 'secure_versions': self.secure_versions, - 'latest_package_version': self.latest_package_version - } - - -class CVE(cve_nmt, DictConverter): - - def to_dict(self): - return {'name': self.name, 'cvssv2': self.cvssv2, 'cvssv3': self.cvssv3} - - -class Severity(severity_nmt, DictConverter): - def to_dict(self): - result = {'severity': {'source': self.source}} - - result['severity']['cvssv2'] = self.cvssv2 - result['severity']['cvssv3'] = self.cvssv3 - - return result - - -class Vulnerability(vulnerability_nmt): - - def to_dict(self): - empty_list_if_none = ['fixed_versions', 'closest_versions_without_known_vulnerabilities', 'resources'] - result = { - } - - ignore = ['pkg'] - - for field, value in zip(self._fields, self): - if field in ignore: - continue - - if value is None and field in empty_list_if_none: - value = [] - - if isinstance(value, CVE): - val = None - if value.name.startswith("CVE"): - val = value.name - result[field] = val - elif isinstance(value, DictConverter): - result.update(value.to_dict()) - elif isinstance(value, datetime): - result[field] = str(value) - else: - result[field] = value - - return result - - def get_advisory(self): - return self.advisory.replace('\r', '') if self.advisory else "No advisory found for this vulnerability." diff --git a/pipenv/patched/safety/output_utils.py b/pipenv/patched/safety/output_utils.py deleted file mode 100644 index 8aec2f474f..0000000000 --- a/pipenv/patched/safety/output_utils.py +++ /dev/null @@ -1,693 +0,0 @@ -import json -import logging -import os -import textwrap -from datetime import datetime - -import pipenv.vendor.click as click - -from pipenv.patched.safety.constants import RED, YELLOW -from pipenv.patched.safety.util import get_safety_version, Package, get_terminal_size, \ - SafetyContext, build_telemetry_data, build_git_data, is_a_remote_mirror - -LOG = logging.getLogger(__name__) - - -def build_announcements_section_content(announcements, columns=get_terminal_size().columns, - start_line_decorator=' ', end_line_decorator=' '): - section = '' - - for i, announcement in enumerate(announcements): - - color = '' - if announcement.get('type') == 'error': - color = RED - elif announcement.get('type') == 'warning': - color = YELLOW - - item = '{message}'.format( - message=format_long_text('* ' + announcement.get('message'), color, columns, - start_line_decorator, end_line_decorator)) - section += '{item}'.format(item=item) - - if i + 1 < len(announcements): - section += '\n' - - return section - - -def add_empty_line(): - return format_long_text('') - - -def style_lines(lines, columns, pre_processed_text='', start_line=' ' * 4, end_line=' ' * 4): - styled_text = pre_processed_text - - for line in lines: - styled_line = '' - left_padding = ' ' * line.get('left_padding', 0) - - for i, word in enumerate(line.get('words', [])): - if word.get('style', {}): - text = '' - - if i == 0: - text = left_padding # Include the line padding in the word to avoid Github issues - left_padding = '' # Clean left padding to avoid be added two times - - text += word.get('value', '') - - styled_line += click.style(text=text, **word.get('style', {})) - else: - styled_line += word.get('value', '') - - styled_text += format_long_text(styled_line, columns=columns, start_line_decorator=start_line, - end_line_decorator=end_line, - left_padding=left_padding, **line.get('format', {})) + '\n' - - return styled_text - - -def format_vulnerability(vulnerability, full_mode, only_text=False, columns=get_terminal_size().columns): - - common_format = {'left_padding': 3, 'format': {'sub_indent': ' ' * 3, 'max_lines': None}} - - styled_vulnerability = [ - {'words': [{'style': {'bold': True}, 'value': 'Vulnerability ID: '}, {'value': vulnerability.vulnerability_id}]}, - ] - - vulnerability_spec = [ - {'words': [{'style': {'bold': True}, 'value': 'Affected spec: '}, {'value': vulnerability.vulnerable_spec}]}] - - cve = vulnerability.CVE - - cvssv2_line = None - cve_lines = [] - - if cve: - if full_mode and cve.cvssv2: - b = cve.cvssv2.get("base_score", "-") - s = cve.cvssv2.get("impact_score", "-") - v = cve.cvssv2.get("vector_string", "-") - - # Reset sub_indent as the left_margin is going to be applied in this case - cvssv2_line = {'format': {'sub_indent': ''}, 'words': [ - {'value': f'CVSS v2, BASE SCORE {b}, IMPACT SCORE {s}, VECTOR STRING {v}'}, - ]} - - if cve.cvssv3 and "base_severity" in cve.cvssv3.keys(): - cvss_base_severity_style = {'bold': True} - base_severity = cve.cvssv3.get("base_severity", "-") - - if base_severity.upper() in ['HIGH', 'CRITICAL']: - cvss_base_severity_style['fg'] = 'red' - - b = cve.cvssv3.get("base_score", "-") - - if full_mode: - s = cve.cvssv3.get("impact_score", "-") - v = cve.cvssv3.get("vector_string", "-") - - cvssv3_text = f'CVSS v3, BASE SCORE {b}, IMPACT SCORE {s}, VECTOR STRING {v}' - - else: - cvssv3_text = f'CVSS v3, BASE SCORE {b} ' - - cve_lines = [ - {'words': [{'style': {'bold': True}, 'value': '{0} is '.format(cve.name)}, - {'style': cvss_base_severity_style, - 'value': f'{base_severity} SEVERITY => '}, - {'value': cvssv3_text}, - ]}, - ] - - if cvssv2_line: - cve_lines.append(cvssv2_line) - - elif cve.name: - cve_lines = [ - {'words': [{'style': {'bold': True}, 'value': cve.name}]} - ] - - advisory_format = {'sub_indent': ' ' * 3, 'max_lines': None} if full_mode else {'sub_indent': ' ' * 3, - 'max_lines': 2} - - basic_vuln_data_lines = [ - {'format': advisory_format, 'words': [ - {'style': {'bold': True}, 'value': 'ADVISORY: '}, - {'value': vulnerability.advisory.replace('\n', '')}]} - ] - - if SafetyContext().key: - fixed_version_line = {'words': [ - {'style': {'bold': True}, 'value': 'Fixed versions: '}, - {'value': ', '.join(vulnerability.fixed_versions) if vulnerability.fixed_versions else 'No known fix'} - ]} - - basic_vuln_data_lines.append(fixed_version_line) - - more_info_line = [{'words': [{'style': {'bold': True}, 'value': 'For more information, please visit '}, - {'value': click.style(vulnerability.more_info_url)}]}] - - vuln_title = f'-> Vulnerability found in {vulnerability.package_name} version {vulnerability.analyzed_version}\n' - - styled_text = click.style(vuln_title, fg='red') - - to_print = styled_vulnerability - - if not vulnerability.ignored: - to_print += vulnerability_spec + basic_vuln_data_lines + cve_lines - else: - generic_reason = 'This vulnerability is being ignored' - if vulnerability.ignored_expires: - generic_reason += f" until {vulnerability.ignored_expires.strftime('%Y-%m-%d %H:%M:%S UTC')}. " \ - f"See your configurations" - - specific_reason = None - if vulnerability.ignored_reason: - specific_reason = [ - {'words': [{'style': {'bold': True}, 'value': 'Reason: '}, {'value': vulnerability.ignored_reason}]}] - - expire_section = [{'words': [ - {'style': {'bold': True, 'fg': 'green'}, 'value': f'{generic_reason}.'}, ]}] - - if specific_reason: - expire_section += specific_reason - - to_print += expire_section - - if cve: - to_print += more_info_line - - to_print = [{**common_format, **line} for line in to_print] - - content = style_lines(to_print, columns, styled_text, start_line='', end_line='', ) - - return click.unstyle(content) if only_text else content - - -def format_license(license, only_text=False, columns=get_terminal_size().columns): - to_print = [ - {'words': [{'style': {'bold': True}, 'value': license['package']}, - {'value': ' version {0} found using license '.format(license['version'])}, - {'style': {'bold': True}, 'value': license['license']} - ] - }, - ] - - content = style_lines(to_print, columns, '-> ', start_line='', end_line='') - - return click.unstyle(content) if only_text else content - - -def build_remediation_section(remediations, only_text=False, columns=get_terminal_size().columns, kwargs=None): - columns -= 2 - left_padding = ' ' * 3 - - if not kwargs: - # Reset default params in the format_long_text func - kwargs = {'left_padding': '', 'columns': columns, 'start_line_decorator': '', 'end_line_decorator': '', - 'sub_indent': left_padding} - - END_SECTION = '+' + '=' * columns + '+' - - if not remediations: - return [] - - content = '' - total_vulns = 0 - total_packages = len(remediations.keys()) - - for pkg in remediations.keys(): - total_vulns += remediations[pkg]['vulns_found'] - upgrade_to = remediations[pkg]['closest_secure_version']['major'] - downgrade_to = remediations[pkg]['closest_secure_version']['minor'] - fix_version = None - - if upgrade_to: - fix_version = str(upgrade_to) - elif downgrade_to: - fix_version = str(downgrade_to) - - new_line = '\n' - - other_options = [str(fix) for fix in remediations[pkg].get('secure_versions', []) if str(fix) != fix_version] - raw_recommendation = f"We recommend upgrading to version {upgrade_to} of {pkg}." - - if other_options: - raw_other_options = ', '.join(other_options) - raw_pre_other_options = 'Other versions without known vulnerabilities are:' - if len(other_options) == 1: - raw_pre_other_options = 'Other version without known vulnerabilities is' - raw_recommendation = f"{raw_recommendation} {raw_pre_other_options} " \ - f"{raw_other_options}" - - remediation_content = [ - f'{left_padding}The closest version with no known vulnerabilities is ' + click.style(upgrade_to, bold=True), - new_line, - click.style(f'{left_padding}{raw_recommendation}', bold=True, fg='green') - ] - - if not fix_version: - remediation_content = [new_line, - click.style(f'{left_padding}There is no known fix for this vulnerability.', bold=True, fg='yellow')] - - text = 'vulnerabilities' if remediations[pkg]['vulns_found'] > 1 else 'vulnerability' - - raw_rem_title = f"-> {pkg} version {remediations[pkg]['version']} was found, " \ - f"which has {remediations[pkg]['vulns_found']} {text}" - - remediation_title = click.style(raw_rem_title, fg=RED, bold=True) - - content += new_line + format_long_text(remediation_title, **kwargs) + new_line - - pre_content = remediation_content + [ - f"{left_padding}For more information, please visit {remediations[pkg]['more_info_url']}", - f'{left_padding}Always check for breaking changes when upgrading packages.', - new_line] - - for i, element in enumerate(pre_content): - content += format_long_text(element, **kwargs) - - if i + 1 < len(pre_content): - content += '\n' - - title = format_long_text(click.style(f'{left_padding}REMEDIATIONS', fg='green', bold=True), **kwargs) - - body = [content] - - if not is_using_api_key(): - vuln_text = 'vulnerabilities were' if total_vulns != 1 else 'vulnerability was' - pkg_text = 'packages' if total_packages > 1 else 'package' - msg = "{0} {1} found in {2} {3}. " \ - "For detailed remediation & fix recommendations, upgrade to a commercial license."\ - .format(total_vulns, vuln_text, total_packages, pkg_text) - content = '\n' + format_long_text(msg, left_padding=' ', columns=columns) + '\n' - body = [content] - - body.append(END_SECTION) - - content = [title] + body - - if only_text: - content = [click.unstyle(item) for item in content] - - return content - - -def get_final_brief(total_vulns_found, total_remediations, ignored, total_ignored, kwargs=None): - if not kwargs: - kwargs = {} - - total_vulns = max(0, total_vulns_found - total_ignored) - - vuln_text = 'vulnerabilities' if total_ignored > 1 else 'vulnerability' - pkg_text = 'packages were' if len(ignored.keys()) > 1 else 'package was' - - policy_file_text = ' using a safety policy file' if is_using_a_safety_policy_file() else '' - - vuln_brief = f" {total_vulns} vulnerabilit{'y was' if total_vulns == 1 else 'ies were'} found." - ignored_text = f' {total_ignored} {vuln_text} from {len(ignored.keys())} {pkg_text} ignored.' if ignored else '' - remediation_text = f" {total_remediations} remediation{' was' if total_remediations == 1 else 's were'} " \ - f"recommended." if is_using_api_key() else '' - - raw_brief = f"Scan was completed{policy_file_text}.{vuln_brief}{ignored_text}{remediation_text}" - - return format_long_text(raw_brief, start_line_decorator=' ', **kwargs) - - -def get_final_brief_license(licenses, kwargs=None): - if not kwargs: - kwargs = {} - - licenses_text = ' Scan was completed.' - - if licenses: - licenses_text = 'The following software licenses were present in your system: {0}'.format(', '.join(licenses)) - - return format_long_text("{0}".format(licenses_text), start_line_decorator=' ', **kwargs) - - -def format_long_text(text, color='', columns=get_terminal_size().columns, start_line_decorator=' ', end_line_decorator=' ', left_padding='', max_lines=None, styling=None, indent='', sub_indent=''): - if not styling: - styling = {} - - if color: - styling.update({'fg': color}) - - columns -= len(start_line_decorator) + len(end_line_decorator) - formatted_lines = [] - lines = text.replace('\r', '').splitlines() - - for line in lines: - base_format = "{:" + str(columns) + "}" - if line == '': - empty_line = base_format.format(" ") - formatted_lines.append("{0}{1}{2}".format(start_line_decorator, empty_line, end_line_decorator)) - wrapped_lines = textwrap.wrap(line, width=columns, max_lines=max_lines, initial_indent=indent, subsequent_indent=sub_indent, placeholder='...') - for wrapped_line in wrapped_lines: - try: - new_line = left_padding + wrapped_line.encode('utf-8') - except TypeError: - new_line = left_padding + wrapped_line - - if styling: - new_line = click.style(new_line, **styling) - - formatted_lines.append(f"{start_line_decorator}{new_line}{end_line_decorator}") - - return "\n".join(formatted_lines) - - -def get_printable_list_of_scanned_items(scanning_target): - context = SafetyContext() - - result = [] - scanned_items_data = [] - - if scanning_target == 'environment': - locations = set([pkg.found for pkg in context.packages if isinstance(pkg, Package)]) - - for path in locations: - result.append([{'styled': False, 'value': '-> ' + path}]) - scanned_items_data.append(path) - - if len(locations) <= 0: - msg = 'No locations found in the environment' - result.append([{'styled': False, 'value': msg}]) - scanned_items_data.append(msg) - - elif scanning_target == 'stdin': - scanned_stdin = [pkg.name for pkg in context.packages if isinstance(pkg, Package)] - value = 'No found packages in stdin' - scanned_items_data = [value] - - if len(scanned_stdin) > 0: - value = ', '.join(scanned_stdin) - scanned_items_data = scanned_stdin - - result.append( - [{'styled': False, 'value': value}]) - - elif scanning_target == 'files': - for file in context.params.get('files', []): - result.append([{'styled': False, 'value': f'-> {file.name}'}]) - scanned_items_data.append(file.name) - elif scanning_target == 'file': - file = context.params.get('file', None) - name = file.name if file else '' - result.append([{'styled': False, 'value': f'-> {name}'}]) - scanned_items_data.append(name) - - return result, scanned_items_data - - -REPORT_HEADING = format_long_text(click.style('REPORT', bold=True)) - - -def build_report_brief_section(columns=None, primary_announcement=None, report_type=1, **kwargs): - if not columns: - columns = get_terminal_size().columns - - styled_brief_lines = [] - - if primary_announcement: - styled_brief_lines.append( - build_primary_announcement(columns=columns, primary_announcement=primary_announcement)) - - for line in get_report_brief_info(report_type=report_type, **kwargs): - ln = '' - padding = ' ' * 2 - - for i, words in enumerate(line): - processed_words = words.get('value', '') - if words.get('style', False): - text = '' - if i == 0: - text = padding - padding = '' - text += processed_words - - processed_words = click.style(text, bold=True) - - ln += processed_words - - styled_brief_lines.append(format_long_text(ln, color='', columns=columns, start_line_decorator='', - left_padding=padding, end_line_decorator='', sub_indent=' ' * 2)) - - return "\n".join([add_empty_line(), REPORT_HEADING, add_empty_line(), '\n'.join(styled_brief_lines)]) - - -def build_report_for_review_vuln_report(as_dict=False): - ctx = SafetyContext() - report_from_file = ctx.review - packages = ctx.packages - - if as_dict: - return report_from_file - - policy_f_name = report_from_file.get('policy_file', None) - safety_policy_used = [] - if policy_f_name: - safety_policy_used = [ - {'style': False, 'value': '\nScanning using a security policy file'}, - {'style': True, 'value': ' {0}'.format(policy_f_name)}, - ] - - action_executed = [ - {'style': True, 'value': 'Scanning dependencies'}, - {'style': False, 'value': ' in your '}, - {'style': True, 'value': report_from_file.get('scan_target', '-') + ':'}, - ] - - scanned_items = [] - - for name in report_from_file.get('scanned', []): - scanned_items.append([{'styled': False, 'value': '-> ' + name}]) - - nl = [{'style': False, 'value': ''}] - using_sentence = build_using_sentence(report_from_file.get('api_key', None), - report_from_file.get('local_database_path_used', None)) - scanned_count_sentence = build_scanned_count_sentence(packages) - old_timestamp = report_from_file.get('timestamp', None) - - old_timestamp = [{'style': False, 'value': 'Report generated '}, {'style': True, 'value': old_timestamp}] - now = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) - current_timestamp = [{'style': False, 'value': 'Timestamp '}, {'style': True, 'value': now}] - - brief_info = [[{'style': False, 'value': 'Safety '}, - {'style': True, 'value': 'v' + report_from_file.get('safety_version', '-')}, - {'style': False, 'value': ' is scanning for '}, - {'style': True, 'value': 'Vulnerabilities'}, - {'style': True, 'value': '...'}] + safety_policy_used, action_executed - ] + [nl] + scanned_items + [nl] + [using_sentence] + [scanned_count_sentence] + [old_timestamp] + \ - [current_timestamp] - - return brief_info - - -def build_using_sentence(key, db): - key_sentence = [] - custom_integration = os.environ.get('SAFETY_CUSTOM_INTEGRATION', - 'false').lower() == 'true' - - if key: - key_sentence = [{'style': True, 'value': 'an API KEY'}, - {'style': False, 'value': ' and the '}] - db_name = 'PyUp Commercial' - elif db: - if is_a_remote_mirror(db): - if custom_integration: - return [] - db_name = f"remote URL {db}" - else: - db_name = f"local file {db}" - else: - db_name = 'non-commercial' - - database_sentence = [{'style': True, 'value': db_name + ' database'}] - - return [{'style': False, 'value': 'Using '}] + key_sentence + database_sentence - - -def build_scanned_count_sentence(packages): - scanned_count = 'No packages found' - if len(packages) >= 1: - scanned_count = 'Found and scanned {0} {1}'.format(len(packages), - 'packages' if len(packages) > 1 else 'package') - - return [{'style': True, 'value': scanned_count}] - - -def add_warnings_if_needed(brief_info): - ctx = SafetyContext() - warnings = [] - - if ctx.packages: - if ctx.params.get('continue_on_error', False): - warnings += [[{'style': True, - 'value': '* Continue-on-error is enabled, so returning successful (0) exit code in all cases.'}]] - - if ctx.params.get('ignore_severity_rules', False) and not is_using_api_key(): - warnings += [[{'style': True, - 'value': '* Could not filter by severity, please upgrade your account to include severity data.'}]] - - if warnings: - brief_info += [[{'style': False, 'value': ''}]] + warnings - - -def get_report_brief_info(as_dict=False, report_type=1, **kwargs): - LOG.info('get_report_brief_info: %s, %s, %s', as_dict, report_type, kwargs) - - context = SafetyContext() - - packages = [pkg for pkg in context.packages if isinstance(pkg, Package)] - brief_data = {} - command = context.command - - if command == 'review': - review = build_report_for_review_vuln_report(as_dict) - return review - - key = context.key - db = context.db_mirror - - scanning_types = {'check': {'name': 'Vulnerabilities', 'action': 'Scanning dependencies', 'scanning_target': 'environment'}, # Files, Env or Stdin - 'license': {'name': 'Licenses', 'action': 'Scanning licenses', 'scanning_target': 'environment'}, # Files or Env - 'review': {'name': 'Report', 'action': 'Reading the report', - 'scanning_target': 'file'}} # From file - - targets = ['stdin', 'environment', 'files', 'file'] - for target in targets: - if context.params.get(target, False): - scanning_types[command]['scanning_target'] = target - break - - scanning_target = scanning_types.get(context.command, {}).get('scanning_target', '') - brief_data['scan_target'] = scanning_target - scanned_items, data = get_printable_list_of_scanned_items(scanning_target) - brief_data['scanned'] = data - nl = [{'style': False, 'value': ''}] - - action_executed = [ - {'style': True, 'value': scanning_types.get(context.command, {}).get('action', '')}, - {'style': False, 'value': ' in your '}, - {'style': True, 'value': scanning_target + ':'}, - ] - - policy_file = context.params.get('policy_file', None) - safety_policy_used = [] - - brief_data['policy_file'] = policy_file.get('filename', '-') if policy_file else None - brief_data['policy_file_source'] = 'server' if brief_data['policy_file'] and 'server-safety-policy' in brief_data['policy_file'] else 'local' - - if policy_file and policy_file.get('filename', False): - safety_policy_used = [ - {'style': False, 'value': '\nScanning using a security policy file'}, - {'style': True, 'value': ' {0}'.format(policy_file.get('filename', '-'))}, - ] - - audit_and_monitor = [] - if context.params.get('audit_and_monitor'): - logged_url = context.params.get('audit_and_monitor_url') if context.params.get('audit_and_monitor_url') else "https://pyup.io" - audit_and_monitor = [ - {'style': False, 'value': '\nLogging scan results to'}, - {'style': True, 'value': ' {0}'.format(logged_url)}, - ] - - current_time = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) - - brief_data['api_key'] = bool(key) - brief_data['local_database_path'] = db if db else None - brief_data['safety_version'] = get_safety_version() - brief_data['timestamp'] = current_time - brief_data['packages_found'] = len(packages) - # Vuln report - additional_data = [] - if report_type == 1: - brief_data['vulnerabilities_found'] = kwargs.get('vulnerabilities_found', 0) - brief_data['vulnerabilities_ignored'] = kwargs.get('vulnerabilities_ignored', 0) - brief_data['remediations_recommended'] = 0 - - additional_data = [ - [{'style': True, 'value': str(brief_data['vulnerabilities_found'])}, - {'style': True, 'value': f' vulnerabilit{"y" if brief_data["vulnerabilities_found"] == 1 else "ies"} found'}], - [{'style': True, 'value': str(brief_data['vulnerabilities_ignored'])}, - {'style': True, 'value': f' vulnerabilit{"y" if brief_data["vulnerabilities_ignored"] == 1 else "ies"} ignored'}], - ] - - if is_using_api_key(): - brief_data['remediations_recommended'] = kwargs.get('remediations_recommended', 0) - additional_data.extend( - [[{'style': True, 'value': str(brief_data['remediations_recommended'])}, - {'style': True, 'value': - f' remediation{"" if brief_data["remediations_recommended"] == 1 else "s"} recommended'}]]) - - elif report_type == 2: - brief_data['licenses_found'] = kwargs.get('licenses_found', 0) - additional_data = [ - [{'style': True, 'value': str(brief_data['licenses_found'])}, - {'style': True, 'value': f' license {"type" if brief_data["licenses_found"] == 1 else "types"} found'}], - ] - - brief_data['telemetry'] = build_telemetry_data() - - brief_data['git'] = build_git_data() - brief_data['project'] = context.params.get('project', None) - - brief_data['json_version'] = 1 - - using_sentence = build_using_sentence(key, db) - using_sentence_section = [nl] if not using_sentence else [nl] + [build_using_sentence(key, db)] - scanned_count_sentence = build_scanned_count_sentence(packages) - - timestamp = [{'style': False, 'value': 'Timestamp '}, {'style': True, 'value': current_time}] - - brief_info = [[{'style': False, 'value': 'Safety '}, - {'style': True, 'value': 'v' + get_safety_version()}, - {'style': False, 'value': ' is scanning for '}, - {'style': True, 'value': scanning_types.get(context.command, {}).get('name', '')}, - {'style': True, 'value': '...'}] + safety_policy_used + audit_and_monitor, action_executed - ] + [nl] + scanned_items + using_sentence_section + [scanned_count_sentence] + [timestamp] - - brief_info.extend(additional_data) - - add_warnings_if_needed(brief_info) - - LOG.info('Brief info data: %s', brief_data) - LOG.info('Brief info, styled output: %s', '\n\n LINE ---->\n ' + '\n\n LINE ---->\n '.join(map(str, brief_info))) - - return brief_data if as_dict else brief_info - - -def build_primary_announcement(primary_announcement, columns=None, only_text=False): - lines = json.loads(primary_announcement.get('message')) - - for line in lines: - if 'words' not in line: - raise ValueError('Missing words keyword') - if len(line['words']) <= 0: - raise ValueError('No words in this line') - for word in line['words']: - if 'value' not in word or not word['value']: - raise ValueError('Empty word or without value') - - message = style_lines(lines, columns, start_line='', end_line='') - - return click.unstyle(message) if only_text else message - - -def is_using_api_key(): - return bool(SafetyContext().key) - - -def is_using_a_safety_policy_file(): - return bool(SafetyContext().params.get('policy_file', None)) - - -def should_add_nl(output, found_vulns): - if output == 'bare' and not found_vulns: - return False - - return True - diff --git a/pipenv/patched/safety/safety-policy-template.yml b/pipenv/patched/safety/safety-policy-template.yml deleted file mode 100644 index 8efb3158cf..0000000000 --- a/pipenv/patched/safety/safety-policy-template.yml +++ /dev/null @@ -1,81 +0,0 @@ -# Safety Security and License Configuration file -# We recommend checking this file into your source control in the root of your Python project -# If this file is named .safety-policy.yml and is in the same directory where you run `safety check` it will be used by default. -# Otherwise, you can use the flag `safety check --policy-file ` to specify a custom location and name for the file. -# To validate and review your policy file, run the validate command: `safety validate policy_file --path ` -security: # configuration for the `safety check` command - ignore-cvss-severity-below: 0 # A severity number between 0 and 10. Some helpful reference points: 9=ignore all vulnerabilities except CRITICAL severity. 7=ignore all vulnerabilities except CRITICAL & HIGH severity. 4=ignore all vulnerabilities except CRITICAL, HIGH & MEDIUM severity. - ignore-cvss-unknown-severity: False # True or False. We recommend you set this to False. - ignore-vulnerabilities: # Here you can list multiple specific vulnerabilities you want to ignore (optionally for a time period) - # We recommend making use of the optional `reason` and `expires` keys for each vulnerability that you ignore. - 25853: # Example vulnerability ID - reason: we don't use the vulnerable function # optional, for internal note purposes to communicate with your team. This reason will be reported in the Safety reports - expires: '2022-10-21' # datetime string - date this ignore will expire, best practice to use this variable - continue-on-vulnerability-error: False # Suppress non-zero exit codes when vulnerabilities are found. Enable this in pipelines and CI/CD processes if you want to pass builds that have vulnerabilities. We recommend you set this to False. -alert: # configuration for the `safety alert` command - security: - # Configuration specific to Safety's GitHub Issue alerting - github-issue: - # Same as for security - these allow controlling if this alert will fire based - # on severity information. - # default: not set - # ignore-cvss-severity-below: 6 - # ignore-cvss-unknown-severity: False - - # Add a label to pull requests with the cvss severity, if available - # default: true - # label-severity: True - - # Add a label to pull requests, default is 'security' - # requires private repo permissions, even on public repos - # default: security - # labels: - # - security - - # Assign users to pull requests, default is not set - # requires private repo permissions, even on public repos - # default: empty - # assignees: - # - example-user - - # Prefix to give issues when creating them. Note that changing - # this might cause duplicate issues to be created. - # default: "[PyUp] " - # issue-prefix: "[PyUp] " - - # Configuration specific to Safety's GitHub PR alerting - github-pr: - # Same as for security - these allow controlling if this alert will fire based - # on severity information. - # default: not set - # ignore-cvss-severity-below: 6 - # ignore-cvss-unknown-severity: False - - # Set the default branch (ie, main, master) - # default: empty, the default branch on GitHub - branch: '' - - # Add a label to pull requests with the cvss severity, if available - # default: true - # label-severity: True - - # Add a label to pull requests, default is 'security' - # requires private repo permissions, even on public repos - # default: security - # labels: - # - security - - # Assign users to pull requests, default is not set - # requires private repo permissions, even on public repos - # default: empty - # assignees: - # - example-user - - # Configure the branch prefix for PRs created by this alert. - # NB: Changing this will likely cause duplicate PRs. - # default: pyup/ - branch-prefix: pyup/ - - # Set a global prefix for PRs - # default: "[PyUp] " - pr-prefix: "[PyUp] " diff --git a/pipenv/patched/safety/safety.py b/pipenv/patched/safety/safety.py deleted file mode 100644 index e1d415e319..0000000000 --- a/pipenv/patched/safety/safety.py +++ /dev/null @@ -1,617 +0,0 @@ -# -*- coding: utf-8 -*- -import errno -import itertools -import json -import logging -import os -import sys -import time -from datetime import datetime - -import pipenv.patched.pip._vendor.requests as requests -from pipenv.vendor.packaging.specifiers import SpecifierSet -from pipenv.vendor.packaging.utils import canonicalize_name -from pipenv.vendor.packaging.version import parse as parse_version, Version, parse - -from .constants import (API_MIRRORS, CACHE_FILE, OPEN_MIRRORS, REQUEST_TIMEOUT, API_BASE_URL) -from .errors import (DatabaseFetchError, DatabaseFileNotFoundError, - InvalidKeyError, TooManyRequestsError, NetworkConnectionError, - RequestTimeoutError, ServerError, MalformedDatabase) -from .models import Vulnerability, CVE, Severity -from .util import RequirementFile, read_requirements, Package, build_telemetry_data, sync_safety_context, SafetyContext, \ - validate_expiration_date, is_a_remote_mirror - -session = requests.session() - -LOG = logging.getLogger(__name__) - - -def get_from_cache(db_name, cache_valid_seconds=0): - LOG.debug('Trying to get from cache...') - if os.path.exists(CACHE_FILE): - LOG.info('Cache file path: %s', CACHE_FILE) - with open(CACHE_FILE) as f: - try: - data = json.loads(f.read()) - LOG.debug('Trying to get the %s from the cache file', db_name) - LOG.debug('Databases in CACHE file: %s', ', '.join(data)) - if db_name in data: - LOG.debug('db_name %s', db_name) - - if "cached_at" in data[db_name]: - if data[db_name]["cached_at"] + cache_valid_seconds > time.time(): - LOG.debug('Getting the database from cache at %s, cache setting: %s', - data[db_name]["cached_at"], cache_valid_seconds) - return data[db_name]["db"] - - LOG.debug('Cached file is too old, it was cached at %s', data[db_name]["cached_at"]) - else: - LOG.debug('There is not the cached_at key in %s database', data[db_name]) - - except json.JSONDecodeError: - LOG.debug('JSONDecodeError trying to get the cached database.') - else: - LOG.debug("Cache file doesn't exist...") - return False - - -def write_to_cache(db_name, data): - # cache is in: ~/safety/cache.json - # and has the following form: - # { - # "insecure.json": { - # "cached_at": 12345678 - # "db": {} - # }, - # "insecure_full.json": { - # "cached_at": 12345678 - # "db": {} - # }, - # } - if not os.path.exists(os.path.dirname(CACHE_FILE)): - try: - os.makedirs(os.path.dirname(CACHE_FILE)) - with open(CACHE_FILE, "w") as _: - _.write(json.dumps({})) - LOG.debug('Cache file created') - except OSError as exc: # Guard against race condition - LOG.debug('Unable to create the cache file because: %s', exc.errno) - if exc.errno != errno.EEXIST: - raise - - with open(CACHE_FILE, "r") as f: - try: - cache = json.loads(f.read()) - except json.JSONDecodeError: - LOG.debug('JSONDecodeError in the local cache, dumping the full cache file.') - cache = {} - - with open(CACHE_FILE, "w") as f: - cache[db_name] = { - "cached_at": time.time(), - "db": data - } - f.write(json.dumps(cache)) - LOG.debug('Safety updated the cache file for %s database.', db_name) - - -def fetch_database_url(mirror, db_name, key, cached, proxy, telemetry=True): - headers = {} - if key: - headers["X-Api-Key"] = key - - if not proxy: - proxy = {} - - if cached: - cached_data = get_from_cache(db_name=db_name, cache_valid_seconds=cached) - if cached_data: - LOG.info('Database %s returned from cache.', db_name) - return cached_data - url = mirror + db_name - - telemetry_data = {'telemetry': json.dumps(build_telemetry_data(telemetry=telemetry))} - - try: - r = session.get(url=url, timeout=REQUEST_TIMEOUT, headers=headers, proxies=proxy, params=telemetry_data) - except requests.exceptions.ConnectionError: - raise NetworkConnectionError() - except requests.exceptions.Timeout: - raise RequestTimeoutError() - except requests.exceptions.RequestException: - raise DatabaseFetchError() - - if r.status_code == 403: - raise InvalidKeyError(key=key, reason=r.text) - - if r.status_code == 429: - raise TooManyRequestsError(reason=r.text) - - if r.status_code != 200: - raise ServerError(reason=r.reason) - - try: - data = r.json() - except json.JSONDecodeError as e: - raise MalformedDatabase(reason=e) - - if cached: - LOG.info('Writing %s to cache because cached value was %s', db_name, cached) - write_to_cache(db_name, data) - - return data - - -def fetch_policy(key, proxy): - url = f"{API_BASE_URL}policy/" - headers = {"X-Api-Key": key} - - if not proxy: - proxy = {} - - try: - LOG.debug(f'Getting policy') - r = session.get(url=url, timeout=REQUEST_TIMEOUT, headers=headers, proxies=proxy) - LOG.debug(r.text) - return r.json() - except: - import pipenv.vendor.click as click - - LOG.exception("Error fetching policy") - click.secho( - "Warning: couldn't fetch policy from pyup.io.", - fg="yellow", - file=sys.stderr - ) - - return {"safety_policy": "", "audit_and_monitor": False} - - -def post_results(key, proxy, safety_json, policy_file): - url = f"{API_BASE_URL}result/" - headers = {"X-Api-Key": key} - - if not proxy: - proxy = {} - - # safety_json is in text form already. policy_file is a text YAML - audit_report = { - "safety_json": json.loads(safety_json), - "policy_file": policy_file - } - - try: - LOG.debug(f'Posting results: {audit_report}') - r = session.post(url=url, timeout=REQUEST_TIMEOUT, headers=headers, proxies=proxy, json=audit_report) - LOG.debug(r.text) - - return r.json() - except: - import pipenv.vendor.click as click - - LOG.exception("Error posting results") - click.secho( - "Warning: couldn't upload results to pyup.io.", - fg="yellow", - file=sys.stderr - ) - - return {} - - -def fetch_database_file(path, db_name): - full_path = os.path.join(path, db_name) - if not os.path.exists(full_path): - raise DatabaseFileNotFoundError(db=path) - with open(full_path) as f: - return json.loads(f.read()) - - -def fetch_database(full=False, key=False, db=False, cached=0, proxy=None, telemetry=True): - if key: - mirrors = API_MIRRORS - elif db: - mirrors = [db] - else: - mirrors = OPEN_MIRRORS - - db_name = "insecure_full.json" if full else "insecure.json" - for mirror in mirrors: - # mirror can either be a local path or a URL - if is_a_remote_mirror(mirror): - data = fetch_database_url(mirror, db_name=db_name, key=key, cached=cached, proxy=proxy, telemetry=telemetry) - else: - data = fetch_database_file(mirror, db_name=db_name) - if data: - return data - raise DatabaseFetchError() - - -def get_vulnerabilities(pkg, spec, db): - for entry in db[pkg]: - for entry_spec in entry["specs"]: - if entry_spec == spec: - yield entry - - -def get_vulnerability_from(vuln_id, cve, data, specifier, db, name, pkg, ignore_vulns): - base_domain = db.get('$meta', {}).get('base_domain') - pkg_meta = db.get('$meta', {}).get('packages', {}).get(name, {}) - insecure_versions = pkg_meta.get("insecure_versions", []) - secure_versions = pkg_meta.get("secure_versions", []) - latest_version_without_known_vulnerabilities = pkg_meta.get("latest_secure_version", None) - latest_version = pkg_meta.get("latest_version", None) - pkg_refreshed = pkg._replace(insecure_versions=insecure_versions, secure_versions=secure_versions, - latest_version_without_known_vulnerabilities=latest_version_without_known_vulnerabilities, - latest_version=latest_version, - more_info_url=f"{base_domain}{pkg_meta.get('more_info_path', '')}") - - ignored = (ignore_vulns and vuln_id in ignore_vulns and ( - not ignore_vulns[vuln_id]['expires'] or ignore_vulns[vuln_id]['expires'] > datetime.utcnow())) - more_info_url = f"{base_domain}{data.get('more_info_path', '')}" - severity = None - - if cve and (cve.cvssv2 or cve.cvssv3): - severity = Severity(source=cve.name, cvssv2=cve.cvssv2, cvssv3=cve.cvssv3) - - return Vulnerability( - vulnerability_id=vuln_id, - package_name=name, - pkg=pkg_refreshed, - ignored=ignored, - ignored_reason=ignore_vulns.get(vuln_id, {}).get('reason', None) if ignore_vulns else None, - ignored_expires=ignore_vulns.get(vuln_id, {}).get('expires', None) if ignore_vulns else None, - vulnerable_spec=specifier, - all_vulnerable_specs=data.get("specs", []), - analyzed_version=pkg_refreshed.version, - advisory=data.get("advisory"), - is_transitive=data.get("transitive", False), - published_date=data.get("published_date"), - fixed_versions=[ver for ver in data.get("fixed_versions", []) if ver], - closest_versions_without_known_vulnerabilities=data.get("closest_secure_versions", []), - resources=data.get("vulnerability_resources"), - CVE=cve, - severity=severity, - affected_versions=data.get("affected_versions", []), - more_info_url=more_info_url - ) - - -def get_cve_from(data, db_full): - cve_data = data.get("cve", '') - - if not cve_data: - return None - - cve_id = cve_data.split(",")[0].strip() - cve_meta = db_full.get("$meta", {}).get("cve", {}).get(cve_id, {}) - return CVE(name=cve_id, cvssv2=cve_meta.get("cvssv2", None), - cvssv3=cve_meta.get("cvssv3", None)) - - -def ignore_vuln_if_needed(vuln_id, cve, ignore_vulns, ignore_severity_rules): - - if not ignore_severity_rules or not isinstance(ignore_vulns, dict): - return - - severity = None - - if cve: - if cve.cvssv2 and cve.cvssv2.get("base_score", None): - severity = cve.cvssv2.get("base_score", None) - - if cve.cvssv3 and cve.cvssv3.get("base_score", None): - severity = cve.cvssv3.get("base_score", None) - - ignore_severity_below = float(ignore_severity_rules.get('ignore-cvss-severity-below', 0.0)) - ignore_unknown_severity = bool(ignore_severity_rules.get('ignore-cvss-unknown-severity', False)) - - if severity: - if float(severity) < ignore_severity_below: - reason = 'Ignored by severity rule in policy file, {0} < {1}'.format(float(severity), - ignore_severity_below) - ignore_vulns[vuln_id] = {'reason': reason, 'expires': None} - elif ignore_unknown_severity: - reason = 'Unknown CVSS severity, ignored by severity rule in policy file.' - ignore_vulns[vuln_id] = {'reason': reason, 'expires': None} - - -@sync_safety_context -def check(packages, key=False, db_mirror=False, cached=0, ignore_vulns=None, ignore_severity_rules=None, proxy=None, - include_ignored=False, is_env_scan=True, telemetry=True, params=None, project=None): - SafetyContext().command = 'check' - db = fetch_database(key=key, db=db_mirror, cached=cached, proxy=proxy, telemetry=telemetry) - db_full = None - vulnerable_packages = frozenset(db.keys()) - vulnerabilities = [] - - for pkg in packages: - # Ignore recursive files not resolved - if isinstance(pkg, RequirementFile): - continue - - # normalize the package name, the safety-db is converting underscores to dashes and uses - # lowercase - name = canonicalize_name(pkg.name) - - if name in vulnerable_packages: - # we have a candidate here, build the spec set - for specifier in db[name]: - spec_set = SpecifierSet(specifiers=specifier) - if spec_set.contains(pkg.version): - if not db_full: - db_full = fetch_database(full=True, key=key, db=db_mirror, cached=cached, proxy=proxy, - telemetry=telemetry) - for data in get_vulnerabilities(pkg=name, spec=specifier, db=db_full): - vuln_id = data.get("id").replace("pyup.io-", "") - cve = get_cve_from(data, db_full) - - ignore_vuln_if_needed(vuln_id, cve, ignore_vulns, ignore_severity_rules) - - vulnerability = get_vulnerability_from(vuln_id, cve, data, specifier, db_full, name, pkg, - ignore_vulns) - - should_add_vuln = not (vulnerability.is_transitive and is_env_scan) - - if (include_ignored or vulnerability.vulnerability_id not in ignore_vulns) and should_add_vuln: - vulnerabilities.append(vulnerability) - - return vulnerabilities, db_full - - -def precompute_remediations(remediations, package_metadata, vulns, - ignored_vulns): - for vuln in vulns: - if vuln.ignored: - ignored_vulns.add(vuln.vulnerability_id) - continue - - if vuln.package_name in remediations.keys(): - remediations[vuln.package_name]['vulns_found'] = remediations[vuln.package_name].get('vulns_found', 0) + 1 - else: - vulns_count = 1 - package_metadata[vuln.package_name] = {'insecure_versions': vuln.pkg.insecure_versions, - 'secure_versions': vuln.pkg.secure_versions, 'version': vuln.pkg.version} - remediations[vuln.package_name] = {'vulns_found': vulns_count, 'version': vuln.pkg.version, - 'more_info_url': vuln.pkg.more_info_url} - - -def get_closest_ver(versions, version): - results = {'minor': None, 'major': None} - if not version or not versions: - return results - - sorted_versions = sorted(versions, key=lambda ver: parse_version(ver), reverse=True) - - for v in sorted_versions: - index = parse_version(v) - current_v = parse_version(version) - - if index > current_v: - results['major'] = index - - if index < current_v: - results['minor'] = index - break - - return results - - -def compute_sec_ver_for_user(package, ignored_vulns, db_full): - pkg_meta = db_full.get('$meta', {}).get('packages', {}).get(package, {}) - versions = set(pkg_meta.get("insecure_versions", []) + pkg_meta.get("secure_versions", [])) - affected_versions = [] - - for vuln in db_full.get(package, []): - vuln_id = vuln.get('id', None) - if vuln_id and vuln_id not in ignored_vulns: - affected_versions += vuln.get('affected_versions', []) - - affected_v = set(affected_versions) - sec_ver_for_user = list(versions.difference(affected_v)) - - return sorted(sec_ver_for_user, key=lambda ver: parse_version(ver), reverse=True) - - -def compute_sec_ver(remediations, package_metadata, ignored_vulns, db_full): - """ - Compute the secure_versions and the closest_secure_version for each remediation using the affected_versions - of each no ignored vulnerability of the same package, there is only a remediation for each package. - """ - for pkg_name in remediations.keys(): - pkg = package_metadata.get(pkg_name, {}) - - if not ignored_vulns: - secure_v = pkg.get('secure_versions', []) - else: - secure_v = compute_sec_ver_for_user(package=pkg_name, ignored_vulns=ignored_vulns, db_full=db_full) - - remediations[pkg_name]['secure_versions'] = secure_v - remediations[pkg_name]['closest_secure_version'] = get_closest_ver(secure_v, - pkg.get('version', None)) - - -def calculate_remediations(vulns, db_full): - remediations = {} - package_metadata = {} - ignored_vulns = set() - - if not db_full: - return remediations - - precompute_remediations(remediations, package_metadata, vulns, ignored_vulns) - compute_sec_ver(remediations, package_metadata, ignored_vulns, db_full) - - return remediations - - -@sync_safety_context -def review(report=None, params=None): - SafetyContext().command = 'review' - vulnerable = [] - vulnerabilities = report.get('vulnerabilities', []) + report.get('ignored_vulnerabilities', []) - remediations = {} - - for key, value in report.get('remediations', {}).items(): - recommended = value.get('recommended_version', None) - secure_v = value.get('other_recommended_versions', []) - major = None - if recommended: - secure_v.append(recommended) - major = parse(recommended) - - remediations[key] = {'vulns_found': value.get('vulnerabilities_found', 0), - 'version': value.get('current_version'), - 'secure_versions': secure_v, - 'closest_secure_version': {'major': major, 'minor': None}, - # minor isn't supported in review - 'more_info_url': value.get('more_info_url')} - - packages = report.get('scanned_packages', []) - pkgs = {pkg_name: Package(**pkg_values) for pkg_name, pkg_values in packages.items()} - ctx = SafetyContext() - found_packages = list(pkgs.values()) - ctx.packages = found_packages - ctx.review = report.get('report_meta', []) - ctx.key = ctx.review.get('api_key', False) - cvssv2 = None - cvssv3 = None - - for vuln in vulnerabilities: - vuln['pkg'] = pkgs.get(vuln.get('package_name', None)) - XVE_ID = vuln.get('CVE', None) # Trying to get first the CVE ID - - severity = vuln.get('severity', None) - if severity and severity.get('source', False): - cvssv2 = severity.get('cvssv2', None) - cvssv3 = severity.get('cvssv3', None) - # Trying to get the PVE ID if it exists, otherwise it will be the same CVE ID of above - XVE_ID = severity.get('source', False) - vuln['severity'] = Severity(source=XVE_ID, cvssv2=cvssv2, cvssv3=cvssv3) - else: - vuln['severity'] = None - - ignored_expires = vuln.get('ignored_expires', None) - - if ignored_expires: - vuln['ignored_expires'] = validate_expiration_date(ignored_expires) - - vuln['CVE'] = CVE(name=XVE_ID, cvssv2=cvssv2, cvssv3=cvssv3) if XVE_ID else None - - vulnerable.append(Vulnerability(**vuln)) - - return vulnerable, remediations, found_packages - - -@sync_safety_context -def get_licenses(key=False, db_mirror=False, cached=0, proxy=None, telemetry=True): - key = key if key else os.environ.get("SAFETY_API_KEY", False) - - if not key and not db_mirror: - raise InvalidKeyError(message="The API-KEY was not provided.") - if db_mirror: - mirrors = [db_mirror] - else: - mirrors = API_MIRRORS - - db_name = "licenses.json" - - for mirror in mirrors: - # mirror can either be a local path or a URL - if is_a_remote_mirror(mirror): - licenses = fetch_database_url(mirror, db_name=db_name, key=key, cached=cached, proxy=proxy, - telemetry=telemetry) - else: - licenses = fetch_database_file(mirror, db_name=db_name) - if licenses: - return licenses - raise DatabaseFetchError() - - -def get_announcements(key, proxy, telemetry=True): - LOG.info('Getting announcements') - - announcements = [] - headers = {} - - if key: - headers["X-Api-Key"] = key - - url = f"{API_BASE_URL}announcements/" - method = 'post' - data = build_telemetry_data(telemetry=telemetry) - request_kwargs = {'headers': headers, 'proxies': proxy, 'timeout': 3} - data_keyword = 'json' - - source = os.environ.get('SAFETY_ANNOUNCEMENTS_URL', None) - - if source: - LOG.debug(f'Getting the announcement from a different source: {source}') - url = source - method = 'get' - data = { - 'telemetry': json.dumps(data)} - data_keyword = 'params' - - request_kwargs[data_keyword] = data - request_kwargs['url'] = url - - LOG.debug(f'Telemetry data sent: {data}') - - try: - request_func = getattr(session, method) - r = request_func(**request_kwargs) - LOG.debug(r.text) - except Exception as e: - LOG.info('Unexpected but HANDLED Exception happened getting the announcements: %s', e) - return announcements - - if r.status_code == 200: - try: - announcements = r.json() - if 'announcements' in announcements.keys(): - announcements = announcements.get('announcements', []) - else: - LOG.info('There is not announcements key in the JSON response, is this a wrong structure?') - announcements = [] - - except json.JSONDecodeError as e: - LOG.info('Unexpected but HANDLED Exception happened decoding the announcement response: %s', e) - - LOG.info('Announcements fetched') - - return announcements - - -def get_packages(files=False, stdin=False): - - if files: - return list(itertools.chain.from_iterable(read_requirements(f, resolve=True) for f in files)) - - if stdin: - return list(read_requirements(sys.stdin)) - - import pipenv.patched.pip._vendor.pkg_resources as pkg_resources - - return [ - Package(name=d.key, version=d.version, found=d.location, insecure_versions=[], secure_versions=[], - latest_version=None, latest_version_without_known_vulnerabilities=None, more_info_url=None) for d in - pkg_resources.working_set - if d.key not in {"python", "wsgiref", "argparse"} - ] - - -def read_vulnerabilities(fh): - try: - data = json.load(fh) - except json.JSONDecodeError as e: - raise MalformedDatabase(reason=e, fetched_from=fh.name) - except TypeError as e: - raise MalformedDatabase(reason=e, fetched_from=fh.name) - - return data - - -def close_session(): - LOG.debug('Closing requests session.') - session.close() diff --git a/pipenv/patched/safety/util.py b/pipenv/patched/safety/util.py deleted file mode 100644 index 7b5f6525a8..0000000000 --- a/pipenv/patched/safety/util.py +++ /dev/null @@ -1,669 +0,0 @@ -import json -import logging -import os -import platform - -import sys -from datetime import datetime -from difflib import SequenceMatcher -from threading import Lock -from typing import List - -import pipenv.vendor.click as click -from pipenv.vendor.click import BadParameter -from pipenv.vendor.dparse import parse, filetypes -from pipenv.vendor.packaging.utils import canonicalize_name -from pipenv.vendor.packaging.version import parse as parse_version -from pipenv.vendor.ruamel.yaml import YAML -from pipenv.vendor.ruamel.yaml.error import MarkedYAMLError - -from pipenv.patched.safety.constants import EXIT_CODE_FAILURE, EXIT_CODE_OK -from pipenv.patched.safety.models import Package, RequirementFile - -LOG = logging.getLogger(__name__) - - -def is_a_remote_mirror(mirror): - return mirror.startswith("http://") or mirror.startswith("https://") - - -def is_supported_by_parser(path): - supported_types = (".txt", ".in", ".yml", ".ini", "Pipfile", - "Pipfile.lock", "setup.cfg", "poetry.lock") - return path.endswith(supported_types) - - -def read_requirements(fh, resolve=True): - """ - Reads requirements from a file like object and (optionally) from referenced files. - :param fh: file like object to read from - :param resolve: boolean. resolves referenced files. - :return: generator - """ - is_temp_file = not hasattr(fh, 'name') - path = None - found = 'temp_file' - file_type = filetypes.requirements_txt - - if not is_temp_file and is_supported_by_parser(fh.name): - LOG.debug('not temp and a compatible file') - path = fh.name - found = path - file_type = None - - LOG.debug(f'Path: {path}') - LOG.debug(f'File Type: {file_type}') - LOG.debug('Trying to parse file using dparse...') - content = fh.read() - LOG.debug(f'Content: {content}') - dependency_file = parse(content, path=path, resolve=resolve, - file_type=file_type) - LOG.debug(f'Dependency file: {dependency_file.serialize()}') - LOG.debug(f'Parsed, dependencies: {[dep.serialize() for dep in dependency_file.resolved_dependencies]}') - for dep in dependency_file.resolved_dependencies: - try: - spec = next(iter(dep.specs))._spec - except StopIteration: - click.secho( - f"Warning: unpinned requirement '{dep.name}' found in {path}, " - "unable to check.", - fg="yellow", - file=sys.stderr - ) - return - - version = spec[1] - if spec[0] == '==': - yield Package(name=dep.name, version=version, - found=found, - insecure_versions=[], - secure_versions=[], latest_version=None, - latest_version_without_known_vulnerabilities=None, - more_info_url=None) - - -def get_proxy_dict(proxy_protocol, proxy_host, proxy_port): - if proxy_protocol and proxy_host and proxy_port: - # Safety only uses https request, so only https dict will be passed to requests - return {'https': f"{proxy_protocol}://{proxy_host}:{str(proxy_port)}"} - return None - - -def get_license_name_by_id(license_id, db): - licenses = db.get('licenses', []) - for name, id in licenses.items(): - if id == license_id: - return name - return None - - -def get_flags_from_context(): - flags = {} - context = click.get_current_context(silent=True) - - if context: - for option in context.command.params: - flags_per_opt = option.opts + option.secondary_opts - for flag in flags_per_opt: - flags[flag] = option.name - - return flags - - -def get_used_options(): - flags = get_flags_from_context() - used_options = {} - - for arg in sys.argv: - cleaned_arg = arg if '=' not in arg else arg.split('=')[0] - if cleaned_arg in flags: - option_used = flags.get(cleaned_arg) - - if option_used in used_options: - used_options[option_used][cleaned_arg] = used_options[option_used].get(cleaned_arg, 0) + 1 - else: - used_options[option_used] = {cleaned_arg: 1} - - return used_options - - -def get_safety_version(): - from pipenv.patched.safety import VERSION - return VERSION - - -def get_primary_announcement(announcements): - for announcement in announcements: - if announcement.get('type', '').lower() == 'primary_announcement': - try: - from pipenv.patched.safety.output_utils import build_primary_announcement - build_primary_announcement(announcement, columns=80) - except Exception as e: - LOG.debug(f'Failed to build primary announcement: {str(e)}') - return None - - return announcement - - return None - - -def get_basic_announcements(announcements): - return [announcement for announcement in announcements if - announcement.get('type', '').lower() != 'primary_announcement'] - - -def filter_announcements(announcements, by_type='error'): - return [announcement for announcement in announcements if - announcement.get('type', '').lower() == by_type] - - -def build_telemetry_data(telemetry=True): - context = SafetyContext() - - body = { - 'os_type': os.environ.get("SAFETY_OS_TYPE", None) or platform.system(), - 'os_release': os.environ.get("SAFETY_OS_RELEASE", None) or platform.release(), - 'os_description': os.environ.get("SAFETY_OS_DESCRIPTION", None) or platform.platform(), - 'python_version': platform.python_version(), - 'safety_command': context.command, - 'safety_options': get_used_options() - } if telemetry else {} - - body['safety_version'] = get_safety_version() - body['safety_source'] = os.environ.get("SAFETY_SOURCE", None) or context.safety_source - - LOG.debug(f'Telemetry body built: {body}') - - return body - - -def build_git_data(): - import subprocess - - def git_command(commandline): - return subprocess.run(commandline, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL).stdout.decode('utf-8').strip() - - try: - is_git = git_command(["git", "rev-parse", "--is-inside-work-tree"]) - except Exception: - is_git = False - - if is_git == "true": - result = { - "branch": "", - "tag": "", - "commit": "", - "dirty": "", - "origin": "" - } - - try: - result['branch'] = git_command(["git", "symbolic-ref", "--short", "-q", "HEAD"]) - result['tag'] = git_command(["git", "describe", "--tags", "--exact-match"]) - - commit = git_command(["git", "describe", '--match=""', '--always', '--abbrev=40', '--dirty']) - result['dirty'] = commit.endswith('-dirty') - result['commit'] = commit.split("-dirty")[0] - - result['origin'] = git_command(["git", "remote", "get-url", "origin"]) - except Exception: - pass - - return result - else: - return { - "error": "not-git-repo" - } - - -def output_exception(exception, exit_code_output=True): - click.secho(str(exception), fg="red", file=sys.stderr) - - if exit_code_output: - exit_code = EXIT_CODE_FAILURE - if hasattr(exception, 'get_exit_code'): - exit_code = exception.get_exit_code() - else: - exit_code = EXIT_CODE_OK - - sys.exit(exit_code) - - -def get_processed_options(policy_file, ignore, ignore_severity_rules, exit_code): - if policy_file: - security = policy_file.get('security', {}) - source = click.get_current_context().get_parameter_source("exit_code") - - if not ignore: - ignore = security.get('ignore-vulnerabilities', {}) - if source == click.core.ParameterSource.DEFAULT: - exit_code = not security.get('continue-on-vulnerability-error', False) - ignore_cvss_below = security.get('ignore-cvss-severity-below', 0.0) - ignore_cvss_unknown = security.get('ignore-cvss-unknown-severity', False) - ignore_severity_rules = {'ignore-cvss-severity-below': ignore_cvss_below, - 'ignore-cvss-unknown-severity': ignore_cvss_unknown} - - return ignore, ignore_severity_rules, exit_code - - -class MutuallyExclusiveOption(click.Option): - def __init__(self, *args, **kwargs): - self.mutually_exclusive = set(kwargs.pop('mutually_exclusive', [])) - self.with_values = kwargs.pop('with_values', {}) - help = kwargs.get('help', '') - if self.mutually_exclusive: - ex_str = ', '.join(["{0} with values {1}".format(item, self.with_values.get(item)) if item in self.with_values else item for item in self.mutually_exclusive]) - kwargs['help'] = help + ( - ' NOTE: This argument is mutually exclusive with ' - ' arguments: [' + ex_str + '].' - ) - super(MutuallyExclusiveOption, self).__init__(*args, **kwargs) - - def handle_parse_result(self, ctx, opts, args): - m_exclusive_used = self.mutually_exclusive.intersection(opts) - option_used = m_exclusive_used and self.name in opts - - exclusive_value_used = False - for used in m_exclusive_used: - value_used = opts.get(used, None) - if not isinstance(value_used, List): - value_used = [value_used] - if value_used and set(self.with_values.get(used, [])).intersection(value_used): - exclusive_value_used = True - - if option_used and (not self.with_values or exclusive_value_used): - options = ', '.join(self.opts) - prohibited = ''.join(["\n * --{0} with {1}".format(item, self.with_values.get( - item)) if item in self.with_values else f"\n * {item}" for item in self.mutually_exclusive]) - raise click.UsageError( - f"Illegal usage: `{options}` is mutually exclusive with: {prohibited}" - ) - - return super(MutuallyExclusiveOption, self).handle_parse_result( - ctx, - opts, - args - ) - - -class DependentOption(click.Option): - def __init__(self, *args, **kwargs): - self.required_options = set(kwargs.pop('required_options', [])) - help = kwargs.get('help', '') - if self.required_options: - ex_str = ', '.join(self.required_options) - kwargs['help'] = help + ( - ' NOTE: This argument requires the following flags ' - ' [' + ex_str + '].' - ) - super(DependentOption, self).__init__(*args, **kwargs) - - def handle_parse_result(self, ctx, opts, args): - missing_required_arguments = self.required_options.difference(opts) and self.name in opts - - if missing_required_arguments: - raise click.UsageError( - "Illegal usage: `{}` needs the " - "arguments `{}`.".format( - self.name, - ', '.join(missing_required_arguments) - ) - ) - - return super(DependentOption, self).handle_parse_result( - ctx, - opts, - args - ) - - -def transform_ignore(ctx, param, value): - if isinstance(value, tuple): - return dict(zip(value, [{'reason': '', 'expires': None} for _ in range(len(value))])) - - return {} - - -def active_color_if_needed(ctx, param, value): - if value == 'screen': - ctx.color = True - - color = os.environ.get("SAFETY_COLOR", None) - - if color is not None: - color = color.lower() - - if color == '1' or color == 'true': - ctx.color = True - elif color == '0' or color == 'false': - ctx.color = False - - return value - - -def json_alias(ctx, param, value): - if value: - os.environ['SAFETY_OUTPUT'] = 'json' - return value - - -def bare_alias(ctx, param, value): - if value: - os.environ['SAFETY_OUTPUT'] = 'bare' - return value - - -def get_terminal_size(): - from shutil import get_terminal_size as t_size - # get_terminal_size can report 0, 0 if run from pseudo-terminal prior Python 3.11 versions - - columns = t_size().columns or 80 - lines = t_size().lines or 24 - - return os.terminal_size((columns, lines)) - - -def validate_expiration_date(expiration_date): - d = None - - if expiration_date: - try: - d = datetime.strptime(expiration_date, '%Y-%m-%d') - except ValueError as e: - pass - - try: - d = datetime.strptime(expiration_date, '%Y-%m-%d %H:%M:%S') - except ValueError as e: - pass - - return d - - -class SafetyPolicyFile(click.ParamType): - """ - Custom Safety Policy file to hold validations - """ - - name = "filename" - envvar_list_splitter = os.path.pathsep - - def __init__( - self, - mode: str = "r", - encoding: str = None, - errors: str = "strict", - pure: bool = os.environ.get('SAFETY_PURE_YAML', 'false').lower() == 'true' - ) -> None: - self.mode = mode - self.encoding = encoding - self.errors = errors - self.basic_msg = '\n' + click.style('Unable to load the Safety Policy file "{name}".', fg='red') - self.pure = pure - - def to_info_dict(self): - info_dict = super().to_info_dict() - info_dict.update(mode=self.mode, encoding=self.encoding) - return info_dict - - def fail_if_unrecognized_keys(self, used_keys, valid_keys, param=None, ctx=None, msg='{hint}', context_hint=''): - for keyword in used_keys: - if keyword not in valid_keys: - match = None - max_ratio = 0.0 - if isinstance(keyword, str): - for option in valid_keys: - ratio = SequenceMatcher(None, keyword, option).ratio() - if ratio > max_ratio: - match = option - max_ratio = ratio - - maybe_msg = f' Maybe you meant: {match}' if max_ratio > 0.7 else \ - f' Valid keywords in this level are: {", ".join(valid_keys)}' - - self.fail(msg.format(hint=f'{context_hint}"{keyword}" is not a valid keyword.{maybe_msg}'), param, ctx) - - def fail_if_wrong_bool_value(self, keyword, value, msg='{hint}'): - if value is not None and not isinstance(value, bool): - self.fail(msg.format(hint=f"'{keyword}' value needs to be a boolean. " - "You can use True, False, TRUE, FALSE, true or false")) - - def convert(self, value, param, ctx): - try: - - if hasattr(value, "read") or hasattr(value, "write"): - return value - - msg = self.basic_msg.format(name=value) + '\n' + click.style('HINT:', fg='yellow') + ' {hint}' - - f, _ = click.types.open_stream( - value, self.mode, self.encoding, self.errors, atomic=False - ) - filename = '' - - try: - raw = f.read() - yaml = YAML(typ='safe', pure=self.pure) - safety_policy = yaml.load(raw) - filename = f.name - f.close() - except Exception as e: - show_parsed_hint = isinstance(e, MarkedYAMLError) - hint = str(e) - if show_parsed_hint: - hint = f'{str(e.problem).strip()} {str(e.context).strip()} {str(e.context_mark).strip()}' - - self.fail(msg.format(name=value, hint=hint), param, ctx) - - if not safety_policy or not isinstance(safety_policy, dict) or not safety_policy.get('security', None): - self.fail( - msg.format(hint='you are missing the security root tag'), param, ctx) - - security_config = safety_policy.get('security', {}) - security_keys = ['ignore-cvss-severity-below', 'ignore-cvss-unknown-severity', 'ignore-vulnerabilities', - 'continue-on-vulnerability-error'] - used_keys = security_config.keys() - - self.fail_if_unrecognized_keys(used_keys, security_keys, param=param, ctx=ctx, msg=msg, - context_hint='"security" -> ') - - ignore_cvss_security_below = security_config.get('ignore-cvss-severity-below', None) - - if ignore_cvss_security_below: - limit = 0.0 - - try: - limit = float(ignore_cvss_security_below) - except ValueError as e: - self.fail(msg.format(hint="'ignore-cvss-severity-below' value needs to be an integer or float.")) - - if limit < 0 or limit > 10: - self.fail(msg.format(hint="'ignore-cvss-severity-below' needs to be a value between 0 and 10")) - - continue_on_vulnerability_error = security_config.get('continue-on-vulnerability-error', None) - self.fail_if_wrong_bool_value('continue-on-vulnerability-error', continue_on_vulnerability_error, msg) - - ignore_cvss_unknown_severity = security_config.get('ignore-cvss-unknown-severity', None) - self.fail_if_wrong_bool_value('ignore-cvss-unknown-severity', ignore_cvss_unknown_severity, msg) - - ignore_vulns = safety_policy.get('security', {}).get('ignore-vulnerabilities', {}) - - if ignore_vulns: - if not isinstance(ignore_vulns, dict): - self.fail(msg.format(hint="Vulnerability IDs under the 'ignore-vulnerabilities' key, need to " - "follow the convention 'ID_NUMBER:', probably you are missing a colon.")) - - normalized = {} - - for ignored_vuln_id, config in ignore_vulns.items(): - ignored_vuln_config = config if config else {} - - if not isinstance(ignored_vuln_config, dict): - self.fail( - msg.format(hint=f"Wrong configuration under the vulnerability with ID: {ignored_vuln_id}")) - - context_msg = f'"security" -> "ignore-vulnerabilities" -> "{ignored_vuln_id}" -> ' - - self.fail_if_unrecognized_keys(ignored_vuln_config.keys(), ['reason', 'expires'], param=param, - ctx=ctx, msg=msg, context_hint=context_msg) - - reason = ignored_vuln_config.get('reason', '') - reason = str(reason) if reason else None - expires = ignored_vuln_config.get('expires', '') - expires = str(expires) if expires else None - - try: - if int(ignored_vuln_id) < 0: - raise ValueError('Negative Vulnerability ID') - except ValueError as e: - self.fail(msg.format( - hint=f"vulnerability id {ignored_vuln_id} under the 'ignore-vulnerabilities' root needs to " - f"be a positive integer") - ) - - # Validate expires - d = validate_expiration_date(expires) - - if expires and not d: - self.fail(msg.format(hint=f"{context_msg}expires: \"{expires}\" isn't a valid format " - f"for the expires keyword, " - "valid options are: YYYY-MM-DD or " - "YYYY-MM-DD HH:MM:SS") - ) - - normalized[str(ignored_vuln_id)] = {'reason': reason, 'expires': d} - - safety_policy['security']['ignore-vulnerabilities'] = normalized - safety_policy['filename'] = filename - safety_policy['raw'] = raw - else: - safety_policy['security']['ignore-vulnerabilities'] = {} - - return safety_policy - except BadParameter as expected_e: - raise expected_e - except Exception as e: - # Don't fail in the default case - if ctx and isinstance(e, OSError): - source = ctx.get_parameter_source("policy_file") - if e.errno == 2 and source == click.core.ParameterSource.DEFAULT and value == '.safety-policy.yml': - return None - - problem = click.style("Policy file YAML is not valid.") - hint = click.style("HINT: ", fg='yellow') + str(e) - self.fail(f"{problem}\n{hint}", param, ctx) - - def shell_complete( - self, ctx: "Context", param: "Parameter", incomplete: str - ): - """Return a special completion marker that tells the completion - system to use the shell to provide file path completions. - - :param ctx: Invocation context for this command. - :param param: The parameter that is requesting completion. - :param incomplete: Value being completed. May be empty. - - .. versionadded:: 8.0 - """ - from pipenv.vendor.click.shell_completion import CompletionItem - - return [CompletionItem(incomplete, type="file")] - - -class SingletonMeta(type): - - _instances = {} - - _lock = Lock() - - def __call__(cls, *args, **kwargs): - with cls._lock: - if cls not in cls._instances: - instance = super().__call__(*args, **kwargs) - cls._instances[cls] = instance - return cls._instances[cls] - - -class SafetyContext(metaclass=SingletonMeta): - packages = None - key = False - db_mirror = False - cached = None - ignore_vulns = None - ignore_severity_rules = None - proxy = None - include_ignored = False - telemetry = None - files = None - stdin = None - is_env_scan = None - command = None - review = None - params = {} - safety_source = 'code' - - -def sync_safety_context(f): - def new_func(*args, **kwargs): - ctx = SafetyContext() - - for attr in dir(ctx): - if attr in kwargs: - setattr(ctx, attr, kwargs.get(attr)) - - return f(*args, **kwargs) - - return new_func - - -@sync_safety_context -def get_packages_licenses(packages=None, licenses_db=None): - """Get the licenses for the specified packages based on their version. - - :param packages: packages list - :param licenses_db: the licenses db in the raw form. - :return: list of objects with the packages and their respectives licenses. - """ - SafetyContext().command = 'license' - - if not packages: - packages = [] - if not licenses_db: - licenses_db = {} - - packages_licenses_db = licenses_db.get('packages', {}) - filtered_packages_licenses = [] - - for pkg in packages: - # Ignore recursive files not resolved - if isinstance(pkg, RequirementFile): - continue - # normalize the package name - pkg_name = canonicalize_name(pkg.name) - # packages may have different licenses depending their version. - pkg_licenses = packages_licenses_db.get(pkg_name, []) - version_requested = parse_version(pkg.version) - license_id = None - license_name = None - for pkg_version in pkg_licenses: - license_start_version = parse_version(pkg_version['start_version']) - # Stops and return the previous stored license when a new - # license starts on a version above the requested one. - if version_requested >= license_start_version: - license_id = pkg_version['license_id'] - else: - # We found the license for the version requested - break - - if license_id: - license_name = get_license_name_by_id(license_id, licenses_db) - if not license_id or not license_name: - license_name = "unknown" - - filtered_packages_licenses.append({ - "package": pkg_name, - "version": pkg.version, - "license": license_name - }) - - return filtered_packages_licenses diff --git a/pipenv/routines/check.py b/pipenv/routines/check.py index c6da10b1ef..23cef38e8a 100644 --- a/pipenv/routines/check.py +++ b/pipenv/routines/check.py @@ -1,31 +1,75 @@ +import configparser import io import json import logging import os +import subprocess as sp +import shutil import sys import tempfile from contextlib import redirect_stderr, redirect_stdout from pathlib import Path + from pipenv import pep508checker -from pipenv.patched.safety.cli import cli -from pipenv.utils.processes import run_command +from pipenv.utils import console, err +from pipenv.utils.processes import run_command, subprocess_run from pipenv.utils.project import ensure_project from pipenv.utils.shell import project_python from pipenv.vendor import click, plette +def _get_safety(project, system=False, auto_install=True): + """Install safety and its dependencies.""" + python = project_python(project, system=system) + + cmd = [python, "-m", "safety"] + c = subprocess_run(cmd) + if c.returncode: + console.print( + "[yellow bold]Safety package is required for vulnerability scanning but not installed.[/yellow bold]" + ) + + install = auto_install + if not auto_install: + install = click.confirm( + "Would you like to install safety? This will not modify your Pipfile/lockfile.", + default=True, + ) + + if not install: + console.print( + "[yellow]Vulnerability scanning skipped. Install safety with 'pip install pipenv[safety]'[/yellow]" + ) + return "" + + console.print("[green]Installing safety...[/green]") + + # Install safety directly rather than as an extra to ensure it works in development mode + cmd = [python, "-m", "pip", "install", "safety>=3.0.0", "typer>=0.9.0", "--quiet"] + c = run_command(cmd) + + if c.returncode != 0: + err.print( + "[red]Failed to install safety. Please install it manually with 'pip install pipenv[safety]'[/red]" + ) + return "" + + console.print("[green]Safety installed successfully![/green]") + else: + console.print("[green]Safety found![/green]") + + return os.path.join(os.path.dirname(python), "safety") + + def build_safety_options( - audit_and_monitor=True, exit_code=True, output="screen", save_json="", policy_file="", safety_project=None, - temp_requirements_name="", ): options = [ - "--audit-and-monitor" if audit_and_monitor else "--disable-audit-and-monitor", "--exit-code" if exit_code else "--continue-on-error", ] formats = {"full-report": "--full-report", "minimal": "--json"} @@ -44,137 +88,40 @@ def build_safety_options( if safety_project: options.append(f"--project={safety_project}") - options.extend(["--file", temp_requirements_name]) - return options -def run_pep508_check(project, system, python): - pep508checker_path = pep508checker.__file__.rstrip("cdo") - cmd = [project_python(project, system=system), Path(pep508checker_path).as_posix()] - c = run_command(cmd, is_verbose=project.s.is_verbose()) +def run_safety_check(cmd, verbose): + if verbose: + click.echo(f"Running: {' '.join(cmd)}") + c = sp.run(cmd, capture_output=False) + return c.stdout, c.stderr, c.returncode - if c.returncode is not None: - try: - return json.loads(c.stdout.strip()) - except json.JSONDecodeError: - click.echo( - f"Failed parsing pep508 results:\n{c.stdout.strip()}\n{c.stderr.strip()}", - err=True, - ) - sys.exit(1) - return {} - - -def check_pep508_requirements(project, results, quiet): - p = plette.Pipfile.load(open(project.pipfile_location)) - p = plette.Lockfile.with_meta_from(p) - failed = False - - for marker, specifier in p._data["_meta"]["requires"].items(): - if marker in results: - if results[marker] != specifier: - failed = True - click.echo( - "Specifier {} does not match {} ({})." - "".format( - click.style(marker, fg="green"), - click.style(specifier, fg="cyan"), - click.style(results[marker], fg="yellow"), - ), - err=True, - ) - - if failed: - click.secho("Failed!", fg="red", err=True) - sys.exit(1) - elif not quiet and not project.s.is_quiet(): - click.secho("Passed!", fg="green") - - -def get_requirements(project, use_installed, categories): - _cmd = [project_python(project, system=False)] - if use_installed: - return run_command( - _cmd + ["-m", "pip", "list", "--format=freeze"], - is_verbose=project.s.is_verbose(), - ) - elif categories: - return run_command( - ["pipenv", "requirements", "--categories", categories], - is_verbose=project.s.is_verbose(), - ) - else: - return run_command(["pipenv", "requirements"], is_verbose=project.s.is_verbose()) +def has_safey_auth_token() -> bool: + """" + Retrieve a token from the local authentication configuration. -def create_temp_requirements(project, requirements): - temp_requirements = tempfile.NamedTemporaryFile( - mode="w+", - prefix=f"{project.virtualenv_name}", - suffix="_requirements.txt", - delete=False, - ) - temp_requirements.write(requirements.stdout.strip()) - temp_requirements.close() - return temp_requirements - - -def run_safety_check(cmd, quiet): - sys.argv = cmd[1:] - - if quiet: - out = io.StringIO() - err = io.StringIO() - exit_code = 0 - with redirect_stdout(out), redirect_stderr(err): - try: - cli(prog_name="pipenv") - except SystemExit as exit_signal: - exit_code = exit_signal.code - return out.getvalue(), err.getvalue(), exit_code - else: - cli(prog_name="pipenv") + This returns tokens saved in the local auth configuration. + There are two types of tokens: access_token and id_token + Args: + name (str): The name of the token to retrieve. -def parse_safety_output(output, quiet): - try: - json_report = json.loads(output) - meta = json_report.get("report_meta", {}) - vulnerabilities_found = meta.get("vulnerabilities_found", 0) - db_type = "commercial" if meta.get("api_key", False) else "free" + Returns: + Optional[str]: The token value, or None if not found. + """ - if quiet: - click.secho( - f"{vulnerabilities_found} vulnerabilities found.", - fg="red" if vulnerabilities_found else "green", - ) - else: - fg = "red" if vulnerabilities_found else "green" - message = f"Scan complete using Safety's {db_type} vulnerability database." - click.echo() - click.secho(f"{vulnerabilities_found} vulnerabilities found.", fg=fg) - click.echo() - - for vuln in json_report.get("vulnerabilities", []): - click.echo( - "{}: {} {} open to vulnerability {} ({}). More info: {}".format( - click.style(vuln["vulnerability_id"], bold=True, fg="red"), - click.style(vuln["package_name"], fg="green"), - click.style(vuln["analyzed_version"], fg="yellow", bold=True), - click.style(vuln["vulnerability_id"], bold=True), - click.style(vuln["vulnerable_spec"], fg="yellow", bold=False), - click.style(vuln["more_info_url"], bold=True), - ) - ) - click.echo(f"{vuln['advisory']}") - click.echo() - - click.secho(message, fg="white", bold=True) - - except json.JSONDecodeError: - click.echo("Failed to parse Safety output.") + authconfig = Path("~", ".safety", "auth.ini").expanduser() + config = configparser.ConfigParser() + config.read(authconfig) + if 'auth' in config.sections() and 'access_token' in config['auth']: + value = config['auth']['access_token'] + if value: + return True + return False + def do_check( project, @@ -207,11 +154,12 @@ def do_check( pypi_mirror=pypi_mirror, ) - if not quiet and not project.s.is_quiet(): - click.secho("Checking PEP 508 requirements...", bold=True) + if not shutil.which("safety"): + safety_path = _get_safety(project) - results = run_pep508_check(project, system, python) - check_pep508_requirements(project, results, quiet) + if not has_safey_auth_token(): + click.secho("Could not find saftey token. Use `safety check` to manually login to your account", bold=True) + sys.exit(1) if not project.lockfile_exists: return @@ -239,52 +187,25 @@ def do_check( err=True, ) - requirements = get_requirements(project, use_installed, categories) - temp_requirements = create_temp_requirements(project, requirements) - options = build_safety_options( - audit_and_monitor=audit_and_monitor, exit_code=exit_code, output=output, save_json=save_json, policy_file=policy_file, safety_project=safety_project, - temp_requirements_name=temp_requirements.name, ) - safety_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), "patched", "safety" - ) - cmd = [project_python(project, system=system), safety_path, "check"] + options + cmd = [safety_path, "scan"] + options if db: if not quiet and not project.s.is_quiet(): click.echo(f"Using {db} database") cmd.append(f"--db={db}") - elif key or project.s.PIPENV_PYUP_API_KEY: - cmd.append(f"--key={key or project.s.PIPENV_PYUP_API_KEY}") - else: - PIPENV_SAFETY_DB = ( - "https://d2qjmgddvqvu75.cloudfront.net/aws/safety/pipenv/1.0.0/" - ) - os.environ["SAFETY_ANNOUNCEMENTS_URL"] = f"{PIPENV_SAFETY_DB}announcements.json" - cmd.append(f"--db={PIPENV_SAFETY_DB}") if ignore: for cve in ignore: cmd.extend(["--ignore", cve]) - os.environ["SAFETY_CUSTOM_INTEGRATION"] = "True" - os.environ["SAFETY_SOURCE"] = "pipenv" - os.environ["SAFETY_PURE_YAML"] = "True" - - output, error, exit_code = run_safety_check(cmd, quiet) - - if quiet: - parse_safety_output(output, quiet) - else: - sys.stdout.write(output) - sys.stderr.write(error) + _, _, exit_code = run_safety_check(cmd, verbose) - temp_requirements.unlink() sys.exit(exit_code)