Skip to content

Commit 0a5a521

Browse files
authored
Merge pull request #140 from IdentityPython/is_compact
Tests for finding out if a token is a compact JWS, json JWS or a JWE.…
2 parents 7065213 + 0135e14 commit 0a5a521

File tree

4 files changed

+156
-4
lines changed

4 files changed

+156
-4
lines changed

src/cryptojwt/jws/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# import struct
2+
23
from cryptography.hazmat.primitives import hashes
34
from cryptography.hazmat.primitives.asymmetric import padding
45

src/cryptojwt/utils.py

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
DEFAULT_HTTPC_TIMEOUT = 10
1515

16+
1617
# ---------------------------------------------------------------------------
1718
# Helper functions
1819

@@ -193,7 +194,7 @@ def split_token(token):
193194

194195
def deser(val):
195196
"""
196-
Deserialize from a string representation of an long integer
197+
Deserialize from a string representation of a long integer
197198
to the python representation of a long integer.
198199
199200
:param val: The string representation of the long integer.
@@ -212,12 +213,12 @@ def modsplit(name):
212213
if ":" in name:
213214
_part = name.split(":")
214215
if len(_part) != 2:
215-
raise ValueError(f"Syntax error: {s}")
216+
raise ValueError(f"Syntax error: {name}")
216217
return _part[0], _part[1]
217218

218219
_part = name.split(".")
219220
if len(_part) < 2:
220-
raise ValueError(f"Syntax error: {s}")
221+
raise ValueError(f"Syntax error: {name}")
221222

222223
return ".".join(_part[:-1]), _part[-1]
223224

@@ -273,3 +274,94 @@ def check_content_type(content_type, mime_type):
273274
msg["content-type"] = content_type
274275
mt = msg.get_content_type()
275276
return mime_type == mt
277+
278+
279+
def is_compact_jws(token):
280+
token = as_bytes(token)
281+
282+
try:
283+
part = split_token(token)
284+
except BadSyntax:
285+
return False
286+
287+
# Should be three parts
288+
if len(part) != 3:
289+
return False
290+
291+
# All base64 encoded
292+
try:
293+
part = [b64d(p) for p in part]
294+
except Exception:
295+
return False
296+
297+
# header should be a JSON object, 'alg' most be one parameter
298+
try:
299+
_header = json.loads(part[0])
300+
except Exception:
301+
return False
302+
303+
if "alg" not in _header:
304+
return False
305+
306+
return True
307+
308+
309+
def is_jwe(token):
310+
token = as_bytes(token)
311+
312+
try:
313+
part = split_token(token)
314+
except BadSyntax:
315+
return False
316+
317+
# Should be five parts
318+
if len(part) != 5:
319+
return False
320+
321+
# All base64 encoded
322+
try:
323+
part = [b64d(p) for p in part]
324+
except Exception:
325+
return False
326+
327+
# header should be a JSON object, 'alg' most be one parameter
328+
try:
329+
_header = json.loads(part[0])
330+
except Exception:
331+
return False
332+
333+
if "alg" not in _header or "enc" not in _header:
334+
return False
335+
336+
return True
337+
338+
339+
def is_json_jws(token):
340+
if isinstance(token, str):
341+
try:
342+
token = json.loads(token)
343+
except Exception:
344+
return False
345+
346+
for arg in ["payload", "signatures"]:
347+
if arg not in token:
348+
return False
349+
350+
if not isinstance(token["signatures"], list):
351+
return False
352+
353+
for sign in token["signatures"]:
354+
if not isinstance(sign, dict):
355+
return False
356+
if "signature" not in sign:
357+
return False
358+
359+
return True
360+
361+
362+
def is_jws(token):
363+
if is_json_jws(token):
364+
return "json"
365+
elif is_compact_jws(token):
366+
return "compact"
367+
return False

tests/test_06_jws.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from cryptography.hazmat.backends import default_backend
88
from cryptography.hazmat.primitives.asymmetric import ec
99

10+
from cryptojwt import as_unicode
1011
from cryptojwt.exception import BadSignature
1112
from cryptojwt.exception import UnknownAlgorithm
1213
from cryptojwt.exception import WrongNumberOfParts
@@ -25,10 +26,13 @@
2526
from cryptojwt.jws.utils import left_hash
2627
from cryptojwt.jws.utils import parse_rsa_algorithm
2728
from cryptojwt.key_bundle import KeyBundle
29+
from cryptojwt.utils import as_bytes
2830
from cryptojwt.utils import b64d
2931
from cryptojwt.utils import b64d_enc_dec
3032
from cryptojwt.utils import b64e
3133
from cryptojwt.utils import intarr2bin
34+
from cryptojwt.utils import is_compact_jws
35+
from cryptojwt.utils import is_json_jws
3236

3337
BASEDIR = os.path.abspath(os.path.dirname(__file__))
3438

@@ -297,7 +301,6 @@ def full_path(local_file):
297301
]
298302
}
299303

300-
301304
SIGJWKS = KeyBundle(JWKS_b)
302305

303306

@@ -1020,3 +1023,50 @@ def test_verify_json_missing_key():
10201023

10211024
# With both
10221025
assert JWS().verify_json(_jwt, keys=[vkeys[0], sym_key])
1026+
1027+
1028+
def test_is_compact_jws():
1029+
_header = {"foo": "bar", "alg": "HS384"}
1030+
_payload = "hello world"
1031+
_sym_key = SYMKey(key=b"My hollow echo chamber", alg="HS384")
1032+
1033+
_jwt = JWS(msg=_payload, alg="HS384").sign_compact(keys=[_sym_key])
1034+
1035+
assert is_compact_jws(_jwt)
1036+
1037+
# Faulty examples
1038+
1039+
# to few parts
1040+
assert is_compact_jws("abc.def") is False
1041+
1042+
# right number of parts but not base64
1043+
1044+
assert is_compact_jws("abc.def.ghi") is False
1045+
1046+
# not base64 illegal characters
1047+
assert is_compact_jws("abc.::::.ghi") is False
1048+
1049+
# Faulty header
1050+
_faulty_header = {"foo": "bar"} # alg is a MUST
1051+
_jwt = ".".join([as_unicode(b64e(as_bytes(json.dumps(_faulty_header)))), "def", "ghi"])
1052+
assert is_compact_jws(_jwt) is False
1053+
1054+
1055+
def test_is_json_jws():
1056+
ec_key = ECKey().load_key(P256())
1057+
sym_key = SYMKey(key=b"My hollow echo chamber", alg="HS384")
1058+
1059+
protected_headers_1 = {"foo": "bar", "alg": "ES256"}
1060+
unprotected_headers_1 = {"abc": "xyz"}
1061+
protected_headers_2 = {"foo": "bar", "alg": "HS384"}
1062+
unprotected_headers_2 = {"abc": "zeb"}
1063+
payload = "hello world"
1064+
_jwt = JWS(msg=payload).sign_json(
1065+
headers=[
1066+
(protected_headers_1, unprotected_headers_1),
1067+
(protected_headers_2, unprotected_headers_2),
1068+
],
1069+
keys=[ec_key, sym_key],
1070+
)
1071+
1072+
assert is_json_jws(_jwt)

tests/test_07_jwe.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737

3838
__author__ = "rohe0002"
3939

40+
from cryptojwt.utils import is_jwe
41+
4042

4143
def rndstr(size=16):
4244
"""
@@ -717,3 +719,10 @@ def test_fernet_blake2s():
717719
decrypter = encrypter
718720
resp = decrypter.decrypt(_token)
719721
assert resp == plain
722+
723+
724+
def test_is_jwe():
725+
encryption_key = SYMKey(use="enc", key="DukeofHazardpass", kid="some-key-id")
726+
jwe = JWE(plain, alg="A128KW", enc="A128CBC-HS256")
727+
_jwe = jwe.encrypt(keys=[encryption_key], kid="some-key-id")
728+
assert is_jwe(_jwe)

0 commit comments

Comments
 (0)