1
+ from datetime import datetime
1
2
from inspect import signature
2
3
from logging import (
3
4
debug ,
4
5
warning ,
5
6
)
6
7
from re import search
7
- from time import time
8
8
from typing import (
9
9
Dict ,
10
10
Optional ,
11
+ Type ,
12
+ cast ,
11
13
)
12
14
13
15
from django .contrib .auth import get_user_model
23
25
JWTError ,
24
26
jwt ,
25
27
)
28
+ from pytz import utc
26
29
from requests import get as request_get
27
30
from requests import post as request_post
28
31
35
38
36
39
37
40
class AuthenticationMixin :
38
- def __init__ (self , * args , ** kwargs ):
39
- self .UserModel = get_user_model ()
41
+ UserModel : Type [AbstractUser ] = cast (Type [AbstractUser ], get_user_model ())
40
42
41
43
def validate_and_decode_id_token (self , id_token : str , nonce : Optional [str ], jwks : Dict ) -> Dict :
42
44
header = jwt .get_unverified_header (id_token )
@@ -56,14 +58,10 @@ def validate_and_decode_id_token(self, id_token: str, nonce: Optional[str], jwks
56
58
key = settings .OIDC_RP_CLIENT_SECRET
57
59
else :
58
60
raise NotImplementedError (f"Algo { algo } cannot be handled by this authentication backend" )
59
- if isinstance (key , dict ):
60
- secret = key
61
- else : # RSA public key (bytes)
62
- secret = key
63
61
try :
64
62
claims = jwt .decode (
65
63
id_token ,
66
- secret ,
64
+ key ,
67
65
algorithms = settings .OIDC_RP_SIGN_ALGOS_ALLOWED ,
68
66
audience = settings .OIDC_RP_CLIENT_ID ,
69
67
options = {
@@ -80,7 +78,7 @@ def validate_and_decode_id_token(self, id_token: str, nonce: Optional[str], jwks
80
78
raise SuspiciousOperation ("JWT Nonce verification failed" )
81
79
return claims
82
80
83
- def validate_claims (self , claims ) :
81
+ def validate_claims (self , claims : Dict ) -> None :
84
82
expected_list = [settings .OIDC_OP_EXPECTED_EMAIL_CLAIM ] + list (settings .OIDC_OP_EXPECTED_CLAIMS )
85
83
debug (f"Validate claims={ claims } against expected { expected_list } " )
86
84
for expected in expected_list :
@@ -163,17 +161,10 @@ def authenticate_oauth2(self,
163
161
return None
164
162
try :
165
163
if use_pkce :
166
- if any ((
167
- not code ,
168
- not code_verifier ,
169
- )):
164
+ if not code or not code_verifier :
170
165
raise SuspiciousOperation ('code and code_verifier values are required' )
171
166
else :
172
- if any ((
173
- not code ,
174
- not state ,
175
- not nonce ,
176
- )):
167
+ if not code or not state or not nonce :
177
168
raise SuspiciousOperation ('code, state and nonce values are required' )
178
169
params = {
179
170
'grant_type' : 'authorization_code' ,
@@ -190,53 +181,52 @@ def authenticate_oauth2(self,
190
181
result = resp .json ()
191
182
id_token = result ['id_token' ]
192
183
access_token = result ['access_token' ]
193
- expires_in = result .get ('expires_in' ) # in secs, could be missing
184
+ access_expires_in = result .get ('expires_in' ) # in secs, could be missing
194
185
refresh_token = result .get ('refresh_token' ) if 'offline_access' in settings .OIDC_RP_SCOPES else None
195
- id_claims = self .validate_and_decode_id_token (id_token , nonce , request .session [ constants .SESSION_OP_JWKS ] )
186
+ id_claims = self .validate_and_decode_id_token (id_token , nonce , request .session . get ( constants .SESSION_OP_JWKS , {}) )
196
187
self .validate_claims (id_claims )
197
- now = time ()
198
- if expires_in :
199
- expires_at = now + expires_in
188
+ now_ts = int (datetime .now (tz = utc ).timestamp ())
189
+ session_expires_at = now_ts + settings .OIDC_MIDDLEWARE_SESSION_TIMEOUT_SECONDS
190
+ if access_expires_in :
191
+ access_expires_at = now_ts + access_expires_in
200
192
else :
201
- expires_at = id_claims .get ('exp' , now + settings . OIDC_MIDDLEWARE_SESSION_TIMEOUT_SECONDS )
193
+ access_expires_at = id_claims .get ('exp' , session_expires_at )
202
194
request .session [constants .SESSION_ID_TOKEN ] = id_token
203
195
request .session [constants .SESSION_ACCESS_TOKEN ] = access_token
204
- request .session [constants .SESSION_ACCESS_EXPIRES_AT ] = expires_at
205
- request .session [constants .SESSION_EXPIRES_AT ] = now + settings . OIDC_MIDDLEWARE_SESSION_TIMEOUT_SECONDS
196
+ request .session [constants .SESSION_ACCESS_EXPIRES_AT ] = access_expires_at
197
+ request .session [constants .SESSION_EXPIRES_AT ] = session_expires_at
206
198
if refresh_token :
207
199
request .session [constants .SESSION_REFRESH_TOKEN ] = refresh_token
208
200
user = self .get_or_create_user (request , id_claims , access_token )
209
201
return user
210
202
except Exception as e :
211
- warning (e , str (e ))
203
+ warning (str (e ), exc_info = True )
212
204
return None
213
205
finally :
214
206
# be sure the session is in sync
215
207
request .session .save ()
216
208
217
209
218
210
class BearerAuthenticationBackend (ModelBackend , AuthenticationMixin , OIDCUrlsMixin ):
219
- def __init__ (self , * args , ** kwargs ):
211
+ def __init__ (self , * args , ** kwargs ) -> None :
220
212
super ().__init__ (* args , ** kwargs )
221
213
self .authorization_prefix = settings .OIDC_AUTHORIZATION_HEADER_PREFIX
222
214
self .oidc_urls = self .get_oidc_urls ({})
223
215
224
216
def authenticate (self , request : HttpRequest , username : Optional [str ] = None , password : Optional [str ] = None , ** kwargs ) -> Optional [AbstractBaseUser ]:
225
217
"""Authenticates users using the Authorization header and previous OIDC Id Token."""
226
- try :
227
- prefix , id_token = request .headers .get ('Authorization' , ' ' ).split (' ' , 1 )
228
- except ValueError :
229
- prefix = id_token = ''
218
+ auth_header = request .headers .get ('Authorization' , '' )
219
+ prefix , id_token = auth_header .split (' ' , 1 ) if ' ' in auth_header else ('' , '' )
230
220
if not prefix or not id_token :
231
221
return None
232
222
try :
233
223
if prefix != self .authorization_prefix :
234
224
raise SuspiciousOperation (f"Authorization should start with a { self .authorization_prefix } prefix" )
235
225
if BlacklistedToken .is_blacklisted (id_token ):
236
226
raise SuspiciousOperation (f"token { id_token } is blacklisted" )
237
- id_claims = self .validate_and_decode_id_token (id_token , nonce = None , jwks = self .oidc_urls .get (constants .SESSION_OP_JWKS , [] ))
227
+ id_claims = self .validate_and_decode_id_token (id_token , nonce = None , jwks = self .oidc_urls .get (constants .SESSION_OP_JWKS , {} ))
238
228
user = self .get_or_create_user (request , id_claims , '' )
239
229
return user
240
230
except Exception as e :
241
- warning (e . args [ 0 ] )
231
+ warning (str ( e ), exc_info = True )
242
232
return None
0 commit comments