From 6206fb7e902de617820273b92e4b143ebcfaac5b Mon Sep 17 00:00:00 2001 From: Mike Fiedler Date: Wed, 10 Jan 2024 17:09:16 -0500 Subject: [PATCH 1/2] chore: remove most of basic auth code Now that the active policies are Session (for web) and Macaroon (for upload), remove the bulk of the BasicAuth policy. Move the logic farther up the chain, so that there's no extra work put in before telling the user that they should migrate to API Tokens. Resolves #13770 Signed-off-by: Mike Fiedler --- tests/unit/accounts/test_core.py | 375 -------------------- tests/unit/accounts/test_security_policy.py | 91 +---- warehouse/accounts/security_policy.py | 139 ++------ warehouse/errors.py | 4 - 4 files changed, 42 insertions(+), 567 deletions(-) diff --git a/tests/unit/accounts/test_core.py b/tests/unit/accounts/test_core.py index 9b6760099a2e..94d34ecb1267 100644 --- a/tests/unit/accounts/test_core.py +++ b/tests/unit/accounts/test_core.py @@ -10,25 +10,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime - -import freezegun import pretend import pytest from celery.schedules import crontab -from pyramid.httpexceptions import HTTPForbidden, HTTPUnauthorized from warehouse import accounts -from warehouse.accounts import security_policy from warehouse.accounts.interfaces import ( IEmailBreachedService, IPasswordBreachedService, ITokenService, IUserService, ) -from warehouse.accounts.models import DisableReason -from warehouse.accounts.security_policy import _basic_auth_check from warehouse.accounts.services import ( HaveIBeenPwnedEmailBreachedService, HaveIBeenPwnedPasswordBreachedService, @@ -36,8 +29,6 @@ database_login_factory, ) from warehouse.accounts.tasks import compute_user_metrics -from warehouse.errors import BasicAuthBreachedPassword, BasicAuthFailedPassword -from warehouse.events.tags import EventTag from warehouse.oidc.interfaces import SignedClaims from warehouse.oidc.models import OIDCPublisher from warehouse.oidc.utils import OIDCContext @@ -47,372 +38,6 @@ from ...common.db.oidc import GitHubPublisherFactory -class TestLogin: - def test_invalid_route(self, pyramid_request, pyramid_services): - service = pretend.stub(find_userid=pretend.call_recorder(lambda username: None)) - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - pretend.stub(), IPasswordBreachedService, None - ) - pyramid_request.matched_route = pretend.stub(name="route_name") - assert _basic_auth_check("myuser", "mypass", pyramid_request) is False - assert service.find_userid.calls == [] - - def test_with_no_user(self, pyramid_request, pyramid_services): - service = pretend.stub(find_userid=pretend.call_recorder(lambda username: None)) - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - pretend.stub(), IPasswordBreachedService, None - ) - pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload") - assert _basic_auth_check("myuser", "mypass", pyramid_request) is False - assert service.find_userid.calls == [pretend.call("myuser")] - - def test_with_invalid_password(self, pyramid_request, pyramid_services): - user = pretend.stub( - id=1, - record_event=pretend.call_recorder(lambda *a, **kw: None), - ) - service = pretend.stub( - get_user=pretend.call_recorder(lambda user_id: user), - find_userid=pretend.call_recorder(lambda username: 1), - check_password=pretend.call_recorder( - lambda userid, password, tags=None: False - ), - is_disabled=pretend.call_recorder(lambda user_id: (False, None)), - ) - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - pretend.stub(), IPasswordBreachedService, None - ) - pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload") - pyramid_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/") - - with pytest.raises(BasicAuthFailedPassword) as excinfo: - assert _basic_auth_check("myuser", "mypass", pyramid_request) is None - - assert excinfo.value.status == ( - "403 Invalid or non-existent authentication information. " - "See /the/help/url/ for more information." - ) - assert service.find_userid.calls == [pretend.call("myuser")] - assert service.get_user.calls == [pretend.call(1)] - assert service.is_disabled.calls == [] - assert service.check_password.calls == [ - pretend.call( - 1, - "mypass", - tags=["mechanism:basic_auth", "method:auth", "auth_method:basic"], - ) - ] - assert user.record_event.calls == [ - pretend.call( - tag=EventTag.Account.LoginFailure, - request=pyramid_request, - additional={"reason": "invalid_password", "auth_method": "basic"}, - ) - ] - - def test_with_disabled_user_no_reason(self, pyramid_request, pyramid_services): - user = pretend.stub( - id=1, - record_event=pretend.call_recorder(lambda *a, **kw: None), - ) - service = pretend.stub( - get_user=pretend.call_recorder(lambda user_id: user), - find_userid=pretend.call_recorder(lambda username: 1), - check_password=pretend.call_recorder( - lambda userid, password, tags=None: True - ), - is_disabled=pretend.call_recorder(lambda user_id: (True, None)), - ) - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - pretend.stub(), IPasswordBreachedService, None - ) - pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload") - pyramid_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/") - - with pytest.raises(HTTPUnauthorized) as excinfo: - assert _basic_auth_check("myuser", "mypass", pyramid_request) is None - - assert excinfo.value.status == "401 Account is disabled." - assert service.find_userid.calls == [pretend.call("myuser")] - assert service.get_user.calls == [pretend.call(1)] - assert service.is_disabled.calls == [pretend.call(1)] - - def test_with_disabled_user_badpass(self, pyramid_request, pyramid_services): - user = pretend.stub( - id=1, - record_event=pretend.call_recorder(lambda *a, **kw: None), - ) - service = pretend.stub( - get_user=pretend.call_recorder(lambda user_id: user), - find_userid=pretend.call_recorder(lambda username: 1), - check_password=pretend.call_recorder( - lambda userid, password, tags=None: False - ), - is_disabled=pretend.call_recorder(lambda user_id: (True, None)), - ) - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - pretend.stub(), IPasswordBreachedService, None - ) - pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload") - pyramid_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/") - - with pytest.raises(BasicAuthFailedPassword) as excinfo: - assert _basic_auth_check("myuser", "mypass", pyramid_request) is None - - assert excinfo.value.status == ( - "403 Invalid or non-existent authentication information. " - "See /the/help/url/ for more information." - ) - assert service.find_userid.calls == [pretend.call("myuser")] - assert service.get_user.calls == [pretend.call(1)] - assert service.is_disabled.calls == [] - assert service.check_password.calls == [ - pretend.call( - 1, - "mypass", - tags=["mechanism:basic_auth", "method:auth", "auth_method:basic"], - ) - ] - assert user.record_event.calls == [ - pretend.call( - tag=EventTag.Account.LoginFailure, - request=pyramid_request, - additional={"reason": "invalid_password", "auth_method": "basic"}, - ) - ] - - def test_with_disabled_user_compromised_pw(self, pyramid_request, pyramid_services): - user = pretend.stub(id=1) - service = pretend.stub( - get_user=pretend.call_recorder(lambda user_id: user), - find_userid=pretend.call_recorder(lambda username: 1), - check_password=pretend.call_recorder( - lambda userid, password, tags=None: True - ), - is_disabled=pretend.call_recorder( - lambda user_id: (True, DisableReason.CompromisedPassword) - ), - ) - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - pretend.stub(failure_message_plain="Bad Password!"), - IPasswordBreachedService, - None, - ) - pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload") - - with pytest.raises(BasicAuthBreachedPassword) as excinfo: - assert _basic_auth_check("myuser", "mypass", pyramid_request) is None - - assert excinfo.value.status == "401 Bad Password!" - assert service.find_userid.calls == [pretend.call("myuser")] - assert service.get_user.calls == [pretend.call(1)] - assert service.is_disabled.calls == [pretend.call(1)] - assert service.check_password.calls == [ - pretend.call( - 1, - "mypass", - tags=["mechanism:basic_auth", "method:auth", "auth_method:basic"], - ) - ] - - def test_with_disabled_user_frozen(self, pyramid_request, pyramid_services): - user = pretend.stub( - id=1, - record_event=pretend.call_recorder(lambda *a, **kw: None), - is_frozen=True, - ) - service = pretend.stub( - get_user=pretend.call_recorder(lambda user_id: user), - find_userid=pretend.call_recorder(lambda username: 1), - check_password=pretend.call_recorder( - lambda userid, password, tags=None: True - ), - is_disabled=pretend.call_recorder( - lambda user_id: (True, DisableReason.AccountFrozen) - ), - ) - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - pretend.stub(), IPasswordBreachedService, None - ) - pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload") - pyramid_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/") - - with pytest.raises(HTTPUnauthorized) as excinfo: - assert _basic_auth_check("myuser", "mypass", pyramid_request) is None - - assert excinfo.value.status == "401 Account is frozen." - assert service.find_userid.calls == [pretend.call("myuser")] - assert service.get_user.calls == [pretend.call(1)] - assert service.is_disabled.calls == [pretend.call(1)] - - def test_with_disabled_user_frozen_badpass(self, pyramid_request, pyramid_services): - user = pretend.stub( - id=1, - record_event=pretend.call_recorder(lambda *a, **kw: None), - is_frozen=True, - ) - service = pretend.stub( - get_user=pretend.call_recorder(lambda user_id: user), - find_userid=pretend.call_recorder(lambda username: 1), - check_password=pretend.call_recorder( - lambda userid, password, tags=None: False - ), - is_disabled=pretend.call_recorder( - lambda user_id: (True, DisableReason.AccountFrozen) - ), - ) - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - pretend.stub(), IPasswordBreachedService, None - ) - pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload") - pyramid_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/") - - with pytest.raises(BasicAuthFailedPassword) as excinfo: - assert _basic_auth_check("myuser", "mypass", pyramid_request) is None - - assert excinfo.value.status == ( - "403 Invalid or non-existent authentication information. " - "See /the/help/url/ for more information." - ) - assert service.find_userid.calls == [pretend.call("myuser")] - assert service.get_user.calls == [pretend.call(1)] - assert service.is_disabled.calls == [] - assert service.check_password.calls == [ - pretend.call( - 1, - "mypass", - tags=["mechanism:basic_auth", "method:auth", "auth_method:basic"], - ) - ] - assert user.record_event.calls == [ - pretend.call( - tag=EventTag.Account.LoginFailure, - request=pyramid_request, - additional={"reason": "invalid_password", "auth_method": "basic"}, - ) - ] - - def test_with_valid_password(self, monkeypatch, pyramid_request, pyramid_services): - user = pretend.stub( - id=2, - has_two_factor=False, - record_event=pretend.call_recorder(lambda *a, **kw: None), - ) - service = pretend.stub( - get_user=pretend.call_recorder(lambda user_id: user), - find_userid=pretend.call_recorder(lambda username: 2), - check_password=pretend.call_recorder( - lambda userid, password, tags=None: True - ), - update_user=pretend.call_recorder(lambda userid, last_login: None), - is_disabled=pretend.call_recorder(lambda user_id: (False, None)), - ) - breach_service = pretend.stub( - check_password=pretend.call_recorder(lambda pw, tags=None: False) - ) - - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - breach_service, IPasswordBreachedService, None - ) - - pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload") - pyramid_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/") - - now = datetime.datetime.utcnow() - - with freezegun.freeze_time(now), pytest.raises(HTTPForbidden): - _basic_auth_check("myuser", "mypass", pyramid_request) - - assert service.find_userid.calls == [pretend.call("myuser")] - assert service.get_user.calls == [pretend.call(2)] - assert service.is_disabled.calls == [pretend.call(2)] - assert service.check_password.calls == [ - pretend.call( - 2, - "mypass", - tags=["mechanism:basic_auth", "method:auth", "auth_method:basic"], - ) - ] - assert breach_service.check_password.calls == [ - pretend.call("mypass", tags=["method:auth", "auth_method:basic"]) - ] - assert service.update_user.calls == [pretend.call(2, last_login=now)] - assert user.record_event.calls == [ - pretend.call( - request=pyramid_request, - tag=EventTag.Account.LoginSuccess, - additional={"auth_method": "basic"}, - ) - ] - - def test_via_basic_auth_compromised( - self, monkeypatch, pyramid_request, pyramid_services - ): - send_email = pretend.call_recorder(lambda *a, **kw: None) - monkeypatch.setattr( - security_policy, "send_password_compromised_email_hibp", send_email - ) - - user = pretend.stub(id=2) - service = pretend.stub( - get_user=pretend.call_recorder(lambda user_id: user), - find_userid=pretend.call_recorder(lambda username: 2), - check_password=pretend.call_recorder( - lambda userid, password, tags=None: True - ), - is_disabled=pretend.call_recorder(lambda user_id: (False, None)), - disable_password=pretend.call_recorder( - lambda user_id, request, reason=None: None - ), - ) - breach_service = pretend.stub( - check_password=pretend.call_recorder(lambda pw, tags=None: True), - failure_message_plain="Bad Password!", - ) - - pyramid_services.register_service(service, IUserService, None) - pyramid_services.register_service( - breach_service, IPasswordBreachedService, None - ) - - pyramid_request.matched_route = pretend.stub(name="forklift.legacy.file_upload") - - with pytest.raises(BasicAuthBreachedPassword) as excinfo: - _basic_auth_check("myuser", "mypass", pyramid_request) - - assert excinfo.value.status == "401 Bad Password!" - assert service.find_userid.calls == [pretend.call("myuser")] - assert service.get_user.calls == [pretend.call(2)] - assert service.is_disabled.calls == [pretend.call(2)] - assert service.check_password.calls == [ - pretend.call( - 2, - "mypass", - tags=["mechanism:basic_auth", "method:auth", "auth_method:basic"], - ) - ] - assert breach_service.check_password.calls == [ - pretend.call("mypass", tags=["method:auth", "auth_method:basic"]) - ] - assert service.disable_password.calls == [ - pretend.call( - 2, - pyramid_request, - reason=DisableReason.CompromisedPassword, - ) - ] - assert send_email.calls == [pretend.call(pyramid_request, user)] - - class TestUser: def test_with_user(self, db_request): user = UserFactory.create() diff --git a/tests/unit/accounts/test_security_policy.py b/tests/unit/accounts/test_security_policy.py index 5bbbdd314ebd..ec208fb2ea0f 100644 --- a/tests/unit/accounts/test_security_policy.py +++ b/tests/unit/accounts/test_security_policy.py @@ -16,6 +16,7 @@ import pytest from pyramid.authorization import Allow +from pyramid.exceptions import HTTPForbidden from pyramid.interfaces import ISecurityPolicy from zope.interface.verify import verifyClass @@ -32,17 +33,16 @@ def test_verify(self): ) def test_noops(self): + """Basically, anything that isn't `identity()` is a no-op.""" policy = security_policy.BasicAuthSecurityPolicy() with pytest.raises(NotImplementedError): policy.authenticated_userid(pretend.stub()) - - def test_forget_and_remember(self): - policy = security_policy.BasicAuthSecurityPolicy() - - assert policy.forget(pretend.stub()) == [] - assert policy.remember(pretend.stub(), pretend.stub()) == [ - ("WWW-Authenticate", 'Basic realm="Realm"') - ] + with pytest.raises(NotImplementedError): + policy.forget(pretend.stub()) + with pytest.raises(NotImplementedError): + policy.remember(pretend.stub(), pretend.stub()) + with pytest.raises(NotImplementedError): + policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) def test_identity_no_credentials(self, monkeypatch): extract_http_basic_credentials = pretend.call_recorder(lambda request: None) @@ -60,8 +60,7 @@ def test_identity_no_credentials(self, monkeypatch): request = pretend.stub( add_response_callback=pretend.call_recorder(lambda cb: None), - banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + matched_route=pretend.stub(name="forklift.legacy.file_upload"), ) assert policy.identity(request) is None @@ -78,9 +77,6 @@ def test_identity_credentials_fail(self, monkeypatch): extract_http_basic_credentials, ) - basic_auth_check = pretend.call_recorder(lambda u, p, r: False) - monkeypatch.setattr(security_policy, "_basic_auth_check", basic_auth_check) - policy = security_policy.BasicAuthSecurityPolicy() vary_cb = pretend.stub() @@ -89,13 +85,13 @@ def test_identity_credentials_fail(self, monkeypatch): request = pretend.stub( add_response_callback=pretend.call_recorder(lambda cb: None), - banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", + help_url=lambda _anchor=None: "/help", + matched_route=pretend.stub(name="forklift.legacy.file_upload"), ) - assert policy.identity(request) is None + with pytest.raises(HTTPForbidden): + policy.identity(request) assert extract_http_basic_credentials.calls == [pretend.call(request)] - assert basic_auth_check.calls == [pretend.call(creds[0], creds[1], request)] assert add_vary_cb.calls == [pretend.call("Authorization")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] @@ -128,46 +124,7 @@ def test_invalid_request_fail(self, monkeypatch, fake_request): assert policy.identity(fake_request) is None def test_identity(self, monkeypatch): - creds = (pretend.stub(), pretend.stub()) - extract_http_basic_credentials = pretend.call_recorder(lambda request: creds) - monkeypatch.setattr( - security_policy, - "extract_http_basic_credentials", - extract_http_basic_credentials, - ) - - basic_auth_check = pretend.call_recorder(lambda u, p, r: True) - monkeypatch.setattr(security_policy, "_basic_auth_check", basic_auth_check) - - policy = security_policy.BasicAuthSecurityPolicy() - - vary_cb = pretend.stub() - add_vary_cb = pretend.call_recorder(lambda *v: vary_cb) - monkeypatch.setattr(security_policy, "add_vary_callback", add_vary_cb) - - user = pretend.stub() - user_service = pretend.stub( - get_user_by_username=pretend.call_recorder(lambda u: user) - ) - request = pretend.stub( - add_response_callback=pretend.call_recorder(lambda cb: None), - find_service=pretend.call_recorder(lambda a, **kw: user_service), - banned=pretend.stub(by_ip=lambda ip_address: False), - remote_addr="1.2.3.4", - ) - - assert policy.identity(request) is user - assert request.authentication_method == AuthenticationMethod.BASIC_AUTH - assert extract_http_basic_credentials.calls == [pretend.call(request)] - assert basic_auth_check.calls == [pretend.call(creds[0], creds[1], request)] - assert request.find_service.calls == [pretend.call(IUserService, context=None)] - assert user_service.get_user_by_username.calls == [pretend.call(creds[0])] - - assert add_vary_cb.calls == [pretend.call("Authorization")] - assert request.add_response_callback.calls == [pretend.call(vary_cb)] - - def test_identityi_ip_banned(self, monkeypatch): - creds = (pretend.stub(), pretend.stub()) + creds = ("__token__", pretend.stub()) extract_http_basic_credentials = pretend.call_recorder(lambda request: creds) monkeypatch.setattr( security_policy, @@ -175,33 +132,21 @@ def test_identityi_ip_banned(self, monkeypatch): extract_http_basic_credentials, ) - basic_auth_check = pretend.call_recorder(lambda u, p, r: True) - monkeypatch.setattr(security_policy, "_basic_auth_check", basic_auth_check) - policy = security_policy.BasicAuthSecurityPolicy() vary_cb = pretend.stub() add_vary_cb = pretend.call_recorder(lambda *v: vary_cb) monkeypatch.setattr(security_policy, "add_vary_callback", add_vary_cb) - user = pretend.stub() - user_service = pretend.stub( - get_user_by_username=pretend.call_recorder(lambda u: user) - ) request = pretend.stub( add_response_callback=pretend.call_recorder(lambda cb: None), - find_service=pretend.call_recorder(lambda a, **kw: user_service), - banned=pretend.stub(by_ip=lambda ip_address: True), - remote_addr="1.2.3.4", + help_url=lambda _anchor=None: "/help", + matched_route=pretend.stub(name="forklift.legacy.file_upload"), ) assert policy.identity(request) is None assert request.authentication_method == AuthenticationMethod.BASIC_AUTH - assert extract_http_basic_credentials.calls == [] - assert basic_auth_check.calls == [] - assert request.find_service.calls == [] - assert user_service.get_user_by_username.calls == [] - + assert extract_http_basic_credentials.calls == [pretend.call(request)] assert add_vary_cb.calls == [pretend.call("Authorization")] assert request.add_response_callback.calls == [pretend.call(vary_cb)] @@ -561,7 +506,7 @@ def test_identity_ip_banned(self, monkeypatch): @pytest.mark.parametrize( "policy_class", - [security_policy.BasicAuthSecurityPolicy, security_policy.SessionSecurityPolicy], + [security_policy.SessionSecurityPolicy], ) class TestPermits: @pytest.mark.parametrize( diff --git a/warehouse/accounts/security_policy.py b/warehouse/accounts/security_policy.py index 144bede4a386..77442e0baf70 100644 --- a/warehouse/accounts/security_policy.py +++ b/warehouse/accounts/security_policy.py @@ -10,29 +10,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime - from pyramid.authentication import ( SessionAuthenticationHelper, extract_http_basic_credentials, ) from pyramid.authorization import ACLHelper -from pyramid.httpexceptions import HTTPForbidden, HTTPUnauthorized +from pyramid.httpexceptions import HTTPForbidden from pyramid.interfaces import ISecurityPolicy from pyramid.security import Allowed from zope.interface import implementer -from warehouse.accounts.interfaces import IPasswordBreachedService, IUserService -from warehouse.accounts.models import DisableReason, User +from warehouse.accounts.interfaces import IUserService +from warehouse.accounts.models import User from warehouse.cache.http import add_vary_callback -from warehouse.email import send_password_compromised_email_hibp -from warehouse.errors import ( - BasicAuthAccountFrozen, - BasicAuthBreachedPassword, - BasicAuthFailedPassword, - WarehouseDenied, -) -from warehouse.events.tags import EventTag +from warehouse.errors import WarehouseDenied from warehouse.utils.security_policy import AuthenticationMethod, principals_for @@ -41,94 +32,6 @@ def _format_exc_status(exc, message): return exc -def _basic_auth_check(username, password, request): - # A route must be matched - if not request.matched_route: - return False - - # Basic authentication can only be used for uploading - if request.matched_route.name != "forklift.legacy.file_upload": - return False - - login_service = request.find_service(IUserService, context=None) - breach_service = request.find_service(IPasswordBreachedService, context=None) - - userid = login_service.find_userid(username) - request._unauthenticated_userid = userid - if userid is not None: - user = login_service.get_user(userid) - if login_service.check_password( - user.id, - password, - tags=["mechanism:basic_auth", "method:auth", "auth_method:basic"], - ): - is_disabled, disabled_for = login_service.is_disabled(user.id) - if is_disabled: - # This technically violates the contract a little bit, this function is - # meant to return False if the user cannot log in. However we want to - # present a different error message than is normal when we're denying - # the log in because of a compromised password. So to do that, we'll - # need to raise a HTTPError that'll ultimately get returned to the - # client. This is OK to do here because we've already successfully - # authenticated the credentials, so it won't screw up the fall through - # to other authentication mechanisms (since we wouldn't have fell - # through to them anyways). - if disabled_for == DisableReason.CompromisedPassword: - raise _format_exc_status( - BasicAuthBreachedPassword(), - breach_service.failure_message_plain, - ) - elif disabled_for == DisableReason.AccountFrozen: - raise _format_exc_status( - BasicAuthAccountFrozen(), "Account is frozen." - ) - else: - raise _format_exc_status(HTTPUnauthorized(), "Account is disabled.") - if breach_service.check_password( - password, tags=["method:auth", "auth_method:basic"] - ): - send_password_compromised_email_hibp(request, user) - login_service.disable_password( - user.id, - request, - reason=DisableReason.CompromisedPassword, - ) - raise _format_exc_status( - BasicAuthBreachedPassword(), breach_service.failure_message_plain - ) - - login_service.update_user(user.id, last_login=datetime.datetime.utcnow()) - user.record_event( - tag=EventTag.Account.LoginSuccess, - request=request, - additional={"auth_method": "basic"}, - ) - - raise _format_exc_status( - HTTPForbidden(), - "Username/Password authentication is no longer supported. " - "Migrate to API Tokens or Trusted Publishers instead. " - f"See {request.help_url(_anchor='apitoken')} " - f"and {request.help_url(_anchor='trusted-publishers')}", - ) - else: - user.record_event( - tag=EventTag.Account.LoginFailure, - request=request, - additional={"reason": "invalid_password", "auth_method": "basic"}, - ) - raise _format_exc_status( - BasicAuthFailedPassword(), - "Invalid or non-existent authentication information. " - "See {projecthelp} for more information.".format( - projecthelp=request.help_url(_anchor="invalid-auth") - ), - ) - - # No user, no authentication. - return False - - @implementer(ISecurityPolicy) class SessionSecurityPolicy: def __init__(self): @@ -206,8 +109,8 @@ def permits(self, request, context, permission): @implementer(ISecurityPolicy) class BasicAuthSecurityPolicy: - def __init__(self): - self._acl = ACLHelper() + """The BasicAuthSecurityPolicy is no longer allowed + and raises a message when used for uploads when it's not an API Token""" def identity(self, request): # If we're calling into this API on a request, then we want to register @@ -216,35 +119,41 @@ def identity(self, request): request.add_response_callback(add_vary_callback("Authorization")) request.authentication_method = AuthenticationMethod.BASIC_AUTH - if request.banned.by_ip(request.remote_addr): + if not request.matched_route: + return None + if request.matched_route.name != "forklift.legacy.file_upload": return None credentials = extract_http_basic_credentials(request) if credentials is None: return None - username, password = credentials - if not _basic_auth_check(username, password, request): + username, _password = credentials + + # The API Token username is allowed to pass through to the + # MacaroonSecurityPolicy. + if username == "__token__": return None - # Like sessions; basic auth can only authenticate users. - login_service = request.find_service(IUserService, context=None) - return login_service.get_user_by_username(username) + raise _format_exc_status( + HTTPForbidden(), + "Username/Password authentication is no longer supported. " + "Migrate to API Tokens or Trusted Publishers instead. " + f"See {request.help_url(_anchor='apitoken')} " + f"and {request.help_url(_anchor='trusted-publishers')}", + ) def forget(self, request, **kw): - # No-op. - return [] + raise NotImplementedError def remember(self, request, userid, **kw): - # NOTE: We could make realm configurable here. - return [("WWW-Authenticate", 'Basic realm="Realm"')] + raise NotImplementedError def authenticated_userid(self, request): - # Handled by MultiSecurityPolicy raise NotImplementedError def permits(self, request, context, permission): - return _permits_for_user_policy(self._acl, request, context, permission) + raise NotImplementedError def _permits_for_user_policy(acl, request, context, permission): diff --git a/warehouse/errors.py b/warehouse/errors.py index c4283aab65d0..aa3cf0c75168 100644 --- a/warehouse/errors.py +++ b/warehouse/errors.py @@ -22,10 +22,6 @@ class BasicAuthBreachedPassword(HTTPUnauthorized): pass -class BasicAuthAccountFrozen(HTTPUnauthorized): - pass - - class BasicAuthTwoFactorEnabled(HTTPUnauthorized): pass From 78a80b681efb652c6497d7131c016f2b402ae36c Mon Sep 17 00:00:00 2001 From: Mike Fiedler Date: Thu, 11 Jan 2024 11:03:19 -0500 Subject: [PATCH 2/2] fix: allow MultiSecurityPolicy to iterate through Signed-off-by: Mike Fiedler --- tests/unit/accounts/test_security_policy.py | 8 ++++---- warehouse/accounts/security_policy.py | 6 ++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/unit/accounts/test_security_policy.py b/tests/unit/accounts/test_security_policy.py index ec208fb2ea0f..0eefbc19ef4d 100644 --- a/tests/unit/accounts/test_security_policy.py +++ b/tests/unit/accounts/test_security_policy.py @@ -37,13 +37,13 @@ def test_noops(self): policy = security_policy.BasicAuthSecurityPolicy() with pytest.raises(NotImplementedError): policy.authenticated_userid(pretend.stub()) - with pytest.raises(NotImplementedError): - policy.forget(pretend.stub()) - with pytest.raises(NotImplementedError): - policy.remember(pretend.stub(), pretend.stub()) with pytest.raises(NotImplementedError): policy.permits(pretend.stub(), pretend.stub(), pretend.stub()) + # These are no-ops, but they don't raise, used in MultiSecurityPolicy + assert policy.forget(pretend.stub()) == [] + assert policy.remember(pretend.stub(), pretend.stub()) == [] + def test_identity_no_credentials(self, monkeypatch): extract_http_basic_credentials = pretend.call_recorder(lambda request: None) monkeypatch.setattr( diff --git a/warehouse/accounts/security_policy.py b/warehouse/accounts/security_policy.py index 77442e0baf70..10d9ab6f7d9a 100644 --- a/warehouse/accounts/security_policy.py +++ b/warehouse/accounts/security_policy.py @@ -144,10 +144,12 @@ def identity(self, request): ) def forget(self, request, **kw): - raise NotImplementedError + # No-op. + return [] def remember(self, request, userid, **kw): - raise NotImplementedError + # No-op. + return [] def authenticated_userid(self, request): raise NotImplementedError