Skip to content

feat: add pypi attestation discovery #1067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies = [
"cyclonedx-python-lib[validation] >=7.3.4,<8.0.0",
"beautifulsoup4 >= 4.12.0,<5.0.0",
"problog >= 2.2.6,<3.0.0",
"cryptography >=44.0.0,<45.0.0",
]
keywords = []
# https://pypi.org/classifiers/
Expand Down
3 changes: 2 additions & 1 deletion src/macaron/config/defaults.ini
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,8 @@ inspector_url_scheme = https
[deps_dev]
url_netloc = api.deps.dev
url_scheme = https
purl_endpoint = v3alpha/purl
api_endpoint = v3alpha
purl_endpoint = purl

[osv_dev]
url_netloc = api.osv.dev
Expand Down
71 changes: 68 additions & 3 deletions src/macaron/provenance/provenance_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ def extract_repo_and_commit_from_provenance(payload: InTotoPayload) -> tuple[str
If the extraction process fails for any reason.
"""
predicate_type = payload.statement.get("predicateType")
if isinstance(payload, InTotoV1Payload) and predicate_type == "https://slsa.dev/provenance/v1":
return _extract_from_slsa_v1(payload)
if isinstance(payload, InTotoV1Payload):
if predicate_type == "https://slsa.dev/provenance/v1":
return _extract_from_slsa_v1(payload)
if predicate_type == "https://docs.pypi.org/attestations/publish/v1":
return _extract_from_pypi_v1(payload)

if isinstance(payload, InTotoV01Payload):
if predicate_type == "https://slsa.dev/provenance/v0.2":
Expand Down Expand Up @@ -195,6 +198,32 @@ def _extract_from_slsa_v1(payload: InTotoV1Payload) -> tuple[str | None, str | N
return repo, commit or None


def _extract_from_pypi_v1(payload: InTotoV1Payload) -> tuple[str | None, str | None]:
"""Extract the repository and commit metadata from the pypi provenance file found at the passed path.

This payload represents a custom predicate created from the certificate of a PyPI v1 attestation file.
By design, these attestations come without a predicate.

Parameters
----------
payload: InTotoPayload
The payload to extract from.

Returns
-------
tuple[str, str]
The repository URL and commit hash if found, a pair of empty strings otherwise.
"""
predicate: dict[str, JsonType] | None = payload.statement.get("predicate")
if not predicate:
logger.debug("No predicate in payload statement.")
return None, None

repo = json_extract(predicate, ["sourceUri"], str)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the documentation for the predicate of https://docs.pypi.org/attestations/publish/v1/, currently its content is not defined. I wonder if the sourceUri and sourceDigest was copied from an existing implementation or it's purposely put here to catch these data if they are available in the future?
I have decoded the statement of https://pypi.org/integrity/ultralytics/8.3.119/ultralytics-8.3.119.tar.gz/provenance and got

{
  "_type": "https://in-toto.io/Statement/v1",
  "subject": [
    {
      "name": "ultralytics-8.3.119.tar.gz",
      "digest": {
        "sha256": "497bdcf3eb1beb082f451d42e5af2a6af944693a5991c78a9b9b0ce538593153"
      }
    }
  ],
  "predicateType": "https://docs.pypi.org/attestations/publish/v1",
  "predicate": null
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have confused things by converting the certificate information into a predicate and attaching it to the pypi spec that has none. The type and the related function could be renamed to better distinguish them from the actual spec. The actual field names within the substitute predicate were chosen for simplicity, and loosely based on existing predicates I suppose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh okay, I can see the big picture now. I'm okay with some extra documentation here to indicate that we are relying on a Predicate that we built ourselves, and that it's not reflecting the real predicate format from Pypi (as there is None).

digest = json_extract(predicate, ["sourceDigest"], str)
return repo, digest


def _extract_from_witness_provenance(payload: InTotoV01Payload) -> tuple[str | None, str | None]:
"""Extract the repository and commit metadata from the witness provenance file found at the passed path.

Expand Down Expand Up @@ -300,7 +329,7 @@ def check_if_input_purl_provenance_conflict(
provenance_repo_url: str | None,
purl: PackageURL,
) -> bool:
"""Test if the input repository type PURL's repo and commit match the contents of the provenance.
"""Test if the input repository type PURL's repo matches the contents of the provenance.

Parameters
----------
Expand Down Expand Up @@ -620,6 +649,41 @@ def get_build_invocation(self, statement: InTotoV01Statement | InTotoV1Statement
return gl_workflow, gl_job_url


class PyPICertificateDefinition(ProvenanceBuildDefinition):
"""Class representing the derived PyPI certificate build definition.

This class implements the abstract methods from the `ProvenanceBuildDefinition`
to extract build invocation details specific to the GitHub Actions build type.
"""

#: Determines the expected ``buildType`` field in the provenance predicate.
expected_build_type = "pypi_certificate"

def get_build_invocation(self, statement: InTotoV01Statement | InTotoV1Statement) -> tuple[str | None, str | None]:
"""Retrieve the build invocation information from the given statement.

Parameters
----------
statement : InTotoV1Statement | InTotoV01Statement
The provenance statement from which to extract the build invocation
details. This statement contains the metadata about the build process
and its associated artifacts.

Returns
-------
tuple[str | None, str | None]
A tuple containing two elements:
- The first element is the build invocation entry point (e.g., workflow name), or None if not found.
- The second element is the invocation URL or identifier (e.g., job URL), or None if not found.
"""
if statement["predicate"] is None:
return None, None

gha_workflow = json_extract(statement["predicate"], ["workflow"], str)
invocation_url = json_extract(statement["predicate"], ["invocationUrl"], str)
return gha_workflow, invocation_url


class ProvenancePredicate:
"""Class providing utility methods for handling provenance predicates.

Expand Down Expand Up @@ -685,6 +749,7 @@ def find_build_def(statement: InTotoV01Statement | InTotoV1Statement) -> Provena
SLSAGCBBuildDefinitionV1(),
SLSAOCIBuildDefinitionV1(),
WitnessGitLabBuildDefinitionV01(),
PyPICertificateDefinition(),
]

for build_def in build_defs:
Expand Down
37 changes: 37 additions & 0 deletions src/macaron/provenance/provenance_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This module contains methods for finding provenance files."""
import json
import logging
import os
import tempfile
Expand All @@ -12,6 +13,7 @@

from macaron.config.defaults import defaults
from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type
from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder
from macaron.slsa_analyzer.analyze_context import AnalyzeContext
from macaron.slsa_analyzer.checks.provenance_available_check import ProvenanceAvailableException
from macaron.slsa_analyzer.ci_service import GitHubActions
Expand Down Expand Up @@ -78,6 +80,10 @@ def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]:
discovery_functions = [partial(find_gav_provenance, purl, self.jfrog_registry)]
return self._find_provenance(discovery_functions)

if purl.type == "pypi":
discovery_functions = [partial(find_pypi_provenance, purl)]
return self._find_provenance(discovery_functions)

# TODO add other possible discovery functions.
logger.debug("Provenance finding not supported for PURL type: %s", purl.type)
return []
Expand Down Expand Up @@ -275,6 +281,37 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
return provenances[:1]


def find_pypi_provenance(purl: PackageURL) -> list[InTotoPayload]:
"""Find and download the PyPI based provenance for the passed PURL.

Parameters
----------
purl: PackageURL
The PURL of the analysis target.

Returns
-------
list[InTotoPayload] | None
The provenance payload if found, or an empty list otherwise.
"""
attestation, verified = DepsDevRepoFinder.get_attestation(purl)
if not attestation:
return []

with tempfile.TemporaryDirectory() as temp_dir:
file_name = os.path.join(temp_dir, f"{purl.name}")
with open(file_name, "w", encoding="utf-8") as file:
json.dump(attestation, file)

try:
payload = load_provenance_payload(file_name)
payload.verified = verified
return [payload]
except LoadIntotoAttestationError as load_error:
logger.error("Error while loading provenance: %s", load_error)
return []


def find_provenance_from_ci(
analyze_ctx: AnalyzeContext, git_obj: Git | None, download_path: str
) -> InTotoPayload | None:
Expand Down
6 changes: 6 additions & 0 deletions src/macaron/provenance/provenance_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) ->

signed_subjects = provenance[1].statement.get("subject")
if not signed_subjects:
logger.debug("Missing signed subjects.")
return False

unsigned_subjects = provenance[0].statement.get("subject")
if not unsigned_subjects:
logger.debug("Missing unsigned subjects.")
return False

found_signed_subject = None
Expand All @@ -97,6 +99,7 @@ def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) ->
break

if not found_signed_subject:
logger.debug("Missing signed subject.")
return False

found_unsigned_subject = None
Expand All @@ -108,15 +111,18 @@ def verify_npm_provenance(purl: PackageURL, provenance: list[InTotoPayload]) ->
break

if not found_unsigned_subject:
logger.debug("Missing unsigned subject.")
return False

signed_digest = found_signed_subject.get("digest")
unsigned_digest = found_unsigned_subject.get("digest")
if not (signed_digest and unsigned_digest):
logger.debug("Missing %ssigned digest.", "un" if signed_digest else "")
return False

# For signed and unsigned to match, the digests must be identical.
if signed_digest != unsigned_digest:
logger.debug("Signed and unsigned digests do not match.")
return False

key = list(signed_digest.keys())[0]
Expand Down
Loading
Loading