diff --git a/src/macaron/artifact/local_artifact.py b/src/macaron/artifact/local_artifact.py index ed37c335a..582799824 100644 --- a/src/macaron/artifact/local_artifact.py +++ b/src/macaron/artifact/local_artifact.py @@ -1,16 +1,21 @@ -# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module declares types and utilities for handling local artifacts.""" import fnmatch import glob +import hashlib +import logging import os from packageurl import PackageURL from macaron.artifact.maven import construct_maven_repository_path from macaron.errors import LocalArtifactFinderError +from macaron.slsa_analyzer.package_registry import MavenCentralRegistry + +logger: logging.Logger = logging.getLogger(__name__) def construct_local_artifact_dirs_glob_pattern_maven_purl(maven_purl: PackageURL) -> list[str] | None: @@ -247,3 +252,53 @@ def get_local_artifact_dirs( ) raise LocalArtifactFinderError(f"Unsupported PURL type {purl_type}") + + +def get_local_artifact_hash(purl: PackageURL, artifact_dirs: list[str], hash_algorithm_name: str) -> str | None: + """Compute the hash of the local artifact. + + Parameters + ---------- + purl: PackageURL + The PURL of the artifact being sought. + artifact_dirs: list[str] + The possible locations of the artifact. + hash_algorithm_name: str + The hash algorithm to use. + + Returns + ------- + str | None + The hash, or None if not found. + """ + if not artifact_dirs: + logger.debug("No artifact directories provided.") + return None + + if not purl.version: + logger.debug("PURL is missing version.") + return None + + artifact_target = None + if purl.type == "maven": + artifact_target = MavenCentralRegistry.get_artifact_file_name(purl) + + if not artifact_target: + logger.debug("PURL type not supported: %s", purl.type) + return None + + for artifact_dir in artifact_dirs: + full_path = os.path.join(artifact_dir, artifact_target) + if not os.path.exists(full_path): + continue + + with open(full_path, "rb") as file: + try: + hash_result = hashlib.file_digest(file, hash_algorithm_name) + except ValueError as error: + logger.debug("Error while hashing file: %s", error) + continue + + return hash_result.hexdigest() + + return None diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index baf3682cd..8f37ccc3b 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -4,11 +4,14 @@ """This module handles the cloning and analyzing a Git repo.""" import glob +import hashlib +import json import logging import os import re import sys import tempfile +import urllib.parse from collections.abc import Mapping from datetime import datetime, timezone from pathlib import Path @@ -20,7 +23,10 @@ from sqlalchemy.orm import Session from macaron import __version__ -from macaron.artifact.local_artifact import get_local_artifact_dirs +from macaron.artifact.local_artifact import ( + get_local_artifact_dirs, + get_local_artifact_hash, +) from macaron.config.global_config import global_config from macaron.config.target_config import Configuration from macaron.database.database_manager import DatabaseManager, get_db_manager, get_db_session @@ -41,6 +47,7 @@ ProvenanceError, PURLNotFoundError, ) +from macaron.json_tools import json_extract from macaron.output_reporter.reporter import FileReporter from macaron.output_reporter.results import Record, Report, SCMStatus from macaron.provenance import provenance_verifier @@ -66,12 +73,15 @@ from macaron.slsa_analyzer.checks import * # pylint: disable=wildcard-import,unused-wildcard-import # noqa: F401,F403 from macaron.slsa_analyzer.ci_service import CI_SERVICES from macaron.slsa_analyzer.database_store import store_analyze_context_to_db -from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService +from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService, GitHub from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR -from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES +from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES, MavenCentralRegistry, PyPIRegistry +from macaron.slsa_analyzer.package_registry.pypi_registry import find_or_create_pypi_asset from macaron.slsa_analyzer.provenance.expectations.expectation_registry import ExpectationRegistry from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV01Payload +from macaron.slsa_analyzer.provenance.intoto.errors import LoadIntotoAttestationError +from macaron.slsa_analyzer.provenance.loader import load_provenance_payload from macaron.slsa_analyzer.provenance.slsa import SLSAProvenanceData from macaron.slsa_analyzer.registry import registry from macaron.slsa_analyzer.specs.ci_spec import CIInfo @@ -403,6 +413,17 @@ def run_single( status=SCMStatus.ANALYSIS_FAILED, ) + local_artifact_dirs = None + if parsed_purl and parsed_purl.type in self.local_artifact_repo_mapper: + local_artifact_repo_path = self.local_artifact_repo_mapper[parsed_purl.type] + try: + local_artifact_dirs = get_local_artifact_dirs( + purl=parsed_purl, + local_artifact_repo_path=local_artifact_repo_path, + ) + except LocalArtifactFinderError as error: + logger.debug(error) + # Prepare the repo. git_obj = None commit_finder_outcome = CommitFinderInfo.NOT_USED @@ -480,6 +501,39 @@ def run_single( git_service = self._determine_git_service(analyze_ctx) self._determine_ci_services(analyze_ctx, git_service) self._determine_build_tools(analyze_ctx, git_service) + + # Try to find an attestation from GitHub, if applicable. + if parsed_purl and not provenance_payload and analysis_target.repo_path and isinstance(git_service, GitHub): + # Try to discover GitHub attestation for the target software component. + url = None + try: + url = urllib.parse.urlparse(analysis_target.repo_path) + except TypeError as error: + logger.debug("Failed to parse repository path as URL: %s", error) + if url and url.hostname == "github.com": + artifact_hash = self.get_artifact_hash( + parsed_purl, local_artifact_dirs, hashlib.sha256(), package_registries_info + ) + if artifact_hash: + git_attestation_dict = git_service.api_client.get_attestation( + analyze_ctx.component.repository.full_name, artifact_hash + ) + if git_attestation_dict: + git_attestation_list = json_extract(git_attestation_dict, ["attestations"], list) + if git_attestation_list: + git_attestation = git_attestation_list[0] + + with tempfile.TemporaryDirectory() as temp_dir: + attestation_file = os.path.join(temp_dir, "attestation") + with open(attestation_file, "w", encoding="UTF-8") as file: + json.dump(git_attestation, file) + + try: + payload = load_provenance_payload(attestation_file) + provenance_payload = payload + except LoadIntotoAttestationError as error: + logger.debug("Failed to load provenance payload: %s", error) + if parsed_purl is not None: self._verify_repository_link(parsed_purl, analyze_ctx) self._determine_package_registries(analyze_ctx, package_registries_info) @@ -541,16 +595,8 @@ def run_single( analyze_ctx.dynamic_data["validate_malware"] = validate_malware - if parsed_purl and parsed_purl.type in self.local_artifact_repo_mapper: - local_artifact_repo_path = self.local_artifact_repo_mapper[parsed_purl.type] - try: - local_artifact_dirs = get_local_artifact_dirs( - purl=parsed_purl, - local_artifact_repo_path=local_artifact_repo_path, - ) - analyze_ctx.dynamic_data["local_artifact_paths"].extend(local_artifact_dirs) - except LocalArtifactFinderError as error: - logger.debug(error) + if local_artifact_dirs: + analyze_ctx.dynamic_data["local_artifact_paths"].extend(local_artifact_dirs) analyze_ctx.check_results = registry.scan(analyze_ctx) @@ -940,6 +986,99 @@ def create_analyze_ctx(self, component: Component) -> AnalyzeContext: return analyze_ctx + def get_artifact_hash( + self, + purl: PackageURL, + cached_artifacts: list[str] | None, + hash_algorithm: Any, + package_registries_info: list[PackageRegistryInfo], + ) -> str | None: + """Get the hash of the artifact found from the passed PURL using local or remote files. + + Parameters + ---------- + purl: PackageURL + The PURL of the artifact. + cached_artifacts: list[str] | None + The list of local files that match the PURL. + hash_algorithm: Any + The hash algorithm to use. + package_registries_info: list[PackageRegistryInfo] + The list of package registry information. + + Returns + ------- + str | None + The hash of the artifact, or None if not found. + """ + if cached_artifacts: + # Try to get the hash from a local file. + artifact_hash = get_local_artifact_hash(purl, cached_artifacts, hash_algorithm.name) + + if artifact_hash: + return artifact_hash + + # Download the artifact. + if purl.type == "maven": + maven_registry = next( + ( + package_registry + for package_registry in PACKAGE_REGISTRIES + if isinstance(package_registry, MavenCentralRegistry) + ), + None, + ) + if not maven_registry: + return None + + return maven_registry.get_artifact_hash(purl, hash_algorithm) + + if purl.type == "pypi": + pypi_registry = next( + ( + package_registry + for package_registry in PACKAGE_REGISTRIES + if isinstance(package_registry, PyPIRegistry) + ), + None, + ) + if not pypi_registry: + logger.debug("Missing registry for PyPI") + return None + + registry_info = next( + ( + info + for info in package_registries_info + if info.package_registry == pypi_registry and info.build_tool_name in {"pip", "poetry"} + ), + None, + ) + if not registry_info: + logger.debug("Missing registry information for PyPI") + return None + + pypi_asset = find_or_create_pypi_asset(purl.name, purl.version, registry_info) + if not pypi_asset: + return None + + pypi_asset.has_repository = True + if not pypi_asset.download(""): + return None + + artifact_hash = pypi_asset.get_sha256() + if artifact_hash: + return artifact_hash + + source_url = pypi_asset.get_sourcecode_url("bdist_wheel") + if not source_url: + return None + + return pypi_registry.get_artifact_hash(source_url, hash_algorithm) + + logger.debug("Purl type '%s' not yet supported for GitHub attestation discovery.", purl.type) + return None + def _determine_git_service(self, analyze_ctx: AnalyzeContext) -> BaseGitService: """Determine the Git service used by the software component.""" remote_path = analyze_ctx.component.repository.remote_path if analyze_ctx.component.repository else None diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index c69de3bde..e37847a07 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -32,7 +32,11 @@ from macaron.slsa_analyzer.checks.check_result import CheckResultData, CheckResultType, Confidence, JustificationType from macaron.slsa_analyzer.package_registry.deps_dev import APIAccessError, DepsDevService from macaron.slsa_analyzer.package_registry.osv_dev import OSVDevService -from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset, PyPIRegistry +from macaron.slsa_analyzer.package_registry.pypi_registry import ( + PyPIPackageJsonAsset, + PyPIRegistry, + find_or_create_pypi_asset, +) from macaron.slsa_analyzer.registry import registry from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo @@ -258,28 +262,16 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: case PackageRegistryInfo( build_tool_name="pip" | "poetry", build_tool_purl_type="pypi", - package_registry=PyPIRegistry() as pypi_registry, + package_registry=PyPIRegistry(), ) as pypi_registry_info: - # Retrieve the pre-existing AssetLocator object for the PyPI package JSON object, if it exists. - pypi_package_json = next( - ( - asset - for asset in pypi_registry_info.metadata - if isinstance(asset, PyPIPackageJsonAsset) - and asset.component_name == ctx.component.name - and asset.component_version == ctx.component.version - ), - None, + # Retrieve the pre-existing asset, or create a new one. + pypi_package_json = find_or_create_pypi_asset( + ctx.component.name, ctx.component.version, pypi_registry_info ) - if not pypi_package_json: - # Create an AssetLocator object for the PyPI package JSON object. - pypi_package_json = PyPIPackageJsonAsset( - component_name=ctx.component.name, - component_version=ctx.component.version, - has_repository=ctx.component.repository is not None, - pypi_registry=pypi_registry, - package_json={}, - ) + if pypi_package_json is None: + return CheckResultData(result_tables=[], result_type=CheckResultType.UNKNOWN) + + pypi_package_json.has_repository = ctx.component.repository is not None pypi_registry_info.metadata.append(pypi_package_json) diff --git a/src/macaron/slsa_analyzer/git_service/api_client.py b/src/macaron/slsa_analyzer/git_service/api_client.py index 8e987e6ca..681a1f4e0 100644 --- a/src/macaron/slsa_analyzer/git_service/api_client.py +++ b/src/macaron/slsa_analyzer/git_service/api_client.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """The module provides API clients for VCS services, such as GitHub.""" @@ -659,6 +659,25 @@ def download_asset(self, url: str, download_path: str) -> bool: return True + def get_attestation(self, full_name: str, artifact_hash: str) -> dict: + """Download and return the attestation associated with the passed artifact hash, if any. + + Parameters + ---------- + full_name : str + The full name of the repo. + artifact_hash: str + The SHA256 hash of an artifact. + + Returns + ------- + dict + The attestation data, or an empty dict if not found. + """ + url = f"{GhAPIClient._REPO_END_POINT}/{full_name}/attestations/sha256:{artifact_hash}" + response_data = send_get_http(url, self.headers) + return response_data or {} + def get_default_gh_client(access_token: str) -> GhAPIClient: """Return a GhAPIClient instance with default values. diff --git a/src/macaron/slsa_analyzer/package_registry/maven_central_registry.py b/src/macaron/slsa_analyzer/package_registry/maven_central_registry.py index 131051b66..bc419f921 100644 --- a/src/macaron/slsa_analyzer/package_registry/maven_central_registry.py +++ b/src/macaron/slsa_analyzer/package_registry/maven_central_registry.py @@ -6,10 +6,13 @@ import logging import urllib.parse from datetime import datetime, timezone +from typing import Any import requests from packageurl import PackageURL +from requests import RequestException +from macaron.artifact.maven import construct_maven_repository_path from macaron.config.defaults import defaults from macaron.errors import ConfigurationError, InvalidHTTPResponseError from macaron.slsa_analyzer.package_registry.package_registry import PackageRegistry @@ -236,3 +239,84 @@ def find_publish_timestamp(self, purl: str) -> datetime: raise InvalidHTTPResponseError(f"The timestamp returned by {url} is invalid") from error raise InvalidHTTPResponseError(f"Invalid response from Maven central for {url}.") + + @staticmethod + def get_artifact_file_name(purl: PackageURL) -> str | None: + """Return the artifact file name of the passed PURL based on the Maven registry standard. + + Parameters + ---------- + purl: PackageURL + The PURL of the artifact. + + Returns + ------- + str | None + The artifact file name, or None if invalid. + """ + if not purl.version: + return None + + return purl.name + "-" + purl.version + ".jar" + + def get_artifact_hash(self, purl: PackageURL, hash_algorithm: Any) -> str | None: + """Return the hash of the artifact found by the passed purl relevant to the registry's URL. + + Parameters + ---------- + purl: PackageURL + The purl of the artifact. + hash_algorithm: Any + The hash algorithm to use. + + Returns + ------- + str | None + The hash of the artifact, or None if not found. + """ + if not purl.namespace: + return None + + file_name = MavenCentralRegistry.get_artifact_file_name(purl) + if not (purl.version and file_name): + return None + + # Maven supports but does not require a sha256 hash of uploaded artifacts. + artifact_path = construct_maven_repository_path(purl.namespace, purl.name, purl.version) + artifact_url = self.registry_url + "/" + artifact_path + "/" + file_name + sha256_url = artifact_url + ".sha256" + logger.debug("Search for artifact hash using URL: %s", [sha256_url, artifact_url]) + + response = send_get_http_raw(sha256_url, {}) + sha256_hash = None + if response and (sha256_hash := response.text): + # As Maven hashes are user provided and not verified they serve as a reference only. + logger.debug("Found hash of artifact: %s", sha256_hash) + + try: + response = requests.get(artifact_url, stream=True, timeout=40) + response.raise_for_status() + except requests.exceptions.HTTPError as http_err: + logger.debug("HTTP error occurred: %s", http_err) + return None + + if response.status_code != 200: + return None + + # Download file and compute hash as chunks are received. + try: + for chunk in response.iter_content(): + hash_algorithm.update(chunk) + except RequestException as error: + # Something went wrong with the request, abort. + logger.debug("Error while streaming target file: %s", error) + response.close() + return None + + artifact_hash: str = hash_algorithm.hexdigest() + if sha256_hash and artifact_hash != sha256_hash: + logger.debug("Artifact hash and discovered hash do not match: %s != %s", artifact_hash, sha256_hash) + return None + + logger.debug("Computed hash of artifact: %s", artifact_hash) + return artifact_hash diff --git a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py index 20f75db08..0852d554c 100644 --- a/src/macaron/slsa_analyzer/package_registry/pypi_registry.py +++ b/src/macaron/slsa_analyzer/package_registry/pypi_registry.py @@ -11,6 +11,7 @@ import zipfile from dataclasses import dataclass from datetime import datetime +from typing import Any import requests from bs4 import BeautifulSoup, Tag @@ -21,6 +22,7 @@ from macaron.json_tools import json_extract from macaron.malware_analyzer.datetime_parser import parse_datetime from macaron.slsa_analyzer.package_registry.package_registry import PackageRegistry +from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo from macaron.util import send_get_http_raw logger: logging.Logger = logging.getLogger(__name__) @@ -231,6 +233,45 @@ def fetch_sourcecode(self, src_url: str) -> dict[str, str] | None: logger.debug("Successfully fetch the source code from PyPI") return py_files_content + def get_artifact_hash(self, artifact_url: str, hash_algorithm: Any) -> str | None: + """Return the hash of the artifact found at the passed URL. + + Parameters + ---------- + artifact_url + The URL of the artifact. + hash_algorithm: Any + The hash algorithm to use. + + Returns + ------- + str | None + The hash of the artifact, or None if not found. + """ + try: + response = requests.get(artifact_url, stream=True, timeout=40) + response.raise_for_status() + except requests.exceptions.HTTPError as http_err: + logger.debug("HTTP error occurred: %s", http_err) + return None + + if response.status_code != 200: + logger.debug("Invalid response: %s", response.status_code) + return None + + try: + for chunk in response.iter_content(): + hash_algorithm.update(chunk) + except RequestException as error: + # Something went wrong with the request, abort. + logger.debug("Error while streaming source file: %s", error) + response.close() + return None + + artifact_hash: str = hash_algorithm.hexdigest() + logger.debug("Computed artifact hash: %s", artifact_hash) + return artifact_hash + def get_package_page(self, package_name: str) -> str | None: """Implement custom API to get package main page. @@ -430,15 +471,19 @@ def get_latest_version(self) -> str | None: """ return json_extract(self.package_json, ["info", "version"], str) - def get_sourcecode_url(self) -> str | None: + def get_sourcecode_url(self, package_type: str = "sdist") -> str | None: """Get the url of the source distribution. + Parameters + ---------- + package_type: str + The package type to retrieve the URL of. + Returns ------- str | None The URL of the source distribution. """ - urls: list | None = None if self.component_version: urls = json_extract(self.package_json, ["releases", self.component_version], list) else: @@ -447,7 +492,7 @@ def get_sourcecode_url(self) -> str | None: if not urls: return None for distribution in urls: - if distribution.get("packagetype") != "sdist": + if distribution.get("packagetype") != package_type: continue # We intentionally check if the url is None and use empty string if that's the case. source_url: str = distribution.get("url") or "" @@ -497,3 +542,59 @@ def get_sourcecode(self) -> dict[str, str] | None: source_code: dict[str, str] | None = self.pypi_registry.fetch_sourcecode(url) return source_code return None + + def get_sha256(self) -> str | None: + """Get the sha256 hash of the artifact from its payload. + + Returns + ------- + str | None + The sha256 hash of the artifact, or None if not found. + """ + if not self.package_json and not self.download(""): + return None + + if not self.component_version: + artifact_hash = json_extract(self.package_json, ["urls", 0, "digests", "sha256"], str) + else: + artifact_hash = json_extract( + self.package_json, ["releases", self.component_version, "digests", "sha256"], str + ) + logger.debug("Found sha256 hash: %s", artifact_hash) + return artifact_hash + + +def find_or_create_pypi_asset( + asset_name: str, asset_version: str | None, pypi_registry_info: PackageRegistryInfo +) -> PyPIPackageJsonAsset | None: + """Find the asset in the provided package registry information, or create it. + + Parameters + ---------- + asset_name: str + The name of the asset. + asset_version: str | None + The version of the asset. + pypi_registry_info: + The package registry information. + + Returns + ------- + PyPIPackageJsonAsset | None + The asset, or None if not found. + """ + pypi_package_json = next( + (asset for asset in pypi_registry_info.metadata if isinstance(asset, PyPIPackageJsonAsset)), + None, + ) + if pypi_package_json: + return pypi_package_json + + package_registry = pypi_registry_info.package_registry + if not isinstance(package_registry, PyPIRegistry): + logger.debug("Failed to create PyPIPackageJson asset.") + return None + + asset = PyPIPackageJsonAsset(asset_name, asset_version, False, package_registry, {}) + pypi_registry_info.metadata.append(asset) + return asset diff --git a/src/macaron/slsa_analyzer/provenance/loader.py b/src/macaron/slsa_analyzer/provenance/loader.py index 65dfee1bb..d75faa726 100644 --- a/src/macaron/slsa_analyzer/provenance/loader.py +++ b/src/macaron/slsa_analyzer/provenance/loader.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module contains the loaders for SLSA provenances.""" @@ -12,7 +12,7 @@ from urllib.parse import urlparse from macaron.config.defaults import defaults -from macaron.json_tools import JsonType +from macaron.json_tools import JsonType, json_extract from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, validate_intoto_payload from macaron.slsa_analyzer.provenance.intoto.errors import LoadIntotoAttestationError, ValidateInTotoPayloadError from macaron.util import send_get_http_raw @@ -83,6 +83,10 @@ def _load_provenance_file_content( # Some provenances, such as Witness may not include the DSSE envelope `dsseEnvelope` # property but contain its value directly. provenance_payload = provenance.get("payload", None) + if not provenance_payload: + # GitHub Attestation. + # TODO Check if old method (above) actually works. + provenance_payload = json_extract(provenance, ["bundle", "dsseEnvelope", "payload"], str) if not provenance_payload: raise LoadIntotoAttestationError( 'Cannot find the "payload" field in the decoded provenance.', diff --git a/tests/integration/cases/github_maven_attestation/policy.dl b/tests/integration/cases/github_maven_attestation/policy.dl new file mode 100644 index 000000000..9df46219b --- /dev/null +++ b/tests/integration/cases/github_maven_attestation/policy.dl @@ -0,0 +1,10 @@ +/* Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. */ +/* Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. */ + +#include "prelude.dl" + +Policy("test_policy", component_id, "") :- + check_passed(component_id, "mcn_provenance_available_1"). + +apply_policy_to("test_policy", component_id) :- + is_component(component_id, "pkg:maven/io.liftwizard/liftwizard-checkstyle@2.1.22"). diff --git a/tests/integration/cases/github_maven_attestation/test.yaml b/tests/integration/cases/github_maven_attestation/test.yaml new file mode 100644 index 000000000..9913d930e --- /dev/null +++ b/tests/integration/cases/github_maven_attestation/test.yaml @@ -0,0 +1,22 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +description: | + Discovering attestation of a Maven artifact on GitHub + +tags: +- macaron-python-package + +steps: +- name: Run macaron analyze + kind: analyze + options: + command_args: + - -purl + - pkg:maven/io.liftwizard/liftwizard-checkstyle@2.1.22 + - -rp + - https://github.com/liftwizard/liftwizard +- name: Run macaron verify-policy to verify passed/failed checks + kind: verify + options: + policy: policy.dl diff --git a/tests/integration/cases/github_maven_attestation_local/policy.dl b/tests/integration/cases/github_maven_attestation_local/policy.dl new file mode 100644 index 000000000..ff31abf90 --- /dev/null +++ b/tests/integration/cases/github_maven_attestation_local/policy.dl @@ -0,0 +1,10 @@ +/* Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. */ +/* Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. */ + +#include "prelude.dl" + +Policy("test_policy", component_id, "") :- + check_failed(component_id, "mcn_provenance_available_1"). + +apply_policy_to("test_policy", component_id) :- + is_component(component_id, "pkg:maven/io.liftwizard/liftwizard-checkstyle@2.1.22"). diff --git a/tests/integration/cases/github_maven_attestation_local/test.yaml b/tests/integration/cases/github_maven_attestation_local/test.yaml new file mode 100644 index 000000000..d66a089b2 --- /dev/null +++ b/tests/integration/cases/github_maven_attestation_local/test.yaml @@ -0,0 +1,28 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +description: | + Discovering GitHub attestation of a local Maven artifact but failing because the artifact is wrong + +tags: +- macaron-python-package + +steps: +- name: Download artifact POM instead of the JAR + kind: shell + options: + cmd: curl --create-dirs -o ./output/.m2/repository/io/liftwizard/liftwizard-checkstyle/2.1.22/liftwizard-checkstyle-2.1.22.jar https://repo1.maven.org/maven2/io/liftwizard/liftwizard-checkstyle/2.1.22/liftwizard-checkstyle-2.1.22.pom +- name: Run macaron analyze + kind: analyze + options: + command_args: + - -purl + - pkg:maven/io.liftwizard/liftwizard-checkstyle@2.1.22 + - -rp + - https://github.com/liftwizard/liftwizard + - --local-maven-repo + - ./output/.m2 +- name: Run macaron verify-policy to verify no provenance was found + kind: verify + options: + policy: policy.dl diff --git a/tests/integration/cases/github_pypi_attestation/policy.dl b/tests/integration/cases/github_pypi_attestation/policy.dl new file mode 100644 index 000000000..26cef7913 --- /dev/null +++ b/tests/integration/cases/github_pypi_attestation/policy.dl @@ -0,0 +1,10 @@ +/* Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. */ +/* Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. */ + +#include "prelude.dl" + +Policy("test_policy", component_id, "") :- + check_passed(component_id, "mcn_provenance_available_1"). + +apply_policy_to("test_policy", component_id) :- + is_component(component_id, "pkg:pypi/toga@0.5.0"). diff --git a/tests/integration/cases/github_pypi_attestation/test.yaml b/tests/integration/cases/github_pypi_attestation/test.yaml new file mode 100644 index 000000000..173361662 --- /dev/null +++ b/tests/integration/cases/github_pypi_attestation/test.yaml @@ -0,0 +1,20 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +description: | + Discovering attestation of a PyPI artifact on GitHub + +tags: +- macaron-python-package + +steps: +- name: Run macaron analyze + kind: analyze + options: + command_args: + - -purl + - pkg:pypi/toga@0.5.0 +- name: Run macaron verify-policy to verify passed/failed checks + kind: verify + options: + policy: policy.dl diff --git a/tests/slsa_analyzer/package_registry/test_maven_central_registry.py b/tests/slsa_analyzer/package_registry/test_maven_central_registry.py index 62b9fdca0..2feffae96 100644 --- a/tests/slsa_analyzer/package_registry/test_maven_central_registry.py +++ b/tests/slsa_analyzer/package_registry/test_maven_central_registry.py @@ -7,11 +7,14 @@ import os import urllib.parse from datetime import datetime +from hashlib import sha256 from pathlib import Path import pytest +from packageurl import PackageURL from pytest_httpserver import HTTPServer +from macaron.artifact.maven import construct_maven_repository_path from macaron.config.defaults import load_defaults from macaron.errors import ConfigurationError, InvalidHTTPResponseError from macaron.slsa_analyzer.package_registry.maven_central_registry import MavenCentralRegistry @@ -35,6 +38,28 @@ def maven_central_instance() -> MavenCentralRegistry: ) +@pytest.fixture(name="maven_service") +def maven_service_(httpserver: HTTPServer, tmp_path: Path) -> None: + """Set up the Maven httpserver.""" + base_url_parsed = urllib.parse.urlparse(httpserver.url_for("")) + + user_config_input = f""" + [package_registry.maven_central] + request_timeout = 20 + search_netloc = {base_url_parsed.netloc} + search_scheme = {base_url_parsed.scheme} + registry_url_netloc = {base_url_parsed.netloc} + registry_url_scheme = {base_url_parsed.scheme} + """ + user_config_path = os.path.join(tmp_path, "config.ini") + with open(user_config_path, "w", encoding="utf-8") as user_config_file: + user_config_file.write(user_config_input) + # We don't have to worry about modifying the ``defaults`` object causing test + # pollution here, since we reload the ``defaults`` object before every test with the + # ``setup_test`` fixture. + load_defaults(user_config_path) + + def test_load_defaults(tmp_path: Path) -> None: """Test the ``load_defaults`` method.""" user_config_path = os.path.join(tmp_path, "config.ini") @@ -150,31 +175,14 @@ def test_is_detected( def test_find_publish_timestamp( resources_path: Path, httpserver: HTTPServer, - tmp_path: Path, + maven_service: dict, # pylint: disable=unused-argument purl: str, mc_json_path: str, query_string: str, expected_timestamp: str, ) -> None: """Test that the function finds the timestamp correctly.""" - base_url_parsed = urllib.parse.urlparse(httpserver.url_for("")) - maven_central = MavenCentralRegistry() - - # Set up responses of solrsearch endpoints using the httpserver plugin. - user_config_input = f""" - [package_registry.maven_central] - request_timeout = 20 - search_netloc = {base_url_parsed.netloc} - search_scheme = {base_url_parsed.scheme} - """ - user_config_path = os.path.join(tmp_path, "config.ini") - with open(user_config_path, "w", encoding="utf-8") as user_config_file: - user_config_file.write(user_config_input) - # We don't have to worry about modifying the ``defaults`` object causing test - # pollution here, since we reload the ``defaults`` object before every test with the - # ``setup_test`` fixture. - load_defaults(user_config_path) maven_central.load_defaults() with open(os.path.join(resources_path, "maven_central_files", mc_json_path), encoding="utf8") as page: @@ -208,35 +216,19 @@ def test_find_publish_timestamp( def test_find_publish_timestamp_errors( resources_path: Path, httpserver: HTTPServer, - tmp_path: Path, + maven_service: dict, # pylint: disable=unused-argument purl: str, mc_json_path: str, expected_msg: str, ) -> None: """Test that the function handles errors correctly.""" - base_url_parsed = urllib.parse.urlparse(httpserver.url_for("")) - maven_central = MavenCentralRegistry() - - # Set up responses of solrsearch endpoints using the httpserver plugin. - user_config_input = f""" - [package_registry.maven_central] - request_timeout = 20 - search_netloc = {base_url_parsed.netloc} - search_scheme = {base_url_parsed.scheme} - """ - user_config_path = os.path.join(tmp_path, "config.ini") - with open(user_config_path, "w", encoding="utf-8") as user_config_file: - user_config_file.write(user_config_input) - # We don't have to worry about modifying the ``defaults`` object causing test - # pollution here, since we reload the ``defaults`` object before every test with the - # ``setup_test`` fixture. - load_defaults(user_config_path) maven_central.load_defaults() with open(os.path.join(resources_path, "maven_central_files", mc_json_path), encoding="utf8") as page: mc_json_response = json.load(page) + # Set up responses of solrsearch endpoints using the httpserver plugin. httpserver.expect_request( "/solrsearch/select", query_string="q=g:org.apache.logging.log4j+AND+a:log4j-core+AND+v:3.0.0-beta2&core=gav&rows=1&wt=json", @@ -245,3 +237,67 @@ def test_find_publish_timestamp_errors( pat = f"^{expected_msg}" with pytest.raises(InvalidHTTPResponseError, match=pat): maven_central.find_publish_timestamp(purl=purl) + + +def test_get_artifact_file_name() -> None: + """Test the artifact file name function.""" + assert not MavenCentralRegistry().get_artifact_file_name(PackageURL.from_string("pkg:maven/test/example")) + + assert ( + MavenCentralRegistry().get_artifact_file_name(PackageURL.from_string("pkg:maven/text/example@1")) + == "example-1.jar" + ) + + +@pytest.mark.parametrize("purl_string", ["pkg:maven/example", "pkg:maven/example/test", "pkg:maven/example/test@1"]) +def test_get_artifact_hash_failures( + httpserver: HTTPServer, maven_service: dict, purl_string: str # pylint: disable=unused-argument +) -> None: + """Test failures of get artifact hash.""" + purl = PackageURL.from_string(purl_string) + + maven_registry = MavenCentralRegistry() + maven_registry.load_defaults() + + if ( + purl.namespace + and purl.version + and (file_name := MavenCentralRegistry().get_artifact_file_name(purl)) + and file_name + ): + artifact_path = "/" + construct_maven_repository_path(purl.namespace, purl.name, purl.version) + "/" + file_name + hash_algorithm = sha256() + hash_algorithm.update(b"example_data") + expected_hash = hash_algorithm.hexdigest() + httpserver.expect_request(artifact_path + ".sha256").respond_with_data(expected_hash) + httpserver.expect_request(artifact_path).respond_with_data(b"example_data_2") + + result = maven_registry.get_artifact_hash(purl, sha256()) + + assert not result + + +def test_get_artifact_hash_success( + httpserver: HTTPServer, maven_service: dict # pylint: disable=unused-argument +) -> None: + """Test success of get artifact hash.""" + purl = PackageURL.from_string("pkg:maven/example/test@1") + assert purl.namespace + assert purl.version + + maven_registry = MavenCentralRegistry() + maven_registry.load_defaults() + + file_name = MavenCentralRegistry().get_artifact_file_name(purl) + assert file_name + + artifact_path = "/" + construct_maven_repository_path(purl.namespace, purl.name, purl.version) + "/" + file_name + hash_algorithm = sha256() + hash_algorithm.update(b"example_data") + expected_hash = hash_algorithm.hexdigest() + httpserver.expect_request(artifact_path + ".sha256").respond_with_data(expected_hash) + httpserver.expect_request(artifact_path).respond_with_data(b"example_data") + + result = maven_registry.get_artifact_hash(purl, sha256()) + + assert result