From 8db88a23c6ea402b59cb0cf63a3a173b68a3ec3c Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Thu, 24 Apr 2025 14:25:17 +1000 Subject: [PATCH 01/15] feat: add pypi attestation discovery Signed-off-by: Ben Selwyn-Smith --- pyproject.toml | 1 + src/macaron/config/defaults.ini | 3 +- .../provenance/provenance_extractor.py | 68 ++++++- src/macaron/provenance/provenance_finder.py | 42 ++++ .../repo_finder/repo_finder_deps_dev.py | 153 +++++++------- src/macaron/repo_finder/repo_finder_enums.py | 14 +- src/macaron/slsa_analyzer/analyzer.py | 6 +- .../package_registry/deps_dev.py | 89 +++++--- .../provenance/intoto/__init__.py | 13 +- .../slsa_analyzer/provenance/loader.py | 49 ++++- .../specs/inferred_provenance.py | 2 +- .../specs/pypi_certificate_predicate.py | 19 ++ tests/conftest.py | 34 +++- .../pypi_attestation_discovery/policy.dl | 13 ++ .../pypi_attestation_discovery/test.yaml | 20 ++ tests/provenance/test_provenance_finder.py | 16 +- .../repo_finder/test_repo_finder_deps_dev.py | 192 ++++++++++++++++++ .../package_registry/test_deps_dev.py | 61 ++---- 18 files changed, 624 insertions(+), 171 deletions(-) create mode 100644 src/macaron/slsa_analyzer/specs/pypi_certificate_predicate.py create mode 100644 tests/integration/cases/pypi_attestation_discovery/policy.dl create mode 100644 tests/integration/cases/pypi_attestation_discovery/test.yaml create mode 100644 tests/repo_finder/test_repo_finder_deps_dev.py diff --git a/pyproject.toml b/pyproject.toml index 8b07c34a5..26a360d71 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ "cyclonedx-python-lib[validation] >=7.3.4,<8.0.0", "beautifulsoup4 >= 4.12.0,<5.0.0", "problog >= 2.2.6,<3.0.0", + "pypi-attestations >= 0.0.23,<1.0.0", ] keywords = [] # https://pypi.org/classifiers/ diff --git a/src/macaron/config/defaults.ini b/src/macaron/config/defaults.ini index c575cb76e..c46e09ce1 100644 --- a/src/macaron/config/defaults.ini +++ b/src/macaron/config/defaults.ini @@ -542,7 +542,8 @@ inspector_url_scheme = https [deps_dev] url_netloc = api.deps.dev url_scheme = https -purl_endpoint = v3alpha/purl +api_endpoint = v3alpha +purl_endpoint = purl [osv_dev] url_netloc = api.osv.dev diff --git a/src/macaron/provenance/provenance_extractor.py b/src/macaron/provenance/provenance_extractor.py index 623f6d304..4bfc8ec03 100644 --- a/src/macaron/provenance/provenance_extractor.py +++ b/src/macaron/provenance/provenance_extractor.py @@ -43,8 +43,11 @@ def extract_repo_and_commit_from_provenance(payload: InTotoPayload) -> tuple[str If the extraction process fails for any reason. """ predicate_type = payload.statement.get("predicateType") - if isinstance(payload, InTotoV1Payload) and predicate_type == "https://slsa.dev/provenance/v1": - return _extract_from_slsa_v1(payload) + if isinstance(payload, InTotoV1Payload): + if predicate_type == "https://slsa.dev/provenance/v1": + return _extract_from_slsa_v1(payload) + if predicate_type == "https://docs.pypi.org/attestations/publish/v1": + return _extract_from_pypi_v1(payload) if isinstance(payload, InTotoV01Payload): if predicate_type == "https://slsa.dev/provenance/v0.2": @@ -195,6 +198,29 @@ def _extract_from_slsa_v1(payload: InTotoV1Payload) -> tuple[str | None, str | N return repo, commit or None +def _extract_from_pypi_v1(payload: InTotoV1Payload) -> tuple[str | None, str | None]: + """Extract the repository and commit metadata from the pypi provenance file found at the passed path. + + Parameters + ---------- + payload: InTotoPayload + The payload to extract from. + + Returns + ------- + tuple[str, str] + The repository URL and commit hash if found, a pair of empty strings otherwise. + """ + predicate: dict[str, JsonType] | None = payload.statement.get("predicate") + if not predicate: + logger.debug("No predicate in payload statement.") + return None, None + + repo = json_extract(predicate, ["sourceUri"], str) + digest = json_extract(predicate, ["sourceDigest"], str) + return repo, digest + + def _extract_from_witness_provenance(payload: InTotoV01Payload) -> tuple[str | None, str | None]: """Extract the repository and commit metadata from the witness provenance file found at the passed path. @@ -300,7 +326,7 @@ def check_if_input_purl_provenance_conflict( provenance_repo_url: str | None, purl: PackageURL, ) -> bool: - """Test if the input repository type PURL's repo and commit match the contents of the provenance. + """Test if the input repository type PURL's repo matches the contents of the provenance. Parameters ---------- @@ -620,6 +646,41 @@ def get_build_invocation(self, statement: InTotoV01Statement | InTotoV1Statement return gl_workflow, gl_job_url +class PyPICertificateDefinition(ProvenanceBuildDefinition): + """Class representing the derived PyPI certificate build definition. + + This class implements the abstract methods from the `ProvenanceBuildDefinition` + to extract build invocation details specific to the GitHub Actions build type. + """ + + #: Determines the expected ``buildType`` field in the provenance predicate. + expected_build_type = "pypi_certificate" + + def get_build_invocation(self, statement: InTotoV01Statement | InTotoV1Statement) -> tuple[str | None, str | None]: + """Retrieve the build invocation information from the given statement. + + Parameters + ---------- + statement : InTotoV1Statement | InTotoV01Statement + The provenance statement from which to extract the build invocation + details. This statement contains the metadata about the build process + and its associated artifacts. + + Returns + ------- + tuple[str | None, str | None] + A tuple containing two elements: + - The first element is the build invocation entry point (e.g., workflow name), or None if not found. + - The second element is the invocation URL or identifier (e.g., job URL), or None if not found. + """ + if statement["predicate"] is None: + return None, None + + gha_workflow = json_extract(statement["predicate"], ["workflow"], str) + invocation_url = json_extract(statement["predicate"], ["invocationUrl"], str) + return gha_workflow, invocation_url + + class ProvenancePredicate: """Class providing utility methods for handling provenance predicates. @@ -685,6 +746,7 @@ def find_build_def(statement: InTotoV01Statement | InTotoV1Statement) -> Provena SLSAGCBBuildDefinitionV1(), SLSAOCIBuildDefinitionV1(), WitnessGitLabBuildDefinitionV01(), + PyPICertificateDefinition(), ] for build_def in build_defs: diff --git a/src/macaron/provenance/provenance_finder.py b/src/macaron/provenance/provenance_finder.py index b02423eec..0e494a411 100644 --- a/src/macaron/provenance/provenance_finder.py +++ b/src/macaron/provenance/provenance_finder.py @@ -2,6 +2,7 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains methods for finding provenance files.""" +import json import logging import os import tempfile @@ -12,6 +13,7 @@ from macaron.config.defaults import defaults from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type +from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder from macaron.slsa_analyzer.analyze_context import AnalyzeContext from macaron.slsa_analyzer.checks.provenance_available_check import ProvenanceAvailableException from macaron.slsa_analyzer.ci_service import GitHubActions @@ -78,6 +80,10 @@ def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]: discovery_functions = [partial(find_gav_provenance, purl, self.jfrog_registry)] return self._find_provenance(discovery_functions) + if purl.type == "pypi": + discovery_functions = [partial(find_pypi_provenance, purl)] + return self._find_provenance(discovery_functions) + # TODO add other possible discovery functions. logger.debug("Provenance finding not supported for PURL type: %s", purl.type) return [] @@ -275,6 +281,42 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[ return provenances[:1] +def find_pypi_provenance(purl: PackageURL) -> list[InTotoPayload]: + """Find and download the PyPI based provenance for the passed PURL. + + Parameters + ---------- + purl: PackageURL + The PURL of the analysis target. + + Returns + ------- + list[InTotoPayload] | None + The provenance payload if found, or an empty list otherwise. + + Raises + ------ + ProvenanceAvailableException + If the discovered provenance file size exceeds the configured limit. + """ + attestation, verified = DepsDevRepoFinder.get_attestation(purl) + if not attestation: + return [] + + with tempfile.TemporaryDirectory() as temp_dir: + file_name = os.path.join(temp_dir, f"{purl.name}") + with open(file_name, "w", encoding="utf-8") as file: + json.dump(attestation, file) + + try: + payload = load_provenance_payload(file_name) + payload.verified = verified + return [payload] + except LoadIntotoAttestationError as load_error: + logger.error("Error while loading provenance: %s", load_error) + return [] + + def find_provenance_from_ci( analyze_ctx: AnalyzeContext, git_obj: Git | None, download_path: str ) -> InTotoPayload | None: diff --git a/src/macaron/repo_finder/repo_finder_deps_dev.py b/src/macaron/repo_finder/repo_finder_deps_dev.py index 35d257408..6f5066b37 100644 --- a/src/macaron/repo_finder/repo_finder_deps_dev.py +++ b/src/macaron/repo_finder/repo_finder_deps_dev.py @@ -4,18 +4,21 @@ """This module contains the PythonRepoFinderDD class to be used for finding repositories using deps.dev.""" import json import logging +import urllib.parse from enum import StrEnum from typing import Any from urllib.parse import quote as encode from packageurl import PackageURL +from macaron.errors import APIAccessError from macaron.json_tools import json_extract from macaron.repo_finder.repo_finder_base import BaseRepoFinder from macaron.repo_finder.repo_finder_enums import RepoFinderInfo from macaron.repo_finder.repo_validator import find_valid_repository_url from macaron.slsa_analyzer.git_url import clean_url -from macaron.util import send_get_http_raw +from macaron.slsa_analyzer.package_registry.deps_dev import DepsDevService +from macaron.util import send_get_http, send_get_http_raw logger: logging.Logger = logging.getLogger(__name__) @@ -37,9 +40,6 @@ class DepsDevType(StrEnum): class DepsDevRepoFinder(BaseRepoFinder): """This class is used to find repositories using Google's Open Source Insights A.K.A. deps.dev.""" - # See https://docs.deps.dev/api/v3alpha/ - BASE_URL = "https://api.deps.dev/v3alpha/purl/" - def find_repo(self, purl: PackageURL) -> tuple[str, RepoFinderInfo]: """ Attempt to retrieve a repository URL that matches the passed artifact. @@ -54,17 +54,12 @@ def find_repo(self, purl: PackageURL) -> tuple[str, RepoFinderInfo]: tuple[str, RepoFinderOutcome] : A tuple of the found URL (or an empty string), and the outcome of the Repo Finder. """ - request_urls, outcome = self._create_urls(purl) - if not request_urls: - logger.debug("No urls found for: %s", purl) - return "", outcome - - json_data = self._retrieve_json(request_urls[0]) - if not json_data: - logger.debug("Failed to retrieve json data for: %s", purl) - return "", RepoFinderInfo.DDEV_JSON_FETCH_ERROR + try: + json_data = DepsDevService.get_package_info(encode(str(purl), safe="")) + except APIAccessError: + return "", RepoFinderInfo.DDEV_API_ERROR - urls, outcome = self._read_json(json_data) + urls, outcome = DepsDevRepoFinder.extract_links(json_data) if not urls: logger.debug("Failed to extract repository URLs from json data: %s", purl) return "", outcome @@ -75,7 +70,7 @@ def find_repo(self, purl: PackageURL) -> tuple[str, RepoFinderInfo]: logger.debug("Found valid url: %s", url) return url, RepoFinderInfo.FOUND - return "", RepoFinderInfo.DDEV_NO_URLS + return "", RepoFinderInfo.DDEV_NO_VALID_URLS @staticmethod def get_project_info(project_url: str) -> dict[str, Any] | None: @@ -98,7 +93,9 @@ def get_project_info(project_url: str) -> dict[str, Any] | None: project_key = clean_repo_url.hostname + clean_repo_url.path - request_url = f"https://api.deps.dev/v3alpha/projects/{encode(project_key, safe='')}" + api_endpoint = DepsDevService.get_endpoint(purl=False, path=f"projects/{encode(project_key, safe='')}") + request_url = urllib.parse.urlunsplit(api_endpoint) + response = send_get_http_raw(request_url) if not (response and response.text): logger.debug("Failed to retrieve additional repo info for: %s", project_url) @@ -130,17 +127,10 @@ def get_latest_version(purl: PackageURL) -> tuple[PackageURL | None, RepoFinderI namespace = purl.namespace + "/" if purl.namespace else "" purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}") - url = f"{DepsDevRepoFinder.BASE_URL}{encode(str(purl), safe='')}" - response = send_get_http_raw(url) - - if not response: - return None, RepoFinderInfo.DDEV_BAD_RESPONSE - try: - metadata: dict = json.loads(response.text) - except ValueError as error: - logger.debug("Failed to parse response from deps.dev: %s", error) - return None, RepoFinderInfo.DDEV_JSON_FETCH_ERROR + metadata = DepsDevService.get_package_info(encode(str(purl), safe="")) + except APIAccessError: + return None, RepoFinderInfo.DDEV_API_ERROR versions_keys = ["package", "versions"] if "package" in metadata else ["version"] versions = json_extract(metadata, versions_keys, list) @@ -149,6 +139,8 @@ def get_latest_version(purl: PackageURL) -> tuple[PackageURL | None, RepoFinderI latest_version = None for version_result in reversed(versions): + if not isinstance(version_result, dict) or "isDefault" not in version_result: + continue if version_result["isDefault"]: # Accept the version as the latest if it is marked with the "isDefault" property. latest_version = json_extract(version_result, ["versionKey", "version"], str) @@ -164,11 +156,9 @@ def get_latest_version(purl: PackageURL) -> tuple[PackageURL | None, RepoFinderI RepoFinderInfo.FOUND_FROM_LATEST, ) - def _create_urls(self, purl: PackageURL) -> tuple[list[str], RepoFinderInfo]: - """ - Create the urls to search for the metadata relating to the passed artifact, and report on that process. - - If a version is not specified, remote API calls will be used to try and find one. + @staticmethod + def get_attestation(purl: PackageURL) -> tuple[dict | None, bool]: + """Retrieve the attestation associated with the passed PURL. Parameters ---------- @@ -177,61 +167,80 @@ def _create_urls(self, purl: PackageURL) -> tuple[list[str], RepoFinderInfo]: Returns ------- - tuple[list[str], RepoFinderInfo] - A tuple of: the list of created URLs, and the information on the Repo Finder outcome. + tuple[dict | None, bool] + The attestation, or None if not found, and a flag for whether it is verified. """ - outcome = None if not purl.version: - latest_purl, outcome = DepsDevRepoFinder.get_latest_version(purl) + latest_purl, _ = DepsDevRepoFinder.get_latest_version(purl) if not latest_purl: - return [], outcome + return None, False purl = latest_purl + if not purl.version: + # Should be unreachable. + return None, False - return [f"{DepsDevRepoFinder.BASE_URL}{encode(str(purl), safe='')}"], outcome or RepoFinderInfo.FOUND - - def _retrieve_json(self, url: str) -> str: - """ - Attempt to retrieve the json file located at the passed URL. - - Parameters - ---------- - url : str - The URL for the GET request. - - Returns - ------- - str : - The retrieved file data or an empty string. - """ - response = send_get_http_raw(url, {}) - - if not response: - return "" - - return response.text + api_endpoint = DepsDevService.get_endpoint( + purl=False, path="/".join(["systems", purl.type, "packages", purl.name, "versions", purl.version]) + ) + target_url = urllib.parse.urlunsplit(api_endpoint) + + result = send_get_http(target_url, headers={}) + if not result: + return None, False + + result_attestations = json_extract(result, ["attestations"], list) + if not result_attestations: + logger.debug("No attestations in result.") + return None, False + if len(result_attestations) > 1: + logger.debug("More than one attestation in result: %s", len(result_attestations)) + + attestation_url = json_extract(result_attestations, [0, "url"], str) + if not attestation_url: + logger.debug("No attestation reported for %s", purl) + return None, False + + attestation_data = send_get_http(attestation_url, headers={}) + if not attestation_data: + return None, False + + bundle = json_extract(attestation_data, ["attestation_bundles"], list) + if not bundle: + logger.debug("No attestation bundle in response.") + return None, False + if len(bundle) > 1: + logger.debug("Bundle length greater than one: %s", len(bundle)) + + attestations = json_extract(bundle[0], ["attestations"], list) + if not attestations: + logger.debug("No attestations in response.") + return None, False + if len(attestations) > 1: + logger.debug("More than one attestation: %s", len(attestations)) + + if not isinstance(attestations[0], dict): + logger.debug("Attestation invalid.") + return None, False + + return attestations[0], json_extract(result_attestations, [0, "verified"], bool) or False - def _read_json(self, json_data: str) -> tuple[list[str], RepoFinderInfo]: + @staticmethod + def extract_links(json_data: dict) -> tuple[list[str], RepoFinderInfo]: """ - Parse the deps.dev json file and extract the repository links. + Extract the repository links from the deps.dev json data. Parameters ---------- - json_data : str - The json metadata as a string. + json_data : dict + The json metadata. Returns ------- tuple[list[str], RepoFinderOutcome] : The extracted contents as a list, and the outcome to report. """ - try: - parsed = json.loads(json_data) - except ValueError as error: - logger.debug("Failed to parse response from deps.dev: %s", error) - return [], RepoFinderInfo.DDEV_JSON_FETCH_ERROR - - links_keys = ["version", "links"] if "version" in parsed else ["links"] - links = json_extract(parsed, links_keys, list) + links_keys = ["version", "links"] if "version" in json_data else ["links"] + links = json_extract(json_data, links_keys, list) if not links: logger.debug("Could not extract 'version' or 'links' from deps.dev response.") return [], RepoFinderInfo.DDEV_JSON_INVALID @@ -242,4 +251,8 @@ def _read_json(self, json_data: str) -> tuple[list[str], RepoFinderInfo]: if url and isinstance(url, str): result.append(url) + if not result: + logger.debug("No str entries in 'links' list.") + return [], RepoFinderInfo.DDEV_NO_URLS + return result, RepoFinderInfo.FOUND diff --git a/src/macaron/repo_finder/repo_finder_enums.py b/src/macaron/repo_finder/repo_finder_enums.py index 43e8d5e8b..87c258491 100644 --- a/src/macaron/repo_finder/repo_finder_enums.py +++ b/src/macaron/repo_finder/repo_finder_enums.py @@ -45,17 +45,17 @@ class RepoFinderInfo(Enum): #: Reported for all other bad status codes that a host could return. E.g. 500, etc. HTTP_OTHER = "HTTP other" - #: Reported if deps.dev produces no response to the HTTP request. - DDEV_BAD_RESPONSE = "deps.dev bad response" - - #: Reported if deps.dev returns JSON data that cannot be parsed. - DDEV_JSON_FETCH_ERROR = "deps.dev fetch error" + #: Reported if deps.dev produces an invalid response to an API request. + DDEV_API_ERROR = "deps.dev bad response" #: Reported if deps.dev returns JSON data that is missing expected fields. DDEV_JSON_INVALID = "deps.dev JSON invalid" - #: Reported if deps.dev returns data that does not contain the desired SCM URL. E.g. The repository URL. - DDEV_NO_URLS = "deps.dev no URLs" + #: Reported if deps.dev returns JSON data with no repository URLs. + DDEV_NO_URLS = "deps.dev no urls" + + #: Reported if deps.dev returns JSON data with no valid repository URLs. + DDEV_NO_VALID_URLS = "deps.dev no valid URLs" #: Reported if there was an error with the request sent to the PyPI registry. PYPI_HTTP_ERROR = "PyPI HTTP error" diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index baf3682cd..e3957e875 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -363,7 +363,9 @@ def run_single( provenances = provenance_finder.find_provenance(parsed_purl) if provenances: provenance_payload = provenances[0] - if verify_provenance: + if provenance_payload.verified: + provenance_is_verified = True + elif verify_provenance: provenance_is_verified = provenance_verifier.verify_provenance(parsed_purl, provenances) # Try to extract the repository URL and commit digest from the Provenance, if it exists. @@ -425,7 +427,7 @@ def run_single( found_commit=final_digest, ) - # Check if only one of the repo or digest came from direct input. + # Check if repo came from direct input. if parsed_purl: if check_if_input_purl_provenance_conflict( bool(repo_path_input), diff --git a/src/macaron/slsa_analyzer/package_registry/deps_dev.py b/src/macaron/slsa_analyzer/package_registry/deps_dev.py index a38d6d60f..4c95f00ac 100644 --- a/src/macaron/slsa_analyzer/package_registry/deps_dev.py +++ b/src/macaron/slsa_analyzer/package_registry/deps_dev.py @@ -7,7 +7,7 @@ import logging import urllib.parse from json.decoder import JSONDecodeError -from urllib.parse import quote as encode +from typing import Any from macaron.config.defaults import defaults from macaron.errors import APIAccessError @@ -20,28 +20,24 @@ class DepsDevService: """The deps.dev service class.""" @staticmethod - def get_package_info(purl: str) -> dict | None: - """Check if the package identified by the PackageURL (PURL) exists and return its information. + def get_endpoint(purl: bool = True, path: str | None = None) -> Any: + """Build the API endpoint for the deps.dev service and return it. Parameters ---------- - purl: str - The PackageURL (PURL). + purl: bool + A flag to determine whether the PURL or BASE endpoint should be returned. + path: str | None + A path to be added to the URL. Returns ------- - dict | None - The package metadata or None if it doesn't exist. - - Raises - ------ - APIAccessError - If the service is misconfigured, the API is invalid, a network error happens, - or unexpected response is returned by the API. + Any + The API endpoint. """ section_name = "deps_dev" if not defaults.has_section(section_name): - return None + raise APIAccessError(f"The {section_name} section is missing in the .ini configuration file.") section = defaults[section_name] url_netloc = section.get("url_netloc") @@ -50,34 +46,73 @@ def get_package_info(purl: str) -> dict | None: f'The "url_netloc" key is missing in section [{section_name}] of the .ini configuration file.' ) url_scheme = section.get("url_scheme", "https") - purl_endpoint = section.get("purl_endpoint") - if not purl_endpoint: + + api_endpoint = section.get("api_endpoint") + if not api_endpoint: raise APIAccessError( - f'The "purl_endpoint" key is missing in section [{section_name}] of the .ini configuration file.' + f'The "api_endpoint" key is missing in section [{section_name}] of the .ini configuration file.' ) - - path_params = "/".join([purl_endpoint, encode(purl, safe="")]) - try: - url = urllib.parse.urlunsplit( - urllib.parse.SplitResult( + endpoint_path = [api_endpoint] + if path: + endpoint_path.append(path) + if not purl: + try: + return urllib.parse.SplitResult( scheme=url_scheme, netloc=url_netloc, - path=path_params, + path="/".join(endpoint_path), query="", fragment="", ) + except ValueError as error: + raise APIAccessError("Failed to construct the API URL.") from error + + purl_endpoint = section.get("purl_endpoint") + if not purl_endpoint: + raise APIAccessError( + f'The "purl_endpoint" key is missing in section [{section_name}] of the .ini configuration file.' + ) + endpoint_path.insert(1, purl_endpoint) + try: + return urllib.parse.SplitResult( + scheme=url_scheme, + netloc=url_netloc, + path="/".join(endpoint_path), + query="", + fragment="", ) except ValueError as error: raise APIAccessError("Failed to construct the API URL.") from error + @staticmethod + def get_package_info(purl: str) -> dict: + """Check if the package identified by the PackageURL (PURL) exists and return its information. + + Parameters + ---------- + purl: str + The PackageURL (PURL). + + Returns + ------- + dict + The package metadata. + + Raises + ------ + APIAccessError + If the service is misconfigured, the API is invalid, a network error happens, + or unexpected response is returned by the API. + """ + api_endpoint = DepsDevService.get_endpoint(path=purl) + url = urllib.parse.urlunsplit(api_endpoint) + response = send_get_http_raw(url) if response and response.text: try: metadata: dict = json.loads(response.text) + return metadata except JSONDecodeError as error: raise APIAccessError(f"Failed to process response from deps.dev for {url}.") from error - if not metadata: - raise APIAccessError(f"Empty response returned by {url} .") - return metadata - return None + raise APIAccessError(f"No valid response from deps.dev for {url}") diff --git a/src/macaron/slsa_analyzer/provenance/intoto/__init__.py b/src/macaron/slsa_analyzer/provenance/intoto/__init__.py index 03b3f16f4..06e895e84 100644 --- a/src/macaron/slsa_analyzer/provenance/intoto/__init__.py +++ b/src/macaron/slsa_analyzer/provenance/intoto/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """In-toto provenance schemas and validation.""" @@ -6,7 +6,8 @@ from __future__ import annotations from collections.abc import Mapping -from typing import NamedTuple, Protocol, TypeVar +from dataclasses import dataclass +from typing import Protocol, TypeVar from packageurl import PackageURL @@ -21,7 +22,8 @@ StatementT = TypeVar("StatementT", bound=Mapping) -class InTotoV01Payload(NamedTuple): +@dataclass +class InTotoV01Payload: """The provenance payload following in-toto v0.1 schema. The payload is a field within a DSSE envelope, having the type "Statement". @@ -36,9 +38,11 @@ class InTotoV01Payload(NamedTuple): """ statement: v01.InTotoV01Statement + verified: bool = False -class InTotoV1Payload(NamedTuple): +@dataclass +class InTotoV1Payload: """The provenance payload following in-toto v1 schema. The payload is a field within a DSSE envelope, having the type "Statement". @@ -53,6 +57,7 @@ class InTotoV1Payload(NamedTuple): """ statement: v1.InTotoV1Statement + verified: bool = False # The payload is a field within a DSSE envelope, having the type "Statement". diff --git a/src/macaron/slsa_analyzer/provenance/loader.py b/src/macaron/slsa_analyzer/provenance/loader.py index 65dfee1bb..11da837e2 100644 --- a/src/macaron/slsa_analyzer/provenance/loader.py +++ b/src/macaron/slsa_analyzer/provenance/loader.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the loaders for SLSA provenances.""" @@ -11,15 +11,27 @@ import zlib from urllib.parse import urlparse +from pypi_attestations import Attestation + from macaron.config.defaults import defaults -from macaron.json_tools import JsonType +from macaron.json_tools import JsonType, json_extract from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, validate_intoto_payload from macaron.slsa_analyzer.provenance.intoto.errors import LoadIntotoAttestationError, ValidateInTotoPayloadError +from macaron.slsa_analyzer.specs.pypi_certificate_predicate import PyPICertificatePredicate from macaron.util import send_get_http_raw logger: logging.Logger = logging.getLogger(__name__) +# See: https://github.com/sigstore/fulcio/blob/main/docs/oid-info.md +_OID_NAMES = { + "source_repo": "1.3.6.1.4.1.57264.1.12", + "source_digest": "1.3.6.1.4.1.57264.1.13", + "workflow": "1.3.6.1.4.1.57264.1.18", + "invocation": "1.3.6.1.4.1.57264.1.21", +} + + def _try_read_url_link_file(file_content: bytes) -> str | None: parser = configparser.ConfigParser() try: @@ -66,9 +78,11 @@ def _load_provenance_file_content( try: try: decompressed_file_content = gzip.decompress(file_content) - provenance = json.loads(decompressed_file_content.decode()) + decoded_file_content = decompressed_file_content.decode() + provenance = json.loads(decoded_file_content) except (gzip.BadGzipFile, EOFError, zlib.error): - provenance = json.loads(file_content.decode()) + decoded_file_content = file_content.decode() + provenance = json.loads(decoded_file_content) except (json.JSONDecodeError, TypeError, UnicodeDecodeError) as error: raise LoadIntotoAttestationError( "Cannot deserialize the file content as JSON.", @@ -83,6 +97,13 @@ def _load_provenance_file_content( # Some provenances, such as Witness may not include the DSSE envelope `dsseEnvelope` # property but contain its value directly. provenance_payload = provenance.get("payload", None) + if not provenance_payload: + # GitHub Attestation. + # TODO Check if old method (above) actually works. + provenance_payload = json_extract(provenance, ["bundle", "dsseEnvelope", "payload"], str) + if not provenance_payload: + # PyPI Attestation. + provenance_payload = json_extract(provenance, ["envelope", "statement"], str) if not provenance_payload: raise LoadIntotoAttestationError( 'Cannot find the "payload" field in the decoded provenance.', @@ -103,6 +124,26 @@ def _load_provenance_file_content( if not isinstance(json_payload, dict): raise LoadIntotoAttestationError("The provenance payload is not a JSON object.") + if json_payload["predicate"]: + return json_payload + + # For provenance without a predicate (e.g. PyPI), try to use the provenance certificate instead. + attestation_model = Attestation.model_validate_json(decoded_file_content) + certificate_claims = attestation_model.certificate_claims + source_repo = certificate_claims[_OID_NAMES["source_repo"]] + workflow = certificate_claims[_OID_NAMES["workflow"]] + workflow = workflow.replace(source_repo + "/", "") + if "@" in workflow: + workflow = workflow[: workflow.index("@")] + + invocation = certificate_claims[_OID_NAMES["invocation"]] + if "/attempts" in invocation: + invocation = invocation[: invocation.index("/attempts")] + + pypi_predicate = PyPICertificatePredicate.build_predicate( + source_repo, certificate_claims[_OID_NAMES["source_digest"]], workflow, invocation + ) + json_payload["predicate"] = pypi_predicate return json_payload diff --git a/src/macaron/slsa_analyzer/specs/inferred_provenance.py b/src/macaron/slsa_analyzer/specs/inferred_provenance.py index 302083768..ee23b021f 100644 --- a/src/macaron/slsa_analyzer/specs/inferred_provenance.py +++ b/src/macaron/slsa_analyzer/specs/inferred_provenance.py @@ -24,7 +24,7 @@ def __init__(self) -> None: "builder": {"id": ""}, "buildType": "", "invocation": { - "configSource": {"uri": "", "digest": {"sha1": ""}, "entryPoint": ""}, + "configSource": {"uri": "", "digest": {"sha1": ""}, "entryPoint": ""}, "parameters": {}, "environment": {}, }, diff --git a/src/macaron/slsa_analyzer/specs/pypi_certificate_predicate.py b/src/macaron/slsa_analyzer/specs/pypi_certificate_predicate.py new file mode 100644 index 000000000..49d16f93f --- /dev/null +++ b/src/macaron/slsa_analyzer/specs/pypi_certificate_predicate.py @@ -0,0 +1,19 @@ +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module contains the spec for predicates derived from a PyPI attestation certificate.""" + + +class PyPICertificatePredicate: + """This class implements the PyPI certificate predicate.""" + + @staticmethod + def build_predicate(source_url: str, source_digest: str, build_workflow: str, invocation_url: str) -> dict: + """Build a predicate using passed parameters.""" + return { + "buildType": "pypi_certificate", + "sourceUri": f"{source_url}", + "sourceDigest": f"{source_digest}", + "workflow": f"{build_workflow}", + "invocationUrl": f"{invocation_url}", + } diff --git a/tests/conftest.py b/tests/conftest.py index b47aa7269..7e97461d0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,11 +1,14 @@ -# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """Fixtures for tests.""" +import os +import urllib.parse from pathlib import Path from typing import Any, NoReturn import pytest +from pytest_httpserver import HTTPServer import macaron from macaron.code_analyzer.call_graph import BaseNode, CallGraph @@ -459,3 +462,32 @@ def build_github_actions_call_graph_for_commands(commands: list[str]) -> CallGra ) return gh_cg + + +@pytest.fixture(name="deps_dev_service_mock") +def deps_dev_service_mock_(httpserver: HTTPServer, tmp_path: Path) -> dict: + """Create the mocked deps.dev service.""" + api_endpoint = "v999" + purl_endpoint = "purl" + base_url_parsed = urllib.parse.urlparse(httpserver.url_for("")) + config = f""" + [deps_dev] + url_netloc = {base_url_parsed.netloc} + url_scheme = {base_url_parsed.scheme} + api_endpoint = {api_endpoint} + purl_endpoint = {purl_endpoint} + """ + user_config_path = os.path.join(tmp_path, "config.ini") + with open(user_config_path, "w", encoding="utf-8") as user_config_file: + user_config_file.write(config) + # We don't have to worry about modifying the ``defaults`` object causing test + # pollution here, since we reload the ``defaults`` object before every test with the + # ``setup_test`` fixture. + load_defaults(user_config_path) + return { + "api": api_endpoint, + "purl": purl_endpoint, + "base_hostname": base_url_parsed.hostname, + "base_scheme": base_url_parsed.scheme, + "base_netloc": base_url_parsed.netloc, + } diff --git a/tests/integration/cases/pypi_attestation_discovery/policy.dl b/tests/integration/cases/pypi_attestation_discovery/policy.dl new file mode 100644 index 000000000..613ed70f8 --- /dev/null +++ b/tests/integration/cases/pypi_attestation_discovery/policy.dl @@ -0,0 +1,13 @@ +/* Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. */ +/* Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. */ + +#include "prelude.dl" + +Policy("test_policy", component_id, "") :- + check_passed(component_id, "mcn_provenance_verified_1"), + check_passed(component_id, "mcn_provenance_available_1"), + check_passed(component_id, "mcn_provenance_derived_repo_1"), + check_passed(component_id, "mcn_provenance_derived_commit_1"). + +apply_policy_to("test_policy", component_id) :- + is_component(component_id, "pkg:pypi/ultralytics@8.3.70"). diff --git a/tests/integration/cases/pypi_attestation_discovery/test.yaml b/tests/integration/cases/pypi_attestation_discovery/test.yaml new file mode 100644 index 000000000..2ec49b682 --- /dev/null +++ b/tests/integration/cases/pypi_attestation_discovery/test.yaml @@ -0,0 +1,20 @@ +# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +description: | + Analyzing a PyPI PURL that has provenance available on the PyPI registry. + +tags: +- macaron-python-package + +steps: +- name: Run macaron analyze + kind: analyze + options: + command_args: + - -purl + - pkg:pypi/ultralytics@8.3.70 +- name: Run macaron verify-policy to verify passed/failed checks + kind: verify + options: + policy: policy.dl diff --git a/tests/provenance/test_provenance_finder.py b/tests/provenance/test_provenance_finder.py index 3cd610c0c..ba19a7e4a 100644 --- a/tests/provenance/test_provenance_finder.py +++ b/tests/provenance/test_provenance_finder.py @@ -14,7 +14,11 @@ from pydriller import Git from macaron.code_analyzer.call_graph import BaseNode, CallGraph -from macaron.provenance.provenance_finder import find_gav_provenance, find_npm_provenance, find_provenance_from_ci +from macaron.provenance.provenance_finder import ( + find_gav_provenance, + find_npm_provenance, + find_provenance_from_ci, +) from macaron.slsa_analyzer.ci_service import BaseCIService, CircleCI, GitHubActions, GitLabCI, Jenkins, Travis from macaron.slsa_analyzer.git_service.api_client import GhAPIClient from macaron.slsa_analyzer.package_registry import JFrogMavenRegistry, NPMRegistry @@ -206,9 +210,7 @@ def test_provenance_on_supported_ci(macaron_path: Path, test_dir: Path) -> None: assert provenance is None -def test_provenance_available_on_npm_registry( - test_dir: Path, -) -> None: +def test_provenance_available_on_npm_registry(test_dir: Path) -> None: """Test provenance published on npm registry.""" purl = PackageURL.from_string("pkg:npm/@sigstore/mock@0.1.0") npm_registry = MockNPMRegistry() @@ -220,11 +222,9 @@ def test_provenance_available_on_npm_registry( assert provenance -def test_provenance_available_on_jfrog_registry( - test_dir: Path, -) -> None: +def test_provenance_available_on_jfrog_registry(test_dir: Path) -> None: """Test provenance published on jfrog registry.""" - purl = PackageURL.from_string("pkg:/maven/io.micronaut/micronaut-core@4.2.3") + purl = PackageURL.from_string("pkg:maven/io.micronaut/micronaut-core@4.2.3") jfrog_registry = MockJFrogRegistry(str(test_dir)) provenance = find_gav_provenance(purl, jfrog_registry) diff --git a/tests/repo_finder/test_repo_finder_deps_dev.py b/tests/repo_finder/test_repo_finder_deps_dev.py new file mode 100644 index 000000000..1633b0bf1 --- /dev/null +++ b/tests/repo_finder/test_repo_finder_deps_dev.py @@ -0,0 +1,192 @@ +# Copyright (c) 2025 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module tests the deps.dev repo finder.""" + +import pytest +from packageurl import PackageURL +from pytest_httpserver import HTTPServer + +from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder +from macaron.repo_finder.repo_finder_enums import RepoFinderInfo + + +def test_find_repo_url_failure(deps_dev_service_mock: dict) -> None: + """Test find repo function.""" + purl = PackageURL.from_string(f"pkg:pypi/example{deps_dev_service_mock['api']}") + result, outcome = DepsDevRepoFinder().find_repo(purl) + assert not result + assert outcome == RepoFinderInfo.DDEV_API_ERROR + + +@pytest.mark.parametrize( + ("data", "expected_outcome"), + [ + ('{"foo": "bar"}', RepoFinderInfo.DDEV_JSON_INVALID), + ('{"links": [{"url": 1}]}', RepoFinderInfo.DDEV_NO_URLS), + ('{"links": [{"url": "test://test.test"}]}', RepoFinderInfo.DDEV_NO_VALID_URLS), + ], +) +def test_find_repo_links_failures( + httpserver: HTTPServer, deps_dev_service_mock: dict, data: str, expected_outcome: RepoFinderInfo +) -> None: + """Test invalid links.""" + purl = PackageURL.from_string("pkg:pypi/example@2") + target_url = ( + f"/{deps_dev_service_mock['api']}/{deps_dev_service_mock['purl']}/pkg:{purl.type}/{purl.name}@{purl.version}" + ) + + httpserver.expect_request(target_url).respond_with_data(data) + result, outcome = DepsDevRepoFinder().find_repo(purl) + + assert not result + assert outcome == expected_outcome + + +def test_find_repo_success(httpserver: HTTPServer, deps_dev_service_mock: dict) -> None: + """Test repo finder success.""" + purl = PackageURL.from_string("pkg:pypi/example@2") + target_url = ( + f"/{deps_dev_service_mock['api']}/{deps_dev_service_mock['purl']}/pkg:{purl.type}/{purl.name}@{purl.version}" + ) + + httpserver.expect_request(target_url).respond_with_data('{"links": [{"url": "http://github.com/oracle/macaron"}]}') + result, outcome = DepsDevRepoFinder().find_repo(purl) + + assert result + assert outcome == RepoFinderInfo.FOUND + + +@pytest.mark.parametrize( + ("repo_url", "server_url", "data"), + [ + ("http::::://130/test", "", ""), + ("http://github.com/oracle/macaron", "", ""), + ("", "/oracle/macaron", "INVALID JSON"), + ], +) +def test_get_project_info_failures( + httpserver: HTTPServer, deps_dev_service_mock: dict, repo_url: str, server_url: str, data: str +) -> None: + """Test get project info failures.""" + if not repo_url: + repo_url = f"{deps_dev_service_mock['base_scheme']}://{deps_dev_service_mock['base_netloc']}{server_url}" + + if server_url: + target_url = f"/{deps_dev_service_mock['api']}/projects/{deps_dev_service_mock['base_hostname']}{server_url}" + httpserver.expect_request(target_url).respond_with_data(data) + + assert not DepsDevRepoFinder().get_project_info(repo_url) + + +def test_get_project_info_success(httpserver: HTTPServer, deps_dev_service_mock: dict) -> None: + """Test get project info success.""" + path = "/oracle/macaron" + repo_url = f"{deps_dev_service_mock['base_scheme']}://{deps_dev_service_mock['base_netloc']}{path}" + target_url = f"/{deps_dev_service_mock['api']}/projects/{deps_dev_service_mock['base_hostname']}{path}" + httpserver.expect_request(target_url).respond_with_data('{"foo": "bar"}') + + assert DepsDevRepoFinder().get_project_info(repo_url) + + +@pytest.mark.parametrize( + ("purl_string", "server_url", "data", "expected_outcome"), + [ + ("pkg:pypi/test@3", False, "", RepoFinderInfo.DDEV_API_ERROR), + ("pkg:pypi/test@3", True, '{"foo": "bar"}', RepoFinderInfo.DDEV_JSON_INVALID), + ("pkg:pypi/test@3", True, '{"version": [1]}', RepoFinderInfo.DDEV_JSON_INVALID), + ], +) +def test_get_latest_version_failures( + httpserver: HTTPServer, + deps_dev_service_mock: dict, + purl_string: str, + server_url: bool, + data: str, + expected_outcome: RepoFinderInfo, +) -> None: + """Test get latest version failures.""" + purl = PackageURL.from_string(purl_string) + + if server_url: + target_url = f"/{deps_dev_service_mock['api']}/{deps_dev_service_mock['purl']}/pkg:{purl.type}/{purl.name}" + httpserver.expect_request(target_url).respond_with_data(data) + + result, outcome = DepsDevRepoFinder().get_latest_version(purl) + + assert not result + assert outcome == expected_outcome + + +def test_get_latest_version_success(httpserver: HTTPServer, deps_dev_service_mock: dict) -> None: + """Test get latest version success.""" + purl = PackageURL.from_string("pkg:pypi/test@3") + target_url = f"/{deps_dev_service_mock['api']}/{deps_dev_service_mock['purl']}/pkg:{purl.type}/{purl.name}" + httpserver.expect_request(target_url).respond_with_data( + '{"version": [{"versionKey":{"version": "4"}, "isDefault":true}]}' + ) + result, outcome = DepsDevRepoFinder().get_latest_version(purl) + assert result + assert outcome == RepoFinderInfo.FOUND_FROM_LATEST + + +@pytest.mark.parametrize( + ("purl_string", "server_url", "data"), + [ + ("pkg:pypi/test", False, ""), + ("pkg:pypi/test@3", False, ""), + ("pkg:pypi/test@3", True, '{"foo": "bar"}'), + ("pkg:pypi/test@3", True, '{"attestations": [1, 2]}'), + ("pkg:pypi/test@3", True, '{"attestations": [{"url": "*replace_url*/bad_endpoint"}]}'), + ("pkg:pypi/test@3", True, '{"attestations": [{"url": "*replace_url*"}]}'), + ("pkg:pypi/test@3", True, '{"attestations": [{"url": "*replace_url*"}], "attestation_bundles": [1,2]}'), + ( + "pkg:pypi/test@3", + True, + '{"attestations": [{"url": "*replace_url*"}], "attestation_bundles": [{"attestations": [1]}]}', + ), + ], +) +def test_get_attestation_failures( + httpserver: HTTPServer, deps_dev_service_mock: dict, purl_string: str, server_url: bool, data: str +) -> None: + """Test get attestation failures.""" + purl = PackageURL.from_string(purl_string) + + if server_url: + assert purl.version + target_url = ( + f"/{deps_dev_service_mock['api']}/" + + f"{'/'.join(['systems', purl.type, 'packages', purl.name, 'versions', purl.version])}" + ) + if "*replace_url*" in data: + attestation_url = ( + f"{deps_dev_service_mock['base_scheme']}://{deps_dev_service_mock['base_netloc']}{target_url}" + ) + data = data.replace("*replace_url*", attestation_url) + + httpserver.expect_request(target_url).respond_with_data(data) + + result, _ = DepsDevRepoFinder().get_attestation(purl) + assert not result + + +def test_get_attestation_success(httpserver: HTTPServer, deps_dev_service_mock: dict) -> None: + """Test get attestation success.""" + purl = PackageURL.from_string("pkg:pypi/test@3") + target_url = ( + f"/{deps_dev_service_mock['api']}/" + + f"{'/'.join(['systems', purl.type, 'packages', purl.name, 'versions', purl.version or ''])}" + ) + attestation_url = f"{deps_dev_service_mock['base_scheme']}://{deps_dev_service_mock['base_netloc']}{target_url}" + data = """ + { + "attestations": [{"url": "*replace_url*", "verified": true}], + "attestation_bundles": [{"attestations": [{"foo": "bar"}]}] + } + """ + data = data.replace("*replace_url*", attestation_url) + httpserver.expect_request(target_url).respond_with_data(data) + result, verified = DepsDevRepoFinder().get_attestation(purl) + assert result + assert verified diff --git a/tests/slsa_analyzer/package_registry/test_deps_dev.py b/tests/slsa_analyzer/package_registry/test_deps_dev.py index da8f0810a..cfe7dc601 100644 --- a/tests/slsa_analyzer/package_registry/test_deps_dev.py +++ b/tests/slsa_analyzer/package_registry/test_deps_dev.py @@ -3,66 +3,41 @@ """Tests for the deps.dev service.""" -import os -import urllib -from pathlib import Path - import pytest from pytest_httpserver import HTTPServer -from werkzeug import Response -from macaron.config.defaults import load_defaults from macaron.slsa_analyzer.package_registry.deps_dev import APIAccessError, DepsDevService @pytest.mark.parametrize( ("purl", "data", "expected"), [ - ("pkg%3Apypi%2Fultralytics%408.3.46", "", None), - ("pkg%3Apypi%2Fultralytics", '{"foo": "bar"}', {"foo": "bar"}), + ("pkg:pypi/ultralytics", '{"foo": "bar"}', {"foo": "bar"}), ], ) -def test_get_package_info(httpserver: HTTPServer, tmp_path: Path, purl: str, data: str, expected: dict | None) -> None: +def test_get_package_info( + httpserver: HTTPServer, purl: str, data: str, expected: dict, deps_dev_service_mock: dict +) -> None: """Test getting package info.""" - base_url_parsed = urllib.parse.urlparse(httpserver.url_for("")) - user_config_input = f""" - [deps_dev] - url_netloc = {base_url_parsed.netloc} - url_scheme = {base_url_parsed.scheme} - """ - - user_config_path = os.path.join(tmp_path, "config.ini") - with open(user_config_path, "w", encoding="utf-8") as user_config_file: - user_config_file.write(user_config_input) - # We don't have to worry about modifying the ``defaults`` object causing test - # pollution here, since we reload the ``defaults`` object before every test with the - # ``setup_test`` fixture. - load_defaults(user_config_path) - - httpserver.expect_request(f"/v3alpha/purl/{purl}").respond_with_response(Response(data)) + httpserver.expect_request( + f"/{deps_dev_service_mock['api']}/{deps_dev_service_mock['purl']}/{purl}" + ).respond_with_data(data) assert DepsDevService.get_package_info(purl) == expected -def test_get_package_info_exception(httpserver: HTTPServer, tmp_path: Path) -> None: +def test_get_package_info_exception(httpserver: HTTPServer, deps_dev_service_mock: dict) -> None: """Test if the function correctly returns an exception.""" - base_url_parsed = urllib.parse.urlparse(httpserver.url_for("")) - user_config_input = f""" - [deps_dev] - url_netloc = {base_url_parsed.netloc} - url_scheme = {base_url_parsed.scheme} - """ + purl = "pkg:pypi/example" - user_config_path = os.path.join(tmp_path, "config.ini") - with open(user_config_path, "w", encoding="utf-8") as user_config_file: - user_config_file.write(user_config_input) - # We don't have to worry about modifying the ``defaults`` object causing test - # pollution here, since we reload the ``defaults`` object before every test with the - # ``setup_test`` fixture. - load_defaults(user_config_path) + # Return bad JSON data. + httpserver.expect_request( + f"/{deps_dev_service_mock['api']}/{deps_dev_service_mock['purl']}/{purl}" + ).respond_with_data("Not Valid") - purl = "pkg%3Apypi%2Fexample" - httpserver.expect_request(f"/v3alpha/purl/{purl}").respond_with_data("Not Valid") - - with pytest.raises(APIAccessError): + with pytest.raises(APIAccessError, match="^Failed to process"): DepsDevService.get_package_info(purl) + + # Request an invalid resource. + with pytest.raises(APIAccessError, match="^No valid response"): + DepsDevService.get_package_info("pkg:pypi/test") From 53d5807f049c9939ed2d9b978526b19ddd58da95 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Thu, 24 Apr 2025 16:28:45 +1000 Subject: [PATCH 02/15] chore: keep encoding of purls consistent Signed-off-by: Ben Selwyn-Smith --- src/macaron/provenance/provenance_verifier.py | 6 ++++++ src/macaron/slsa_analyzer/package_registry/deps_dev.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/src/macaron/provenance/provenance_verifier.py b/src/macaron/provenance/provenance_verifier.py index 44b2193eb..174d09c6d 100644 --- a/src/macaron/provenance/provenance_verifier.py +++ b/src/macaron/provenance/provenance_verifier.py @@ -82,10 +82,12 @@ def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> signed_subjects = provenance[1].statement.get("subject") if not signed_subjects: + logger.debug("Missing signed subjects.") return False unsigned_subjects = provenance[0].statement.get("subject") if not unsigned_subjects: + logger.debug("Missing unsigned subjects.") return False found_signed_subject = None @@ -97,6 +99,7 @@ def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> break if not found_signed_subject: + logger.debug("Missing signed subject.") return False found_unsigned_subject = None @@ -108,15 +111,18 @@ def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) -> break if not found_unsigned_subject: + logger.debug("Missing unsigned subject.") return False signed_digest = found_signed_subject.get("digest") unsigned_digest = found_unsigned_subject.get("digest") if not (signed_digest and unsigned_digest): + logger.debug("Missing %ssigned digest.", "un" if signed_digest else "") return False # For signed and unsigned to match, the digests must be identical. if signed_digest != unsigned_digest: + logger.debug("Signed and unsigned digests do not match.") return False key = list(signed_digest.keys())[0] diff --git a/src/macaron/slsa_analyzer/package_registry/deps_dev.py b/src/macaron/slsa_analyzer/package_registry/deps_dev.py index 4c95f00ac..a7bcb85d9 100644 --- a/src/macaron/slsa_analyzer/package_registry/deps_dev.py +++ b/src/macaron/slsa_analyzer/package_registry/deps_dev.py @@ -8,6 +8,8 @@ import urllib.parse from json.decoder import JSONDecodeError from typing import Any +from urllib.parse import quote as encode +from urllib.parse import unquote as decode from macaron.config.defaults import defaults from macaron.errors import APIAccessError @@ -104,6 +106,10 @@ def get_package_info(purl: str) -> dict: If the service is misconfigured, the API is invalid, a network error happens, or unexpected response is returned by the API. """ + if "%" in purl: + purl = decode(purl) + purl = encode(purl, safe="") + api_endpoint = DepsDevService.get_endpoint(path=purl) url = urllib.parse.urlunsplit(api_endpoint) From 8797d7ab2ba2d650942cbf81de68a02823d86a84 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Thu, 24 Apr 2025 21:20:04 +1000 Subject: [PATCH 03/15] chore: add config to repo finder tests Signed-off-by: Ben Selwyn-Smith --- .../cases/repo_finder_remote_calls/repo_finder.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/integration/cases/repo_finder_remote_calls/repo_finder.py b/tests/integration/cases/repo_finder_remote_calls/repo_finder.py index bb0313ac0..177b3cbac 100644 --- a/tests/integration/cases/repo_finder_remote_calls/repo_finder.py +++ b/tests/integration/cases/repo_finder_remote_calls/repo_finder.py @@ -43,6 +43,13 @@ def test_repo_finder() -> int: defaults.add_section("git_service.gitlab") defaults.set("git_service.gitlab", "hostname", "gitlab.com") + if not defaults.has_section("deps_dev"): + defaults.add_section("deps_dev") + defaults.set("deps_dev", "url_netloc", "api.deps.dev") + defaults.set("deps_dev", "url_scheme", "https") + defaults.set("deps_dev", "api_endpoint", "v3alpha") + defaults.set("deps_dev", "purl_endpoint", "purl") + # Test Java package with SCM metadata in artifact POM. match, outcome = find_repo(PackageURL.from_string("pkg:maven/com.fasterxml.jackson.core/jackson-databind@2.14.2")) if not match or outcome != RepoFinderInfo.FOUND: From b6b7fd3276a919d16ee7721353e53b8e71c0db6c Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Thu, 1 May 2025 09:22:49 +1000 Subject: [PATCH 04/15] chore: use SplitResult type hint Signed-off-by: Ben Selwyn-Smith --- src/macaron/slsa_analyzer/package_registry/deps_dev.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/macaron/slsa_analyzer/package_registry/deps_dev.py b/src/macaron/slsa_analyzer/package_registry/deps_dev.py index a7bcb85d9..69db6dab3 100644 --- a/src/macaron/slsa_analyzer/package_registry/deps_dev.py +++ b/src/macaron/slsa_analyzer/package_registry/deps_dev.py @@ -7,7 +7,6 @@ import logging import urllib.parse from json.decoder import JSONDecodeError -from typing import Any from urllib.parse import quote as encode from urllib.parse import unquote as decode @@ -22,7 +21,7 @@ class DepsDevService: """The deps.dev service class.""" @staticmethod - def get_endpoint(purl: bool = True, path: str | None = None) -> Any: + def get_endpoint(purl: bool = True, path: str | None = None) -> urllib.parse.SplitResult: """Build the API endpoint for the deps.dev service and return it. Parameters From c670305cf763348f60b545befebf861dd6faa2a7 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Fri, 2 May 2025 11:45:50 +1000 Subject: [PATCH 05/15] chore: replace library used for x509 extraction Signed-off-by: Ben Selwyn-Smith --- .../slsa_analyzer/provenance/loader.py | 93 ++++++++++++++++--- 1 file changed, 80 insertions(+), 13 deletions(-) diff --git a/src/macaron/slsa_analyzer/provenance/loader.py b/src/macaron/slsa_analyzer/provenance/loader.py index 11da837e2..bbbaf2739 100644 --- a/src/macaron/slsa_analyzer/provenance/loader.py +++ b/src/macaron/slsa_analyzer/provenance/loader.py @@ -11,7 +11,7 @@ import zlib from urllib.parse import urlparse -from pypi_attestations import Attestation +from cryptography import x509 from macaron.config.defaults import defaults from macaron.json_tools import JsonType, json_extract @@ -30,6 +30,12 @@ "workflow": "1.3.6.1.4.1.57264.1.18", "invocation": "1.3.6.1.4.1.57264.1.21", } +_OID_IDS = { + "1.3.6.1.4.1.57264.1.12": "source_repo", + "1.3.6.1.4.1.57264.1.13": "source_digest", + "1.3.6.1.4.1.57264.1.18": "workflow", + "1.3.6.1.4.1.57264.1.21": "invocation", +} def _try_read_url_link_file(file_content: bytes) -> str | None: @@ -128,25 +134,86 @@ def _load_provenance_file_content( return json_payload # For provenance without a predicate (e.g. PyPI), try to use the provenance certificate instead. - attestation_model = Attestation.model_validate_json(decoded_file_content) - certificate_claims = attestation_model.certificate_claims - source_repo = certificate_claims[_OID_NAMES["source_repo"]] - workflow = certificate_claims[_OID_NAMES["workflow"]] - workflow = workflow.replace(source_repo + "/", "") - if "@" in workflow: - workflow = workflow[: workflow.index("@")] - - invocation = certificate_claims[_OID_NAMES["invocation"]] - if "/attempts" in invocation: - invocation = invocation[: invocation.index("/attempts")] + raw_certificate = json_extract(provenance, ["verification_material", "certificate"], str) + if not raw_certificate: + raise LoadIntotoAttestationError("Failed to extract certificate data.") + try: + decoded_certificate = base64.b64decode(raw_certificate) + certificate_claims = get_x509_certificate_values(decoded_certificate) + except UnicodeDecodeError as error: + raise LoadIntotoAttestationError("Cannot decode the payload.") from error + except ValueError as error: + logger.debug(error) + raise LoadIntotoAttestationError("Error parsing certificate.") from error pypi_predicate = PyPICertificatePredicate.build_predicate( - source_repo, certificate_claims[_OID_NAMES["source_digest"]], workflow, invocation + certificate_claims["source_repo"], + certificate_claims["source_digest"], + certificate_claims["workflow"], + certificate_claims["invocation"], ) json_payload["predicate"] = pypi_predicate return json_payload +def get_x509_certificate_values(decoded_certificate: bytes) -> dict: + """Retrieve the values of interest from an x509 certificate. + + Parameters + ---------- + decoded_certificate: bytes + The decoded certificate bytes. + + Returns + ------- + dict + A dictionary of the extracted values. + + Raises + ------ + ValueError + If the values could not be extracted. + """ + certificate = x509.load_der_x509_certificate(decoded_certificate) + certificate_claims = {} + for extension in certificate.extensions: + if extension.oid.dotted_string not in _OID_IDS: + continue + + claim_name = _OID_IDS[extension.oid.dotted_string] + certificate_claims[claim_name] = extension.value.value + + for name in _OID_NAMES: + if name not in certificate_claims: + raise ValueError(f"Missing certificate value: {name}") + + # Values are DER encoded UTF-8 strings. Removing the first two bytes seems to be sufficient. + value: str = certificate_claims[name][2:].decode("UTF-8") + if name == "source_digest" and len(value) != 40: + # Expect a 40 character hex value. + raise ValueError(f"Digest is not 40 characters long: {value}. Original: {certificate_claims[name]}") + if name != "source_digest" and not value.startswith("http"): + # Expect a URL with scheme. + raise ValueError(f"URL has invalid scheme: {value}. Original: {certificate_claims[name]}") + + # Accept value. + certificate_claims[name] = value + + # Apply final formatting. + workflow = certificate_claims["workflow"] + workflow = workflow.replace(certificate_claims["source_repo"] + "/", "") + if "@" in workflow: + workflow = workflow[: workflow.index("@")] + certificate_claims["workflow"] = workflow + + if "/attempts" in certificate_claims["invocation"]: + certificate_claims["invocation"] = certificate_claims["invocation"][ + : certificate_claims["invocation"].index("/attempts") + ] + + return certificate_claims + + def load_provenance_file(filepath: str) -> dict[str, JsonType]: """Load a provenance file and obtain the payload. From b2cfab14157e53557df0a74433f86a6c82c802cc Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Fri, 2 May 2025 14:08:48 +1000 Subject: [PATCH 06/15] chore: update pyproject.toml Signed-off-by: Ben Selwyn-Smith --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 26a360d71..6cae94f7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ dependencies = [ "cyclonedx-python-lib[validation] >=7.3.4,<8.0.0", "beautifulsoup4 >= 4.12.0,<5.0.0", "problog >= 2.2.6,<3.0.0", - "pypi-attestations >= 0.0.23,<1.0.0", + "cryptography >=44.0.0,<45.0.0", ] keywords = [] # https://pypi.org/classifiers/ From 4944928c338d681a96207d165fd1fc546455ebc4 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Fri, 2 May 2025 14:09:19 +1000 Subject: [PATCH 07/15] chore: remove comment Signed-off-by: Ben Selwyn-Smith --- src/macaron/provenance/provenance_finder.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/macaron/provenance/provenance_finder.py b/src/macaron/provenance/provenance_finder.py index 0e494a411..853a3a3cd 100644 --- a/src/macaron/provenance/provenance_finder.py +++ b/src/macaron/provenance/provenance_finder.py @@ -293,11 +293,6 @@ def find_pypi_provenance(purl: PackageURL) -> list[InTotoPayload]: ------- list[InTotoPayload] | None The provenance payload if found, or an empty list otherwise. - - Raises - ------ - ProvenanceAvailableException - If the discovered provenance file size exceeds the configured limit. """ attestation, verified = DepsDevRepoFinder.get_attestation(purl) if not attestation: From 198c853e7a4cb5241919802270bdfced63748104 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Wed, 7 May 2025 14:10:58 +1000 Subject: [PATCH 08/15] chore: address PR feedback Signed-off-by: Ben Selwyn-Smith --- .../repo_finder/repo_finder_deps_dev.py | 43 +++---- .../package_registry/deps_dev.py | 119 +++++++++++++----- .../package_registry/pypi_registry.py | 34 +++++ .../slsa_analyzer/provenance/loader.py | 78 ++++++------ .../specs/pypi_certificate_predicate.py | 21 +++- .../repo_finder/test_repo_finder_deps_dev.py | 54 ++++---- .../package_registry/test_deps_dev.py | 16 +-- 7 files changed, 227 insertions(+), 138 deletions(-) diff --git a/src/macaron/repo_finder/repo_finder_deps_dev.py b/src/macaron/repo_finder/repo_finder_deps_dev.py index 6f5066b37..3da0a017d 100644 --- a/src/macaron/repo_finder/repo_finder_deps_dev.py +++ b/src/macaron/repo_finder/repo_finder_deps_dev.py @@ -17,6 +17,7 @@ from macaron.repo_finder.repo_finder_enums import RepoFinderInfo from macaron.repo_finder.repo_validator import find_valid_repository_url from macaron.slsa_analyzer.git_url import clean_url +from macaron.slsa_analyzer.package_registry import PyPIRegistry from macaron.slsa_analyzer.package_registry.deps_dev import DepsDevService from macaron.util import send_get_http, send_get_http_raw @@ -55,7 +56,7 @@ def find_repo(self, purl: PackageURL) -> tuple[str, RepoFinderInfo]: A tuple of the found URL (or an empty string), and the outcome of the Repo Finder. """ try: - json_data = DepsDevService.get_package_info(encode(str(purl), safe="")) + json_data = DepsDevService.get_package_info(str(purl)) except APIAccessError: return "", RepoFinderInfo.DDEV_API_ERROR @@ -93,7 +94,7 @@ def get_project_info(project_url: str) -> dict[str, Any] | None: project_key = clean_repo_url.hostname + clean_repo_url.path - api_endpoint = DepsDevService.get_endpoint(purl=False, path=f"projects/{encode(project_key, safe='')}") + api_endpoint = DepsDevService.get_endpoint(f"projects/{encode(project_key, safe='')}") request_url = urllib.parse.urlunsplit(api_endpoint) response = send_get_http_raw(request_url) @@ -128,7 +129,7 @@ def get_latest_version(purl: PackageURL) -> tuple[PackageURL | None, RepoFinderI purl = PackageURL.from_string(f"pkg:{purl.type}/{namespace}{purl.name}") try: - metadata = DepsDevService.get_package_info(encode(str(purl), safe="")) + metadata = DepsDevService.get_package_info(purl) except APIAccessError: return None, RepoFinderInfo.DDEV_API_ERROR @@ -170,19 +171,18 @@ def get_attestation(purl: PackageURL) -> tuple[dict | None, bool]: tuple[dict | None, bool] The attestation, or None if not found, and a flag for whether it is verified. """ + if purl.type != "pypi": + logger.debug("PURL type (%s) attestation not yet supported via deps.dev.") + return None, False + if not purl.version: latest_purl, _ = DepsDevRepoFinder.get_latest_version(purl) if not latest_purl: return None, False purl = latest_purl - if not purl.version: - # Should be unreachable. - return None, False - api_endpoint = DepsDevService.get_endpoint( - purl=False, path="/".join(["systems", purl.type, "packages", purl.name, "versions", purl.version]) - ) - target_url = urllib.parse.urlunsplit(api_endpoint) + purl_endpoint = DepsDevService().get_purl_endpoint(purl) + target_url = urllib.parse.urlunsplit(purl_endpoint) result = send_get_http(target_url, headers={}) if not result: @@ -204,25 +204,10 @@ def get_attestation(purl: PackageURL) -> tuple[dict | None, bool]: if not attestation_data: return None, False - bundle = json_extract(attestation_data, ["attestation_bundles"], list) - if not bundle: - logger.debug("No attestation bundle in response.") - return None, False - if len(bundle) > 1: - logger.debug("Bundle length greater than one: %s", len(bundle)) - - attestations = json_extract(bundle[0], ["attestations"], list) - if not attestations: - logger.debug("No attestations in response.") - return None, False - if len(attestations) > 1: - logger.debug("More than one attestation: %s", len(attestations)) - - if not isinstance(attestations[0], dict): - logger.debug("Attestation invalid.") - return None, False - - return attestations[0], json_extract(result_attestations, [0, "verified"], bool) or False + return ( + PyPIRegistry().extract_attestation(attestation_data), + json_extract(result_attestations, [0, "verified"], bool) or False, + ) @staticmethod def extract_links(json_data: dict) -> tuple[list[str], RepoFinderInfo]: diff --git a/src/macaron/slsa_analyzer/package_registry/deps_dev.py b/src/macaron/slsa_analyzer/package_registry/deps_dev.py index 69db6dab3..12af90253 100644 --- a/src/macaron/slsa_analyzer/package_registry/deps_dev.py +++ b/src/macaron/slsa_analyzer/package_registry/deps_dev.py @@ -7,8 +7,8 @@ import logging import urllib.parse from json.decoder import JSONDecodeError -from urllib.parse import quote as encode -from urllib.parse import unquote as decode + +from packageurl import PackageURL from macaron.config.defaults import defaults from macaron.errors import APIAccessError @@ -21,19 +21,59 @@ class DepsDevService: """The deps.dev service class.""" @staticmethod - def get_endpoint(purl: bool = True, path: str | None = None) -> urllib.parse.SplitResult: + def get_purl_endpoint(purl: PackageURL | str) -> urllib.parse.SplitResult: + """Build the purl API endpoint for the deps.dev service and return it. + + Parameters + ---------- + purl: PackageURL | str + The PURL to append to the API endpoint. + + Returns + ------- + urllib.parse.SplitResult + The purl API endpoint. + + Raises + ------ + APIAccessError + If building the API endpoint fails. + """ + encoded_purl = DepsDevService.encode_purl(purl) + if not encoded_purl: + raise APIAccessError("The PURL could not be encoded.") + + purl_endpoint = defaults.get("deps_dev", "purl_endpoint", fallback="") + if not purl_endpoint: + raise APIAccessError( + 'The "purl_endpoint" key is missing in section [deps_dev] of the .ini configuration file.' + ) + + base_url = DepsDevService.get_endpoint() + + try: + return urllib.parse.SplitResult( + scheme=base_url.scheme, + netloc=base_url.netloc, + path="/".join([base_url.path, purl_endpoint, encoded_purl]), + query="", + fragment="", + ) + except ValueError as error: + raise APIAccessError("Failed to construct the PURL API URL.") from error + + @staticmethod + def get_endpoint(path: str | None = None) -> urllib.parse.SplitResult: """Build the API endpoint for the deps.dev service and return it. Parameters ---------- - purl: bool - A flag to determine whether the PURL or BASE endpoint should be returned. path: str | None - A path to be added to the URL. + A path to be appended to the API endpoint. Returns ------- - Any + urllib.parse.SplitResult The API endpoint. """ section_name = "deps_dev" @@ -56,24 +96,7 @@ def get_endpoint(purl: bool = True, path: str | None = None) -> urllib.parse.Spl endpoint_path = [api_endpoint] if path: endpoint_path.append(path) - if not purl: - try: - return urllib.parse.SplitResult( - scheme=url_scheme, - netloc=url_netloc, - path="/".join(endpoint_path), - query="", - fragment="", - ) - except ValueError as error: - raise APIAccessError("Failed to construct the API URL.") from error - - purl_endpoint = section.get("purl_endpoint") - if not purl_endpoint: - raise APIAccessError( - f'The "purl_endpoint" key is missing in section [{section_name}] of the .ini configuration file.' - ) - endpoint_path.insert(1, purl_endpoint) + try: return urllib.parse.SplitResult( scheme=url_scheme, @@ -86,12 +109,48 @@ def get_endpoint(purl: bool = True, path: str | None = None) -> urllib.parse.Spl raise APIAccessError("Failed to construct the API URL.") from error @staticmethod - def get_package_info(purl: str) -> dict: + def encode_purl(purl: PackageURL | str) -> str | None: + """Encode a PURL to match the deps.dev requirements. + + The fragment (subpath) and query (qualifiers) PURL sections are not accepted by deps.dev. + See: https://docs.deps.dev/api/v3alpha/index.html#purllookup. + The documentation claims that all special characters must be percent-encoded. This is not strictly true, as '@' + and ':' are accepted as is. The forward slashes in the PURL must be encoded to distinguish them from URL parts. + + Parameters + ---------- + purl: PackageURL | str + The PURL to encode. + + Returns + ------- + str | None + The encoded PURL. + """ + try: + original_purl = purl if isinstance(purl, PackageURL) else PackageURL.from_string(purl) + new_purl = PackageURL( + type=original_purl.type, + namespace=original_purl.namespace, + name=original_purl.name, + version=original_purl.version, + ) + except ValueError as error: + logger.debug(error) + return None + + # We rely on packageurl calling urllib to encode PURLs for all special characters except forward slash: "/". + encoded = str(new_purl).replace("/", "%2F") + + return encoded + + @staticmethod + def get_package_info(purl: PackageURL | str) -> dict: """Check if the package identified by the PackageURL (PURL) exists and return its information. Parameters ---------- - purl: str + purl: PackageURL | str The PackageURL (PURL). Returns @@ -105,11 +164,7 @@ def get_package_info(purl: str) -> dict: If the service is misconfigured, the API is invalid, a network error happens, or unexpected response is returned by the API. """ - if "%" in purl: - purl = decode(purl) - purl = encode(purl, safe="") - - api_endpoint = DepsDevService.get_endpoint(path=purl) + api_endpoint = DepsDevService.get_purl_endpoint(purl) url = urllib.parse.urlunsplit(api_endpoint) response = send_get_http_raw(url) diff --git a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py index 20f75db08..b0b0275b5 100644 --- a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py +++ b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py @@ -335,6 +335,40 @@ def get_maintainer_join_date(self, username: str) -> datetime | None: return res.replace(tzinfo=None) if res else None + @staticmethod + def extract_attestation(attestation_data: dict) -> dict | None: + """Extract the first attestation file from a PyPI attestation response. + + Parameters + ---------- + attestation_data: dict + The JSON data representing a bundle of attestations. + + Returns + ------- + dict | None + The first attestation, or None if not found. + """ + bundle = json_extract(attestation_data, ["attestation_bundles"], list) + if not bundle: + logger.debug("No attestation bundle in response.") + return None + if len(bundle) > 1: + logger.debug("Bundle length greater than one: %s", len(bundle)) + + attestations = json_extract(bundle[0], ["attestations"], list) + if not attestations: + logger.debug("No attestations in response.") + return None + if len(attestations) > 1: + logger.debug("More than one attestation: %s", len(attestations)) + + if not isinstance(attestations[0], dict): + logger.debug("Attestation invalid.") + return None + + return attestations[0] + @dataclass class PyPIPackageJsonAsset: diff --git a/src/macaron/slsa_analyzer/provenance/loader.py b/src/macaron/slsa_analyzer/provenance/loader.py index bbbaf2739..db15dddef 100644 --- a/src/macaron/slsa_analyzer/provenance/loader.py +++ b/src/macaron/slsa_analyzer/provenance/loader.py @@ -12,6 +12,7 @@ from urllib.parse import urlparse from cryptography import x509 +from cryptography.x509 import DuplicateExtension, UnsupportedGeneralNameType from macaron.config.defaults import defaults from macaron.json_tools import JsonType, json_extract @@ -24,12 +25,6 @@ # See: https://github.com/sigstore/fulcio/blob/main/docs/oid-info.md -_OID_NAMES = { - "source_repo": "1.3.6.1.4.1.57264.1.12", - "source_digest": "1.3.6.1.4.1.57264.1.13", - "workflow": "1.3.6.1.4.1.57264.1.18", - "invocation": "1.3.6.1.4.1.57264.1.21", -} _OID_IDS = { "1.3.6.1.4.1.57264.1.12": "source_repo", "1.3.6.1.4.1.57264.1.13": "source_digest", @@ -123,14 +118,17 @@ def _load_provenance_file_content( try: json_payload = json.loads(decoded_payload) except (json.JSONDecodeError, TypeError) as error: - raise LoadIntotoAttestationError( - "Cannot deserialize the provenance payload as JSON.", - ) from error + raise LoadIntotoAttestationError("Cannot deserialize the provenance payload as JSON.") from error if not isinstance(json_payload, dict): raise LoadIntotoAttestationError("The provenance payload is not a JSON object.") if json_payload["predicate"]: + predicate_type = json_extract(json_payload, ["predicateType"], str) + if not predicate_type: + raise LoadIntotoAttestationError("Missing predicateType in payload.") + if predicate_type == "https://docs.pypi.org/attestations/publish/v1": + raise LoadIntotoAttestationError("PyPI attestation should not have a predicate.") return json_payload # For provenance without a predicate (e.g. PyPI), try to use the provenance certificate instead. @@ -139,65 +137,66 @@ def _load_provenance_file_content( raise LoadIntotoAttestationError("Failed to extract certificate data.") try: decoded_certificate = base64.b64decode(raw_certificate) - certificate_claims = get_x509_certificate_values(decoded_certificate) + certificate_predicate = get_x509_der_certificate_values(decoded_certificate) except UnicodeDecodeError as error: raise LoadIntotoAttestationError("Cannot decode the payload.") from error except ValueError as error: logger.debug(error) raise LoadIntotoAttestationError("Error parsing certificate.") from error - pypi_predicate = PyPICertificatePredicate.build_predicate( - certificate_claims["source_repo"], - certificate_claims["source_digest"], - certificate_claims["workflow"], - certificate_claims["invocation"], - ) - json_payload["predicate"] = pypi_predicate + json_payload["predicate"] = certificate_predicate return json_payload -def get_x509_certificate_values(decoded_certificate: bytes) -> dict: - """Retrieve the values of interest from an x509 certificate. +def get_x509_der_certificate_values(x509_der_certificate: bytes) -> PyPICertificatePredicate: + """Retrieve the values of interest from an x509 certificate in the form of a predicate. + + The passed certificate should be following the DER specification. + See https://peps.python.org/pep-0740/#provenance-objects. + Parameters ---------- - decoded_certificate: bytes - The decoded certificate bytes. + x509_der_certificate: bytes + The certificate bytes. Returns ------- - dict - A dictionary of the extracted values. + PyPICertificatePredicate + A predicate created from the extracted values. Raises ------ ValueError If the values could not be extracted. + """ - certificate = x509.load_der_x509_certificate(decoded_certificate) + certificate = x509.load_der_x509_certificate(x509_der_certificate) certificate_claims = {} - for extension in certificate.extensions: + try: + extensions = certificate.extensions + except (DuplicateExtension, UnsupportedGeneralNameType) as error: + raise ValueError("Certificate extension error:") from error + + for extension in extensions: if extension.oid.dotted_string not in _OID_IDS: continue + # These extensions should be of the UnrecognizedExtension type. + # See: https://cryptography.io/en/latest/x509/reference/#cryptography.x509.UnrecognizedExtension claim_name = _OID_IDS[extension.oid.dotted_string] - certificate_claims[claim_name] = extension.value.value - - for name in _OID_NAMES: - if name not in certificate_claims: - raise ValueError(f"Missing certificate value: {name}") # Values are DER encoded UTF-8 strings. Removing the first two bytes seems to be sufficient. - value: str = certificate_claims[name][2:].decode("UTF-8") - if name == "source_digest" and len(value) != 40: + value: str = extension.value.value[2:].decode("UTF-8") + if claim_name == "source_digest" and len(value) != 40: # Expect a 40 character hex value. - raise ValueError(f"Digest is not 40 characters long: {value}. Original: {certificate_claims[name]}") - if name != "source_digest" and not value.startswith("http"): + raise ValueError(f"Digest is not 40 characters long: {value}. Original: {extension.value.value}") + if claim_name != "source_digest" and not value.startswith("http"): # Expect a URL with scheme. - raise ValueError(f"URL has invalid scheme: {value}. Original: {certificate_claims[name]}") + raise ValueError(f"URL has invalid scheme: {value}. Original: {extension.value.value}") # Accept value. - certificate_claims[name] = value + certificate_claims[claim_name] = value # Apply final formatting. workflow = certificate_claims["workflow"] @@ -211,7 +210,12 @@ def get_x509_certificate_values(decoded_certificate: bytes) -> dict: : certificate_claims["invocation"].index("/attempts") ] - return certificate_claims + return PyPICertificatePredicate( + certificate_claims["source_repo"], + certificate_claims["source_digest"], + certificate_claims["workflow"], + certificate_claims["invocation"], + ) def load_provenance_file(filepath: str) -> dict[str, JsonType]: diff --git a/src/macaron/slsa_analyzer/specs/pypi_certificate_predicate.py b/src/macaron/slsa_analyzer/specs/pypi_certificate_predicate.py index 49d16f93f..2ae7cfb6e 100644 --- a/src/macaron/slsa_analyzer/specs/pypi_certificate_predicate.py +++ b/src/macaron/slsa_analyzer/specs/pypi_certificate_predicate.py @@ -2,18 +2,27 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the spec for predicates derived from a PyPI attestation certificate.""" +from dataclasses import dataclass +@dataclass(frozen=True) class PyPICertificatePredicate: """This class implements the PyPI certificate predicate.""" - @staticmethod - def build_predicate(source_url: str, source_digest: str, build_workflow: str, invocation_url: str) -> dict: + source_url: str + + source_digest: str + + build_workflow: str + + invocation_url: str + + def build_predicate(self) -> dict: """Build a predicate using passed parameters.""" return { "buildType": "pypi_certificate", - "sourceUri": f"{source_url}", - "sourceDigest": f"{source_digest}", - "workflow": f"{build_workflow}", - "invocationUrl": f"{invocation_url}", + "sourceUri": f"{self.source_url}", + "sourceDigest": f"{self.source_digest}", + "workflow": f"{self.build_workflow}", + "invocationUrl": f"{self.invocation_url}", } diff --git a/tests/repo_finder/test_repo_finder_deps_dev.py b/tests/repo_finder/test_repo_finder_deps_dev.py index 1633b0bf1..bf71d8a75 100644 --- a/tests/repo_finder/test_repo_finder_deps_dev.py +++ b/tests/repo_finder/test_repo_finder_deps_dev.py @@ -2,7 +2,6 @@ # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module tests the deps.dev repo finder.""" - import pytest from packageurl import PackageURL from pytest_httpserver import HTTPServer @@ -58,23 +57,23 @@ def test_find_repo_success(httpserver: HTTPServer, deps_dev_service_mock: dict) @pytest.mark.parametrize( - ("repo_url", "server_url", "data"), + "repo_url", [ - ("http::::://130/test", "", ""), - ("http://github.com/oracle/macaron", "", ""), - ("", "/oracle/macaron", "INVALID JSON"), + "http::::://130/test", + "http://github.com/oracle/macaron", ], ) -def test_get_project_info_failures( - httpserver: HTTPServer, deps_dev_service_mock: dict, repo_url: str, server_url: str, data: str -) -> None: - """Test get project info failures.""" - if not repo_url: - repo_url = f"{deps_dev_service_mock['base_scheme']}://{deps_dev_service_mock['base_netloc']}{server_url}" +def test_get_project_info_invalid_url(repo_url: str) -> None: + """Test get project info invalid url.""" + assert not DepsDevRepoFinder().get_project_info(repo_url) - if server_url: - target_url = f"/{deps_dev_service_mock['api']}/projects/{deps_dev_service_mock['base_hostname']}{server_url}" - httpserver.expect_request(target_url).respond_with_data(data) + +def test_get_project_info_invalid_json(httpserver: HTTPServer, deps_dev_service_mock: dict) -> None: + """Test get project info invalid json.""" + server_url = "/oracle/macaron" + repo_url = f"{deps_dev_service_mock['base_scheme']}://{deps_dev_service_mock['base_netloc']}{server_url}" + target_url = f"/{deps_dev_service_mock['api']}/projects/{deps_dev_service_mock['base_hostname']}{server_url}" + httpserver.expect_request(target_url).respond_with_data("INVALID JSON") assert not DepsDevRepoFinder().get_project_info(repo_url) @@ -155,10 +154,7 @@ def test_get_attestation_failures( if server_url: assert purl.version - target_url = ( - f"/{deps_dev_service_mock['api']}/" - + f"{'/'.join(['systems', purl.type, 'packages', purl.name, 'versions', purl.version])}" - ) + target_url = f"/{deps_dev_service_mock['api']}/{deps_dev_service_mock['purl']}/{purl}" if "*replace_url*" in data: attestation_url = ( f"{deps_dev_service_mock['base_scheme']}://{deps_dev_service_mock['base_netloc']}{target_url}" @@ -174,15 +170,25 @@ def test_get_attestation_failures( def test_get_attestation_success(httpserver: HTTPServer, deps_dev_service_mock: dict) -> None: """Test get attestation success.""" purl = PackageURL.from_string("pkg:pypi/test@3") - target_url = ( - f"/{deps_dev_service_mock['api']}/" - + f"{'/'.join(['systems', purl.type, 'packages', purl.name, 'versions', purl.version or ''])}" - ) + target_url = f"/{deps_dev_service_mock['api']}/{deps_dev_service_mock['purl']}/{purl}" attestation_url = f"{deps_dev_service_mock['base_scheme']}://{deps_dev_service_mock['base_netloc']}{target_url}" data = """ { - "attestations": [{"url": "*replace_url*", "verified": true}], - "attestation_bundles": [{"attestations": [{"foo": "bar"}]}] + "attestations": [ + { + "url": "*replace_url*", + "verified": true + } + ], + "attestation_bundles": [ + { + "attestations": [ + { + "foo": "bar" + } + ] + } + ] } """ data = data.replace("*replace_url*", attestation_url) diff --git a/tests/slsa_analyzer/package_registry/test_deps_dev.py b/tests/slsa_analyzer/package_registry/test_deps_dev.py index cfe7dc601..700e6ff91 100644 --- a/tests/slsa_analyzer/package_registry/test_deps_dev.py +++ b/tests/slsa_analyzer/package_registry/test_deps_dev.py @@ -4,26 +4,22 @@ """Tests for the deps.dev service.""" import pytest +from packageurl import PackageURL from pytest_httpserver import HTTPServer from macaron.slsa_analyzer.package_registry.deps_dev import APIAccessError, DepsDevService -@pytest.mark.parametrize( - ("purl", "data", "expected"), - [ - ("pkg:pypi/ultralytics", '{"foo": "bar"}', {"foo": "bar"}), - ], -) -def test_get_package_info( - httpserver: HTTPServer, purl: str, data: str, expected: dict, deps_dev_service_mock: dict -) -> None: +def test_get_package_info(httpserver: HTTPServer, deps_dev_service_mock: dict) -> None: """Test getting package info.""" + purl = "pkg:npm/@test/%:_@\"'$£!^&*()-test-example@v3.5.0-jar" httpserver.expect_request( f"/{deps_dev_service_mock['api']}/{deps_dev_service_mock['purl']}/{purl}" - ).respond_with_data(data) + ).respond_with_data('{"foo": "bar"}') + expected = {"foo": "bar"} assert DepsDevService.get_package_info(purl) == expected + assert DepsDevService.get_package_info(PackageURL.from_string(purl)) == expected def test_get_package_info_exception(httpserver: HTTPServer, deps_dev_service_mock: dict) -> None: From 446b1b4615c5fa4a4fcd642aabc3f5f04d16e6d1 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Wed, 7 May 2025 14:57:40 +1000 Subject: [PATCH 09/15] chore: minor fix Signed-off-by: Ben Selwyn-Smith --- tests/repo_finder/test_repo_finder_deps_dev.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/repo_finder/test_repo_finder_deps_dev.py b/tests/repo_finder/test_repo_finder_deps_dev.py index bf71d8a75..9df584fc5 100644 --- a/tests/repo_finder/test_repo_finder_deps_dev.py +++ b/tests/repo_finder/test_repo_finder_deps_dev.py @@ -63,7 +63,9 @@ def test_find_repo_success(httpserver: HTTPServer, deps_dev_service_mock: dict) "http://github.com/oracle/macaron", ], ) -def test_get_project_info_invalid_url(repo_url: str) -> None: +def test_get_project_info_invalid_url( + deps_dev_service_mock: dict, repo_url: str # pylint: disable=unused-argument +) -> None: """Test get project info invalid url.""" assert not DepsDevRepoFinder().get_project_info(repo_url) From b78a06dac02fc9f905da657acdf50ea3be5ae197 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Wed, 7 May 2025 15:45:12 +1000 Subject: [PATCH 10/15] chore: minor fix Signed-off-by: Ben Selwyn-Smith --- src/macaron/repo_finder/repo_finder_deps_dev.py | 6 +++++- src/macaron/slsa_analyzer/provenance/loader.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/macaron/repo_finder/repo_finder_deps_dev.py b/src/macaron/repo_finder/repo_finder_deps_dev.py index 3da0a017d..1e5ae9b2d 100644 --- a/src/macaron/repo_finder/repo_finder_deps_dev.py +++ b/src/macaron/repo_finder/repo_finder_deps_dev.py @@ -188,7 +188,11 @@ def get_attestation(purl: PackageURL) -> tuple[dict | None, bool]: if not result: return None, False - result_attestations = json_extract(result, ["attestations"], list) + attestation_keys = ["attestations"] + if "version" in result: + attestation_keys.insert(0, "version") + + result_attestations = json_extract(result, attestation_keys, list) if not result_attestations: logger.debug("No attestations in result.") return None, False diff --git a/src/macaron/slsa_analyzer/provenance/loader.py b/src/macaron/slsa_analyzer/provenance/loader.py index db15dddef..0495c9cb2 100644 --- a/src/macaron/slsa_analyzer/provenance/loader.py +++ b/src/macaron/slsa_analyzer/provenance/loader.py @@ -144,7 +144,7 @@ def _load_provenance_file_content( logger.debug(error) raise LoadIntotoAttestationError("Error parsing certificate.") from error - json_payload["predicate"] = certificate_predicate + json_payload["predicate"] = certificate_predicate.build_predicate() return json_payload From 424a44cbb910e135a5854543fe3bcaea3e267e5c Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Thu, 8 May 2025 10:13:19 +1000 Subject: [PATCH 11/15] chore: add custom predicate doc string Signed-off-by: Ben Selwyn-Smith --- src/macaron/provenance/provenance_extractor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/macaron/provenance/provenance_extractor.py b/src/macaron/provenance/provenance_extractor.py index 4bfc8ec03..f2c54c607 100644 --- a/src/macaron/provenance/provenance_extractor.py +++ b/src/macaron/provenance/provenance_extractor.py @@ -201,6 +201,9 @@ def _extract_from_slsa_v1(payload: InTotoV1Payload) -> tuple[str | None, str | N def _extract_from_pypi_v1(payload: InTotoV1Payload) -> tuple[str | None, str | None]: """Extract the repository and commit metadata from the pypi provenance file found at the passed path. + This payload represents a custom predicate created from the certificate of a PyPI v1 attestation file. + By design, these attestations come without a predicate. + Parameters ---------- payload: InTotoPayload From 0d7280a7c3c4ce46d49913a2c4fff64408ab469b Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Thu, 8 May 2025 23:13:26 +1000 Subject: [PATCH 12/15] chore: address PR feedback Signed-off-by: Ben Selwyn-Smith --- src/macaron/slsa_analyzer/provenance/loader.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/macaron/slsa_analyzer/provenance/loader.py b/src/macaron/slsa_analyzer/provenance/loader.py index 0495c9cb2..a0c6337d2 100644 --- a/src/macaron/slsa_analyzer/provenance/loader.py +++ b/src/macaron/slsa_analyzer/provenance/loader.py @@ -123,14 +123,18 @@ def _load_provenance_file_content( if not isinstance(json_payload, dict): raise LoadIntotoAttestationError("The provenance payload is not a JSON object.") - if json_payload["predicate"]: - predicate_type = json_extract(json_payload, ["predicateType"], str) - if not predicate_type: - raise LoadIntotoAttestationError("Missing predicateType in payload.") + predicate_type = json_extract(json_payload, ["predicateType"], str) + if not predicate_type: + raise LoadIntotoAttestationError("The payload is missing a predicate type.") + + if "predicate" in json_payload: if predicate_type == "https://docs.pypi.org/attestations/publish/v1": raise LoadIntotoAttestationError("PyPI attestation should not have a predicate.") return json_payload + if predicate_type != "https://docs.pypi.org/attestations/publish/v1": + raise LoadIntotoAttestationError(f"The payload predicate type '{predicate_type}' requires a predicate.") + # For provenance without a predicate (e.g. PyPI), try to use the provenance certificate instead. raw_certificate = json_extract(provenance, ["verification_material", "certificate"], str) if not raw_certificate: @@ -198,6 +202,10 @@ def get_x509_der_certificate_values(x509_der_certificate: bytes) -> PyPICertific # Accept value. certificate_claims[claim_name] = value + # Expect all values to have been found. + if len(certificate_claims) != len(_OID_IDS): + raise ValueError(f"Missing certificate claim(s). Found {len(certificate_claims)} of {len(_OID_IDS)}") + # Apply final formatting. workflow = certificate_claims["workflow"] workflow = workflow.replace(certificate_claims["source_repo"] + "/", "") From 25d0efb2fac4a63d7a5e1a05c490cf9d50434654 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Thu, 8 May 2025 23:17:19 +1000 Subject: [PATCH 13/15] chore: add example PURL endpoint Signed-off-by: Ben Selwyn-Smith --- src/macaron/repo_finder/repo_finder_deps_dev.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/macaron/repo_finder/repo_finder_deps_dev.py b/src/macaron/repo_finder/repo_finder_deps_dev.py index 1e5ae9b2d..3375151d0 100644 --- a/src/macaron/repo_finder/repo_finder_deps_dev.py +++ b/src/macaron/repo_finder/repo_finder_deps_dev.py @@ -181,6 +181,8 @@ def get_attestation(purl: PackageURL) -> tuple[dict | None, bool]: return None, False purl = latest_purl + # Example of a PURL endpoint for deps.dev with '/' encoded as '%2F': + # https://api.deps.dev/v3alpha/purl/pkg:npm%2F@sigstore%2Fmock@0.7.5 purl_endpoint = DepsDevService().get_purl_endpoint(purl) target_url = urllib.parse.urlunsplit(purl_endpoint) From 545a603d6f4f9b4ac447e16081647fa3a1216882 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Fri, 9 May 2025 08:41:24 +1000 Subject: [PATCH 14/15] chore: fix predicate check Signed-off-by: Ben Selwyn-Smith --- src/macaron/slsa_analyzer/provenance/loader.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/macaron/slsa_analyzer/provenance/loader.py b/src/macaron/slsa_analyzer/provenance/loader.py index a0c6337d2..9aa4fb1e9 100644 --- a/src/macaron/slsa_analyzer/provenance/loader.py +++ b/src/macaron/slsa_analyzer/provenance/loader.py @@ -127,7 +127,8 @@ def _load_provenance_file_content( if not predicate_type: raise LoadIntotoAttestationError("The payload is missing a predicate type.") - if "predicate" in json_payload: + predicate = json_extract(json_payload, ["predicate"], dict) + if predicate: if predicate_type == "https://docs.pypi.org/attestations/publish/v1": raise LoadIntotoAttestationError("PyPI attestation should not have a predicate.") return json_payload From 57f4346fb60a6de7963174c41669991841c3cc03 Mon Sep 17 00:00:00 2001 From: Ben Selwyn-Smith Date: Mon, 12 May 2025 20:21:30 +1000 Subject: [PATCH 15/15] chore: remove github attestation related change Signed-off-by: Ben Selwyn-Smith --- src/macaron/slsa_analyzer/provenance/loader.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/macaron/slsa_analyzer/provenance/loader.py b/src/macaron/slsa_analyzer/provenance/loader.py index 9aa4fb1e9..19c256315 100644 --- a/src/macaron/slsa_analyzer/provenance/loader.py +++ b/src/macaron/slsa_analyzer/provenance/loader.py @@ -98,10 +98,6 @@ def _load_provenance_file_content( # Some provenances, such as Witness may not include the DSSE envelope `dsseEnvelope` # property but contain its value directly. provenance_payload = provenance.get("payload", None) - if not provenance_payload: - # GitHub Attestation. - # TODO Check if old method (above) actually works. - provenance_payload = json_extract(provenance, ["bundle", "dsseEnvelope", "payload"], str) if not provenance_payload: # PyPI Attestation. provenance_payload = json_extract(provenance, ["envelope", "statement"], str)