Skip to content

feat: add GitHub attestation discovery #1020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion src/macaron/artifact/local_artifact.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module declares types and utilities for handling local artifacts."""

import fnmatch
import glob
import hashlib
import logging
import os

from packageurl import PackageURL

from macaron.artifact.maven import construct_maven_repository_path
from macaron.errors import LocalArtifactFinderError
from macaron.slsa_analyzer.package_registry import MavenCentralRegistry

logger: logging.Logger = logging.getLogger(__name__)


def construct_local_artifact_dirs_glob_pattern_maven_purl(maven_purl: PackageURL) -> list[str] | None:
Expand Down Expand Up @@ -247,3 +252,53 @@ def get_local_artifact_dirs(
)

raise LocalArtifactFinderError(f"Unsupported PURL type {purl_type}")


def get_local_artifact_hash(purl: PackageURL, artifact_dirs: list[str], hash_algorithm_name: str) -> str | None:
"""Compute the hash of the local artifact.

Parameters
----------
purl: PackageURL
The PURL of the artifact being sought.
artifact_dirs: list[str]
The possible locations of the artifact.
hash_algorithm_name: str
The hash algorithm to use.

Returns
-------
str | None
The hash, or None if not found.
"""
if not artifact_dirs:
logger.debug("No artifact directories provided.")
return None

if not purl.version:
logger.debug("PURL is missing version.")
return None

artifact_target = None
if purl.type == "maven":
artifact_target = MavenCentralRegistry.get_artifact_file_name(purl)

if not artifact_target:
logger.debug("PURL type not supported: %s", purl.type)
return None

for artifact_dir in artifact_dirs:
full_path = os.path.join(artifact_dir, artifact_target)
if not os.path.exists(full_path):
continue

with open(full_path, "rb") as file:
try:
hash_result = hashlib.file_digest(file, hash_algorithm_name)
except ValueError as error:
logger.debug("Error while hashing file: %s", error)
continue

return hash_result.hexdigest()

return None
165 changes: 152 additions & 13 deletions src/macaron/slsa_analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
"""This module handles the cloning and analyzing a Git repo."""

import glob
import hashlib
import json
import logging
import os
import re
import sys
import tempfile
import urllib.parse
from collections.abc import Mapping
from datetime import datetime, timezone
from pathlib import Path
Expand All @@ -20,7 +23,10 @@
from sqlalchemy.orm import Session

from macaron import __version__
from macaron.artifact.local_artifact import get_local_artifact_dirs
from macaron.artifact.local_artifact import (
get_local_artifact_dirs,
get_local_artifact_hash,
)
from macaron.config.global_config import global_config
from macaron.config.target_config import Configuration
from macaron.database.database_manager import DatabaseManager, get_db_manager, get_db_session
Expand All @@ -41,6 +47,7 @@
ProvenanceError,
PURLNotFoundError,
)
from macaron.json_tools import json_extract
from macaron.output_reporter.reporter import FileReporter
from macaron.output_reporter.results import Record, Report, SCMStatus
from macaron.provenance import provenance_verifier
Expand All @@ -66,12 +73,15 @@
from macaron.slsa_analyzer.checks import * # pylint: disable=wildcard-import,unused-wildcard-import # noqa: F401,F403
from macaron.slsa_analyzer.ci_service import CI_SERVICES
from macaron.slsa_analyzer.database_store import store_analyze_context_to_db
from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService
from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService, GitHub
from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService
from macaron.slsa_analyzer.git_url import GIT_REPOS_DIR
from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES
from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES, MavenCentralRegistry, PyPIRegistry
from macaron.slsa_analyzer.package_registry.pypi_registry import find_or_create_pypi_asset
from macaron.slsa_analyzer.provenance.expectations.expectation_registry import ExpectationRegistry
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV01Payload
from macaron.slsa_analyzer.provenance.intoto.errors import LoadIntotoAttestationError
from macaron.slsa_analyzer.provenance.loader import load_provenance_payload
from macaron.slsa_analyzer.provenance.slsa import SLSAProvenanceData
from macaron.slsa_analyzer.registry import registry
from macaron.slsa_analyzer.specs.ci_spec import CIInfo
Expand Down Expand Up @@ -403,6 +413,17 @@ def run_single(
status=SCMStatus.ANALYSIS_FAILED,
)

local_artifact_dirs = None
if parsed_purl and parsed_purl.type in self.local_artifact_repo_mapper:
local_artifact_repo_path = self.local_artifact_repo_mapper[parsed_purl.type]
try:
local_artifact_dirs = get_local_artifact_dirs(
purl=parsed_purl,
local_artifact_repo_path=local_artifact_repo_path,
)
except LocalArtifactFinderError as error:
logger.debug(error)

# Prepare the repo.
git_obj = None
commit_finder_outcome = CommitFinderInfo.NOT_USED
Expand Down Expand Up @@ -480,6 +501,39 @@ def run_single(
git_service = self._determine_git_service(analyze_ctx)
self._determine_ci_services(analyze_ctx, git_service)
self._determine_build_tools(analyze_ctx, git_service)

# Try to find an attestation from GitHub, if applicable.
if parsed_purl and not provenance_payload and analysis_target.repo_path and isinstance(git_service, GitHub):
# Try to discover GitHub attestation for the target software component.
url = None
try:
url = urllib.parse.urlparse(analysis_target.repo_path)
except TypeError as error:
logger.debug("Failed to parse repository path as URL: %s", error)
if url and url.hostname == "github.com":
artifact_hash = self.get_artifact_hash(
parsed_purl, local_artifact_dirs, hashlib.sha256(), package_registries_info
)
if artifact_hash:
git_attestation_dict = git_service.api_client.get_attestation(
analyze_ctx.component.repository.full_name, artifact_hash
)
if git_attestation_dict:
git_attestation_list = json_extract(git_attestation_dict, ["attestations"], list)
if git_attestation_list:
git_attestation = git_attestation_list[0]

with tempfile.TemporaryDirectory() as temp_dir:
attestation_file = os.path.join(temp_dir, "attestation")
with open(attestation_file, "w", encoding="UTF-8") as file:
json.dump(git_attestation, file)

try:
payload = load_provenance_payload(attestation_file)
provenance_payload = payload
except LoadIntotoAttestationError as error:
logger.debug("Failed to load provenance payload: %s", error)

if parsed_purl is not None:
self._verify_repository_link(parsed_purl, analyze_ctx)
self._determine_package_registries(analyze_ctx, package_registries_info)
Expand Down Expand Up @@ -541,16 +595,8 @@ def run_single(

analyze_ctx.dynamic_data["validate_malware"] = validate_malware

if parsed_purl and parsed_purl.type in self.local_artifact_repo_mapper:
local_artifact_repo_path = self.local_artifact_repo_mapper[parsed_purl.type]
try:
local_artifact_dirs = get_local_artifact_dirs(
purl=parsed_purl,
local_artifact_repo_path=local_artifact_repo_path,
)
analyze_ctx.dynamic_data["local_artifact_paths"].extend(local_artifact_dirs)
except LocalArtifactFinderError as error:
logger.debug(error)
if local_artifact_dirs:
analyze_ctx.dynamic_data["local_artifact_paths"].extend(local_artifact_dirs)

analyze_ctx.check_results = registry.scan(analyze_ctx)

Expand Down Expand Up @@ -940,6 +986,99 @@ def create_analyze_ctx(self, component: Component) -> AnalyzeContext:

return analyze_ctx

def get_artifact_hash(
self,
purl: PackageURL,
cached_artifacts: list[str] | None,
hash_algorithm: Any,
package_registries_info: list[PackageRegistryInfo],
) -> str | None:
"""Get the hash of the artifact found from the passed PURL using local or remote files.

Parameters
----------
purl: PackageURL
The PURL of the artifact.
cached_artifacts: list[str] | None
The list of local files that match the PURL.
hash_algorithm: Any
The hash algorithm to use.
package_registries_info: list[PackageRegistryInfo]
The list of package registry information.

Returns
-------
str | None
The hash of the artifact, or None if not found.
"""
if cached_artifacts:
# Try to get the hash from a local file.
artifact_hash = get_local_artifact_hash(purl, cached_artifacts, hash_algorithm.name)

if artifact_hash:
return artifact_hash

# Download the artifact.
if purl.type == "maven":
maven_registry = next(
(
package_registry
for package_registry in PACKAGE_REGISTRIES
if isinstance(package_registry, MavenCentralRegistry)
),
None,
)
if not maven_registry:
return None

return maven_registry.get_artifact_hash(purl, hash_algorithm)

if purl.type == "pypi":
pypi_registry = next(
(
package_registry
for package_registry in PACKAGE_REGISTRIES
if isinstance(package_registry, PyPIRegistry)
),
None,
)
if not pypi_registry:
logger.debug("Missing registry for PyPI")
return None

registry_info = next(
(
info
for info in package_registries_info
if info.package_registry == pypi_registry and info.build_tool_name in {"pip", "poetry"}
),
None,
)
if not registry_info:
logger.debug("Missing registry information for PyPI")
return None

pypi_asset = find_or_create_pypi_asset(purl.name, purl.version, registry_info)
if not pypi_asset:
return None

pypi_asset.has_repository = True
if not pypi_asset.download(""):
return None

artifact_hash = pypi_asset.get_sha256()
if artifact_hash:
return artifact_hash

source_url = pypi_asset.get_sourcecode_url("bdist_wheel")
if not source_url:
return None

return pypi_registry.get_artifact_hash(source_url, hash_algorithm)

logger.debug("Purl type '%s' not yet supported for GitHub attestation discovery.", purl.type)
return None

def _determine_git_service(self, analyze_ctx: AnalyzeContext) -> BaseGitService:
"""Determine the Git service used by the software component."""
remote_path = analyze_ctx.component.repository.remote_path if analyze_ctx.component.repository else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
from macaron.slsa_analyzer.checks.check_result import CheckResultData, CheckResultType, Confidence, JustificationType
from macaron.slsa_analyzer.package_registry.deps_dev import APIAccessError, DepsDevService
from macaron.slsa_analyzer.package_registry.osv_dev import OSVDevService
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset, PyPIRegistry
from macaron.slsa_analyzer.package_registry.pypi_registry import (
PyPIPackageJsonAsset,
PyPIRegistry,
find_or_create_pypi_asset,
)
from macaron.slsa_analyzer.registry import registry
from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo

Expand Down Expand Up @@ -258,28 +262,16 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
case PackageRegistryInfo(
build_tool_name="pip" | "poetry",
build_tool_purl_type="pypi",
package_registry=PyPIRegistry() as pypi_registry,
package_registry=PyPIRegistry(),
) as pypi_registry_info:
# Retrieve the pre-existing AssetLocator object for the PyPI package JSON object, if it exists.
pypi_package_json = next(
(
asset
for asset in pypi_registry_info.metadata
if isinstance(asset, PyPIPackageJsonAsset)
and asset.component_name == ctx.component.name
and asset.component_version == ctx.component.version
),
None,
# Retrieve the pre-existing asset, or create a new one.
pypi_package_json = find_or_create_pypi_asset(
ctx.component.name, ctx.component.version, pypi_registry_info
)
if not pypi_package_json:
# Create an AssetLocator object for the PyPI package JSON object.
pypi_package_json = PyPIPackageJsonAsset(
component_name=ctx.component.name,
component_version=ctx.component.version,
has_repository=ctx.component.repository is not None,
pypi_registry=pypi_registry,
package_json={},
)
if pypi_package_json is None:
return CheckResultData(result_tables=[], result_type=CheckResultType.UNKNOWN)

pypi_package_json.has_repository = ctx.component.repository is not None

pypi_registry_info.metadata.append(pypi_package_json)

Expand Down
21 changes: 20 additions & 1 deletion src/macaron/slsa_analyzer/git_service/api_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022 - 2024, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2022 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""The module provides API clients for VCS services, such as GitHub."""
Expand Down Expand Up @@ -659,6 +659,25 @@ def download_asset(self, url: str, download_path: str) -> bool:

return True

def get_attestation(self, full_name: str, artifact_hash: str) -> dict:
"""Download and return the attestation associated with the passed artifact hash, if any.

Parameters
----------
full_name : str
The full name of the repo.
artifact_hash: str
The SHA256 hash of an artifact.

Returns
-------
dict
The attestation data, or an empty dict if not found.
"""
url = f"{GhAPIClient._REPO_END_POINT}/{full_name}/attestations/sha256:{artifact_hash}"
response_data = send_get_http(url, self.headers)
return response_data or {}


def get_default_gh_client(access_token: str) -> GhAPIClient:
"""Return a GhAPIClient instance with default values.
Expand Down
Loading
Loading