diff --git a/tests/unit/accounts/test_core.py b/tests/unit/accounts/test_core.py index 18bec5f54009..9cafdaba4261 100644 --- a/tests/unit/accounts/test_core.py +++ b/tests/unit/accounts/test_core.py @@ -46,16 +46,6 @@ class TestLogin: - def test_invalid_route(self, pyramid_request, pyramid_services): - service = pretend.stub(find_userid=pretend.call_recorder(lambda username: None)) - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - pretend.stub(), IPasswordBreachedService, None - ) - pyramid_request.matched_route = pretend.stub(name="route_name") - assert _basic_auth_check("myuser", "mypass", pyramid_request) is False - assert service.find_userid.calls == [] - def test_with_no_user(self, pyramid_request, pyramid_services): service = pretend.stub(find_userid=pretend.call_recorder(lambda username: None)) pyramid_services.register_service(service, IUserService, None) @@ -390,14 +380,8 @@ def test_unauthenticated_userid(self): def test_includeme(monkeypatch): - authz_obj = pretend.stub() - authz_cls = pretend.call_recorder(lambda *a, **kw: authz_obj) - monkeypatch.setattr(accounts, "ACLAuthorizationPolicy", authz_cls) - monkeypatch.setattr(accounts, "MacaroonAuthorizationPolicy", authz_cls) - monkeypatch.setattr(accounts, "TwoFactorAuthorizationPolicy", authz_cls) - multi_policy_obj = pretend.stub() - multi_policy_cls = pretend.call_recorder(lambda ps, authz: multi_policy_obj) + multi_policy_cls = pretend.call_recorder(lambda ps: multi_policy_obj) monkeypatch.setattr(accounts, "MultiSecurityPolicy", multi_policy_cls) session_policy_obj = pretend.stub() @@ -474,6 +458,10 @@ def test_includeme(monkeypatch): assert config.set_security_policy.calls == [pretend.call(multi_policy_obj)] assert multi_policy_cls.calls == [ pretend.call( - [session_policy_obj, basic_policy_obj, macaroon_policy_obj], authz_obj + [ + session_policy_obj, + basic_policy_obj, + macaroon_policy_obj, + ] ) ] diff --git a/tests/unit/accounts/test_models.py b/tests/unit/accounts/test_models.py index 5223818539a8..515a49048ed2 100644 --- a/tests/unit/accounts/test_models.py +++ b/tests/unit/accounts/test_models.py @@ -11,10 +11,14 @@ # limitations under the License. import datetime +import uuid import pytest +from pyramid.authorization import Authenticated + from warehouse.accounts.models import Email, RecoveryCode, User, UserFactory +from warehouse.utils.security_policy import principals_for from ...common.db.accounts import ( EmailFactory as DBEmailFactory, @@ -162,3 +166,62 @@ def test_acl(self, db_session): ("Allow", "group:admins", "admin"), ("Allow", "group:moderators", "moderator"), ] + + @pytest.mark.parametrize( + ( + "is_superuser", + "is_moderator", + "is_psf_staff", + "expected", + ), + [ + (False, False, False, []), + ( + True, + False, + False, + ["group:admins", "group:moderators", "group:psf_staff"], + ), + ( + False, + True, + False, + ["group:moderators"], + ), + ( + True, + True, + False, + ["group:admins", "group:moderators", "group:psf_staff"], + ), + ( + False, + False, + True, + ["group:psf_staff"], + ), + ( + False, + True, + True, + ["group:moderators", "group:psf_staff"], + ), + ], + ) + def test_principals( + self, + is_superuser, + is_moderator, + is_psf_staff, + expected, + ): + user = User( + id=uuid.uuid4(), + is_superuser=is_superuser, + is_moderator=is_moderator, + is_psf_staff=is_psf_staff, + ) + + expected = expected[:] + [f"user:{user.id}", Authenticated] + + assert set(principals_for(user)) == set(expected) diff --git a/tests/unit/accounts/test_security_policy.py b/tests/unit/accounts/test_security_policy.py index 969a33c726e8..a280ac2c5dcf 100644 --- a/tests/unit/accounts/test_security_policy.py +++ b/tests/unit/accounts/test_security_policy.py @@ -14,17 +14,14 @@ import pretend import pytest -from pyramid.interfaces import IAuthorizationPolicy, ISecurityPolicy -from pyramid.security import Allowed, Denied +from pyramid.authorization import Allow +from pyramid.interfaces import ISecurityPolicy from zope.interface.verify import verifyClass from warehouse.accounts import security_policy from warehouse.accounts.interfaces import IUserService -from warehouse.errors import WarehouseDenied from warehouse.utils.security_policy import AuthenticationMethod -from ...common.db.packaging import ProjectFactory - class TestBasicAuthSecurityPolicy: def test_verify(self): @@ -37,8 +34,6 @@ def test_noops(self): policy = security_policy.BasicAuthSecurityPolicy() with pytest.raises(NotImplementedError): policy.authenticated_userid(pretend.stub()) - with pytest.raises(NotImplementedError): - policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) def test_forget_and_remember(self): policy = security_policy.BasicAuthSecurityPolicy() @@ -63,6 +58,7 @@ def test_identity_no_credentials(self, monkeypatch): monkeypatch.setattr(security_policy, "add_vary_callback", add_vary_cb) request = pretend.stub( + is_api=True, add_response_callback=pretend.call_recorder(lambda cb: None), banned=pretend.stub(by_ip=lambda ip_address: False), remote_addr="1.2.3.4", @@ -92,6 +88,7 @@ def test_identity_credentials_fail(self, monkeypatch): monkeypatch.setattr(security_policy, "add_vary_callback", add_vary_cb) request = pretend.stub( + is_api=True, add_response_callback=pretend.call_recorder(lambda cb: None), banned=pretend.stub(by_ip=lambda ip_address: False), remote_addr="1.2.3.4", @@ -103,33 +100,11 @@ def test_identity_credentials_fail(self, monkeypatch): assert add_vary_cb.calls == [pretend.call("Authorization")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] - @pytest.mark.parametrize( - "fake_request", - [ - pretend.stub( - matched_route=None, - banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", - ), - pretend.stub( - matched_route=pretend.stub(name="an.invalid.route"), - banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", - ), - ], - ) - def test_invalid_request_fail(self, monkeypatch, fake_request): - creds = (pretend.stub(), pretend.stub()) - extract_http_basic_credentials = pretend.call_recorder(lambda request: creds) - monkeypatch.setattr( - security_policy, - "extract_http_basic_credentials", - extract_http_basic_credentials, - ) + def test_not_api_request_fail(self): + request = pretend.stub(is_api=False) policy = security_policy.BasicAuthSecurityPolicy() - fake_request.add_response_callback = pretend.call_recorder(lambda cb: None) - assert policy.identity(fake_request) is None + assert policy.identity(request) is None def test_identity(self, monkeypatch): creds = (pretend.stub(), pretend.stub()) @@ -154,6 +129,7 @@ def test_identity(self, monkeypatch): get_user_by_username=pretend.call_recorder(lambda u: user) ) request = pretend.stub( + is_api=True, add_response_callback=pretend.call_recorder(lambda cb: None), find_service=pretend.call_recorder(lambda a, **kw: user_service), banned=pretend.stub(by_ip=lambda ip_address: False), @@ -193,6 +169,7 @@ def test_identityi_ip_banned(self, monkeypatch): get_user_by_username=pretend.call_recorder(lambda u: user) ) request = pretend.stub( + is_api=True, add_response_callback=pretend.call_recorder(lambda cb: None), find_service=pretend.call_recorder(lambda a, **kw: user_service), banned=pretend.stub(by_ip=lambda ip_address: True), @@ -221,8 +198,6 @@ def test_noops(self): policy = security_policy.SessionSecurityPolicy() with pytest.raises(NotImplementedError): policy.authenticated_userid(pretend.stub()) - with pytest.raises(NotImplementedError): - policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) def test_forget_and_remember(self, monkeypatch): request = pretend.stub() @@ -249,59 +224,11 @@ def test_forget_and_remember(self, monkeypatch): pretend.call(request, userid, foo=None) ] - def test_identity_missing_route(self, monkeypatch): - session_helper_obj = pretend.stub() - session_helper_cls = pretend.call_recorder(lambda: session_helper_obj) - monkeypatch.setattr( - security_policy, "SessionAuthenticationHelper", session_helper_cls - ) - - policy = security_policy.SessionSecurityPolicy() - - vary_cb = pretend.stub() - add_vary_cb = pretend.call_recorder(lambda *v: vary_cb) - monkeypatch.setattr(security_policy, "add_vary_callback", add_vary_cb) - - request = pretend.stub( - add_response_callback=pretend.call_recorder(lambda cb: None), - matched_route=None, - banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", - ) - - assert policy.identity(request) is None - assert request.authentication_method == AuthenticationMethod.SESSION - assert session_helper_cls.calls == [pretend.call()] - - assert add_vary_cb.calls == [pretend.call("Cookie")] - assert request.add_response_callback.calls == [pretend.call(vary_cb)] - - def test_identity_invalid_route(self, monkeypatch): - session_helper_obj = pretend.stub() - session_helper_cls = pretend.call_recorder(lambda: session_helper_obj) - monkeypatch.setattr( - security_policy, "SessionAuthenticationHelper", session_helper_cls - ) - + def test_identity_api_route_fail(self): policy = security_policy.SessionSecurityPolicy() - - vary_cb = pretend.stub() - add_vary_cb = pretend.call_recorder(lambda *v: vary_cb) - monkeypatch.setattr(security_policy, "add_vary_callback", add_vary_cb) - - request = pretend.stub( - add_response_callback=pretend.call_recorder(lambda cb: None), - matched_route=pretend.stub(name="forklift.legacy.file_upload"), - banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", - ) + request = pretend.stub(is_api=True) assert policy.identity(request) is None - assert request.authentication_method == AuthenticationMethod.SESSION - assert session_helper_cls.calls == [pretend.call()] - - assert add_vary_cb.calls == [pretend.call("Cookie")] - assert request.add_response_callback.calls == [pretend.call(vary_cb)] def test_identity_no_userid(self, monkeypatch): session_helper_obj = pretend.stub( @@ -319,6 +246,7 @@ def test_identity_no_userid(self, monkeypatch): monkeypatch.setattr(security_policy, "add_vary_callback", add_vary_cb) request = pretend.stub( + is_api=False, add_response_callback=pretend.call_recorder(lambda cb: None), matched_route=pretend.stub(name="a.permitted.route"), banned=pretend.stub(by_ip=lambda ip_address: False), @@ -351,6 +279,7 @@ def test_identity_no_user(self, monkeypatch): user_service = pretend.stub(get_user=pretend.call_recorder(lambda uid: None)) request = pretend.stub( + is_api=False, add_response_callback=pretend.call_recorder(lambda cb: None), matched_route=pretend.stub(name="a.permitted.route"), find_service=pretend.call_recorder(lambda i, **kw: user_service), @@ -391,6 +320,7 @@ def test_identity_password_outdated(self, monkeypatch): get_password_timestamp=pretend.call_recorder(lambda uid: timestamp), ) request = pretend.stub( + is_api=False, add_response_callback=pretend.call_recorder(lambda cb: None), matched_route=pretend.stub(name="a.permitted.route"), find_service=pretend.call_recorder(lambda i, **kw: user_service), @@ -442,6 +372,7 @@ def test_identity(self, monkeypatch): get_password_timestamp=pretend.call_recorder(lambda uid: timestamp), ) request = pretend.stub( + is_api=False, add_response_callback=pretend.call_recorder(lambda cb: None), matched_route=pretend.stub(name="a.permitted.route"), find_service=pretend.call_recorder(lambda i, **kw: user_service), @@ -487,6 +418,7 @@ def test_identity_ip_banned(self, monkeypatch): get_password_timestamp=pretend.call_recorder(lambda uid: timestamp), ) request = pretend.stub( + is_api=False, add_response_callback=pretend.call_recorder(lambda cb: None), matched_route=pretend.stub(name="a.permitted.route"), find_service=pretend.call_recorder(lambda i, **kw: user_service), @@ -510,205 +442,134 @@ def test_identity_ip_banned(self, monkeypatch): assert request.add_response_callback.calls == [pretend.call(vary_cb)] -class TestTwoFactorAuthorizationPolicy: - def test_verify(self): - assert verifyClass( - IAuthorizationPolicy, security_policy.TwoFactorAuthorizationPolicy - ) - - def test_permits_no_active_request(self, monkeypatch): - get_current_request = pretend.call_recorder(lambda: None) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: pretend.stub()) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) - - assert result == WarehouseDenied("") - assert result.s == "There was no active request." - - def test_permits_if_context_is_not_permitted_by_backing_policy(self, monkeypatch): - request = pretend.stub() - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - permits_result = Denied("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) - - assert result == permits_result - - def test_permits_if_non_2fa_requireable_context(self, monkeypatch): - request = pretend.stub() - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) +@pytest.mark.parametrize( + "policy_class", + [security_policy.BasicAuthSecurityPolicy, security_policy.SessionSecurityPolicy], +) +class TestPermits: + @pytest.mark.parametrize( + "principals,expected", [("user:5", True), ("user:1", False)] + ) + def test_acl(self, monkeypatch, policy_class, principals, expected): + monkeypatch.setattr(security_policy, "User", pretend.stub) - permits_result = Allowed("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) + request = pretend.stub(identity=pretend.stub(__principals__=lambda: principals)) + context = pretend.stub(__acl__=[(Allow, "user:5", "myperm")]) - assert result == permits_result + policy = policy_class() + assert bool(policy.permits(request, context, "myperm")) == expected - def test_permits_if_context_does_not_require_2fa(self, monkeypatch, db_request): - db_request.user = pretend.stub() - db_request.registry.settings = { - "warehouse.two_factor_mandate.enabled": True, - "warehouse.two_factor_mandate.available": True, - "warehouse.two_factor_requirement.enabled": True, - } - get_current_request = pretend.call_recorder(lambda: db_request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) + @pytest.mark.parametrize( + "mfa_required,has_mfa,expected", + [ + (True, True, True), + (False, True, True), + (True, False, False), + (False, False, True), + ], + ) + def test_2fa_owner_requires( + self, monkeypatch, policy_class, mfa_required, has_mfa, expected + ): + monkeypatch.setattr(security_policy, "User", pretend.stub) + monkeypatch.setattr(security_policy, "TwoFactorRequireable", pretend.stub) - permits_result = Allowed("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) + request = pretend.stub( + identity=pretend.stub( + __principals__=lambda: ["user:5"], has_two_factor=has_mfa + ), + registry=pretend.stub( + settings={ + "warehouse.two_factor_requirement.enabled": True, + "warehouse.two_factor_mandate.enabled": False, + "warehouse.two_factor_mandate.available": False, + } + ), ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - context = ProjectFactory.create( - owners_require_2fa=False, - pypi_mandates_2fa=False, + context = pretend.stub( + __acl__=[(Allow, "user:5", "myperm")], owners_require_2fa=mfa_required ) - result = policy.permits(context, pretend.stub(), pretend.stub()) - - assert result == permits_result - def test_flashes_if_context_requires_2fa_but_not_enabled( - self, monkeypatch, db_request - ): - db_request.registry.settings = { - "warehouse.two_factor_mandate.enabled": False, - "warehouse.two_factor_mandate.available": True, - "warehouse.two_factor_requirement.enabled": True, - } - db_request.session.flash = pretend.call_recorder(lambda m, queue: None) - db_request.user = pretend.stub(has_two_factor=False) - get_current_request = pretend.call_recorder(lambda: db_request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - permits_result = Allowed("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - context = ProjectFactory.create( - owners_require_2fa=False, - pypi_mandates_2fa=True, - ) - result = policy.permits(context, pretend.stub(), pretend.stub()) - - assert result == permits_result - assert db_request.session.flash.calls == [ - pretend.call( - "This project is included in PyPI's two-factor mandate " - "for critical projects. In the future, you will be unable to " - "perform this action without enabling 2FA for your account", - queue="warning", - ), - ] + policy = policy_class() + assert bool(policy.permits(request, context, "myperm")) == expected - @pytest.mark.parametrize("owners_require_2fa", [True, False]) - @pytest.mark.parametrize("pypi_mandates_2fa", [True, False]) - @pytest.mark.parametrize("two_factor_requirement_enabled", [True, False]) - @pytest.mark.parametrize("two_factor_mandate_available", [True, False]) - @pytest.mark.parametrize("two_factor_mandate_enabled", [True, False]) - def test_permits_if_user_has_2fa( - self, - monkeypatch, - owners_require_2fa, - pypi_mandates_2fa, - two_factor_requirement_enabled, - two_factor_mandate_available, - two_factor_mandate_enabled, - db_request, + @pytest.mark.parametrize( + "mfa_required,has_mfa,expected", + [ + (True, True, True), + (False, True, True), + (True, False, False), + (False, False, True), + ], + ) + def test_2fa_pypi_mandates_2fa( + self, monkeypatch, policy_class, mfa_required, has_mfa, expected ): - db_request.registry.settings = { - "warehouse.two_factor_requirement.enabled": two_factor_requirement_enabled, - "warehouse.two_factor_mandate.available": two_factor_mandate_available, - "warehouse.two_factor_mandate.enabled": two_factor_mandate_enabled, - } - user = pretend.stub(has_two_factor=True) - db_request.user = user - get_current_request = pretend.call_recorder(lambda: db_request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) + monkeypatch.setattr(security_policy, "User", pretend.stub) + monkeypatch.setattr(security_policy, "TwoFactorRequireable", pretend.stub) - permits_result = Allowed("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) + request = pretend.stub( + identity=pretend.stub( + __principals__=lambda: ["user:5"], has_two_factor=has_mfa + ), + registry=pretend.stub( + settings={ + "warehouse.two_factor_requirement.enabled": False, + "warehouse.two_factor_mandate.enabled": True, + "warehouse.two_factor_mandate.available": False, + } + ), ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - context = ProjectFactory.create( - owners_require_2fa=owners_require_2fa, pypi_mandates_2fa=pypi_mandates_2fa + context = pretend.stub( + __acl__=[(Allow, "user:5", "myperm")], pypi_mandates_2fa=mfa_required ) - result = policy.permits(context, pretend.stub(), pretend.stub()) - assert result == permits_result + policy = policy_class() + assert bool(policy.permits(request, context, "myperm")) == expected @pytest.mark.parametrize( - "owners_require_2fa, pypi_mandates_2fa, reason", + "mfa_required,has_mfa,expected", [ - (True, False, "owners_require_2fa"), - (False, True, "pypi_mandates_2fa"), - (True, True, "pypi_mandates_2fa"), + (True, True, True), + (False, True, True), + (True, False, False), + (False, False, True), ], ) - def test_denies_if_2fa_is_required_but_user_doesnt_have_2fa( - self, - monkeypatch, - owners_require_2fa, - pypi_mandates_2fa, - reason, - db_request, + def test_2fa_pypi_mandates_2fa_with_warning( + self, monkeypatch, policy_class, mfa_required, has_mfa, expected ): - db_request.registry.settings = { - "warehouse.two_factor_requirement.enabled": owners_require_2fa, - "warehouse.two_factor_mandate.enabled": pypi_mandates_2fa, - } - user = pretend.stub(has_two_factor=False) - db_request.user = user - get_current_request = pretend.call_recorder(lambda: db_request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - permits_result = Allowed("Because") - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits_result) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - context = ProjectFactory.create( - owners_require_2fa=owners_require_2fa, pypi_mandates_2fa=pypi_mandates_2fa - ) - result = policy.permits(context, pretend.stub(), pretend.stub()) - - summary = { - "owners_require_2fa": ( - "This project requires two factor authentication to be enabled " - "for all contributors.", + monkeypatch.setattr(security_policy, "User", pretend.stub) + monkeypatch.setattr(security_policy, "TwoFactorRequireable", pretend.stub) + + request = pretend.stub( + identity=pretend.stub( + __principals__=lambda: ["user:5"], has_two_factor=has_mfa ), - "pypi_mandates_2fa": ( - "PyPI requires two factor authentication to be enabled " - "for all contributors to this project.", + registry=pretend.stub( + settings={ + "warehouse.two_factor_requirement.enabled": False, + "warehouse.two_factor_mandate.enabled": False, + "warehouse.two_factor_mandate.available": True, + } ), - }[reason] - - assert result == WarehouseDenied(summary, reason="two_factor_required") - - def test_principals_allowed_by_permission(self): - principals = pretend.stub() - backing_policy = pretend.stub( - principals_allowed_by_permission=pretend.call_recorder( - lambda *a: principals - ) - ) - policy = security_policy.TwoFactorAuthorizationPolicy(policy=backing_policy) - - assert ( - policy.principals_allowed_by_permission(pretend.stub(), pretend.stub()) - is principals - ) + session=pretend.stub(flash=pretend.call_recorder(lambda msg, queue: None)), + ) + context = pretend.stub( + __acl__=[(Allow, "user:5", "myperm")], pypi_mandates_2fa=mfa_required + ) + + policy = policy_class() + assert bool(policy.permits(request, context, "myperm")) + + if not expected: + assert request.session.flash.calls == [ + pretend.call( + "This project is included in PyPI's two-factor mandate " + "for critical projects. In the future, you will be unable to " + "perform this action without enabling 2FA for your account", + queue="warning", + ) + ] + else: + assert request.session.flash.calls == [] diff --git a/tests/unit/accounts/test_views.py b/tests/unit/accounts/test_views.py index bac26aafba9e..69d5982f8c89 100644 --- a/tests/unit/accounts/test_views.py +++ b/tests/unit/accounts/test_views.py @@ -401,6 +401,13 @@ def test_post_validate_no_redirects( ) pyramid_request.session.record_password_timestamp = lambda timestamp: None + security_policy = pretend.stub( + authenticated_userid=lambda r: None, + remember=lambda r, u, **kw: [], + reset=pretend.call_recorder(lambda r: None), + ) + pyramid_request.registry.queryUtility = lambda iface: security_policy + form_obj = pretend.stub( validate=pretend.call_recorder(lambda: True), username=pretend.stub(data="theuser"), @@ -421,6 +428,7 @@ def test_post_validate_no_redirects( ) ] assert pyramid_request.session.record_auth_timestamp.calls == [pretend.call()] + assert security_policy.reset.calls == [pretend.call(pyramid_request)] def test_redirect_authenticated_user(self): pyramid_request = pretend.stub(authenticated_userid=1) @@ -1330,6 +1338,11 @@ def test_post_forgets_user(self, monkeypatch, pyramid_request): invalidate=pretend.call_recorder(lambda: None) ) + security_policy = pretend.stub( + reset=pretend.call_recorder(lambda r: None), + ) + pyramid_request.registry.queryUtility = lambda iface: security_policy + result = views.logout(pyramid_request) assert isinstance(result, HTTPSeeOther) @@ -1337,6 +1350,7 @@ def test_post_forgets_user(self, monkeypatch, pyramid_request): assert result.headers["foo"] == "bar" assert forget.calls == [pretend.call(pyramid_request)] assert pyramid_request.session.invalidate.calls == [pretend.call()] + assert security_policy.reset.calls == [pretend.call(pyramid_request)] @pytest.mark.parametrize( # The set of all possible next URLs. Since this set is infinite, we diff --git a/tests/unit/admin/test_routes.py b/tests/unit/admin/test_routes.py index a2d00612e2de..323fe48b00c3 100644 --- a/tests/unit/admin/test_routes.py +++ b/tests/unit/admin/test_routes.py @@ -83,6 +83,13 @@ def test_includeme(): factory="warehouse.accounts.models:UserFactory", traverse="/{username}", ), + pretend.call( + "admin.user.wipe_factors", + "/admin/users/{username}/wipe_factors/", + domain=warehouse, + factory="warehouse.accounts.models:UserFactory", + traverse="/{username}", + ), pretend.call( "admin.prohibited_user_names.bulk_add", "/admin/prohibited_user_names/bulk/", diff --git a/tests/unit/admin/views/test_users.py b/tests/unit/admin/views/test_users.py index 41315cc74c68..f35bc00cecde 100644 --- a/tests/unit/admin/views/test_users.py +++ b/tests/unit/admin/views/test_users.py @@ -18,7 +18,12 @@ from webob.multidict import MultiDict, NoVars from warehouse.accounts.interfaces import IEmailBreachedService, IUserService -from warehouse.accounts.models import DisableReason, ProhibitedUserName +from warehouse.accounts.models import ( + DisableReason, + ProhibitedUserName, + RecoveryCode, + WebAuthn, +) from warehouse.admin.views import users as views from warehouse.packaging.models import JournalEntry, Project @@ -456,6 +461,100 @@ def test_user_reset_password_redirects_actual_name(self, db_request): ] +class TestUserWipeFactors: + def test_wipes_factors(self, db_request, monkeypatch): + user = UserFactory.create( + totp_secret=b"aaaaabbbbbcccccddddd", + webauthn=[ + WebAuthn( + label="fake", credential_id="fake", public_key="extremely fake" + ) + ], + recovery_codes=[ + RecoveryCode(code="fake"), + ], + ) + + assert user.totp_secret is not None + assert len(user.webauthn) == 1 + assert len(user.recovery_codes.all()) == 1 + + db_request.matchdict["username"] = str(user.username) + db_request.params = {"username": user.username} + db_request.route_path = pretend.call_recorder(lambda *a, **kw: "/foobar") + db_request.user = user + service = pretend.stub( + find_userid=pretend.call_recorder(lambda username: user.username), + disable_password=pretend.call_recorder( + lambda userid, request, reason: None + ), + ) + db_request.find_service = pretend.call_recorder(lambda iface, context: service) + + send_email = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "send_password_compromised_email", send_email) + + result = views.user_wipe_factors(user, db_request) + + assert user.totp_secret is None + assert len(user.webauthn) == 0 + assert len(user.recovery_codes.all()) == 0 + assert db_request.find_service.calls == [ + pretend.call(IUserService, context=None) + ] + assert send_email.calls == [pretend.call(db_request, user)] + assert service.disable_password.calls == [ + pretend.call(user.id, db_request, reason=DisableReason.CompromisedPassword) + ] + assert db_request.route_path.calls == [ + pretend.call("admin.user.detail", username=user.username) + ] + assert result.status_code == 303 + assert result.location == "/foobar" + + def test_wipes_factors_bad_confirm(self, db_request, monkeypatch): + user = UserFactory.create() + + db_request.matchdict["username"] = str(user.username) + db_request.params = {"username": "wrong"} + db_request.route_path = pretend.call_recorder(lambda *a, **kw: "/foobar") + db_request.user = UserFactory.create() + service = pretend.stub( + find_userid=pretend.call_recorder(lambda username: user.username), + disable_password=pretend.call_recorder(lambda userid, reason: None), + ) + db_request.find_service = pretend.call_recorder(lambda iface, context: service) + + send_email = pretend.call_recorder(lambda *a, **kw: None) + monkeypatch.setattr(views, "send_password_compromised_email", send_email) + + result = views.user_wipe_factors(user, db_request) + + assert db_request.find_service.calls == [] + assert send_email.calls == [] + assert service.disable_password.calls == [] + assert db_request.route_path.calls == [ + pretend.call("admin.user.detail", username=user.username) + ] + assert result.status_code == 303 + assert result.location == "/foobar" + + def test_user_wipe_factors_redirects_actual_name(self, db_request): + user = UserFactory.create(username="wu-tang") + db_request.matchdict["username"] = "Wu-Tang" + db_request.current_route_path = pretend.call_recorder( + lambda username: "/user/the-redirect/" + ) + + result = views.user_wipe_factors(user, db_request) + + assert isinstance(result, HTTPMovedPermanently) + assert result.headers["Location"] == "/user/the-redirect/" + assert db_request.current_route_path.calls == [ + pretend.call(username=user.username) + ] + + class TestBulkAddProhibitedUserName: def test_get(self): request = pretend.stub(method="GET") diff --git a/tests/unit/forklift/test_init.py b/tests/unit/forklift/test_init.py index 280a7d65baae..d472021deacd 100644 --- a/tests/unit/forklift/test_init.py +++ b/tests/unit/forklift/test_init.py @@ -39,7 +39,10 @@ def test_includeme(forklift_domain, monkeypatch): assert config.include.calls == [pretend.call(".action_routing")] assert config.add_legacy_action_route.calls == [ pretend.call( - "forklift.legacy.file_upload", "file_upload", domain=forklift_domain + "forklift.legacy.file_upload", + "file_upload", + domain=forklift_domain, + is_api=True, ), pretend.call("forklift.legacy.submit", "submit", domain=forklift_domain), pretend.call( diff --git a/tests/unit/macaroons/test_security_policy.py b/tests/unit/macaroons/test_security_policy.py index eb4e0b08e815..a1eb34d0fbb5 100644 --- a/tests/unit/macaroons/test_security_policy.py +++ b/tests/unit/macaroons/test_security_policy.py @@ -14,7 +14,8 @@ import pretend import pytest -from pyramid.interfaces import IAuthorizationPolicy, ISecurityPolicy +from pyramid.authorization import Allow +from pyramid.interfaces import ISecurityPolicy from pyramid.security import Denied from zope.interface.verify import verifyClass @@ -68,8 +69,6 @@ def test_noops(self): policy = security_policy.MacaroonSecurityPolicy() with pytest.raises(NotImplementedError): policy.authenticated_userid(pretend.stub()) - with pytest.raises(NotImplementedError): - policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) def test_forget_and_remember(self): policy = security_policy.MacaroonSecurityPolicy() @@ -77,6 +76,12 @@ def test_forget_and_remember(self): assert policy.forget(pretend.stub()) == [] assert policy.remember(pretend.stub(), pretend.stub()) == [] + def test_identity_no_api_fail(self): + request = pretend.stub(is_api=False) + policy = security_policy.MacaroonSecurityPolicy() + + assert policy.identity(request) is None + def test_identity_no_http_macaroon(self, monkeypatch): policy = security_policy.MacaroonSecurityPolicy() @@ -90,7 +95,8 @@ def test_identity_no_http_macaroon(self, monkeypatch): ) request = pretend.stub( - add_response_callback=pretend.call_recorder(lambda cb: None) + is_api=True, + add_response_callback=pretend.call_recorder(lambda cb: None), ) assert policy.identity(request) is None @@ -117,6 +123,7 @@ def test_identity_no_db_macaroon(self, monkeypatch): ) request = pretend.stub( + is_api=True, add_response_callback=pretend.call_recorder(lambda cb: None), find_service=pretend.call_recorder(lambda iface, **kw: macaroon_service), ) @@ -151,6 +158,7 @@ def test_identity_user(self, monkeypatch): ) request = pretend.stub( + is_api=True, add_response_callback=pretend.call_recorder(lambda cb: None), find_service=pretend.call_recorder(lambda iface, **kw: macaroon_service), ) @@ -188,6 +196,7 @@ def test_identity_oidc_publisher(self, monkeypatch): ) request = pretend.stub( + is_api=True, add_response_callback=pretend.call_recorder(lambda cb: None), find_service=pretend.call_recorder(lambda iface, **kw: macaroon_service), ) @@ -208,45 +217,6 @@ def test_identity_oidc_publisher(self, monkeypatch): assert add_vary_cb.calls == [pretend.call("Authorization")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] - -class TestMacaroonAuthorizationPolicy: - def test_verify(self): - assert verifyClass( - IAuthorizationPolicy, security_policy.MacaroonAuthorizationPolicy - ) - - def test_permits_no_active_request(self, monkeypatch): - get_current_request = pretend.call_recorder(lambda: None) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: pretend.stub()) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) - - assert result == Denied("") - assert result.s == "There was no active request." - - def test_permits_no_macaroon(self, monkeypatch): - request = pretend.stub() - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - _extract_http_macaroon = pretend.call_recorder(lambda r: None) - monkeypatch.setattr( - security_policy, "_extract_http_macaroon", _extract_http_macaroon - ) - - permits = pretend.stub() - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) - - assert result == permits - def test_permits_invalid_macaroon(self, monkeypatch): macaroon_service = pretend.stub( verify=pretend.raiser(InvalidMacaroonError("foo")) @@ -254,91 +224,58 @@ def test_permits_invalid_macaroon(self, monkeypatch): request = pretend.stub( find_service=pretend.call_recorder(lambda interface, **kw: macaroon_service) ) - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - _extract_http_macaroon = pretend.call_recorder(lambda r: b"not a real macaroon") + _extract_http_macaroon = pretend.call_recorder(lambda r: "not a real macaroon") monkeypatch.setattr( security_policy, "_extract_http_macaroon", _extract_http_macaroon ) - permits = pretend.stub() - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) + policy = security_policy.MacaroonSecurityPolicy() + result = policy.permits(request, pretend.stub(), "upload") assert result == Denied("") assert result.s == "Invalid API Token: foo" - def test_permits_valid_macaroon(self, monkeypatch): + @pytest.mark.parametrize( + "principals,expected", [(["user:5"], True), (["user:1"], False)] + ) + def test_permits_valid_macaroon(self, monkeypatch, principals, expected): macaroon_service = pretend.stub( verify=pretend.call_recorder(lambda *a: pretend.stub()) ) request = pretend.stub( - find_service=pretend.call_recorder(lambda interface, **kw: macaroon_service) + identity=pretend.stub(__principals__=lambda: principals), + find_service=pretend.call_recorder( + lambda interface, **kw: macaroon_service + ), ) - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - _extract_http_macaroon = pretend.call_recorder(lambda r: b"not a real macaroon") + _extract_http_macaroon = pretend.call_recorder(lambda r: "not a real macaroon") monkeypatch.setattr( security_policy, "_extract_http_macaroon", _extract_http_macaroon ) - permits = pretend.stub() - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) - result = policy.permits(pretend.stub(), pretend.stub(), "upload") + context = pretend.stub(__acl__=[(Allow, "user:5", ["upload"])]) + + policy = security_policy.MacaroonSecurityPolicy() + result = policy.permits(request, context, "upload") - assert result == permits + assert bool(result) == expected @pytest.mark.parametrize( "invalid_permission", - ["admin", "moderator", "manage:user", "manage:project", "nonexistant"], + ["admin", "moderator", "manage:user", "manage:project", "nonexistent"], ) def test_denies_valid_macaroon_for_incorrect_permission( self, monkeypatch, invalid_permission ): - macaroon_service = pretend.stub( - verify=pretend.call_recorder(lambda *a: pretend.stub()) - ) - request = pretend.stub( - find_service=pretend.call_recorder(lambda interface, **kw: macaroon_service) - ) - get_current_request = pretend.call_recorder(lambda: request) - monkeypatch.setattr(security_policy, "get_current_request", get_current_request) - - _extract_http_macaroon = pretend.call_recorder(lambda r: b"not a real macaroon") + _extract_http_macaroon = pretend.call_recorder(lambda r: "not a real macaroon") monkeypatch.setattr( security_policy, "_extract_http_macaroon", _extract_http_macaroon ) - permits = pretend.stub() - backing_policy = pretend.stub( - permits=pretend.call_recorder(lambda *a, **kw: permits) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) + policy = security_policy.MacaroonSecurityPolicy() result = policy.permits(pretend.stub(), pretend.stub(), invalid_permission) assert result == Denied("") assert result.s == ( f"API tokens are not valid for permission: {invalid_permission}!" ) - - def test_principals_allowed_by_permission(self): - principals = pretend.stub() - backing_policy = pretend.stub( - principals_allowed_by_permission=pretend.call_recorder( - lambda *a: principals - ) - ) - policy = security_policy.MacaroonAuthorizationPolicy(policy=backing_policy) - - assert ( - policy.principals_allowed_by_permission(pretend.stub(), pretend.stub()) - is principals - ) diff --git a/tests/unit/oidc/test_utils.py b/tests/unit/oidc/test_utils.py index 769ee4ae11bc..496806c68c43 100644 --- a/tests/unit/oidc/test_utils.py +++ b/tests/unit/oidc/test_utils.py @@ -15,8 +15,11 @@ import pretend import pytest +from pyramid.authorization import Authenticated + from tests.common.db.oidc import GitHubPublisherFactory from warehouse.oidc import utils +from warehouse.utils.security_policy import principals_for def test_find_publisher_by_issuer_bad_issuer_url(): @@ -71,3 +74,12 @@ def test_find_publisher_by_issuer_github(db_request, environment, expected_id): ).id == expected_id ) + + +def test_oidc_context_principals(): + assert principals_for( + utils.OIDCContext(publisher=pretend.stub(id=17), claims=None) + ) == [ + Authenticated, + "oidc:17", + ] diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index c65ddbf6b3a1..a0f01434396b 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -470,13 +470,12 @@ def __init__(self): def test_root_factory_access_control_list(): acl = config.RootFactory.__acl__ - assert len(acl) == 5 - assert acl[0] == (Allow, "group:admins", "admin") - assert acl[1] == (Allow, "group:moderators", "moderator") - assert acl[2] == (Allow, "group:psf_staff", "psf_staff") - assert acl[3] == ( - Allow, - "group:with_admin_dashboard_access", - "admin_dashboard_access", - ) - assert acl[4] == (Allow, Authenticated, "manage:user") + assert acl == [ + (Allow, "group:admins", "admin"), + (Allow, "group:admins", "admin_dashboard_access"), + (Allow, "group:moderators", "moderator"), + (Allow, "group:moderators", "admin_dashboard_access"), + (Allow, "group:psf_staff", "psf_staff"), + (Allow, "group:psf_staff", "admin_dashboard_access"), + (Allow, Authenticated, "manage:user"), + ] diff --git a/tests/unit/test_predicates.py b/tests/unit/test_predicates.py index 56764feb1832..70ee40d5dfa9 100644 --- a/tests/unit/test_predicates.py +++ b/tests/unit/test_predicates.py @@ -19,8 +19,10 @@ from warehouse.organizations.models import OrganizationType from warehouse.predicates import ( ActiveOrganizationPredicate, + APIPredicate, DomainPredicate, HeadersPredicate, + _is_api_route, includeme, ) from warehouse.subscriptions.models import StripeSubscriptionStatus @@ -56,6 +58,39 @@ def test_invalid_value(self): assert not predicate(None, pretend.stub(domain="pypi.io")) +class TestAPIPredicate: + @pytest.mark.parametrize( + ("value", "expected"), [(True, "is_api = True"), (False, "is_api = False")] + ) + def test_text(self, value, expected): + pred = APIPredicate(value, None) + assert pred.text() == expected + assert pred.phash() == expected + + @pytest.mark.parametrize("value", [True, False, None]) + def test_always_allows(self, value): + pred = APIPredicate(value, None) + assert pred(None, None) + + def test_request_no_matched_route(self): + assert not _is_api_route(pretend.stub(matched_route=None)) + + def test_request_matched_route_no_pred(self): + request = pretend.stub(matched_route=pretend.stub(predicates=[])) + assert not _is_api_route(request) + + def test_request_matched_route_no_api_pred(self): + request = pretend.stub(matched_route=pretend.stub(predicates=[pretend.stub()])) + assert not _is_api_route(request) + + @pytest.mark.parametrize(("value", "expected"), [(True, True), (False, False)]) + def test_request_matched_route_with_api_pred(self, value, expected): + request = pretend.stub( + matched_route=pretend.stub(predicates=[APIPredicate(value, None)]) + ) + assert _is_api_route(request) == expected + + class TestHeadersPredicate: @pytest.mark.parametrize( ("value", "expected"), @@ -189,12 +224,20 @@ def test_active_subscription( def test_includeme(): config = pretend.stub( + add_request_method=pretend.call_recorder(lambda fn, name, reify: None), add_route_predicate=pretend.call_recorder(lambda name, pred: None), add_view_predicate=pretend.call_recorder(lambda name, pred: None), ) includeme(config) - assert config.add_route_predicate.calls == [pretend.call("domain", DomainPredicate)] + assert config.add_request_method.calls == [ + pretend.call(_is_api_route, name="is_api", reify=True) + ] + + assert config.add_route_predicate.calls == [ + pretend.call("domain", DomainPredicate), + pretend.call("is_api", APIPredicate), + ] assert config.add_view_predicate.calls == [ pretend.call("require_headers", HeadersPredicate), diff --git a/tests/unit/utils/test_security_policy.py b/tests/unit/utils/test_security_policy.py index 38e7d287c899..0c1c1b0683af 100644 --- a/tests/unit/utils/test_security_policy.py +++ b/tests/unit/utils/test_security_policy.py @@ -11,256 +11,144 @@ # limitations under the License. import pretend -import pytest -from pyramid.authorization import Authenticated -from pyramid.security import Denied +from pyramid.interfaces import ISecurityPolicy +from zope.interface.verify import verifyClass -from warehouse.oidc.utils import OIDCContext from warehouse.utils import security_policy -from ...common.db.accounts import UserFactory -from ...common.db.oidc import GitHubPublisherFactory - - -@pytest.mark.parametrize( - ( - "is_superuser", - "is_moderator", - "is_psf_staff", - "expected", - ), - [ - (False, False, False, []), - ( - True, - False, - False, - [ - "group:admins", - "group:moderators", - "group:psf_staff", - "group:with_admin_dashboard_access", - ], - ), - ( - False, - True, - False, - ["group:moderators", "group:with_admin_dashboard_access"], - ), - ( - True, - True, - False, - [ - "group:admins", - "group:moderators", - "group:psf_staff", - "group:with_admin_dashboard_access", - ], - ), - ( - False, - False, - True, - ["group:psf_staff", "group:with_admin_dashboard_access"], - ), - ( - False, - True, - True, - [ - "group:moderators", - "group:psf_staff", - "group:with_admin_dashboard_access", - ], - ), - ], -) -def test_principals_for_authenticated_user( - is_superuser, - is_moderator, - is_psf_staff, - expected, -): - user = pretend.stub( - id=1, - is_superuser=is_superuser, - is_moderator=is_moderator, - is_psf_staff=is_psf_staff, - ) - assert security_policy._principals_for_authenticated_user(user) == expected + +def test_principals_for(): + identity = pretend.stub(__principals__=lambda: ["a", "b", "z"]) + assert security_policy.principals_for(identity) == ["a", "b", "z"] + + +def test_principals_for_with_none(): + assert security_policy.principals_for(pretend.stub()) == [] class TestMultiSecurityPolicy: - def test_initializes(self): - subpolicies = pretend.stub() - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + def test_verify(self): + assert verifyClass( + ISecurityPolicy, + security_policy.MultiSecurityPolicy, + ) + + def test_reset(self): + identity1 = pretend.stub() + identity2 = pretend.stub() + identities = iter([identity1, identity2]) + + subpolicies = [pretend.stub(identity=lambda r: next(identities))] + policy = security_policy.MultiSecurityPolicy(subpolicies) + + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) + + assert policy.identity(request) is identity1 + assert policy.identity(request) is identity1 + + policy.reset(request) - assert policy._policies is subpolicies - assert policy._authz is authz + assert policy.identity(request) is identity2 def test_identity_none(self): - subpolicies = [pretend.stub(identity=pretend.call_recorder(lambda r: None))] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + subpolicies = [pretend.stub(identity=lambda r: None)] + policy = security_policy.MultiSecurityPolicy(subpolicies) - request = pretend.stub() + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) assert policy.identity(request) is None - for p in subpolicies: - assert p.identity.calls == [pretend.call(request)] def test_identity_first_come_first_serve(self): identity1 = pretend.stub() identity2 = pretend.stub() subpolicies = [ - pretend.stub(identity=pretend.call_recorder(lambda r: None)), - pretend.stub(identity=pretend.call_recorder(lambda r: identity1)), - pretend.stub(identity=pretend.call_recorder(lambda r: identity2)), + pretend.stub(identity=lambda r: None), + pretend.stub(identity=lambda r: identity1), + pretend.stub(identity=lambda r: identity2), ] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + policy = security_policy.MultiSecurityPolicy(subpolicies) - request = pretend.stub() + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) assert policy.identity(request) is identity1 - assert subpolicies[0].identity.calls == [pretend.call(request)] - assert subpolicies[1].identity.calls == [pretend.call(request)] - assert subpolicies[2].identity.calls == [] def test_authenticated_userid_no_identity(self): - request = pretend.stub() - subpolicies = [pretend.stub(identity=pretend.call_recorder(lambda r: None))] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) + subpolicies = [pretend.stub(identity=lambda r: None)] + policy = security_policy.MultiSecurityPolicy(subpolicies) assert policy.authenticated_userid(request) is None - assert subpolicies[0].identity.calls == [pretend.call(request)] def test_authenticated_userid_nonuser_identity(self, db_request): - request = pretend.stub() + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) nonuser = pretend.stub(id="not-a-user-instance") - subpolicies = [pretend.stub(identity=pretend.call_recorder(lambda r: nonuser))] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + subpolicies = [pretend.stub(identity=lambda r: nonuser)] + policy = security_policy.MultiSecurityPolicy(subpolicies) assert policy.authenticated_userid(request) is None - assert subpolicies[0].identity.calls == [pretend.call(request)] - def test_authenticated_userid(self, db_request): - request = pretend.stub() - user = UserFactory.create() - subpolicies = [pretend.stub(identity=pretend.call_recorder(lambda r: user))] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + def test_authenticated_userid(self, monkeypatch): + monkeypatch.setattr(security_policy, "User", pretend.stub) + + request = pretend.stub(add_finished_callback=lambda *a, **kw: None) + user = pretend.stub(id="a fake user") + subpolicies = [pretend.stub(identity=lambda r: user)] + policy = security_policy.MultiSecurityPolicy(subpolicies) assert policy.authenticated_userid(request) == str(user.id) - assert subpolicies[0].identity.calls == [pretend.call(request)] def test_forget(self): - header = pretend.stub() - subpolicies = [ - pretend.stub(forget=pretend.call_recorder(lambda r, **kw: [header])) - ] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + subpolicies = [pretend.stub(forget=lambda r, **kw: [("ForgetMe", "1")])] + policy = security_policy.MultiSecurityPolicy(subpolicies) request = pretend.stub() - assert policy.forget(request, foo=None) == [header] - assert subpolicies[0].forget.calls == [pretend.call(request, foo=None)] + assert policy.forget(request, foo=None) == [("ForgetMe", "1")] def test_remember(self): - header = pretend.stub() subpolicies = [ - pretend.stub(remember=pretend.call_recorder(lambda r, uid, **kw: [header])) + pretend.stub(remember=lambda r, uid, foo, **kw: [("RememberMe", foo)]) ] - authz = pretend.stub() - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + policy = security_policy.MultiSecurityPolicy(subpolicies) request = pretend.stub() userid = pretend.stub() - assert policy.remember(request, userid, foo=None) == [header] - assert subpolicies[0].remember.calls == [ - pretend.call(request, userid, foo=None) - ] - - def test_permits_user(self, db_request, monkeypatch): - subpolicies = pretend.stub() - status = pretend.stub() - authz = pretend.stub(permits=pretend.call_recorder(lambda *a: status)) - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) - - principals_for_authenticated_user = pretend.call_recorder( - lambda *a: ["some:principal"] - ) - monkeypatch.setattr( - security_policy, - "_principals_for_authenticated_user", - principals_for_authenticated_user, - ) + assert policy.remember(request, userid, foo="bob") == [("RememberMe", "bob")] - user = UserFactory.create() - request = pretend.stub(identity=user) + def test_permits(self): + identity1 = pretend.stub() + identity2 = pretend.stub() context = pretend.stub() - permission = pretend.stub() - assert policy.permits(request, context, permission) is status - assert authz.permits.calls == [ - pretend.call( - context, - [Authenticated, f"user:{user.id}", "some:principal"], - permission, - ) - ] - - def test_permits_oidc_publisher(self, db_request): - subpolicies = pretend.stub() - status = pretend.stub() - authz = pretend.stub(permits=pretend.call_recorder(lambda *a: status)) - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) - publisher = GitHubPublisherFactory.create() - request = pretend.stub(identity=OIDCContext(publisher, None)) - context = pretend.stub() - permission = pretend.stub() - assert policy.permits(request, context, permission) is status - assert authz.permits.calls == [ - pretend.call( - context, - [Authenticated, f"oidc:{publisher.id}"], - permission, - ) + subpolicies = [ + pretend.stub(identity=lambda r: None), + pretend.stub( + identity=lambda r: identity1, + permits=( + lambda r, c, p: r.identity == identity1 + and c == context + and p == "myperm" + ), + ), + pretend.stub(identity=lambda r: identity2), ] + policy = security_policy.MultiSecurityPolicy(subpolicies) - def test_permits_nonuser_denied(self): - subpolicies = pretend.stub() - authz = pretend.stub(permits=pretend.call_recorder(lambda *a: pretend.stub())) - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) - - # Anything that doesn't pass an isinstance check for User - fakeuser = pretend.stub() - request = pretend.stub(identity=fakeuser) - context = pretend.stub() - permission = pretend.stub() - assert policy.permits(request, context, permission) == Denied("unimplemented") - assert authz.permits.calls == [] + request = pretend.stub( + identity=identity1, + add_finished_callback=lambda *a, **kw: None, + ) - def test_permits_no_identity(self): - subpolicies = pretend.stub() - status = pretend.stub() - authz = pretend.stub(permits=pretend.call_recorder(lambda *a: status)) - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) + assert policy.permits(request, context, "myperm") - request = pretend.stub(identity=None) + def test_permits_no_policy(self): + subpolicies = [ + pretend.stub(identity=lambda r: None), + pretend.stub(identity=lambda r: None), + pretend.stub(identity=lambda r: None), + ] + policy = security_policy.MultiSecurityPolicy(subpolicies) + request = pretend.stub( + identity=None, add_finished_callback=lambda *a, **kw: None + ) context = pretend.stub() - permission = pretend.stub() - assert policy.permits(request, context, permission) is status - assert authz.permits.calls == [pretend.call(context, [], permission)] - - def test_cant_use_unauthenticated_userid(self): - subpolicies = pretend.stub() - authz = pretend.stub(permits=pretend.call_recorder(lambda *a: pretend.stub())) - policy = security_policy.MultiSecurityPolicy(subpolicies, authz) - with pytest.raises(NotImplementedError): - policy.unauthenticated_userid(pretend.stub()) + assert not policy.permits(request, context, "myperm") diff --git a/warehouse/accounts/__init__.py b/warehouse/accounts/__init__.py index 05cc2e97518c..9667b3a064c9 100644 --- a/warehouse/accounts/__init__.py +++ b/warehouse/accounts/__init__.py @@ -10,8 +10,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pyramid.authorization import ACLAuthorizationPolicy - from warehouse.accounts.interfaces import ( IEmailBreachedService, IPasswordBreachedService, @@ -22,7 +20,6 @@ from warehouse.accounts.security_policy import ( BasicAuthSecurityPolicy, SessionSecurityPolicy, - TwoFactorAuthorizationPolicy, ) from warehouse.accounts.services import ( HaveIBeenPwnedEmailBreachedService, @@ -33,10 +30,7 @@ database_login_factory, ) from warehouse.admin.flags import AdminFlagValue -from warehouse.macaroons.security_policy import ( - MacaroonAuthorizationPolicy, - MacaroonSecurityPolicy, -) +from warehouse.macaroons.security_policy import MacaroonSecurityPolicy from warehouse.oidc.utils import OIDCContext from warehouse.organizations.services import IOrganizationService from warehouse.rate_limiting import IRateLimiter, RateLimit @@ -127,10 +121,7 @@ def includeme(config): breached_email_class.create_service, IEmailBreachedService ) - # Register our security policies (AuthN + AuthZ) - authz_policy = TwoFactorAuthorizationPolicy( - policy=MacaroonAuthorizationPolicy(policy=ACLAuthorizationPolicy()) - ) + # Register our security policies. config.set_security_policy( MultiSecurityPolicy( [ @@ -138,7 +129,6 @@ def includeme(config): BasicAuthSecurityPolicy(), MacaroonSecurityPolicy(), ], - authz_policy, ) ) diff --git a/warehouse/accounts/models.py b/warehouse/accounts/models.py index 08c464bd1950..5aad274d43f5 100644 --- a/warehouse/accounts/models.py +++ b/warehouse/accounts/models.py @@ -14,7 +14,7 @@ import enum from citext import CIText -from pyramid.authorization import Allow +from pyramid.authorization import Allow, Authenticated from sqlalchemy import ( Boolean, CheckConstraint, @@ -192,6 +192,18 @@ def can_reset_password(self): ] ) + def __principals__(self) -> list[str]: + principals = [Authenticated, f"user:{self.id}"] + + if self.is_superuser: + principals.append("group:admins") + if self.is_moderator or self.is_superuser: + principals.append("group:moderators") + if self.is_psf_staff or self.is_superuser: + principals.append("group:psf_staff") + + return principals + def __acl__(self): return [ (Allow, "group:admins", "admin"), diff --git a/warehouse/accounts/security_policy.py b/warehouse/accounts/security_policy.py index 16dc2d6775e9..56b95b3f04da 100644 --- a/warehouse/accounts/security_policy.py +++ b/warehouse/accounts/security_policy.py @@ -16,13 +16,14 @@ SessionAuthenticationHelper, extract_http_basic_credentials, ) +from pyramid.authorization import ACLHelper from pyramid.httpexceptions import HTTPUnauthorized -from pyramid.interfaces import IAuthorizationPolicy, ISecurityPolicy -from pyramid.threadlocal import get_current_request +from pyramid.interfaces import ISecurityPolicy +from pyramid.security import Allowed from zope.interface import implementer from warehouse.accounts.interfaces import IPasswordBreachedService, IUserService -from warehouse.accounts.models import DisableReason +from warehouse.accounts.models import DisableReason, User from warehouse.cache.http import add_vary_callback from warehouse.email import send_password_compromised_email_hibp from warehouse.errors import ( @@ -33,7 +34,7 @@ ) from warehouse.events.tags import EventTag from warehouse.packaging.models import TwoFactorRequireable -from warehouse.utils.security_policy import AuthenticationMethod +from warehouse.utils.security_policy import AuthenticationMethod, principals_for def _format_exc_status(exc, message): @@ -42,14 +43,6 @@ def _format_exc_status(exc, message): def _basic_auth_check(username, password, request): - # A route must be matched - if not request.matched_route: - return False - - # Basic authentication can only be used for uploading - if request.matched_route.name != "forklift.legacy.file_upload": - return False - login_service = request.find_service(IUserService, context=None) breach_service = request.find_service(IPasswordBreachedService, context=None) @@ -124,8 +117,15 @@ def _basic_auth_check(username, password, request): class SessionSecurityPolicy: def __init__(self): self._session_helper = SessionAuthenticationHelper() + self._acl = ACLHelper() def identity(self, request): + # If our current request *is* any API request, then we will disallow + # authenticating with sessions, as an API request should only use API + # authentication methods. + if request.is_api: + return None + # If we're calling into this API on a request, then we want to register # a callback which will ensure that the response varies based on the # Cookie header. @@ -135,14 +135,6 @@ def identity(self, request): if request.banned.by_ip(request.remote_addr): return None - # A route must be matched - if not request.matched_route: - return None - - # Session authentication cannot be used for uploading - if request.matched_route.name == "forklift.legacy.file_upload": - return None - userid = self._session_helper.authenticated_userid(request) request._unauthenticated_userid = userid @@ -184,13 +176,24 @@ def authenticated_userid(self, request): raise NotImplementedError def permits(self, request, context, permission): - # Handled by MultiSecurityPolicy - raise NotImplementedError + return _permits_for_user_policy(self._acl, request, context, permission) @implementer(ISecurityPolicy) class BasicAuthSecurityPolicy: + def __init__(self): + self._acl = ACLHelper() + def identity(self, request): + # If our current request isn't an API request, then we'll just quickly skip + # trying to do anything since we only support basic auth on API routes. + # NOTE: Technically we only support it on upload, which is at the moment our + # only API route. We'll end up removing this entirely once we go to 2FA + # only for uploading, so a short term window if we add another API route + # seems fine. + if not request.is_api: + return None + # If we're calling into this API on a request, then we want to register # a callback which will ensure that the response varies based on the # Authorization header. @@ -225,82 +228,77 @@ def authenticated_userid(self, request): raise NotImplementedError def permits(self, request, context, permission): - # Handled by MultiSecurityPolicy - raise NotImplementedError + return _permits_for_user_policy(self._acl, request, context, permission) + +def _permits_for_user_policy(acl, request, context, permission): + # It should only be possible for request.identity to be a User object + # at this point, and we only a User in these policies. + assert isinstance(request.identity, User) -@implementer(IAuthorizationPolicy) -class TwoFactorAuthorizationPolicy: - def __init__(self, policy): - self.policy = policy + # Dispatch to our ACL + # NOTE: These parameters are in a different order than the signature of this method. + res = acl.permits(context, principals_for(request.identity), permission) - def permits(self, context, principals, permission): - # The Pyramid API doesn't let us access the request here, so we have to pull it - # out of the thread local instead. - # TODO: Work with Pyramid devs to figure out if there is a better way to support - # the worklow we are using here or not. - request = get_current_request() + # If our underlying permits allowed this, we will check our 2FA status, + # that might possibly return a reason to deny the request anyways, and if + # it does we'll return that. + if isinstance(res, Allowed): + mfa = _check_for_mfa(request, context) + if mfa is not None: + return mfa - # Our request could possibly be a None, if there isn't an active request, in - # that case we're going to always deny, because without a request, we can't - # determine if this request is authorized or not. - if request is None: + return res + + +def _check_for_mfa(request, context) -> WarehouseDenied | None: + # It should only be possible for request.identity to be a User object + # at this point, and we only a User in these policies. + assert isinstance(request.identity, User) + + # Check if the context is 2FA requireable, if 2FA is indeed required, and if + # the user has 2FA enabled. + if isinstance(context, TwoFactorRequireable): + # Check if we allow owners to require 2FA, and if so does our context owner + # require 2FA? And if so does our user have 2FA? + if ( + request.registry.settings["warehouse.two_factor_requirement.enabled"] + and context.owners_require_2fa + and not request.identity.has_two_factor + ): return WarehouseDenied( - "There was no active request.", reason="no_active_request" + "This project requires two factor authentication to be enabled " + "for all contributors.", + reason="owners_require_2fa", ) - # Check if the subpolicy permits authorization - subpolicy_permits = self.policy.permits(context, principals, permission) - - # If the request is permitted by the subpolicy, check if the context is - # 2FA requireable, if 2FA is indeed required, and if the user has 2FA - # enabled. - # - # We check for `request.user` explicitly because we don't perform - # this check for non-user identities: the only way a non-user - # identity can be created is after a 2FA check on a 2FA-mandated - # project. + # Check if PyPI is enforcing the 2FA mandate on "critical" projects, and if it + # is does our current context require it, and if it does, does our user have + # 2FA? if ( - subpolicy_permits - and isinstance(context, TwoFactorRequireable) - and request.user + request.registry.settings["warehouse.two_factor_mandate.enabled"] + and context.pypi_mandates_2fa + and not request.identity.has_two_factor ): - if ( - request.registry.settings["warehouse.two_factor_requirement.enabled"] - and context.owners_require_2fa - and not request.user.has_two_factor - ): - return WarehouseDenied( - "This project requires two factor authentication to be enabled " - "for all contributors.", - reason="owners_require_2fa", - ) - if ( - request.registry.settings["warehouse.two_factor_mandate.enabled"] - and context.pypi_mandates_2fa - and not request.user.has_two_factor - ): - return WarehouseDenied( - "PyPI requires two factor authentication to be enabled " - "for all contributors to this project.", - reason="pypi_mandates_2fa", - ) - if ( - request.registry.settings["warehouse.two_factor_mandate.available"] - and context.pypi_mandates_2fa - and not request.user.has_two_factor - ): - request.session.flash( - "This project is included in PyPI's two-factor mandate " - "for critical projects. In the future, you will be unable to " - "perform this action without enabling 2FA for your account", - queue="warning", - ) + return WarehouseDenied( + "PyPI requires two factor authentication to be enabled " + "for all contributors to this project.", + reason="pypi_mandates_2fa", + ) - return subpolicy_permits + # Check if PyPI's 2FA mandate is available, but not enforcing, and if it is and + # the current context would require 2FA, and if our user doesn't have have 2FA + # then we'll flash a warning. + if ( + request.registry.settings["warehouse.two_factor_mandate.available"] + and context.pypi_mandates_2fa + and not request.identity.has_two_factor + ): + request.session.flash( + "This project is included in PyPI's two-factor mandate " + "for critical projects. In the future, you will be unable to " + "perform this action without enabling 2FA for your account", + queue="warning", + ) - def principals_allowed_by_permission(self, context, permission): - # We just dispatch this, because this policy doesn't restrict what - # principals are allowed by a particular permission, it just restricts - # specific requests to not have that permission. - return self.policy.principals_allowed_by_permission(context, permission) + return None diff --git a/warehouse/accounts/views.py b/warehouse/accounts/views.py index 207d11608569..d0c35c68fd39 100644 --- a/warehouse/accounts/views.py +++ b/warehouse/accounts/views.py @@ -27,6 +27,7 @@ HTTPTooManyRequests, HTTPUnauthorized, ) +from pyramid.interfaces import ISecurityPolicy from pyramid.security import forget, remember from pyramid.view import view_config, view_defaults from sqlalchemy.exc import NoResultFound @@ -550,6 +551,12 @@ def logout(request, redirect_field_name=REDIRECT_FIELD_NAME): # the session for the original user. request.session.invalidate() + # We've logged the user out, so we want to ensure that we invalid the cache + # for our security policy, if we're using one that can be reset during login + security_policy = request.registry.queryUtility(ISecurityPolicy) + if hasattr(security_policy, "reset"): + security_policy.reset(request) + # Now that we're logged out we'll want to redirect the user to either # where they were originally, or to the default view. resp = HTTPSeeOther(redirect_to, headers=dict(headers)) @@ -1260,6 +1267,12 @@ def _login_user(request, userid, two_factor_method=None, two_factor_label=None): request.session.invalidate() request.session.update(data) + # We've logged the user in, so we want to ensure that we invalid the cache + # for our security policy, if we're using one that can be reset during login + security_policy = request.registry.queryUtility(ISecurityPolicy) + if hasattr(security_policy, "reset"): + security_policy.reset(request) + # Remember the userid using the authentication policy. headers = remember(request, str(userid)) diff --git a/warehouse/admin/routes.py b/warehouse/admin/routes.py index 74d2ee3a83c2..be95d490ea2d 100644 --- a/warehouse/admin/routes.py +++ b/warehouse/admin/routes.py @@ -81,6 +81,13 @@ def includeme(config): factory="warehouse.accounts.models:UserFactory", traverse="/{username}", ) + config.add_route( + "admin.user.wipe_factors", + "/admin/users/{username}/wipe_factors/", + domain=warehouse, + factory="warehouse.accounts.models:UserFactory", + traverse="/{username}", + ) config.add_route( "admin.prohibited_user_names.bulk_add", "/admin/prohibited_user_names/bulk/", diff --git a/warehouse/admin/templates/admin/users/detail.html b/warehouse/admin/templates/admin/users/detail.html index daec0ed68b26..2020480b2ede 100644 --- a/warehouse/admin/templates/admin/users/detail.html +++ b/warehouse/admin/templates/admin/users/detail.html @@ -101,12 +101,15 @@

Actions

- - +
-
+ + diff --git a/warehouse/admin/views/users.py b/warehouse/admin/views/users.py index 82c2577860ec..8721996f6a00 100644 --- a/warehouse/admin/views/users.py +++ b/warehouse/admin/views/users.py @@ -266,6 +266,14 @@ def user_delete(user, request): return HTTPSeeOther(request.route_path("admin.user.list")) +def _user_reset_password(user, request): + login_service = request.find_service(IUserService, context=None) + send_password_compromised_email(request, user) + login_service.disable_password( + user.id, request, reason=DisableReason.CompromisedPassword + ) + + @view_config( route_name="admin.user.reset_password", require_methods=["POST"], @@ -285,16 +293,42 @@ def user_reset_password(user, request): request.route_path("admin.user.detail", username=user.username) ) - login_service = request.find_service(IUserService, context=None) - send_password_compromised_email(request, user) - login_service.disable_password( - user.id, request, reason=DisableReason.CompromisedPassword - ) + _user_reset_password(user, request) request.session.flash(f"Reset password for {user.username!r}", queue="success") return HTTPSeeOther(request.route_path("admin.user.detail", username=user.username)) +@view_config( + route_name="admin.user.wipe_factors", + require_methods=["POST"], + permission="admin", + has_translations=True, + uses_session=True, + require_csrf=True, + context=User, +) +def user_wipe_factors(user, request): + if user.username != request.matchdict.get("username", user.username): + return HTTPMovedPermanently(request.current_route_path(username=user.username)) + + if user.username != request.params.get("username"): + request.session.flash("Wrong confirmation input", queue="error") + return HTTPSeeOther( + request.route_path("admin.user.detail", username=user.username) + ) + + user.totp_secret = None + user.webauthn = [] + user.recovery_codes = [] + _user_reset_password(user, request) + + request.session.flash( + f"Wiped factors and reset password for {user.username!r}", queue="success" + ) + return HTTPSeeOther(request.route_path("admin.user.detail", username=user.username)) + + @view_config( route_name="admin.prohibited_user_names.bulk_add", renderer="admin/prohibited_user_names/bulk.html", diff --git a/warehouse/config.py b/warehouse/config.py index b89a5bf04828..57b734c31462 100644 --- a/warehouse/config.py +++ b/warehouse/config.py @@ -58,9 +58,11 @@ class RootFactory: __acl__ = [ (Allow, "group:admins", "admin"), + (Allow, "group:admins", "admin_dashboard_access"), (Allow, "group:moderators", "moderator"), + (Allow, "group:moderators", "admin_dashboard_access"), (Allow, "group:psf_staff", "psf_staff"), - (Allow, "group:with_admin_dashboard_access", "admin_dashboard_access"), + (Allow, "group:psf_staff", "admin_dashboard_access"), (Allow, Authenticated, "manage:user"), ] diff --git a/warehouse/forklift/__init__.py b/warehouse/forklift/__init__.py index b1b73a653302..abc62988b995 100644 --- a/warehouse/forklift/__init__.py +++ b/warehouse/forklift/__init__.py @@ -31,7 +31,10 @@ def includeme(config): # Add the routes that we'll be using in Forklift. config.add_legacy_action_route( - "forklift.legacy.file_upload", "file_upload", domain=forklift + "forklift.legacy.file_upload", + "file_upload", + domain=forklift, + is_api=True, ) config.add_legacy_action_route("forklift.legacy.submit", "submit", domain=forklift) config.add_legacy_action_route( diff --git a/warehouse/locale/messages.pot b/warehouse/locale/messages.pot index 26c70728cdf1..d8610d6036d2 100644 --- a/warehouse/locale/messages.pot +++ b/warehouse/locale/messages.pot @@ -100,216 +100,216 @@ msgstr "" msgid "No user found with that username or email" msgstr "" -#: warehouse/accounts/views.py:101 +#: warehouse/accounts/views.py:102 msgid "" "There have been too many unsuccessful login attempts. You have been " "locked out for {}. Please try again later." msgstr "" -#: warehouse/accounts/views.py:118 +#: warehouse/accounts/views.py:119 msgid "" "Too many emails have been added to this account without verifying them. " "Check your inbox and follow the verification links. (IP: ${ip})" msgstr "" -#: warehouse/accounts/views.py:130 +#: warehouse/accounts/views.py:131 msgid "" "Too many password resets have been requested for this account without " "completing them. Check your inbox and follow the verification links. (IP:" " ${ip})" msgstr "" -#: warehouse/accounts/views.py:305 warehouse/accounts/views.py:369 -#: warehouse/accounts/views.py:371 warehouse/accounts/views.py:398 -#: warehouse/accounts/views.py:400 warehouse/accounts/views.py:466 +#: warehouse/accounts/views.py:306 warehouse/accounts/views.py:370 +#: warehouse/accounts/views.py:372 warehouse/accounts/views.py:399 +#: warehouse/accounts/views.py:401 warehouse/accounts/views.py:467 msgid "Invalid or expired two factor login." msgstr "" -#: warehouse/accounts/views.py:363 +#: warehouse/accounts/views.py:364 msgid "Already authenticated" msgstr "" -#: warehouse/accounts/views.py:442 +#: warehouse/accounts/views.py:443 msgid "Successful WebAuthn assertion" msgstr "" -#: warehouse/accounts/views.py:498 warehouse/manage/views/__init__.py:816 +#: warehouse/accounts/views.py:499 warehouse/manage/views/__init__.py:816 msgid "Recovery code accepted. The supplied code cannot be used again." msgstr "" -#: warehouse/accounts/views.py:584 +#: warehouse/accounts/views.py:591 msgid "" "New user registration temporarily disabled. See https://pypi.org/help" "#admin-intervention for details." msgstr "" -#: warehouse/accounts/views.py:716 +#: warehouse/accounts/views.py:723 msgid "Expired token: request a new password reset link" msgstr "" -#: warehouse/accounts/views.py:718 +#: warehouse/accounts/views.py:725 msgid "Invalid token: request a new password reset link" msgstr "" -#: warehouse/accounts/views.py:720 warehouse/accounts/views.py:823 -#: warehouse/accounts/views.py:923 warehouse/accounts/views.py:1096 +#: warehouse/accounts/views.py:727 warehouse/accounts/views.py:830 +#: warehouse/accounts/views.py:930 warehouse/accounts/views.py:1103 msgid "Invalid token: no token supplied" msgstr "" -#: warehouse/accounts/views.py:724 +#: warehouse/accounts/views.py:731 msgid "Invalid token: not a password reset token" msgstr "" -#: warehouse/accounts/views.py:729 +#: warehouse/accounts/views.py:736 msgid "Invalid token: user not found" msgstr "" -#: warehouse/accounts/views.py:740 +#: warehouse/accounts/views.py:747 msgid "Invalid token: user has logged in since this token was requested" msgstr "" -#: warehouse/accounts/views.py:758 +#: warehouse/accounts/views.py:765 msgid "" "Invalid token: password has already been changed since this token was " "requested" msgstr "" -#: warehouse/accounts/views.py:791 +#: warehouse/accounts/views.py:798 msgid "You have reset your password" msgstr "" -#: warehouse/accounts/views.py:819 +#: warehouse/accounts/views.py:826 msgid "Expired token: request a new email verification link" msgstr "" -#: warehouse/accounts/views.py:821 +#: warehouse/accounts/views.py:828 msgid "Invalid token: request a new email verification link" msgstr "" -#: warehouse/accounts/views.py:827 +#: warehouse/accounts/views.py:834 msgid "Invalid token: not an email verification token" msgstr "" -#: warehouse/accounts/views.py:836 +#: warehouse/accounts/views.py:843 msgid "Email not found" msgstr "" -#: warehouse/accounts/views.py:839 +#: warehouse/accounts/views.py:846 msgid "Email already verified" msgstr "" -#: warehouse/accounts/views.py:857 +#: warehouse/accounts/views.py:864 msgid "You can now set this email as your primary address" msgstr "" -#: warehouse/accounts/views.py:861 +#: warehouse/accounts/views.py:868 msgid "This is your primary address" msgstr "" -#: warehouse/accounts/views.py:866 +#: warehouse/accounts/views.py:873 msgid "Email address ${email_address} verified. ${confirm_message}." msgstr "" -#: warehouse/accounts/views.py:919 +#: warehouse/accounts/views.py:926 msgid "Expired token: request a new organization invitation" msgstr "" -#: warehouse/accounts/views.py:921 +#: warehouse/accounts/views.py:928 msgid "Invalid token: request a new organization invitation" msgstr "" -#: warehouse/accounts/views.py:927 +#: warehouse/accounts/views.py:934 msgid "Invalid token: not an organization invitation token" msgstr "" -#: warehouse/accounts/views.py:931 +#: warehouse/accounts/views.py:938 msgid "Organization invitation is not valid." msgstr "" -#: warehouse/accounts/views.py:940 +#: warehouse/accounts/views.py:947 msgid "Organization invitation no longer exists." msgstr "" -#: warehouse/accounts/views.py:993 +#: warehouse/accounts/views.py:1000 msgid "Invitation for '${organization_name}' is declined." msgstr "" -#: warehouse/accounts/views.py:1058 +#: warehouse/accounts/views.py:1065 msgid "You are now ${role} of the '${organization_name}' organization." msgstr "" -#: warehouse/accounts/views.py:1092 +#: warehouse/accounts/views.py:1099 msgid "Expired token: request a new project role invitation" msgstr "" -#: warehouse/accounts/views.py:1094 +#: warehouse/accounts/views.py:1101 msgid "Invalid token: request a new project role invitation" msgstr "" -#: warehouse/accounts/views.py:1100 +#: warehouse/accounts/views.py:1107 msgid "Invalid token: not a collaboration invitation token" msgstr "" -#: warehouse/accounts/views.py:1104 +#: warehouse/accounts/views.py:1111 msgid "Role invitation is not valid." msgstr "" -#: warehouse/accounts/views.py:1119 +#: warehouse/accounts/views.py:1126 msgid "Role invitation no longer exists." msgstr "" -#: warehouse/accounts/views.py:1152 +#: warehouse/accounts/views.py:1159 msgid "Invitation for '${project_name}' is declined." msgstr "" -#: warehouse/accounts/views.py:1220 +#: warehouse/accounts/views.py:1227 msgid "You are now ${role} of the '${project_name}' project." msgstr "" -#: warehouse/accounts/views.py:1438 warehouse/accounts/views.py:1458 -#: warehouse/accounts/views.py:1593 warehouse/manage/views/__init__.py:1225 +#: warehouse/accounts/views.py:1451 warehouse/accounts/views.py:1471 +#: warehouse/accounts/views.py:1606 warehouse/manage/views/__init__.py:1225 #: warehouse/manage/views/__init__.py:1244 msgid "" "Trusted publishers are temporarily disabled. See https://pypi.org/help" "#admin-intervention for details." msgstr "" -#: warehouse/accounts/views.py:1472 +#: warehouse/accounts/views.py:1485 msgid "" "You must have a verified email in order to register a pending trusted " "publisher. See https://pypi.org/help#openid-connect for details." msgstr "" -#: warehouse/accounts/views.py:1485 +#: warehouse/accounts/views.py:1498 msgid "You can't register more than 3 pending trusted publishers at once." msgstr "" -#: warehouse/accounts/views.py:1501 warehouse/manage/views/__init__.py:1263 +#: warehouse/accounts/views.py:1514 warehouse/manage/views/__init__.py:1263 msgid "" "There have been too many attempted trusted publisher registrations. Try " "again later." msgstr "" -#: warehouse/accounts/views.py:1515 warehouse/manage/views/__init__.py:1277 +#: warehouse/accounts/views.py:1528 warehouse/manage/views/__init__.py:1277 msgid "The trusted publisher could not be registered" msgstr "" -#: warehouse/accounts/views.py:1534 +#: warehouse/accounts/views.py:1547 msgid "" "This trusted publisher has already been registered. Please contact PyPI's" " admins if this wasn't intentional." msgstr "" -#: warehouse/accounts/views.py:1570 +#: warehouse/accounts/views.py:1583 msgid "Registered a new publishing publisher to create " msgstr "" -#: warehouse/accounts/views.py:1607 warehouse/accounts/views.py:1620 -#: warehouse/accounts/views.py:1627 +#: warehouse/accounts/views.py:1620 warehouse/accounts/views.py:1633 +#: warehouse/accounts/views.py:1640 msgid "Invalid publisher ID" msgstr "" -#: warehouse/accounts/views.py:1633 +#: warehouse/accounts/views.py:1646 msgid "Removed trusted publisher for project " msgstr "" diff --git a/warehouse/macaroons/security_policy.py b/warehouse/macaroons/security_policy.py index 0ec3872cf0aa..9210b9ba757c 100644 --- a/warehouse/macaroons/security_policy.py +++ b/warehouse/macaroons/security_policy.py @@ -12,8 +12,8 @@ import base64 -from pyramid.interfaces import IAuthorizationPolicy, ISecurityPolicy -from pyramid.threadlocal import get_current_request +from pyramid.authorization import ACLHelper +from pyramid.interfaces import ISecurityPolicy from zope.interface import implementer from warehouse.cache.http import add_vary_callback @@ -21,7 +21,7 @@ from warehouse.macaroons import InvalidMacaroonError from warehouse.macaroons.interfaces import IMacaroonService from warehouse.oidc.utils import OIDCContext -from warehouse.utils.security_policy import AuthenticationMethod +from warehouse.utils.security_policy import AuthenticationMethod, principals_for def _extract_basic_macaroon(auth): @@ -73,7 +73,15 @@ def _extract_http_macaroon(request): @implementer(ISecurityPolicy) class MacaroonSecurityPolicy: + def __init__(self): + self._acl = ACLHelper() + def identity(self, request): + # If our current request isn't an API request, then we'll just quickly skip + # trying to do anything since we only support basic auth on API routes. + if not request.is_api: + return None + # If we're calling into this API on a request, then we want to register # a callback which will ensure that the response varies based on the # Authorization header. @@ -122,65 +130,41 @@ def authenticated_userid(self, request): raise NotImplementedError def permits(self, request, context, permission): - # Handled by MultiSecurityPolicy - raise NotImplementedError - - -@implementer(IAuthorizationPolicy) -class MacaroonAuthorizationPolicy: - def __init__(self, policy): - self.policy = policy - - def permits(self, context, principals, permission): - # The Pyramid API doesn't let us access the request here, so we have to pull it - # out of the thread local instead. - # TODO: Work with Pyramid devs to figure out if there is a better way to support - # the worklow we are using here or not. - request = get_current_request() - - # Our request could possibly be a None, if there isn't an active request, in - # that case we're going to always deny, because without a request, we can't - # determine if this request is authorized or not. - if request is None: - return WarehouseDenied( - "There was no active request.", reason="no_active_request" - ) - # Re-extract our Macaroon from the request, it sucks to have to do this work # twice, but I believe it is inevitable unless we pass the Macaroon back as # a principal-- which doesn't seem to be the right fit for it. macaroon = _extract_http_macaroon(request) - # This logic will only happen on requests that are being authenticated with - # Macaroons. Any other request will just fall back to the standard Authorization - # policy. - if macaroon is not None: - valid_permissions = ["upload"] - macaroon_service = request.find_service(IMacaroonService, context=None) - - try: - macaroon_service.verify(macaroon, request, context, permission) - except InvalidMacaroonError as exc: - return WarehouseDenied( - f"Invalid API Token: {exc}", reason="invalid_api_token" - ) - - # If our Macaroon is verified, and for a valid permission then we'll pass - # this request to our underlying Authorization policy, so it can handle its - # own authorization logic on the principal. - if permission in valid_permissions: - return self.policy.permits(context, principals, permission) - else: - return WarehouseDenied( - f"API tokens are not valid for permission: {permission}!", - reason="invalid_permission", - ) - - else: - return self.policy.permits(context, principals, permission) - - def principals_allowed_by_permission(self, context, permission): - # We just dispatch this, because Macaroons don't restrict what principals are - # allowed by a particular permission, they just restrict specific requests - # to not have that permission. - return self.policy.principals_allowed_by_permission(context, permission) + # It should not be possible to *not* have a macaroon at this point, because we + # can't call this function without an identity that came from a macaroon + assert isinstance(macaroon, str), "no valid macaroon" + + # Check to make sure that the permission we're attempting to permit is one that + # is allowed to be used for macaroons. + # TODO: This should be moved out of there and into the macaroons themselves, it + # doesn't really make a lot of sense here and it makes things more + # complicated if we want to allow the use of macaroons for actions other + # than uploading. + if permission not in ["upload"]: + return WarehouseDenied( + f"API tokens are not valid for permission: {permission}!", + reason="invalid_permission", + ) + + # Check if our macaroon itself is valid. This does not actually check if the + # identity bound to that macaroon has permission to do what it's trying to do + # but rather that the caveats embedded into the macaroon are valid for the given + # request, context, and permission. + macaroon_service = request.find_service(IMacaroonService, context=None) + try: + macaroon_service.verify(macaroon, request, context, permission) + except InvalidMacaroonError as exc: + return WarehouseDenied( + f"Invalid API Token: {exc}", reason="invalid_api_token" + ) + + # The macaroon is valid, so we can actually see if request.identity is + # authorized now or not. + # NOTE: These parameters are in a different order than the signature of this + # method. + return self._acl.permits(context, principals_for(request.identity), permission) diff --git a/warehouse/oidc/utils.py b/warehouse/oidc/utils.py index c5e2f3d701aa..d692adb3867d 100644 --- a/warehouse/oidc/utils.py +++ b/warehouse/oidc/utils.py @@ -14,6 +14,7 @@ from dataclasses import dataclass +from pyramid.authorization import Authenticated from sqlalchemy.sql.expression import func, literal from warehouse.oidc.interfaces import SignedClaims @@ -117,3 +118,6 @@ class OIDCContext: """ Pertinent OIDC claims from the token, if they exist. """ + + def __principals__(self) -> list[str]: + return [Authenticated, f"oidc:{self.publisher.id}"] diff --git a/warehouse/predicates.py b/warehouse/predicates.py index b3f4aec751de..a64257d788ad 100644 --- a/warehouse/predicates.py +++ b/warehouse/predicates.py @@ -37,6 +37,32 @@ def __call__(self, info, request): return is_same_domain(request.domain, self.val) +# NOTE: APIPredicate does not actually influence the match, instead we use it +# as a way to mark which routes are considered "API", and should support +# API authentication methods. +class APIPredicate: + def __init__(self, val, config): + self.val = val + + def text(self): + return f"is_api = {self.val}" + + phash = text + + def __call__(self, info, request): + # Since we're not actually using this for dispatching, we always match + return True + + +def _is_api_route(request): + if route := request.matched_route: + for pred in route.predicates: + if isinstance(pred, APIPredicate) and pred.val: + return True + + return False + + class HeadersPredicate: def __init__(self, val: list[str], config): if not val: @@ -101,7 +127,10 @@ def __call__(self, context: Organization | Team, request): def includeme(config): + config.add_request_method(_is_api_route, name="is_api", reify=True) + config.add_route_predicate("domain", DomainPredicate) + config.add_route_predicate("is_api", APIPredicate) config.add_view_predicate("require_headers", HeadersPredicate) config.add_view_predicate( "require_active_organization", ActiveOrganizationPredicate diff --git a/warehouse/utils/security_policy.py b/warehouse/utils/security_policy.py index 4bb3c7c6150d..25a81860a152 100644 --- a/warehouse/utils/security_policy.py +++ b/warehouse/utils/security_policy.py @@ -12,13 +12,20 @@ import enum -from pyramid.authorization import Authenticated from pyramid.interfaces import ISecurityPolicy +from pyramid.request import RequestLocalCache from pyramid.security import Denied from zope.interface import implementer from warehouse.accounts.models import User -from warehouse.oidc.utils import OIDCContext + + +# NOTE: Is there a better place for this to live? It may not even need to exist +# since it's so small, it may be easier to just inline it. +def principals_for(identity) -> list[str]: + if hasattr(identity, "__principals__"): + return identity.__principals__() + return [] class AuthenticationMethod(enum.Enum): @@ -27,23 +34,6 @@ class AuthenticationMethod(enum.Enum): MACAROON = "macaroon" -def _principals_for_authenticated_user(user): - """Apply the necessary principals to the authenticated user""" - principals = [] - if user.is_superuser: - principals.append("group:admins") - if user.is_moderator or user.is_superuser: - principals.append("group:moderators") - if user.is_psf_staff or user.is_superuser: - principals.append("group:psf_staff") - - # user must have base admin access if any admin permission - if principals: - principals.append("group:with_admin_dashboard_access") - - return principals - - @implementer(ISecurityPolicy) class MultiSecurityPolicy: """ @@ -54,35 +44,48 @@ class MultiSecurityPolicy: with the following semantics: * `identity`: Selected from the first policy to return non-`None` - * `authenticated_userid`: Selected from the first policy to return a user + * `authenticated_userid`: Selected from the first policy to return an identity * `forget`: Combined from all policies * `remember`: Combined from all policies - * `permits`: Uses the AuthZ policy passed during initialization + * `permits`: Uses the the policy that returned the identity. These semantics mostly mirror those of `pyramid-multiauth`. """ - def __init__(self, policies, authz): + def __init__(self, policies): self._policies = policies - self._authz = authz + self._identity_cache = RequestLocalCache(self._get_identity_with_policy) - def identity(self, request): + def _get_identity_with_policy(self, request): + # This will be cached per request, which means that we'll have a stable + # result for both the identity AND the policy that produced it. Further + # uses can then make sure to use the same policy throughout, at least + # where it makes sense to. for policy in self._policies: if ident := policy.identity(request): - return ident + return ident, policy - return None + return None, None + + def reset(self, request): + self._identity_cache.clear(request) + + def identity(self, request): + identity, _policy = self._identity_cache.get_or_create(request) + return identity def authenticated_userid(self, request): if ident := self.identity(request): + # TODO: Note, this logic breaks the contract of a SecurityPolicy, the + # authenticated_userid is intended to be used to fetch the unique + # identifier that represents the current identity. We're leaving + # it here for now, because there are a number of views directly + # using this to detect user vs not, which we'll need to move to a + # more correct pattern before fixing this. if isinstance(ident, User): return str(ident.id) return None - def unauthenticated_userid(self, request): - # This is deprecated and we shouldn't use it - raise NotImplementedError - def forget(self, request, **kw): headers = [] for policy in self._policies: @@ -96,22 +99,16 @@ def remember(self, request, userid, **kw): return headers def permits(self, request, context, permission): - identity = request.identity - principals = [] - if identity is not None: - principals.append(Authenticated) - - if isinstance(identity, User): - principals.append(f"user:{identity.id}") - principals.extend(_principals_for_authenticated_user(identity)) - elif isinstance(identity, OIDCContext): - principals.append(f"oidc:{identity.publisher.id}") - else: - return Denied("unknown identity") - - # NOTE: Observe that the parameters passed into the underlying AuthZ - # policy here are not the same (or in the same order) as the ones - # passed into `permits` above. This is because the underlying AuthZ - # policy is a "legacy" Pyramid 1.0 style one that implements the - # `IAuthorizationPolicy` interface rather than `ISecurityPolicy`. - return self._authz.permits(context, principals, permission) + identity, policy = self._identity_cache.get_or_create(request) + # Sanity check that somehow our cached identity + policy didn't end up + # different than what the request.identity is. This shouldn't be possible + # but we'll assert it because if we let it pass silently it may mean that + # some kind of confused-deputy attack is possible. + assert request.identity == identity, "request has a different identity" + + # Dispatch to the underlying policy for the given identity, if there was one + # for this request. + if policy is not None: + return policy.permits(request, context, permission) + else: + return Denied("unknown identity")