diff --git a/src/cryptojwt/exception.py b/src/cryptojwt/exception.py index 74a9d8c4..83bb1d35 100644 --- a/src/cryptojwt/exception.py +++ b/src/cryptojwt/exception.py @@ -117,3 +117,7 @@ class HTTPException(JWKESTException): class UnsupportedECurve(Unsupported): pass + + +class UnsupportedOKPCurve(Unsupported): + pass diff --git a/src/cryptojwt/jwk/jwk.py b/src/cryptojwt/jwk/jwk.py index c9e32904..670c8da5 100644 --- a/src/cryptojwt/jwk/jwk.py +++ b/src/cryptojwt/jwk/jwk.py @@ -4,6 +4,8 @@ from cryptography.hazmat import backends from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import ed448 +from cryptography.hazmat.primitives.asymmetric import ed25519 from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric.rsa import rsa_crt_dmp1 from cryptography.hazmat.primitives.asymmetric.rsa import rsa_crt_dmq1 @@ -17,6 +19,7 @@ from .ec import NIST2SEC from .ec import ECKey from .hmac import SYMKey +from .okp import OKPKey from .rsa import RSAKey EC_PUBLIC_REQUIRED = frozenset(["crv", "x", "y"]) @@ -25,6 +28,12 @@ EC_PRIVATE_OPTIONAL = frozenset() EC_PRIVATE = EC_PRIVATE_REQUIRED | EC_PRIVATE_OPTIONAL +OKP_PUBLIC_REQUIRED = frozenset(["crv", "x"]) +OKP_PUBLIC = OKP_PUBLIC_REQUIRED +OKP_PRIVATE_REQUIRED = frozenset(["d"]) +OKP_PRIVATE_OPTIONAL = frozenset() +OKP_PRIVATE = OKP_PRIVATE_REQUIRED | OKP_PRIVATE_OPTIONAL + RSA_PUBLIC_REQUIRED = frozenset(["e", "n"]) RSA_PUBLIC = RSA_PUBLIC_REQUIRED RSA_PRIVATE_REQUIRED = frozenset(["p", "q", "d"]) @@ -42,6 +51,16 @@ def ensure_ec_params(jwk_dict, private): return ensure_params("EC", provided, required) +def ensure_okp_params(jwk_dict, private): + """Ensure all required OKP parameters are present in dictionary""" + provided = frozenset(jwk_dict.keys()) + if private is not None and private: + required = OKP_PUBLIC_REQUIRED | OKP_PRIVATE_REQUIRED + else: + required = OKP_PUBLIC_REQUIRED + return ensure_params("OKP", provided, required) + + def ensure_rsa_params(jwk_dict, private): """Ensure all required RSA parameters are present in dictionary""" provided = frozenset(jwk_dict.keys()) @@ -140,6 +159,15 @@ def key_from_jwk_dict(jwk_dict, private=None): if _jwk_dict["kty"] != "RSA": raise WrongKeyType('"{}" should have been "RSA"'.format(_jwk_dict["kty"])) return RSAKey(**_jwk_dict) + elif _jwk_dict["kty"] == "OKP": + ensure_okp_params(_jwk_dict, private) + + if private is not None and not private: + # remove private components + for v in OKP_PRIVATE: + _jwk_dict.pop(v, None) + + return OKPKey(**_jwk_dict) elif _jwk_dict["kty"] == "oct": if "key" not in _jwk_dict and "k" not in _jwk_dict: raise MissingValue('There has to be one of "k" or "key" in a symmetric key') @@ -164,6 +192,8 @@ def jwk_wrap(key, use="", kid=""): kspec = SYMKey(key=key, use=use, kid=kid) elif isinstance(key, ec.EllipticCurvePublicKey): kspec = ECKey(use=use, kid=kid).load_key(key) + elif isinstance(key, (ed25519.Ed25519PublicKey, ed448.Ed448PublicKey)): + kspec = OKPKey(use=use, kid=kid).load_key(key) else: raise Exception("Unknown key type:key=" + str(type(key))) diff --git a/src/cryptojwt/jwk/okp.py b/src/cryptojwt/jwk/okp.py new file mode 100644 index 00000000..7d165f74 --- /dev/null +++ b/src/cryptojwt/jwk/okp.py @@ -0,0 +1,381 @@ +from typing import Union + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ed448 +from cryptography.hazmat.primitives.asymmetric import ed25519 +from cryptography.hazmat.primitives.asymmetric import x448 +from cryptography.hazmat.primitives.asymmetric import x25519 + +from cryptojwt.exception import KeyNotFound + +from ..exception import DeSerializationNotPossible +from ..exception import JWKESTException +from ..exception import UnsupportedOKPCurve +from ..utils import b64d +from ..utils import b64e +from .asym import AsymmetricKey +from .x509 import import_private_key_from_pem_file +from .x509 import import_public_key_from_pem_data +from .x509 import import_public_key_from_pem_file + +OKPPublicKey = Union[ + ed25519.Ed25519PublicKey, ed448.Ed448PublicKey, x25519.X25519PublicKey, x448.X448PublicKey +] +OKPPrivateKey = Union[ + ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey, x25519.X25519PrivateKey, x448.X448PrivateKey +] + +OKP_CRV2PUBLIC = { + "Ed25519": ed25519.Ed25519PublicKey, + "Ed448": ed448.Ed448PublicKey, + "X25519": x25519.X25519PublicKey, + "X448": x448.X448PublicKey, +} + +OKP_CRV2PRIVATE = { + "Ed25519": ed25519.Ed25519PrivateKey, + "Ed448": ed448.Ed448PrivateKey, + "X25519": x25519.X25519PrivateKey, + "X448": x448.X448PrivateKey, +} + +OKP_CRV_SIGN = ["Ed25519", "Ed448"] +OKP_CRV_ENCR = ["X25519", "X448"] + + +def is_private_key(key) -> bool: + if isinstance( + key, + ( + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, + x25519.X25519PrivateKey, + x448.X448PrivateKey, + ), + ): + return True + elif isinstance( + key, + ( + ed25519.Ed25519PublicKey, + ed448.Ed448PublicKey, + ed448.Ed448PublicKey, + x25519.X25519PublicKey, + x448.X448PublicKey, + ), + ): + return False + raise TypeError + + +def deser(val): + return b64d(val.encode()) if isinstance(val, str) else b64d(val) + + +class OKPKey(AsymmetricKey): + """ + JSON Web key representation of an Octet Key Pair key. + According to RFC 8037 a JWK representation of an OKP key can look like + this:: + + { + "kty":"OKP", + "crv":"Ed25519", + "x":"XWxGtApfcqmKI7p0OKnF5JSEWMVoLsytFXLEP7xZ_l8", + } + + Parameters according to https://tools.ietf.org/html/rfc8037 + """ + + members = AsymmetricKey.members[:] + # The elliptic curve specific attributes + members.extend(["crv", "x", "d"]) + longs = ["x", "d"] + public_members = AsymmetricKey.public_members[:] + public_members.extend(["kty", "alg", "use", "kid", "crv", "x"]) + # required attributes + required = ["kty", "crv", "x"] + + def __init__(self, kty="OKP", alg="", use="", kid="", crv="", x="", d="", **kwargs): + AsymmetricKey.__init__(self, kty, alg, use, kid, **kwargs) + self.crv = crv + self.x = x + self.d = d + + if not self.pub_key and not self.priv_key: + if self.x and self.crv: + self.verify() + self.deserialize() + elif any([self.x, self.crv]): + raise JWKESTException("Missing required parameter") + elif self.priv_key and not self.pub_key: + self.pub_key = self.priv_key.public_key() + self._serialize(self.priv_key) + + def deserialize(self): + """ + Starting with information gathered from the on-the-wire representation + of an OKP key (a JWK) initiate a OKPPublicKey or OKPPrivateKey instance. + So we have to get from having:: + + { + "kty":"OKP", + "crv":"Ed2559", + "x":"11qYAYKxCrfVS_7TyWQHOg7hcvPapiMlrwIaaPcHURo", + "d":"nWGxne_9WmC6hEr0kuwsxERJxWl7MmkZcDusAxyuf2A" + } + + to having a key that can be used for signing/verifying and/or + encrypting/decrypting. + If 'd' has value then we're dealing with a private key otherwise + a public key. 'x' MUST have a value. + If self.pub_key or self.priv_key has a value beforehand this will + be overwrite. + + x and d (if present) must be strings or bytes. + """ + + if isinstance(self.x, (str, bytes)): + _x = deser(self.x) + else: + raise ValueError('"x" MUST be a string') + + if self.d: + try: + if isinstance(self.d, (str, bytes)): + try: + self.priv_key = OKP_CRV2PRIVATE[self.crv].from_private_bytes(deser(self.d)) + except KeyError: + raise UnsupportedOKPCurve("Unsupported OKP curve: {}".format(self.crv)) + self.pub_key = self.priv_key.public_key() + except ValueError as err: + raise DeSerializationNotPossible(str(err)) + else: + try: + self.pub_key = OKP_CRV2PUBLIC[self.crv].from_public_bytes(_x) + except KeyError: + raise UnsupportedOKPCurve("Unsupported OKP curve: {}".format(self.crv)) + + def _serialize_public(self, key): + self.x = b64e( + key.public_bytes( + encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw + ) + ).decode("ascii") + + def _serialize_private(self, key): + self._serialize_public(key.public_key()) + self.d = b64e( + key.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ) + ).decode("ascii") + + def _serialize(self, key): + if isinstance(key, ed25519.Ed25519PublicKey): + self._serialize_public(key) + self.crv = "Ed25519" + elif isinstance(key, ed25519.Ed25519PrivateKey): + self._serialize_private(key) + self.crv = "Ed25519" + elif isinstance(key, x25519.X25519PublicKey): + self._serialize_public(key) + self.crv = "X25519" + elif isinstance(key, x25519.X25519PrivateKey): + self._serialize_private(key) + self.crv = "X25519" + elif isinstance(key, ed448.Ed448PublicKey): + self._serialize_public(key) + self.crv = "Ed448" + elif isinstance(key, ed448.Ed448PrivateKey): + self._serialize_private(key) + self.crv = "Ed448" + elif isinstance(key, x448.X448PublicKey): + self._serialize_public(key) + self.crv = "X448" + elif isinstance(key, x448.X448PrivateKey): + self._serialize_private(key) + self.crv = "X448" + + def serialize(self, private=False): + """ + Go from a OKPPrivateKey or OKPPublicKey instance to a JWK representation. + + :param private: Whether we should include the private attributes or not. + :return: A JWK as a dictionary + """ + if self.priv_key: + self._serialize(self.priv_key) + else: + self._serialize(self.pub_key) + + res = self.common() + + res.update({"crv": self.crv, "x": self.x}) + + if private and self.d: + res["d"] = self.d + + return res + + def load_key(self, key): + """ + Load an Octet Key Pair key + + :param key: An octet key pair key instance, private or public. + :return: Reference to this instance + """ + self._serialize(key) + + if is_private_key(key): + self.priv_key = key + self.pub_key = key.public_key() + else: + self.pub_key = key + + return self + + def load(self, filename): + """ + Load an Octet Key Pair from a file. + + :param filename: File name + """ + return self.load_key(import_private_okp_key_from_file(filename)) + + def decryption_key(self): + """ + Get a key appropriate for decrypting a message. + + :return: An OKPPrivateKey instance + """ + return self.priv_key + + def encryption_key(self): + """ + Get a key appropriate for encrypting a message. + + :return: An OKPPublicKey instance + """ + return self.pub_key + + def __eq__(self, other): + """ + Verify that the other key has the same properties as myself. + + :param other: The other key + :return: True if the keys as the same otherwise False + """ + + if self.__class__ != other.__class__: + return False + + _public_cls = OKP_CRV2PUBLIC[self.crv] + _private_cls = OKP_CRV2PRIVATE[self.crv] + if cmp_keys(self.pub_key, other.pub_key, _public_cls): + if other.private_key(): + if cmp_keys(self.priv_key, other.priv_key, _private_cls): + return True + elif self.private_key(): + return False + else: + return True + + return False + + def key_len(self): + if self.priv_key: + return self.priv_key.key_size + elif self.pub_key: + return self.pub_key.key_size + else: + raise KeyNotFound + + +def cmp_keys(a, b, key_type): + if isinstance(a, key_type): + if isinstance(b, key_type): + if is_private_key(a): + if a.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ) != b.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ): + return False + else: + if a.public_bytes( + encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw + ) != b.public_bytes( + encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw + ): + return False + return True + + return False + + +def new_okp_key(crv, kid="", **kwargs): + + _key = OKP_CRV2PRIVATE[crv].generate() + + _rk = OKPKey(priv_key=_key, kid=kid, **kwargs) + if not kid: + _rk.add_kid() + + return _rk + + +def import_public_okp_key_from_file(filename): + """ + Read a public Octet Key Pair key from a PEM file. + + :param filename: The name of the file + :param passphrase: A pass phrase to use to unpack the PEM file. + :return: A cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey instance + """ + public_key = import_public_key_from_pem_file(filename) + if not is_private_key(public_key): + return public_key + else: + return ValueError("Not a Octet Key Pair key") + + +def import_private_okp_key_from_file(filename, passphrase=None): + """ + Read a private Octet Key Pair key from a PEM file. + + :param filename: The name of the file + :param passphrase: A pass phrase to use to unpack the PEM file. + :return: A cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey + instance + """ + private_key = import_private_key_from_pem_file(filename, passphrase) + if is_private_key(private_key): + return private_key + else: + return ValueError("Not a private Octet Key Pair key") + + +def import_okp_key(pem_data): + """ + Extract an Octet Key Pair key from a PEM-encoded X.509 certificate + + :param pem_data: Elliptic Curve key encoded in standard form + :return: ec.EllipticCurvePublicKey + """ + public_key = import_public_key_from_pem_data(pem_data) + if not is_private_key(public_key): + return public_key + else: + return ValueError("Not a Octet Key Pair key") + + +def import_okp_key_from_cert_file(pem_file): + with open(pem_file, "r") as cert_file: + return import_okp_key(cert_file.read()) diff --git a/src/cryptojwt/jws/eddsa.py b/src/cryptojwt/jws/eddsa.py new file mode 100644 index 00000000..983d93d2 --- /dev/null +++ b/src/cryptojwt/jws/eddsa.py @@ -0,0 +1,50 @@ +import sys + +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives.asymmetric import ed448 +from cryptography.hazmat.primitives.asymmetric import ed25519 + +from ..exception import BadSignature +from ..exception import Unsupported +from . import Signer + + +class EDDSASigner(Signer): + def sign(self, msg, key): + """ + Create a signature over a message as defined in RFC7515 using an + Octet Key Pair key + + :param msg: The message + :param key: An Ed25519PrivateKey or Ed448PrivateKey instance + :return: + """ + + if not isinstance(key, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey)): + raise TypeError( + "The private key must be an instance of Ed25519PrivateKey or Ed448PrivateKey" + ) + + return key.sign(msg) + + def verify(self, msg, sig, key): + """ + Verify a message signature + + :param msg: The message + :param sig: A signature + :param key: A Ed25519PublicKey or Ed448PublicKey to use for the verification. + :raises: BadSignature if the signature can't be verified. + :return: True + """ + if not isinstance(key, (ed25519.Ed25519PublicKey, ed448.Ed448PublicKey)): + raise TypeError( + "The public key must be an instance of Ed25519PublicKey or Ed448PublicKey" + ) + + try: + key.verify(sig, msg) + except InvalidSignature as err: + raise BadSignature(err) + else: + return True diff --git a/src/cryptojwt/jws/jws.py b/src/cryptojwt/jws/jws.py index c9b334ee..1803be11 100644 --- a/src/cryptojwt/jws/jws.py +++ b/src/cryptojwt/jws/jws.py @@ -14,6 +14,7 @@ from ..utils import b64e_enc_dec from ..utils import b64encode_item from .dsa import ECDSASigner +from .eddsa import EDDSASigner from .exception import FormatError from .exception import NoSuitableSigningKeys from .exception import SignerAlgError @@ -45,6 +46,7 @@ "PS256": PSSSigner("SHA256"), "PS384": PSSSigner("SHA384"), "PS512": PSSSigner("SHA512"), + "EdDSA": EDDSASigner(), "none": None, } diff --git a/src/cryptojwt/jws/utils.py b/src/cryptojwt/jws/utils.py index 709c7853..9ccf16ff 100644 --- a/src/cryptojwt/jws/utils.py +++ b/src/cryptojwt/jws/utils.py @@ -49,6 +49,8 @@ def alg2keytype(alg): return "oct" elif alg.startswith("ES") or alg.startswith("ECDH-ES"): return "EC" + elif alg == "EdDSA": + return "OKP" else: return None diff --git a/src/cryptojwt/key_bundle.py b/src/cryptojwt/key_bundle.py index 999dd696..b07728e6 100755 --- a/src/cryptojwt/key_bundle.py +++ b/src/cryptojwt/key_bundle.py @@ -15,6 +15,7 @@ from cryptojwt.jwk.ec import NIST2SEC from cryptojwt.jwk.hmac import new_sym_key +from cryptojwt.jwk.okp import OKP_CRV2PUBLIC from cryptojwt.jwk.x509 import import_private_key_from_pem_file from .exception import JWKException @@ -27,6 +28,8 @@ from .jwk.hmac import SYMKey from .jwk.jwk import dump_jwk from .jwk.jwk import import_jwk +from .jwk.okp import OKPKey +from .jwk.okp import new_okp_key from .jwk.rsa import RSAKey from .jwk.rsa import new_rsa_key from .utils import as_unicode @@ -46,7 +49,7 @@ # raise excep(_err, 'application/json') # Make sure the keys are all uppercase -K2C = {"RSA": RSAKey, "EC": ECKey, "oct": SYMKey} +K2C = {"RSA": RSAKey, "EC": ECKey, "oct": SYMKey, "OKP": OKPKey} MAP = {"dec": "enc", "enc": "enc", "ver": "sig", "sig": "sig"} @@ -154,6 +157,29 @@ def ec_init(spec): return _kb +def okp_init(spec): + """ + Initiate a key bundle with an Octet Key Pair. + + :param spec: Key specifics of the form:: + {"type": "OKP", "crv": "Ed25519", "use": ["sig"]} + + :return: A KeyBundle instance + """ + curve = spec.get("crv", "Ed25519") + + _kb = KeyBundle(keytype="OKP") + if "use" in spec: + for use in spec["use"]: + eck = new_okp_key(crv=curve, use=use) + _kb.append(eck) + else: + eck = new_okp_key(crv=curve) + _kb.append(eck) + + return _kb + + def keys_writer(func): def wrapper(self, *args, **kwargs): with self._lock_writer: @@ -1003,6 +1029,17 @@ def build_key_bundle(key_conf, kid_template=""): ) else: _bundle = ec_init(spec) + elif typ == "OKP": + if "key" in spec and spec["key"]: + if os.path.isfile(spec["key"]): + _bundle = KeyBundle( + source="file://%s" % spec["key"], + fileformat="der", + keytype=typ, + keyusage=spec["use"], + ) + else: + _bundle = okp_init(spec) elif typ.lower() == "oct": _bundle = sym_init(spec) else: @@ -1047,7 +1084,7 @@ def type_order(kd1, kd2): if _l: return _l - if kd1["type"] == "EC": + if kd1["type"] in ["EC", "OKP"]: _l = _cmp(kd1["crv"], kd2["crv"]) if _l: return _l @@ -1155,8 +1192,8 @@ def key_diff(key_bundle, key_defs): if key.kty != key_def["type"]: continue - if key.kty == "EC": - # special test only for EC keys + if key.kty in ["EC", "OKP"]: + # special test only for EC and OKP keys if key.crv != key_def["crv"]: continue @@ -1230,7 +1267,7 @@ def key_rollover(bundle): key_spec = [] for key in bundle.get(): _spec = {"type": key.kty, "use": [key.use]} - if key.kty == "EC": + if key.kty in ["EC", "OKP"]: _spec["crv"] = key.crv key_spec.append(_spec) @@ -1264,6 +1301,7 @@ def unique_keys(keys): DEFAULT_RSA_KEYSIZE = 2048 DEFAULT_RSA_EXP = 65537 DEFAULT_EC_CURVE = "P-256" +DEFAULT_OKP_CURVE = "Ed25519" def key_gen(type, **kwargs): @@ -1290,6 +1328,12 @@ def key_gen(type, **kwargs): logging.error("Unknown curve: %s", crv) raise ValueError("Unknown curve: {}".format(crv)) _key = new_ec_key(crv=crv, **kargs) + elif type.upper() == "OKP": + crv = kwargs.get("crv", DEFAULT_OKP_CURVE) + if crv not in OKP_CRV2PUBLIC: + logging.error("Unknown curve: %s", crv) + raise ValueError("Unknown curve: {}".format(crv)) + _key = new_okp_key(crv=crv, **kargs) elif type.lower() in ["sym", "oct"]: keysize = kwargs.get("bytes", 24) randomkey = os.urandom(keysize) @@ -1324,6 +1368,8 @@ def key_by_alg(alg: str): return key_gen("EC", crv="P-384") elif alg == "ES512": return key_gen("EC", crv="P-521") + elif alg == "EdDSA": + return key_gen("OKP", crv=DEFAULT_OKP_CURVE) elif alg.startswith("HS"): return key_gen("sym") diff --git a/src/cryptojwt/tools/keyconv.py b/src/cryptojwt/tools/keyconv.py index a5b27112..969a2b01 100644 --- a/src/cryptojwt/tools/keyconv.py +++ b/src/cryptojwt/tools/keyconv.py @@ -14,6 +14,9 @@ from cryptojwt.jwk.ec import import_private_ec_key_from_file from cryptojwt.jwk.ec import import_public_ec_key_from_file from cryptojwt.jwk.hmac import SYMKey +from cryptojwt.jwk.okp import OKPKey +from cryptojwt.jwk.okp import import_private_okp_key_from_file +from cryptojwt.jwk.okp import import_public_okp_key_from_file from cryptojwt.jwk.rsa import RSAKey from cryptojwt.jwk.rsa import import_private_rsa_key_from_file from cryptojwt.jwk.rsa import import_public_rsa_key_from_file @@ -59,6 +62,22 @@ def pem2ec( return jwk +def pem2okp( + filename: str, + kid: Optional[str] = None, + private: bool = False, + passphrase: Optional[str] = None, +) -> JWK: + """Convert OKP key from PEM to JWK""" + if private: + key = import_private_okp_key_from_file(filename, passphrase) + else: + key = import_public_okp_key_from_file(filename) + jwk = OKPKey(kid=kid) + jwk.load_key(key) + return jwk + + def bin2jwk(filename: str, kid: str) -> JWK: """Read raw key from filename and return JWK""" with open(filename, "rb") as file: @@ -91,6 +110,8 @@ def pem2jwk( jwk = pem2ec(filename, kid, private=False) elif kty is not None and kty == "RSA": jwk = pem2rsa(filename, kid, private=False) + elif kty is not None and kty == "OKP": + jwk = pem2okp(filename, kid, private=False) else: raise ValueError("Unknown key type") elif "BEGIN PRIVATE KEY" in header: @@ -98,6 +119,8 @@ def pem2jwk( jwk = pem2ec(filename, kid, private=True, passphrase=passphrase) elif kty is not None and kty == "RSA": jwk = pem2rsa(filename, kid, private=True, passphrase=passphrase) + elif kty is not None and kty == "OKP": + jwk = pem2okp(filename, kid, private=True, passphrase=passphrase) else: raise ValueError("Unknown key type") elif "BEGIN EC PRIVATE KEY" in header: diff --git a/src/cryptojwt/tools/keygen.py b/src/cryptojwt/tools/keygen.py index 558a6673..3a028613 100644 --- a/src/cryptojwt/tools/keygen.py +++ b/src/cryptojwt/tools/keygen.py @@ -8,6 +8,8 @@ from cryptojwt.jwk.ec import NIST2SEC from cryptojwt.jwk.ec import new_ec_key from cryptojwt.jwk.hmac import new_sym_key +from cryptojwt.jwk.okp import OKP_CRV2PUBLIC +from cryptojwt.jwk.okp import new_okp_key from cryptojwt.jwk.rsa import new_rsa_key from cryptojwt.utils import b64e @@ -28,7 +30,7 @@ def main(): dest="crv", metavar="curve", help="EC curve", - choices=NIST2SEC.keys(), + choices=list(NIST2SEC.keys()) + list(OKP_CRV2PUBLIC.keys()), default=DEFAULT_EC_CURVE, ) parser.add_argument( @@ -51,6 +53,11 @@ def main(): print("Unknown curve: {0}".format(args.crv), file=sys.stderr) exit(1) jwk = new_ec_key(crv=args.crv, kid=args.kid) + elif args.kty.upper() == "OKP": + if args.crv not in OKP_CRV2PUBLIC: + print("Unknown curve: {0}".format(args.crv), file=sys.stderr) + exit(1) + jwk = new_okp_key(crv=args.crv, kid=args.kid) elif args.kty.upper() == "SYM" or args.kty.upper() == "OCT": if args.keysize is None: args.keysize = DEFAULT_SYM_KEYSIZE diff --git a/tests/ed25519.pem b/tests/ed25519.pem new file mode 100644 index 00000000..51033458 --- /dev/null +++ b/tests/ed25519.pem @@ -0,0 +1,3 @@ +-----BEGIN PRIVATE KEY----- +MC4CAQAwBQYDK2VwBCIEIFp/m1fdvi+8lyL11WjusBF566clBk556Rpx/ZLtOyE3 +-----END PRIVATE KEY----- diff --git a/tests/ed448.pem b/tests/ed448.pem new file mode 100644 index 00000000..35e85c50 --- /dev/null +++ b/tests/ed448.pem @@ -0,0 +1,4 @@ +-----BEGIN PRIVATE KEY----- +MEcCAQAwBQYDK2VxBDsEOXswwlU/yneCGw8vZZLRGYxk71AFyv8W4+rZcXpVV9i2 +8w6Cvd8wk1S9itC4VSqrnuEFpfHVaY47wA== +-----END PRIVATE KEY----- diff --git a/tests/test_02_jwk.py b/tests/test_02_jwk.py old mode 100644 new mode 100755 index 485bde45..c01e85ad --- a/tests/test_02_jwk.py +++ b/tests/test_02_jwk.py @@ -9,7 +9,11 @@ import pytest from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import ed448 +from cryptography.hazmat.primitives.asymmetric import ed25519 from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric import x448 +from cryptography.hazmat.primitives.asymmetric import x25519 from cryptojwt.exception import DeSerializationNotPossible from cryptojwt.exception import UnsupportedAlgorithm @@ -27,6 +31,8 @@ from cryptojwt.jwk.jwk import import_jwk from cryptojwt.jwk.jwk import jwk_wrap from cryptojwt.jwk.jwk import key_from_jwk_dict +from cryptojwt.jwk.okp import OKPKey +from cryptojwt.jwk.okp import new_okp_key from cryptojwt.jwk.rsa import RSAKey from cryptojwt.jwk.rsa import import_private_rsa_key_from_file from cryptojwt.jwk.rsa import import_public_rsa_key_from_file @@ -723,3 +729,79 @@ def test_import_public_key_from_pem_file(filename, key_type): _file = full_path(filename) pub_key = import_public_key_from_pem_file(_file) assert isinstance(pub_key, key_type) + + +OKPKEY = {"crv": "Ed25519", "kty": "OKP", "x": "11qYAYKxCrfVS_7TyWQHOg7hcvPapiMlrwIaaPcHURo"} +OKPKEY_SHA256 = "kPrK_qmxVWaYVA9wwBF6Iuo3vVzz7TxHCTwXBygrS4k" + + +def test_new_okp_thumbprint(): + okp_key = OKPKey(**OKPKEY) + assert okp_key.thumbprint("SHA-256").decode() == OKPKEY_SHA256 + + +def test_new_okp_key(): + okp_key = new_okp_key("Ed25519") + assert isinstance(okp_key, OKPKey) + assert okp_key.crv == "Ed25519" + + okp_key = new_okp_key("Ed448") + assert isinstance(okp_key, OKPKey) + assert okp_key.crv == "Ed448" + + okp_key = new_okp_key("X25519") + assert isinstance(okp_key, OKPKey) + assert okp_key.crv == "X25519" + + okp_key = new_okp_key("X448") + assert isinstance(okp_key, OKPKey) + assert okp_key.crv == "X448" + + +def test_create_okp_key(): + okp = new_okp_key("Ed25519") + exp_key = okp.serialize() + assert _eq(list(exp_key.keys()), ["x", "crv", "kty", "kid"]) + + +def test_create_okp_wrap(): + key = ed25519.Ed25519PrivateKey.generate() + okp_key = jwk_wrap(key.public_key()) + assert isinstance(okp_key, OKPKey) + assert okp_key.crv == "Ed25519" + + +def test_cmp_neq_okp(): + okp_key = new_okp_key("Ed25519") + _key1 = OKPKey(priv_key=okp_key.priv_key) + _key2 = OKPKey(**OKPKEY) + + assert _key1 != _key2 + + +def test_cmp_eq_okp(): + okp_key = new_okp_key("Ed25519") + _key1 = OKPKey(priv_key=okp_key.priv_key) + _key2 = OKPKey(priv_key=okp_key.priv_key) + + assert _key1 == _key2 + + +def test_key_from_jwk_dict_okp_ed25519(): + key = OKPKey().load(full_path("ed25519.pem")) + assert key.has_private_key() + jwk = key.serialize(private=True) + assert jwk["crv"] == "Ed25519" + _key = key_from_jwk_dict(jwk) + assert isinstance(_key, OKPKey) + assert _key.has_private_key() + + +def test_key_from_jwk_dict_okp_ed448(): + key = OKPKey().load(full_path("ed448.pem")) + assert key.has_private_key() + jwk = key.serialize(private=True) + assert jwk["crv"] == "Ed448" + _key = key_from_jwk_dict(jwk) + assert isinstance(_key, OKPKey) + assert _key.has_private_key() diff --git a/tests/test_03_key_bundle.py b/tests/test_03_key_bundle.py index 048ca958..bd7353ba 100755 --- a/tests/test_03_key_bundle.py +++ b/tests/test_03_key_bundle.py @@ -14,6 +14,8 @@ from cryptojwt.jwk.ec import ECKey from cryptojwt.jwk.ec import new_ec_key from cryptojwt.jwk.hmac import SYMKey +from cryptojwt.jwk.okp import OKPKey +from cryptojwt.jwk.okp import new_okp_key from cryptojwt.jwk.rsa import RSAKey from cryptojwt.jwk.rsa import import_rsa_key_from_cert_file from cryptojwt.jwk.rsa import new_rsa_key @@ -620,6 +622,13 @@ def test_dump_jwk(): "y": "GOd2jL_6wa0cfnyA0SmEhok9fkYEnAHFKLLM79BZ8_E", "crv": "P-256", }, + { + "kty": "OKP", + "kid": "xyzzy", + "use": "sig", + "x": "11qYAYKxCrfVS_7TyWQHOg7hcvPapiMlrwIaaPcHURo", + "crv": "Ed25519", + }, ] } @@ -627,17 +636,19 @@ def test_dump_jwk(): def test_keys(): kb = KeyBundle(JWKS_DICT) - assert len(kb) == 3 + assert len(kb) == 4 assert len(kb.get("rsa")) == 1 assert len(kb.get("oct")) == 1 assert len(kb.get("ec")) == 1 + assert len(kb.get("okp")) == 1 EXPECTED = [ b"iA7PvG_DfJIeeqQcuXFmvUGjqBkda8In_uMpZrcodVA", b"akXzyGlXg8yLhsCczKb_r8VERLx7-iZBUMIVgg2K7p4", b"kLsuyGef1kfw5-t-N9CJLIHx_dpZ79-KemwqjwdrvTI", + b"kPrK_qmxVWaYVA9wwBF6Iuo3vVzz7TxHCTwXBygrS4k", ] @@ -860,6 +871,15 @@ def test_key_gen_rsa(): assert isinstance(_jwk, RSAKey) +def test_key_gen_okp(): + _jwk = key_gen("OKP", kid="kid1") + assert _jwk + assert _jwk.kty == "OKP" + assert _jwk.kid == "kid1" + + assert isinstance(_jwk, OKPKey) + + def test_init_key(): spec = {"type": "RSA", "kid": "one"} @@ -932,10 +952,11 @@ def test_remote(): exp = kb.dump() kb2 = KeyBundle().load(exp) assert kb2.source == source - assert len(kb2.keys()) == 3 + assert len(kb2.keys()) == 4 assert len(kb2.get("rsa")) == 1 assert len(kb2.get("oct")) == 1 assert len(kb2.get("ec")) == 1 + assert len(kb2.get("okp")) == 1 assert kb2.httpc_params == {"timeout": (2, 2)} assert kb2.imp_jwks assert kb2.last_updated @@ -972,11 +993,12 @@ def test_remote_not_modified(): exp = kb.dump() kb2 = KeyBundle().load(exp) assert kb2.source == source - assert len(kb2.keys()) == 3 - assert len(kb2.active_keys()) == 3 + assert len(kb2.keys()) == 4 + assert len(kb2.active_keys()) == 4 assert len(kb2.get("rsa")) == 1 assert len(kb2.get("oct")) == 1 assert len(kb2.get("ec")) == 1 + assert len(kb2.get("okp")) == 1 assert kb2.httpc_params == {"timeout": (2, 2)} assert kb2.imp_jwks assert kb2.last_updated diff --git a/tests/test_06_jws.py b/tests/test_06_jws.py index c62d0e42..0fb7a5ee 100644 --- a/tests/test_06_jws.py +++ b/tests/test_06_jws.py @@ -6,6 +6,7 @@ import pytest from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import ed25519 from cryptojwt import as_unicode from cryptojwt.exception import BadSignature @@ -13,6 +14,7 @@ from cryptojwt.exception import WrongNumberOfParts from cryptojwt.jwk.ec import ECKey from cryptojwt.jwk.hmac import SYMKey +from cryptojwt.jwk.okp import OKPKey from cryptojwt.jwk.rsa import RSAKey from cryptojwt.jwk.rsa import import_private_rsa_key_from_file from cryptojwt.jws.exception import FormatError @@ -601,6 +603,39 @@ def test_signer_ps512(): assert _rj.verify_alg("PS512") +def test_signer_eddsa(): + payload = "Please take a moment to register today" + okp = ed25519.Ed25519PrivateKey.generate() + _key = OKPKey().load_key(okp) + keys = [_key] + _jws = JWS(payload, alg="EdDSA") + _jwt = _jws.sign_compact(keys) + + _pubkey = OKPKey().load_key(okp.public_key()) + _rj = JWS(alg="EdDSA") + info = _rj.verify_compact(_jwt, [_pubkey]) + assert info == payload + + +def test_signer_eddsa_fail(): + payload = "Please take a moment to register today" + okp = ed25519.Ed25519PrivateKey.generate() + _key = OKPKey().load_key(okp) + keys = [_key] + _jws = JWS(payload, alg="EdDSA") + _jwt = _jws.sign_compact(keys) + + okp2 = ed25519.Ed25519PrivateKey.generate() + _pubkey = OKPKey().load_key(okp2.public_key()) + _rj = JWS(alg="EdDSA") + try: + info = _rj.verify_compact(_jwt, [_pubkey]) + except BadSignature: + pass + else: + assert False + + def test_no_alg_and_alg_none_same(): payload = "Please take a moment to register today" _jws = JWS(payload, alg="none") diff --git a/tests/test_09_jwt.py b/tests/test_09_jwt.py index d016b211..b1fa2167 100755 --- a/tests/test_09_jwt.py +++ b/tests/test_09_jwt.py @@ -235,3 +235,26 @@ def test_pick_key(): _k = pick_key(keys, "enc", "ECDH-ES") assert len(_k) == 0 + + +def test_eddsa_jwt(): + JWKS_DICT = { + "keys": [ + { + "kty": "OKP", + "kid": "-1909572257", + "crv": "Ed25519", + "x": "XWxGtApfcqmKI7p0OKnF5JSEWMVoLsytFXLEP7xZ_l8", + } + ] + } + JWT_TEST = ( + "eyJraWQiOiItMTkwOTU3MjI1NyIsImFsZyI6IkVkRFNBIn0." + + "eyJqdGkiOiIyMjkxNmYzYy05MDkzLTQ4MTMtODM5Ny1mMTBlNmI3MDRiNjgiLCJkZWxlZ2F0aW9uSWQiOiJiNGFlNDdhNy02MjVhLTQ2MzAtOTcyNy00NTc2NGE3MTJjY2UiLCJleHAiOjE2NTUyNzkxMDksIm5iZiI6MTY1NTI3ODgwOSwic2NvcGUiOiJyZWFkIG9wZW5pZCIsImlzcyI6Imh0dHBzOi8vaWRzdnIuZXhhbXBsZS5jb20iLCJzdWIiOiJ1c2VybmFtZSIsImF1ZCI6ImFwaS5leGFtcGxlLmNvbSIsImlhdCI6MTY1NTI3ODgwOSwicHVycG9zZSI6ImFjY2Vzc190b2tlbiJ9." + + "rjeE8D_e4RYzgvpu-nOwwx7PWMiZyDZwkwO6RiHR5t8g4JqqVokUKQt-oST1s45wubacfeDSFogOrIhe3UHDAg" + ) + ISSUER = "https://idsvr.example.com" + kj = KeyJar() + kj.add_kb(ISSUER, KeyBundle(JWKS_DICT)) + jwt = JWT(key_jar=kj) + _ = jwt.unpack(JWT_TEST)