8
8
def client ():
9
9
app = FastAPI ()
10
10
11
+ @app .get ('/all-token' )
12
+ def all_token (Authorize : AuthJWT = Depends ()):
13
+ access_token = Authorize .create_access_token (subject = 1 ,fresh = True )
14
+ refresh_token = Authorize .create_refresh_token (subject = 1 )
15
+ Authorize .set_access_cookies (access_token )
16
+ Authorize .set_refresh_cookies (refresh_token )
17
+ return {"msg" :"all token" }
18
+
19
+ @app .get ('/unset-all-token' )
20
+ def unset_all_token (Authorize : AuthJWT = Depends ()):
21
+ Authorize .unset_jwt_cookies ()
22
+ return {"msg" :"unset all token" }
23
+
11
24
@app .websocket ('/jwt-required' )
12
25
async def websocket_jwt_required (
13
26
websocket : WebSocket ,
@@ -22,6 +35,20 @@ async def websocket_jwt_required(
22
35
await websocket .send_text (err .message )
23
36
await websocket .close ()
24
37
38
+ @app .websocket ('/jwt-required-cookies' )
39
+ async def websocket_jwt_required_cookies (
40
+ websocket : WebSocket ,
41
+ csrf_token : str = Query (...),
42
+ Authorize : AuthJWT = Depends ()
43
+ ):
44
+ await websocket .accept ()
45
+ try :
46
+ Authorize .jwt_required ("websocket" ,websocket = websocket ,csrf_token = csrf_token )
47
+ await websocket .send_text ("Successfully Login!" )
48
+ except AuthJWTException as err :
49
+ await websocket .send_text (err .message )
50
+ await websocket .close ()
51
+
25
52
@app .websocket ('/jwt-optional' )
26
53
async def websocket_jwt_optional (
27
54
websocket : WebSocket ,
@@ -39,6 +66,23 @@ async def websocket_jwt_optional(
39
66
await websocket .send_text (err .message )
40
67
await websocket .close ()
41
68
69
+ @app .websocket ('/jwt-optional-cookies' )
70
+ async def websocket_jwt_optional_cookies (
71
+ websocket : WebSocket ,
72
+ csrf_token : str = Query (...),
73
+ Authorize : AuthJWT = Depends ()
74
+ ):
75
+ await websocket .accept ()
76
+ try :
77
+ Authorize .jwt_optional ("websocket" ,websocket = websocket ,csrf_token = csrf_token )
78
+ decoded_token = Authorize .get_raw_jwt ()
79
+ if decoded_token :
80
+ await websocket .send_text ("hello world" )
81
+ await websocket .send_text ("hello anonym" )
82
+ except AuthJWTException as err :
83
+ await websocket .send_text (err .message )
84
+ await websocket .close ()
85
+
42
86
@app .websocket ('/jwt-refresh-required' )
43
87
async def websocket_jwt_refresh_required (
44
88
websocket : WebSocket ,
@@ -53,6 +97,20 @@ async def websocket_jwt_refresh_required(
53
97
await websocket .send_text (err .message )
54
98
await websocket .close ()
55
99
100
+ @app .websocket ('/jwt-refresh-required-cookies' )
101
+ async def websocket_jwt_refresh_required_cookies (
102
+ websocket : WebSocket ,
103
+ csrf_token : str = Query (...),
104
+ Authorize : AuthJWT = Depends ()
105
+ ):
106
+ await websocket .accept ()
107
+ try :
108
+ Authorize .jwt_refresh_token_required ("websocket" ,websocket = websocket ,csrf_token = csrf_token )
109
+ await websocket .send_text ("Successfully Login!" )
110
+ except AuthJWTException as err :
111
+ await websocket .send_text (err .message )
112
+ await websocket .close ()
113
+
56
114
@app .websocket ('/fresh-jwt-required' )
57
115
async def websocket_fresh_jwt_required (
58
116
websocket : WebSocket ,
@@ -67,6 +125,20 @@ async def websocket_fresh_jwt_required(
67
125
await websocket .send_text (err .message )
68
126
await websocket .close ()
69
127
128
+ @app .websocket ('/fresh-jwt-required-cookies' )
129
+ async def websocket_fresh_jwt_required_cookies (
130
+ websocket : WebSocket ,
131
+ csrf_token : str = Query (...),
132
+ Authorize : AuthJWT = Depends ()
133
+ ):
134
+ await websocket .accept ()
135
+ try :
136
+ Authorize .fresh_jwt_required ("websocket" ,websocket = websocket ,csrf_token = csrf_token )
137
+ await websocket .send_text ("Successfully Login!" )
138
+ except AuthJWTException as err :
139
+ await websocket .send_text (err .message )
140
+ await websocket .close ()
141
+
70
142
client = TestClient (app )
71
143
return client
72
144
@@ -128,3 +200,156 @@ def test_fresh_jwt_required_websocket(client,Authorize):
128
200
with client .websocket_connect (url + f"?token={ token } " ) as websocket :
129
201
data = websocket .receive_text ()
130
202
assert data == "Successfully Login!"
203
+
204
+ # ========= COOKIES ========
205
+
206
+ def test_invalid_instance_websocket (Authorize ):
207
+ with pytest .raises (TypeError ,match = r"request" ):
208
+ Authorize .jwt_required ("websocket" ,websocket = "test" )
209
+ with pytest .raises (TypeError ,match = r"request" ):
210
+ Authorize .jwt_optional ("websocket" ,websocket = "test" )
211
+ with pytest .raises (TypeError ,match = r"request" ):
212
+ Authorize .jwt_refresh_token_required ("websocket" ,websocket = "test" )
213
+ with pytest .raises (TypeError ,match = r"request" ):
214
+ Authorize .fresh_jwt_required ("websocket" ,websocket = "test" )
215
+
216
+ @pytest .mark .parametrize ("url" ,["/jwt-required-cookies" ,"/jwt-refresh-required-cookies" ,"/fresh-jwt-required-cookies" ])
217
+ def test_missing_cookie (url ,client ):
218
+ cookie_key = "access_token_cookie" if url != "/jwt-refresh-required-cookies" else "refresh_token_cookie"
219
+ with client .websocket_connect (url + "?csrf_token=" ) as websocket :
220
+ data = websocket .receive_text ()
221
+ assert data == f"Missing cookie { cookie_key } "
222
+
223
+ @pytest .mark .parametrize ("url" ,[
224
+ "/jwt-required-cookies" ,
225
+ "/jwt-refresh-required-cookies" ,
226
+ "/fresh-jwt-required-cookies" ,
227
+ "/jwt-optional-cookies"
228
+ ])
229
+ def test_missing_csrf_token (url ,client ):
230
+ @AuthJWT .load_config
231
+ def get_cookie_location ():
232
+ return [("authjwt_token_location" ,{'cookies' }),("authjwt_secret_key" ,"secret" )]
233
+
234
+ # required and optional
235
+ client .get ('/all-token' )
236
+
237
+ with client .websocket_connect (url + "?csrf_token=" ) as websocket :
238
+ data = websocket .receive_text ()
239
+ assert data == "Missing CSRF Token"
240
+
241
+ client .get ('/unset-all-token' )
242
+
243
+ # disable csrf protection
244
+ @AuthJWT .load_config
245
+ def change_request_csrf_protect_to_false ():
246
+ return [
247
+ ("authjwt_token_location" ,{'cookies' }),
248
+ ("authjwt_secret_key" ,"secret" ),
249
+ ("authjwt_cookie_csrf_protect" ,False )
250
+ ]
251
+
252
+ client .get ('/all-token' )
253
+
254
+ msg = "hello world" if url == "/jwt-optional-cookies" else "Successfully Login!"
255
+ with client .websocket_connect (url + "?csrf_token=" ) as websocket :
256
+ data = websocket .receive_text ()
257
+ assert data == msg
258
+
259
+ @pytest .mark .parametrize ("url" ,[
260
+ "/jwt-required-cookies" ,
261
+ "/jwt-refresh-required-cookies" ,
262
+ "/fresh-jwt-required-cookies" ,
263
+ "/jwt-optional-cookies"
264
+ ])
265
+ def test_missing_claim_csrf_in_token (url ,client ):
266
+ # required and optional
267
+ @AuthJWT .load_config
268
+ def change_request_csrf_protect_to_false ():
269
+ return [
270
+ ("authjwt_token_location" ,{'cookies' }),
271
+ ("authjwt_secret_key" ,"secret" ),
272
+ ("authjwt_cookie_csrf_protect" ,False )
273
+ ]
274
+
275
+ client .get ('/all-token' )
276
+
277
+ @AuthJWT .load_config
278
+ def change_request_csrf_protect_to_true ():
279
+ return [("authjwt_token_location" ,{'cookies' }),("authjwt_secret_key" ,"secret" )]
280
+
281
+ with client .websocket_connect (url + "?csrf_token=test" ) as websocket :
282
+ data = websocket .receive_text ()
283
+ assert data == "Missing claim: csrf"
284
+
285
+ # disable csrf protection
286
+ @AuthJWT .load_config
287
+ def change_request_csrf_protect_to_false_again ():
288
+ return [
289
+ ("authjwt_token_location" ,{'cookies' }),
290
+ ("authjwt_secret_key" ,"secret" ),
291
+ ("authjwt_cookie_csrf_protect" ,False )
292
+ ]
293
+
294
+ msg = "hello world" if url == "/jwt-optional-cookies" else "Successfully Login!"
295
+ with client .websocket_connect (url + "?csrf_token=test" ) as websocket :
296
+ data = websocket .receive_text ()
297
+ assert data == msg
298
+
299
+ @pytest .mark .parametrize ("url" ,[
300
+ "/jwt-required-cookies" ,
301
+ "/jwt-refresh-required-cookies" ,
302
+ "/fresh-jwt-required-cookies" ,
303
+ "/jwt-optional-cookies"
304
+ ])
305
+ def test_invalid_csrf_double_submit (url ,client ):
306
+ # required and optional
307
+ @AuthJWT .load_config
308
+ def get_cookie_location ():
309
+ return [("authjwt_token_location" ,{'cookies' }),("authjwt_secret_key" ,"secret" )]
310
+
311
+ client .get ('/all-token' )
312
+
313
+ with client .websocket_connect (url + "?csrf_token=test" ) as websocket :
314
+ data = websocket .receive_text ()
315
+ assert data == "CSRF double submit tokens do not match"
316
+
317
+ # disable csrf protection
318
+ @AuthJWT .load_config
319
+ def change_request_csrf_protect_to_false ():
320
+ return [
321
+ ("authjwt_token_location" ,{'cookies' }),
322
+ ("authjwt_secret_key" ,"secret" ),
323
+ ("authjwt_cookie_csrf_protect" ,False )
324
+ ]
325
+
326
+ msg = "hello world" if url == "/jwt-optional-cookies" else "Successfully Login!"
327
+ with client .websocket_connect (url + "?csrf_token=test" ) as websocket :
328
+ data = websocket .receive_text ()
329
+ assert data == msg
330
+
331
+ @pytest .mark .parametrize ("url" ,[
332
+ "/jwt-required-cookies" ,
333
+ "/jwt-refresh-required-cookies" ,
334
+ "/fresh-jwt-required-cookies" ,
335
+ "/jwt-optional-cookies"
336
+ ])
337
+ def test_valid_access_endpoint_with_csrf (url ,client ):
338
+ # required and optional
339
+ @AuthJWT .load_config
340
+ def get_cookie_location ():
341
+ return [("authjwt_token_location" ,{'cookies' }),("authjwt_secret_key" ,"secret" )]
342
+
343
+ res = client .get ('/all-token' )
344
+ csrf_access = res .cookies .get ("csrf_access_token" )
345
+ csrf_refresh = res .cookies .get ("csrf_refresh_token" )
346
+
347
+ if url == "/jwt-refresh-required-cookies" :
348
+ with client .websocket_connect (url + f"?csrf_token={ csrf_refresh } " ) as websocket :
349
+ data = websocket .receive_text ()
350
+ assert data == "Successfully Login!"
351
+ else :
352
+ msg = "hello world" if url == "/jwt-optional-cookies" else "Successfully Login!"
353
+ with client .websocket_connect (url + f"?csrf_token={ csrf_access } " ) as websocket :
354
+ data = websocket .receive_text ()
355
+ assert data == msg
0 commit comments