diff --git a/CHANGELOG.md b/CHANGELOG.md index 28d3f007..a867b7db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,14 @@ All versions prior to 0.9.0 are untracked. `sigstore verify identity`, as it was during the 1.0 release series ([#642](https://github.com/sigstore/sigstore-python/pull/642)) +* API change: the `Signer` API has been broken up into `SigningContext` + and `Signer`, allowing a `SigningContext` to create individual `Signer` + instances that correspond to a single `IdentityToken`. This new API + also enables ephemeral key and certificate reuse across multiple inputs, + reducing the number of cryptographic operations and network roundtrips + required when signing more than one input + ([#645](https://github.com/sigstore/sigstore-python/pull/645)) + ### Fixed * Fixed a case where `sigstore verify` would fail to verify an otherwise valid diff --git a/sigstore/_cli.py b/sigstore/_cli.py index 5827ce4c..d1dc2e4d 100644 --- a/sigstore/_cli.py +++ b/sigstore/_cli.py @@ -28,7 +28,11 @@ from sigstore import __version__ from sigstore._internal.ctfe import CTKeyring -from sigstore._internal.fulcio.client import DEFAULT_FULCIO_URL, FulcioClient +from sigstore._internal.fulcio.client import ( + DEFAULT_FULCIO_URL, + ExpiredCertificate, + FulcioClient, +) from sigstore._internal.keyring import Keyring from sigstore._internal.rekor.client import ( DEFAULT_REKOR_URL, @@ -41,11 +45,12 @@ from sigstore.oidc import ( DEFAULT_OAUTH_ISSUER_URL, STAGING_OAUTH_ISSUER_URL, + ExpiredIdentity, IdentityToken, Issuer, detect_credential, ) -from sigstore.sign import Signer +from sigstore.sign import SigningContext from sigstore.transparency import LogEntry from sigstore.verify import ( CertificateVerificationFailure, @@ -620,13 +625,13 @@ def _sign(args: argparse.Namespace) -> None: "bundle": bundle, } - # Select the signer to use. + # Select the signing context to use. if args.staging: logger.debug("sign: staging instances requested") - signer = Signer.staging() + signing_ctx = SigningContext.staging() args.oidc_issuer = STAGING_OAUTH_ISSUER_URL elif args.fulcio_url == DEFAULT_FULCIO_URL and args.rekor_url == DEFAULT_REKOR_URL: - signer = Signer.production() + signing_ctx = SigningContext.production() else: # Assume "production" keys if none are given as arguments updater = TrustUpdater.production() @@ -642,7 +647,7 @@ def _sign(args: argparse.Namespace) -> None: ct_keyring = CTKeyring(Keyring(ctfe_keys)) rekor_keyring = RekorKeyring(Keyring(rekor_keys)) - signer = Signer( + signing_ctx = SigningContext( fulcio=FulcioClient(args.fulcio_url), rekor=RekorClient(args.rekor_url, rekor_keyring, ct_keyring), ) @@ -661,38 +666,46 @@ def _sign(args: argparse.Namespace) -> None: if not identity: _die(args, "No identity token supplied or detected!") - for file, outputs in output_map.items(): - logger.debug(f"signing for {file.name}") - with file.open(mode="rb", buffering=0) as io: - result = signer.sign( - input_=io, - identity=identity, - ) + with signing_ctx.signer(identity) as signer: + for file, outputs in output_map.items(): + logger.debug(f"signing for {file.name}") + with file.open(mode="rb", buffering=0) as io: + try: + result = signer.sign(input_=io) + except ExpiredIdentity as exp_identity: + print("Signature failed: identity token has expired") + raise exp_identity - print("Using ephemeral certificate:") - print(result.cert_pem) + except ExpiredCertificate as exp_certificate: + print("Signature failed: Fulcio signing certificate has expired") + raise exp_certificate - print(f"Transparency log entry created at index: {result.log_entry.log_index}") + print("Using ephemeral certificate:") + print(result.cert_pem) - sig_output: TextIO - if outputs["sig"] is not None: - sig_output = outputs["sig"].open("w") - else: - sig_output = sys.stdout + print( + f"Transparency log entry created at index: {result.log_entry.log_index}" + ) + + sig_output: TextIO + if outputs["sig"] is not None: + sig_output = outputs["sig"].open("w") + else: + sig_output = sys.stdout - print(result.b64_signature, file=sig_output) - if outputs["sig"] is not None: - print(f"Signature written to {outputs['sig']}") + print(result.b64_signature, file=sig_output) + if outputs["sig"] is not None: + print(f"Signature written to {outputs['sig']}") - if outputs["cert"] is not None: - with outputs["cert"].open(mode="w") as io: - print(result.cert_pem, file=io) - print(f"Certificate written to {outputs['cert']}") + if outputs["cert"] is not None: + with outputs["cert"].open(mode="w") as io: + print(result.cert_pem, file=io) + print(f"Certificate written to {outputs['cert']}") - if outputs["bundle"] is not None: - with outputs["bundle"].open(mode="w") as io: - print(result._to_bundle().to_json(), file=io) - print(f"Sigstore bundle written to {outputs['bundle']}") + if outputs["bundle"] is not None: + with outputs["bundle"].open(mode="w") as io: + print(result._to_bundle().to_json(), file=io) + print(f"Sigstore bundle written to {outputs['bundle']}") def _collect_verification_state( diff --git a/sigstore/_internal/fulcio/__init__.py b/sigstore/_internal/fulcio/__init__.py index c41c5c46..d33628a4 100644 --- a/sigstore/_internal/fulcio/__init__.py +++ b/sigstore/_internal/fulcio/__init__.py @@ -19,12 +19,14 @@ from .client import ( DetachedFulcioSCT, + ExpiredCertificate, FulcioCertificateSigningResponse, FulcioClient, ) __all__ = [ "DetachedFulcioSCT", + "ExpiredCertificate", "FulcioCertificateSigningResponse", "FulcioClient", ] diff --git a/sigstore/_internal/fulcio/client.py b/sigstore/_internal/fulcio/client.py index ef7054e4..b9aeb95b 100644 --- a/sigstore/_internal/fulcio/client.py +++ b/sigstore/_internal/fulcio/client.py @@ -164,6 +164,10 @@ def signature(self) -> bytes: SignedCertificateTimestamp.register(DetachedFulcioSCT) +class ExpiredCertificate(Exception): + """An error raised when the Certificate is expired.""" + + @dataclass(frozen=True) class FulcioCertificateSigningResponse: """Certificate response""" diff --git a/sigstore/oidc.py b/sigstore/oidc.py index 8a03a1c8..dd8964dc 100644 --- a/sigstore/oidc.py +++ b/sigstore/oidc.py @@ -24,6 +24,7 @@ import time import urllib.parse import webbrowser +from datetime import datetime, timezone from typing import NoReturn, Optional, cast import id @@ -58,6 +59,20 @@ class _OpenIDConfiguration(BaseModel): token_endpoint: StrictStr +# See: https://github.com/sigstore/fulcio/blob/b2186c0/pkg/config/config.go#L182-L201 +_KNOWN_OIDC_ISSUERS = { + "https://accounts.google.com": "email", + "https://oauth2.sigstore.dev/auth": "email", + "https://oauth2.sigstage.dev/auth": "email", + "https://token.actions.githubusercontent.com": "sub", +} +DEFAULT_AUDIENCE = "sigstore" + + +class ExpiredIdentity(Exception): + """An error raised when an identity token is expired.""" + + class IdentityToken: """ An OIDC "identity", corresponding to an underlying OIDC token with @@ -77,22 +92,28 @@ def __init__(self, raw_token: str) -> None: # certificate binding and issuance. try: self._unverified_claims = jwt.decode( - self._raw_token, options={"verify_signature": False} + raw_token, + options={ + "verify_signature": False, + "verify_aud": True, + "verify_iat": True, + "verify_exp": True, + "require": ["aud", "iat", "exp", "iss"], + }, + audience=DEFAULT_AUDIENCE, ) - except jwt.InvalidTokenError as exc: - raise IdentityError("invalid identity token") from exc + except Exception as exc: + raise IdentityError( + "Identity token is malformed or missing claims" + ) from exc - self._issuer: str = self._unverified_claims.get("iss") - if self._issuer is None: - raise IdentityError("Identity token missing the required `iss` claim") + self._iss: str = self._unverified_claims["iss"] + self._nbf: int | None = self._unverified_claims.get("nbf") + self._exp: int = self._unverified_claims["exp"] - aud = self._unverified_claims.get("aud") - if aud is None: - raise IdentityError("Identity token missing the required `aud` claim") - if aud != _DEFAULT_AUDIENCE: - raise IdentityError( - f"Audience should be {_DEFAULT_AUDIENCE!r}, not {aud!r}" - ) + # Fail early if this token isn't within its validity period. + if not self.in_validity_period(): + raise IdentityError("Identity token is not within its validity period") # When verifying the private key possession proof, Fulcio uses # different claims depending on the token's issuer. @@ -102,7 +123,7 @@ def __init__(self, raw_token: str) -> None: if identity_claim is not None: if identity_claim not in self._unverified_claims: raise IdentityError( - f"Identity token missing the required {identity_claim!r} claim" + f"Identity token is missing the required {identity_claim!r} claim" ) self._identity = str(self._unverified_claims.get(identity_claim)) @@ -110,7 +131,9 @@ def __init__(self, raw_token: str) -> None: try: self._identity = str(self._unverified_claims["sub"]) except KeyError: - raise IdentityError("Identity token missing the required 'sub' claim") + raise IdentityError( + "Identity token is missing the required 'sub' claim" + ) # This identity token might have been retrieved directly from # an identity provider, or it might be a "federated" identity token @@ -139,6 +162,22 @@ def __init__(self, raw_token: str) -> None: self._federated_issuer = federated_issuer + def in_validity_period(self) -> bool: + """ + Returns whether or not this `Identity` is currently within its self-stated validity period. + + NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper; + the check here only asserts whether the *unverified* identity's claims + are within their validity period. + """ + + now = datetime.now(timezone.utc).timestamp() + + if self._nbf is not None: + return self._nbf <= now < self._exp + else: + return now < self._exp + @property def identity(self) -> str: """ @@ -156,7 +195,7 @@ def issuer(self) -> str: """ Returns a URL identifying this `IdentityToken`'s issuer. """ - return self._issuer + return self._iss @property def expected_certificate_subject(self) -> str: @@ -178,7 +217,7 @@ def expected_certificate_subject(self) -> str: if self._federated_issuer is not None: return self._federated_issuer - return self._issuer + return self.issuer def __str__(self) -> str: """ diff --git a/sigstore/sign.py b/sigstore/sign.py index 2e867bbc..3750f0b7 100644 --- a/sigstore/sign.py +++ b/sigstore/sign.py @@ -16,6 +16,7 @@ API for signing artifacts. Example: + ```python from pathlib import Path @@ -29,9 +30,10 @@ artifact = Path("foo.txt") with artifact.open("rb") as a: - signer = Signer.production() - result = signer.sign(input_=a, identity=identity) - print(result) + signing_ctx = SigningContext.production() + with signing_ctx.signer(identity, cache=True) as signer: + result = signer.sign(input_=a, rekor=signing_ctx._rekor, fulcio=signing_ctx._fulcio) + print(result) ``` """ @@ -39,7 +41,9 @@ import base64 import logging -from typing import IO +from contextlib import contextmanager +from datetime import datetime, timezone +from typing import IO, Iterator, Optional import cryptography.x509 as x509 from cryptography.hazmat.primitives import hashes, serialization @@ -67,12 +71,16 @@ TransparencyLogEntry, ) -from sigstore._internal.fulcio import FulcioClient +from sigstore._internal.fulcio import ( + ExpiredCertificate, + FulcioCertificateSigningResponse, + FulcioClient, +) from sigstore._internal.rekor.client import RekorClient from sigstore._internal.sct import verify_sct from sigstore._internal.tuf import TrustUpdater from sigstore._utils import B64Str, HexStr, PEMCert, sha256_streaming -from sigstore.oidc import IdentityToken +from sigstore.oidc import ExpiredIdentity, IdentityToken from sigstore.transparency import LogEntry logger = logging.getLogger(__name__) @@ -83,73 +91,102 @@ class Signer: The primary API for signing operations. """ - def __init__(self, *, fulcio: FulcioClient, rekor: RekorClient): + def __init__( + self, + identity_token: IdentityToken, + signing_ctx: SigningContext, + cache: bool = True, + ) -> None: """ Create a new `Signer`. - `fulcio` is a `FulcioClient` capable of connecting to a Fulcio instance - and returning signing certificates. + `identity_token` is the identity token used to request a signing certificate + from Fulcio. - `rekor` is a `RekorClient` capable of connecting to a Rekor instance - and creating transparency log entries. - """ - self._fulcio = fulcio - self._rekor = rekor + `signing_ctx` is a `SigningContext` that keeps information about the signing + configuration. - @classmethod - def production(cls) -> Signer: + `cache` determines whether the signing certificate and ephemeral private key + should be reused (until the certificate expires) to sign different artifacts. + Default is `True`. """ - Return a `Signer` instance configured against Sigstore's production-level services. - """ - updater = TrustUpdater.production() - rekor = RekorClient.production(updater) - return cls(fulcio=FulcioClient.production(), rekor=rekor) + self._identity_token = identity_token + self._signing_ctx: SigningContext = signing_ctx + self.__cached_private_key: Optional[ec.EllipticCurvePrivateKey] = None + self.__cached_signing_certificate: Optional[ + FulcioCertificateSigningResponse + ] = None + if cache: + logger.debug("Generating ephemeral keys...") + self.__cached_private_key = ec.generate_private_key(ec.SECP384R1()) + logger.debug("Requesting ephemeral certificate...") + self.__cached_signing_certificate = self._signing_cert(self._private_key) + + @property + def _private_key(self) -> ec.EllipticCurvePrivateKey: + """Get or generate a signing key.""" + if self.__cached_private_key is None: + logger.debug("no cached key; generating ephemeral key") + return ec.generate_private_key(ec.SECP384R1()) + return self.__cached_private_key + + def _signing_cert( + self, + private_key: ec.EllipticCurvePrivateKey, + ) -> FulcioCertificateSigningResponse: + """Get or request a signing certificate from Fulcio.""" + # If it exists, verify if the current certificate is expired + if self.__cached_signing_certificate: + if ( + datetime.now(timezone.utc).timestamp() + > self.__cached_signing_certificate.cert.not_valid_after.timestamp() + ): + raise ExpiredCertificate + return self.__cached_signing_certificate + + else: + logger.debug("Retrieving signed certificate...") + + # Build an X.509 Certificiate Signing Request + builder = ( + x509.CertificateSigningRequestBuilder() + .subject_name( + x509.Name( + [ + x509.NameAttribute( + NameOID.EMAIL_ADDRESS, self._identity_token._identity + ), + ] + ) + ) + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + ) + certificate_request = builder.sign(private_key, hashes.SHA256()) - @classmethod - def staging(cls) -> Signer: - """ - Return a `Signer` instance configured against Sigstore's staging-level services. - """ - updater = TrustUpdater.staging() - rekor = RekorClient.staging(updater) - return cls(fulcio=FulcioClient.staging(), rekor=rekor) + certificate_response = self._signing_ctx._fulcio.signing_cert.post( + certificate_request, self._identity_token + ) + + return certificate_response def sign( self, input_: IO[bytes], - identity: IdentityToken, ) -> SigningResult: """Public API for signing blobs""" input_digest = sha256_streaming(input_) + private_key = self._private_key - private_key = ec.generate_private_key(ec.SECP384R1()) + if not self._identity_token.in_validity_period(): + raise ExpiredIdentity - logger.debug( - f"Performing CSR: identity={identity.identity} " - f"issuer={identity.issuer} " - f"subject={identity.expected_certificate_subject}" - ) - - # Build an X.509 Certificate Signing Request - builder = ( - x509.CertificateSigningRequestBuilder() - .subject_name( - x509.Name( - [ - x509.NameAttribute(NameOID.EMAIL_ADDRESS, identity.identity), - ] - ) - ) - .add_extension( - x509.BasicConstraints(ca=False, path_length=None), - critical=True, - ) - ) - certificate_request = builder.sign(private_key, hashes.SHA256()) - - certificate_response = self._fulcio.signing_cert.post( - certificate_request, identity - ) + try: + certificate_response = self._signing_cert(private_key) + except ExpiredCertificate as e: + raise e # TODO(alex): Retrieve the public key via TUF # @@ -158,7 +195,7 @@ def sign( cert = certificate_response.cert # noqa chain = certificate_response.chain - verify_sct(sct, cert, chain, self._rekor._ct_keyring) + verify_sct(sct, cert, chain, self._signing_ctx._rekor._ct_keyring) logger.debug("Successfully verified SCT...") @@ -174,7 +211,7 @@ def sign( ) # Create the transparency log entry - entry = self._rekor.log.entries.post( + entry = self._signing_ctx._rekor.log.entries.post( b64_artifact_signature=B64Str(b64_artifact_signature), sha256_artifact_hash=input_digest.hex(), b64_cert=B64Str(b64_cert.decode()), @@ -192,6 +229,71 @@ def sign( ) +class SigningContext: + """ + Keep a context between signing operations. + """ + + def __init__( + self, + *, + fulcio: FulcioClient, + rekor: RekorClient, + ): + """ + Create a new `SigningContext`. + + `fulcio` is a `FulcioClient` capable of connecting to a Fulcio instance + and returning signing certificates. + + `rekor` is a `RekorClient` capable of connecting to a Rekor instance + and creating transparency log entries. + """ + self._fulcio = fulcio + self._rekor = rekor + + @classmethod + def production(cls) -> SigningContext: + """ + Return a `SigningContext` instance configured against Sigstore's production-level services. + """ + updater = TrustUpdater.production() + rekor = RekorClient.production(updater) + return cls( + fulcio=FulcioClient.production(), + rekor=rekor, + ) + + @classmethod + def staging(cls) -> SigningContext: + """ + Return a `SignerContext` instance configured against Sigstore's staging-level services. + """ + updater = TrustUpdater.staging() + rekor = RekorClient.staging(updater) + return cls( + fulcio=FulcioClient.staging(), + rekor=rekor, + ) + + @contextmanager + def signer( + self, identity_token: IdentityToken, *, cache: bool = True + ) -> Iterator[Signer]: + """ + A context manager for signing operations. + + `identity_token` is the identity token passed to the `Signer` instance + and used to request a signing certificate from Fulcio. + + `cache` determines whether the signing certificate and ephemeral private key + generated by the `Signer` instance should be reused (until the certificate expires) + to sign different artifacts. + Default is `True`. + """ + yield Signer(identity_token, self, cache) + + class SigningResult(BaseModel): """ Represents the artifacts of a signing operation. diff --git a/test/unit/conftest.py b/test/unit/conftest.py index 8b926a6c..b8ec60db 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -21,6 +21,7 @@ from typing import Iterator from urllib.parse import urlparse +import jwt import pytest from cryptography.x509 import Certificate, load_pem_x509_certificate from id import ( @@ -34,7 +35,7 @@ from sigstore._internal import tuf from sigstore.oidc import _DEFAULT_AUDIENCE, IdentityToken -from sigstore.sign import Signer +from sigstore.sign import SigningContext from sigstore.verify import VerificationMaterials from sigstore.verify.policy import VerificationSuccess @@ -227,7 +228,10 @@ def tuf_dirs(monkeypatch, tmp_path): @pytest.fixture( - params=[("production", Signer.production), ("staging", Signer.staging)], + params=[ + ("production", SigningContext.production), + ("staging", SigningContext.staging), + ], ids=["production", "staging"], ) def id_config(request): @@ -239,3 +243,11 @@ def id_config(request): token = detect_credential(_DEFAULT_AUDIENCE) return signer, IdentityToken(token) + + +@pytest.fixture +def dummy_jwt(): + def _dummy_jwt(claims: dict): + return jwt.encode(claims, key="definitely not secure") + + return _dummy_jwt diff --git a/test/unit/test_oidc.py b/test/unit/test_oidc.py index 4f7aa13c..f3d9b4b1 100644 --- a/test/unit/test_oidc.py +++ b/test/unit/test_oidc.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime + import pytest from sigstore import oidc @@ -19,109 +21,236 @@ class TestIdentityToken: def test_invalid_jwt(self): - with pytest.raises(oidc.IdentityError, match="invalid identity token"): + with pytest.raises( + oidc.IdentityError, match="Identity token is malformed or missing claims" + ): oidc.IdentityToken("invalid jwt") - def test_missing_iss(self): - # HS256 for testing, empty claim set - jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.RX-vj8lcO2nwYZa_ALhrQkO55BGH-x4AOC0LzW7IFew" + def test_missing_iss(self, dummy_jwt): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": "sigstore", + "iat": now, + "nbf": now, + "exp": now + 600, + } + ) + with pytest.raises( - oidc.IdentityError, match="Identity token missing the required `iss` claim" + oidc.IdentityError, match="Identity token is malformed or missing claims" ): oidc.IdentityToken(jwt) - def test_missing_aud(self): - # HS256 for testing, `{ "iss": "https://example.com" }` - jwt = ( - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2V4YW1wbGUuY29tIn0" - ".ajiTV42uC6T7M9AH-gS0DyzpJoGY4xLXCSrL0U6ELmE" + def test_missing_aud(self, dummy_jwt): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "iat": now, + "nbf": now, + "exp": now + 600, + "iss": "fake-issuer", + } ) + with pytest.raises( - oidc.IdentityError, match="Identity token missing the required `aud` claim" + oidc.IdentityError, match="Identity token is malformed or missing claims" ): oidc.IdentityToken(jwt) - def test_wrong_aud(self): - # HS256 for testing, `{ "iss": "https://example.com", "aud": "notsigstore" }` - jwt = ( - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2V4YW1wbGUuY29tI" - "iwiYXVkIjoibm90c2lnc3RvcmUifQ.vM6kUdGyaabfyYaQY3YfNhcR1Hy59rrdVKHFExWA0Bo" + @pytest.mark.parametrize("aud", (None, "not-sigstore")) + def test_invalid_aud(self, dummy_jwt, aud): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": aud, + "iat": now, + "nbf": now, + "exp": now + 600, + "iss": "fake-issuer", + } ) + with pytest.raises( - oidc.IdentityError, match="Audience should be 'sigstore', not 'notsigstore'" + oidc.IdentityError, match="Identity token is malformed or missing claims" ): oidc.IdentityToken(jwt) - def test_known_issuer_missing_identity_claim(self): - # HS256 for testing; no `email` claim - # - # { - # "iss": "https://accounts.google.com", - # "aud": "sigstore" - # } - jwt = ( - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2FjY291bnRzLmdvb2dsZ" - "S5jb20iLCJhdWQiOiJzaWdzdG9yZSJ9.qcgUH_e0s7lg6wZuzwBT5SdB0SlbsZM6gk8li2OVOmg" + def test_missing_iat(self, dummy_jwt): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": "sigstore", + "nbf": now, + "exp": now + 600, + "iss": "fake-issuer", + } ) + with pytest.raises( - oidc.IdentityError, - match="Identity token missing the required 'email' claim", + oidc.IdentityError, match="Identity token is malformed or missing claims" ): oidc.IdentityToken(jwt) - def test_known_issuer_ok(self): - jwt = ( - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2FjY291bnRzLmdvb2dsZS5" - "jb20iLCJhdWQiOiJzaWdzdG9yZSIsImVtYWlsIjoiZXhhbXBsZUBleGFtcGxlLmNvbSJ9.NDvzhMRf7O" - "ueWpesIyqBFDkL9mGmcOK0S3UC3tMx_Ws" + @pytest.mark.parametrize("iat", (None, "not-an-int")) + def test_invalid_iat(self, dummy_jwt, iat): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": "sigstore", + "iat": iat, + "nbf": now, + "exp": now + 600, + "iss": "fake-issuer", + } ) - token = oidc.IdentityToken(jwt) - - assert str(token) == jwt == token._raw_token - assert token.identity == "example@example.com" - assert token.issuer == "https://accounts.google.com" - - def test_unknown_issuer_missing_sub(self): - # HS256 for testing; no `sub` claim - # - # { - # "iss": "https://example.com", - # "aud": "sigstore" - # } - jwt = ( - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2V4YW1wbGUuY29tIiwiY" - "XVkIjoic2lnc3RvcmUifQ.t3qwWcGfy5dj_NAFliPviVSmI3Us4mV9mEkDpKrgLn0" + + with pytest.raises( + oidc.IdentityError, match="Identity token is malformed or missing claims" + ): + oidc.IdentityToken(jwt) + + def test_missing_nbf_ok(self, dummy_jwt): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": "sigstore", + "iat": now, + "exp": now + 600, + "iss": "fake-issuer", + "sub": "sigstore", + } + ) + + assert oidc.IdentityToken(jwt) is not None + + def test_invalid_nbf(self, dummy_jwt): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": "sigstore", + "iat": now, + "nbf": now + 600, + "exp": now + 601, + "iss": "fake-issuer", + } ) + with pytest.raises( oidc.IdentityError, - match="Identity token missing the required 'sub' claim", + match="Identity token is not within its validity period", + ): + oidc.IdentityToken(jwt) + + def test_missing_exp(self, dummy_jwt): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": "sigstore", + "iat": now, + "nbf": now, + "iss": "fake-issuer", + } + ) + + with pytest.raises( + oidc.IdentityError, match="Identity token is malformed or missing claims" ): oidc.IdentityToken(jwt) - def test_unknown_issuer_ok(self): - jwt = ( - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXV" - "kIjoic2lnc3RvcmUiLCJzdWIiOiJzb21lLWlkZW50aXR5In0.xdmbAw5jagKqsHCUmwLyA7JR1fWo8nk" - "8AHFVIJo-gfY" + def test_invalid_exp(self, dummy_jwt): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": "sigstore", + "iat": now - 600, + "nbf": now - 300, + "exp": now - 1, + "iss": "fake-issuer", + } ) - token = oidc.IdentityToken(jwt) - - assert str(token) == jwt == token._raw_token - assert token.identity == "some-identity" - assert token.issuer == "https://example.com" - assert token.expected_certificate_subject == "https://example.com" - - def test_unknown_issuer_federated_ok(self): - jwt = ( - "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwczovL2V4YW1wbGUuY29tIiwiYXV" - "kIjoic2lnc3RvcmUiLCJzdWIiOiJzb21lLWlkZW50aXR5IiwiZmVkZXJhdGVkX2NsYWltcyI6eyJjb25" - "uZWN0b3JfaWQiOiJodHRwczovL290aGVyLmV4YW1wbGUuY29tIn19.EkpGq-4TZnHyxMaTd0AlEJrMtv" - "wxJ8TZH_0qZ-8CfuE" + + with pytest.raises( + oidc.IdentityError, match="Identity token is malformed or missing claims" + ): + oidc.IdentityToken(jwt) + + @pytest.mark.parametrize("iss", oidc._KNOWN_OIDC_ISSUERS.keys()) + def test_missing_identity_claim(self, dummy_jwt, iss): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": "sigstore", + "iat": now, + "nbf": now, + "exp": now + 600, + "iss": iss, + } ) - token = oidc.IdentityToken(jwt) + with pytest.raises( + oidc.IdentityError, + match=r"Identity token is missing the required '.+' claim", + ): + oidc.IdentityToken(jwt) + + @pytest.mark.parametrize("fed", ("notadict", {"connector_id": 123})) + def test_invalid_federated_claims(self, dummy_jwt, fed): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": "sigstore", + "iat": now, + "nbf": now, + "exp": now + 600, + "iss": "https://accounts.google.com", + "email": "example@example.com", + "federated_claims": fed, + } + ) + + with pytest.raises( + oidc.IdentityError, + match="unexpected claim type: federated_claims.*", + ): + oidc.IdentityToken(jwt) + + @pytest.mark.parametrize( + ("iss", "identity_claim", "identity_value", "fed_iss"), + [ + ("https://accounts.google.com", "email", "example@example.com", None), + ( + "https://oauth2.sigstore.dev/auth", + "email", + "example@example.com", + "https://accounts.google.com", + ), + ("https://oauth2.sigstore.dev/auth", "email", "example@example.com", None), + ( + "https://token.actions.githubusercontent.com", + "sub", + "some-subject", + None, + ), + ("hxxps://unknown.issuer.example.com/auth", "sub", "some-subject", None), + ], + ) + def test_ok(self, dummy_jwt, iss, identity_claim, identity_value, fed_iss): + now = int(datetime.datetime.now().timestamp()) + jwt = dummy_jwt( + { + "aud": "sigstore", + "iat": now, + "nbf": now, + "exp": now + 600, + "iss": iss, + identity_claim: identity_value, + "federated_claims": {"connector_id": fed_iss}, + } + ) - assert str(token) == jwt == token._raw_token - assert token.identity == "some-identity" - assert token.issuer == "https://example.com" - assert token.expected_certificate_subject == "https://other.example.com" + identity = oidc.IdentityToken(jwt) + assert identity.in_validity_period() + assert identity.identity == identity_value + assert identity.issuer == iss + assert identity.expected_certificate_subject == iss if not fed_iss else fed_iss diff --git a/test/unit/test_sign.py b/test/unit/test_sign.py index d2dbae02..a4bea319 100644 --- a/test/unit/test_sign.py +++ b/test/unit/test_sign.py @@ -21,33 +21,32 @@ import sigstore.oidc from sigstore._internal.keyring import KeyringError, KeyringLookupError from sigstore._internal.sct import InvalidSCTError, InvalidSCTKeyError -from sigstore.sign import Signer +from sigstore.sign import SigningContext -@pytest.mark.online -def test_signer_production(): - signer = Signer.production() - assert signer is not None - +class TestSigningContext: + @pytest.mark.online + def test_production(self): + assert SigningContext.production() is not None -def test_signer_staging(mock_staging_tuf): - signer = Signer.staging() - assert signer is not None + def test_staging(self, mock_staging_tuf): + assert SigningContext.staging() is not None @pytest.mark.online @pytest.mark.ambient_oidc def test_sign_rekor_entry_consistent(id_config): - signer, identity = id_config + ctx, identity = id_config # NOTE: The actual signer instance is produced lazily, so that parameter # expansion doesn't fail in offline tests. - signer = signer() + ctx: SigningContext = ctx() assert identity is not None payload = io.BytesIO(secrets.token_bytes(32)) - expected_entry = signer.sign(payload, identity).log_entry - actual_entry = signer._rekor.log.entries.get(log_index=expected_entry.log_index) + with ctx.signer(identity) as signer: + expected_entry = signer.sign(payload, identity).log_entry + actual_entry = signer._rekor.log.entries.get(log_index=expected_entry.log_index) assert expected_entry.uuid == actual_entry.uuid assert expected_entry.body == actual_entry.body @@ -59,11 +58,11 @@ def test_sign_rekor_entry_consistent(id_config): @pytest.mark.online @pytest.mark.ambient_oidc def test_sct_verify_keyring_lookup_error(id_config, monkeypatch): - signer, identity = id_config + ctx, identity = id_config # a signer whose keyring always fails to lookup a given key. - signer = signer() - signer._rekor._ct_keyring = pretend.stub(verify=pretend.raiser(KeyringLookupError)) + ctx: SigningContext = ctx() + ctx._rekor._ct_keyring = pretend.stub(verify=pretend.raiser(KeyringLookupError)) assert identity is not None payload = io.BytesIO(secrets.token_bytes(32)) @@ -71,7 +70,8 @@ def test_sct_verify_keyring_lookup_error(id_config, monkeypatch): with pytest.raises( InvalidSCTError, ) as excinfo: - signer.sign(payload, identity) + with ctx.signer(identity) as signer: + signer.sign(payload, identity) # The exception subclass is the one we expect. assert isinstance(excinfo.value, InvalidSCTKeyError) @@ -80,25 +80,26 @@ def test_sct_verify_keyring_lookup_error(id_config, monkeypatch): @pytest.mark.online @pytest.mark.ambient_oidc def test_sct_verify_keyring_error(id_config, monkeypatch): - signer, identity = id_config + ctx, identity = id_config # a signer whose keyring throws an internal error. - signer = signer() - signer._rekor._ct_keyring = pretend.stub(verify=pretend.raiser(KeyringError)) + ctx: SigningContext = ctx() + ctx._rekor._ct_keyring = pretend.stub(verify=pretend.raiser(KeyringError)) assert identity is not None payload = io.BytesIO(secrets.token_bytes(32)) with pytest.raises(InvalidSCTError): - signer.sign(payload, identity) + with ctx.signer(identity) as signer: + signer.sign(payload) @pytest.mark.online @pytest.mark.ambient_oidc def test_identity_proof_claim_lookup(id_config, monkeypatch): - signer, identity = id_config + ctx, identity = id_config - signer = signer() + ctx: SigningContext = ctx() assert identity is not None # clear out the known issuers, forcing the `Identity`'s `proof_claim` to be looked up. @@ -106,7 +107,8 @@ def test_identity_proof_claim_lookup(id_config, monkeypatch): payload = io.BytesIO(secrets.token_bytes(32)) - expected_entry = signer.sign(payload, identity).log_entry + with ctx.signer(identity) as signer: + expected_entry = signer.sign(payload).log_entry actual_entry = signer._rekor.log.entries.get(log_index=expected_entry.log_index) assert expected_entry.uuid == actual_entry.uuid