34
34
TokenServiceFactory ,
35
35
database_login_factory ,
36
36
)
37
- from warehouse .errors import BasicAuthBreachedPassword , BasicAuthFailedPassword
37
+ from warehouse .errors import (
38
+ BasicAuthBreachedPassword ,
39
+ BasicAuthFailedPassword ,
40
+ BasicAuthTwoFactorEnabled ,
41
+ )
38
42
from warehouse .events .tags import EventTag
39
43
from warehouse .oidc .models import OIDCPublisher
40
44
from warehouse .rate_limiting import IRateLimiter , RateLimit
@@ -76,6 +80,7 @@ def test_with_invalid_password(self, pyramid_request, pyramid_services):
76
80
lambda userid , password , tags = None : False
77
81
),
78
82
is_disabled = pretend .call_recorder (lambda user_id : (False , None )),
83
+ has_two_factor = pretend .call_recorder (lambda uid : False ),
79
84
)
80
85
pyramid_services .register_service (service , IUserService , None )
81
86
pyramid_services .register_service (
@@ -212,6 +217,7 @@ def test_with_valid_password(self, monkeypatch, pyramid_request, pyramid_service
212
217
),
213
218
update_user = pretend .call_recorder (lambda userid , last_login : None ),
214
219
is_disabled = pretend .call_recorder (lambda user_id : (False , None )),
220
+ has_two_factor = pretend .call_recorder (lambda uid : False ),
215
221
)
216
222
breach_service = pretend .stub (
217
223
check_password = pretend .call_recorder (lambda pw , tags = None : False )
@@ -232,6 +238,7 @@ def test_with_valid_password(self, monkeypatch, pyramid_request, pyramid_service
232
238
assert service .find_userid .calls == [pretend .call ("myuser" )]
233
239
assert service .get_user .calls == [pretend .call (2 )]
234
240
assert service .is_disabled .calls == [pretend .call (2 )]
241
+ assert service .has_two_factor .calls == [pretend .call (2 )]
235
242
assert service .check_password .calls == [
236
243
pretend .call (
237
244
2 ,
@@ -252,6 +259,52 @@ def test_with_valid_password(self, monkeypatch, pyramid_request, pyramid_service
252
259
)
253
260
]
254
261
262
+ def test_via_basic_auth_2fa_enforced (
263
+ self , monkeypatch , pyramid_request , pyramid_services
264
+ ):
265
+ send_email = pretend .call_recorder (lambda * a , ** kw : None )
266
+
267
+ user = pretend .stub (id = 2 , username = "multiFactor" )
268
+ service = pretend .stub (
269
+ get_user = pretend .call_recorder (lambda user_id : user ),
270
+ find_userid = pretend .call_recorder (lambda username : 2 ),
271
+ check_password = pretend .call_recorder (
272
+ lambda userid , password , tags = None : True
273
+ ),
274
+ is_disabled = pretend .call_recorder (lambda user_id : (False , None )),
275
+ disable_password = pretend .call_recorder (
276
+ lambda user_id , request , reason = None : None
277
+ ),
278
+ has_two_factor = pretend .call_recorder (lambda uid : True ),
279
+ )
280
+ breach_service = pretend .stub (
281
+ check_password = pretend .call_recorder (lambda pw , tags = None : False ),
282
+ )
283
+
284
+ pyramid_services .register_service (service , IUserService , None )
285
+ pyramid_services .register_service (
286
+ breach_service , IPasswordBreachedService , None
287
+ )
288
+
289
+ pyramid_request .matched_route = pretend .stub (name = "forklift.legacy.file_upload" )
290
+
291
+ with pytest .raises (BasicAuthTwoFactorEnabled ) as excinfo :
292
+ _basic_auth_check ("multiFactor" , "mypass" , pyramid_request )
293
+
294
+ assert excinfo .value .status == (
295
+ "401 User multiFactor has two factor auth enabled, "
296
+ "an API Token or Trusted Publisher must be used "
297
+ "to upload in place of password."
298
+ )
299
+ assert service .find_userid .calls == [pretend .call ("multiFactor" )]
300
+ assert service .get_user .calls == [pretend .call (2 )]
301
+ assert service .is_disabled .calls == [pretend .call (2 )]
302
+ assert service .has_two_factor .calls == [pretend .call (2 )]
303
+ assert service .check_password .calls == []
304
+ assert breach_service .check_password .calls == []
305
+ assert service .disable_password .calls == []
306
+ assert send_email .calls == []
307
+
255
308
def test_via_basic_auth_compromised (
256
309
self , monkeypatch , pyramid_request , pyramid_services
257
310
):
@@ -271,6 +324,7 @@ def test_via_basic_auth_compromised(
271
324
disable_password = pretend .call_recorder (
272
325
lambda user_id , request , reason = None : None
273
326
),
327
+ has_two_factor = pretend .call_recorder (lambda uid : False ),
274
328
)
275
329
breach_service = pretend .stub (
276
330
check_password = pretend .call_recorder (lambda pw , tags = None : True ),
0 commit comments