Skip to content

treewide: upgrade to cryptography 38 #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@ classifiers = [
"Topic :: Security :: Cryptography",
]
dependencies = [
"cryptography>=3.1",
"cryptography>=38",
"pydantic",
"pyjwt>=2.1",
"pyOpenSSL",
"requests",
"securesystemslib",
# HACK(#84): Remove these dependencies.
"pyasn1",
"pyasn1-modules",
]
requires-python = ">=3.7"

Expand All @@ -59,10 +56,10 @@ lint = [
"isort",
"interrogate",
"mypy",
"types-cryptography",
"types-requests",
"types-pyOpenSSL",
"types-pyjwt",
# TODO(ww): Re-enable once dependency on types-cryptography is dropped.
# See: https://github.com/python/typeshed/issues/8699
# "types-pyOpenSSL",
]
dev = [
"build",
Expand Down
48 changes: 5 additions & 43 deletions sigstore/_internal/fulcio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from abc import ABC
from dataclasses import dataclass
from enum import IntEnum
from typing import List, Optional
from typing import List
from urllib.parse import urljoin

import requests
Expand All @@ -39,10 +39,10 @@
)
from cryptography.x509.certificate_transparency import (
LogEntryType,
SignatureAlgorithm,
SignedCertificateTimestamp,
Version,
)
from pyasn1.codec.der.decoder import decode as asn1_decode
from pydantic import BaseModel, Field, validator

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -127,10 +127,8 @@ def signature_hash_algorithm(self) -> hashes.HashAlgorithm:
return hash_.to_cryptography()

@property
def signature_algorithm(self) -> int:
# TODO(ww): This method will need to return a SignatureAlgorithm
# variant instead, for consistency with cryptography's interface.
return self.digitally_signed[1]
def signature_algorithm(self) -> SignatureAlgorithm:
return SignatureAlgorithm(self.digitally_signed[1])

@property
def signature(self) -> bytes:
Expand All @@ -155,8 +153,6 @@ class FulcioCertificateSigningResponse:
cert: Certificate
chain: List[Certificate]
sct: SignedCertificateTimestamp
# HACK(#84): Remove entirely.
raw_sct: Optional[bytes]


@dataclass(frozen=True)
Expand Down Expand Up @@ -246,36 +242,6 @@ def post(
raise FulcioClientError(
f"Unexpected embedded SCT count in response: {len(precert_scts_extension)} != 1"
)

# HACK(#84): Remove entirely.
# HACK: Until cryptography is released, we don't have direct access
# to each SCT's internals (signature, extensions, etc.)
# Instead, we do something really nasty here: we decode the ASN.1,
# unwrap the underlying TLS structures, and stash the raw SCT
# for later use.
parsed_sct_extension = asn1_decode(precert_scts_extension.public_bytes())

def _opaque16(value: bytes) -> bytes:
# invariant: there have to be at least two bytes, for the length.
if len(value) < 2:
raise FulcioClientError(
"malformed TLS encoding in response (length)"
)

(length,) = struct.unpack("!H", value[0:2])

if length != len(value[2:]):
raise FulcioClientError(
"malformed TLS encoding in response (payload)"
)

return value[2:]

# This is a TLS-encoded `opaque<0..2^16-1>` for the list,
# which itself contains an `opaque<0..2^16-1>` for the SCT.
raw_sct_list_bytes = bytes(parsed_sct_extension[0])
raw_sct = _opaque16(_opaque16(raw_sct_list_bytes))

sct = precert_scts_extension[0]
else:
# If we don't have any embedded SCTs, then we might be dealing
Expand Down Expand Up @@ -304,11 +270,7 @@ def _opaque16(value: bytes) -> bytes:
# Ideally we'd catch something less generic here.
raise FulcioClientError from exc

# HACK(#84): Remove entirely.
# The terrible hack above doesn't apply to detached SCTs.
raw_sct = None

return FulcioCertificateSigningResponse(cert, chain, sct, raw_sct)
return FulcioCertificateSigningResponse(cert, chain, sct)


class FulcioTrustBundle(Endpoint):
Expand Down
23 changes: 21 additions & 2 deletions sigstore/_internal/rekor/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import requests
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from pydantic import BaseModel, Field, validator

DEFAULT_REKOR_URL = "https://rekor.sigstore.dev"
Expand Down Expand Up @@ -199,8 +200,26 @@ def __init__(self, url: str, pubkey: bytes, ctfe_pubkey: bytes) -> None:
{"Content-Type": "application/json", "Accept": "application/json"}
)

self._pubkey = serialization.load_pem_public_key(pubkey)
self._ctfe_pubkey = serialization.load_pem_public_key(ctfe_pubkey)
pubkey = serialization.load_pem_public_key(pubkey)
if not isinstance(
pubkey,
(
ec.EllipticCurvePublicKey,
),
):
raise RekorClientError(f"Invalid public key type: {pubkey}")
self._pubkey = pubkey

ctfe_pubkey = serialization.load_pem_public_key(ctfe_pubkey)
if not isinstance(
ctfe_pubkey,
(
rsa.RSAPublicKey,
ec.EllipticCurvePublicKey,
),
):
raise RekorClientError(f"Invalid CTFE public key type: {ctfe_pubkey}")
self._ctfe_pubkey = ctfe_pubkey

@classmethod
def production(cls) -> RekorClient:
Expand Down
149 changes: 16 additions & 133 deletions sigstore/_internal/sct.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,129 +20,22 @@
import logging
import struct
from datetime import timezone
from typing import List, Optional, Tuple, Union
from typing import List, Optional, Union

import cryptography.hazmat.primitives.asymmetric.padding as padding
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.x509 import Certificate, ExtendedKeyUsage, ObjectIdentifier
from cryptography.x509.certificate_transparency import ( # SignatureAlgorithm,
from cryptography.x509 import Certificate, ExtendedKeyUsage
from cryptography.x509.certificate_transparency import (
LogEntryType,
SignatureAlgorithm,
SignedCertificateTimestamp,
)
from cryptography.x509.oid import ExtensionOID
from pyasn1.codec.der.decoder import decode as asn1_decode
from pyasn1.codec.der.encoder import encode as asn1_encode
from pyasn1_modules import rfc5280
from cryptography.x509.oid import ExtendedKeyUsageOID

logger = logging.getLogger(__name__)

# HACK(#84): Replace with the import below.
# from cryptography.x509.oid import ExtendedKeyUsageOID
_CERTIFICATE_TRANSPARENCY = ObjectIdentifier("1.3.6.1.4.1.11129.2.4.4")

# HACK(#84): Remove entirely.
_HASH_ALGORITHM_SHA256 = 4
_SIG_ALGORITHM_RSA = 1
_SIG_ALGORITHM_ECDSA = 3


# HACK(#84): Remove entirely.
def _make_tbs_precertificate_bytes(cert: Certificate) -> bytes:
if hasattr(cert, "tbs_precertificate_bytes"):
# NOTE(ww): cryptography 38, which is unreleased, will contain this API.
return cert.tbs_precertificate_bytes # type: ignore[attr-defined, no-any-return]
else:
# Otherwise, we have to do things the hard way: we take the raw
# DER-encoded TBSCertificate, re-decode it, and manually strip
# out the SCT list extension.
tbs_cert = asn1_decode(
cert.tbs_certificate_bytes, asn1Spec=rfc5280.TBSCertificate()
)[0]

filtered_extensions = [
ext
for ext in tbs_cert["extensions"]
if str(ext["extnID"])
!= ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS.dotted_string
]
tbs_cert["extensions"].clear()
tbs_cert["extensions"].extend(filtered_extensions)

return asn1_encode(tbs_cert) # type: ignore[no-any-return]


# HACK(#84): Remove entirely.
def _sct_properties(
sct: SignedCertificateTimestamp, raw_sct: Optional[bytes]
) -> Tuple[hashes.HashAlgorithm, int, bytes]:
if hasattr(sct, "signature"):
return (
sct.hash_algorithm, # type: ignore[attr-defined]
sct.signature_algorithm, # type: ignore[attr-defined]
sct.signature, # type: ignore[attr-defined]
)

if not raw_sct:
raise InvalidSctError("API misuse: missing raw SCT")

return _raw_sct_properties(raw_sct)


# HACK(#84): Remove entirely.
def _raw_sct_properties(raw_sct: bytes) -> Tuple[hashes.HashAlgorithm, int, bytes]:
# YOLO: A raw SCT looks like this:
#
# u8 Version
# u8[32] LogID
# u64 Timestamp
# opaque CtExtensions<0..2^16-1>
# digitally-signed struct { ... }
#
# The last component contains the signature, in RFC5246's
# digitally-signed format, which looks like this:
#
# u8 Hash
# u8 Signature
# opaque signature<0..2^16-1>

def _opaque16(value: bytes) -> bytes:
# invariant: there have to be at least two bytes, for the length.
if len(value) < 2:
raise InvalidSctError("malformed TLS encoding in SCT (length)")

(length,) = struct.unpack("!H", value[0:2])

if length != len(value[2:]):
raise InvalidSctError("malformed TLS encoding in SCT (payload)")

return value[2:]

# 43 = sizeof(Version) + sizeof(LogID) + sizeof(Timestamp) + sizeof(opauque CtExtensions),
# the latter being assumed to be just two (length + zero payload).
digitally_signed_offset = 43
digitally_signed = raw_sct[digitally_signed_offset:]

hash_algorithm = digitally_signed[0]
signature_algorithm = digitally_signed[1]
signature = _opaque16(digitally_signed[2:])

if hash_algorithm != _HASH_ALGORITHM_SHA256:
raise InvalidSctError(
f"invalid hash algorithm ({hash_algorithm}, expected {_HASH_ALGORITHM_SHA256})"
)
return (hashes.SHA256(), signature_algorithm, signature)


# HACK(#84): Remove entirely.
def _sct_extension_bytes(sct: SignedCertificateTimestamp) -> bytes:
if hasattr(sct, "extension_bytes"):
return sct.extension_bytes # type: ignore[attr-defined, no-any-return]

# We don't actually expect any extension bytes anyways, so this is okay.
return b""


def _pack_signed_entry(
sct: SignedCertificateTimestamp, cert: Certificate, issuer_key_hash: Optional[bytes]
Expand All @@ -165,7 +58,7 @@ def _pack_signed_entry(
pack_format = "!32sBBB{cert_der_len}s"

# Precertificates must have their SCT list extension filtered out.
cert_der = _make_tbs_precertificate_bytes(cert)
cert_der = cert.tbs_precertificate_bytes
fields.append(issuer_key_hash)
else:
raise InvalidSctError(f"unknown SCT log entry type: {sct.entry_type!r}")
Expand Down Expand Up @@ -200,8 +93,7 @@ def _pack_digitally_signed(

# No extensions are currently specified, so we treat the presence
# of any extension bytes as suspicious.
# HACK(#84): Replace with `sct.extension_bytes`
if len(_sct_extension_bytes(sct)) != 0:
if len(sct.extension_bytes) != 0:
raise InvalidSctError("Unexpected trailing extension bytes")

# This constructs the "core" `signed_entry` field, which is either
Expand All @@ -221,8 +113,7 @@ def _pack_digitally_signed(
int(timestamp.timestamp() * 1000), # timestamp (milliseconds)
sct.entry_type.value, # entry_type (x509_entry(0) | precert_entry(1))
signed_entry, # select(entry_type) -> signed_entry (see above)
# HACK(#84): Replace with `sct.extension_bytes`
len(_sct_extension_bytes(sct)), # extensions (opaque CtExtensions<0..2^16-1>)
len(sct.extension_bytes), # extensions (opaque CtExtensions<0..2^16-1>)
)
# fmt: on

Expand All @@ -232,9 +123,7 @@ def _pack_digitally_signed(
def _is_preissuer(issuer: Certificate) -> bool:
ext_key_usage = issuer.extensions.get_extension_for_class(ExtendedKeyUsage)

# HACK(#84): Replace with the line below.
# return ExtendedKeyUsageOID.CERTIFICATE_TRANSPARENCY in ext_key_usage.value
return _CERTIFICATE_TRANSPARENCY in ext_key_usage.value
return ExtendedKeyUsageOID.CERTIFICATE_TRANSPARENCY in ext_key_usage.value


def _get_issuer_cert(chain: List[Certificate]) -> Certificate:
Expand Down Expand Up @@ -262,7 +151,6 @@ def verify_sct(
cert: Certificate,
chain: List[Certificate],
ctfe_key: Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey],
raw_sct: Optional[bytes],
) -> None:
"""Verify a signed certificate timestamp"""

Expand All @@ -276,39 +164,34 @@ def verify_sct(

digitally_signed = _pack_digitally_signed(sct, cert, issuer_key_hash)

hash_algorithm, signature_algorithm, signature = _sct_properties(sct, raw_sct)

# HACK(#84): Refactor.
if not isinstance(hash_algorithm, hashes.SHA256):
if not isinstance(sct.signature_hash_algorithm, hashes.SHA256):
raise InvalidSctError(
"Found unexpected hash algorithm in SCT: only SHA256 is supported "
f"(expected {_HASH_ALGORITHM_SHA256}, got {hash_algorithm})"
f"(expected {hashes.SHA256}, got {sct.signature_hash_algorithm})"
)

try:
# HACK(#84): Replace with `sct.signature_algorithm`
if signature_algorithm == _SIG_ALGORITHM_RSA and isinstance(
if sct.signature_algorithm == SignatureAlgorithm.RSA and isinstance(
ctfe_key, rsa.RSAPublicKey
):
ctfe_key.verify(
signature=signature,
signature=sct.signature,
data=digitally_signed,
padding=padding.PKCS1v15(),
algorithm=hashes.SHA256(),
)
# HACK(#84): Replace with `sct.signature_algorithm`
elif signature_algorithm == _SIG_ALGORITHM_ECDSA and isinstance(
elif sct.signature_algorithm == SignatureAlgorithm.ECDSA and isinstance(
ctfe_key, ec.EllipticCurvePublicKey
):
ctfe_key.verify(
signature=signature,
signature=sct.signature,
data=digitally_signed,
signature_algorithm=ec.ECDSA(hashes.SHA256()),
)
else:
raise InvalidSctError(
"Found unexpected signature type in SCT: signature type of"
f"{signature_algorithm} and CTFE key type of {type(ctfe_key)}"
f"{sct.signature_algorithm} and CTFE key type of {type(ctfe_key)}"
)
except InvalidSignature as inval_sig:
raise InvalidSctError from inval_sig
Loading