diff --git a/dev/environment b/dev/environment index 0a7398deb9af..66bd218fd5e5 100644 --- a/dev/environment +++ b/dev/environment @@ -71,5 +71,13 @@ OIDC_AUDIENCE=pypi RECAPTCHA_SITE_KEY=6LeIxAcTAAAAAJcZVRqyHh71UMIEGNQ_MXjiZKhI RECAPTCHA_SECRET_KEY=6LeIxAcTAAAAAGG-vFI1TnRWxMZNFuojJ4WifJWe +# Testing hCaptcha keys from https://docs.hcaptcha.com/#integration-testing-test-keys +HCAPTCHA_SITE_KEY=10000000-ffff-ffff-ffff-000000000001 +HCAPTCHA_SECRET_KEY=0x0000000000000000000000000000000000000000 +# Test Key Set: Enterprise Account (Safe End User) +# HCAPTCHA_SITE_KEY=20000000-ffff-ffff-ffff-000000000002 +# Test Key Set: Enterprise Account (Bot Detected) +# HCAPTCHA_SITE_KEY=30000000-ffff-ffff-ffff-000000000003 + # Example of Captcha backend configuration -# CAPTCHA_BACKEND=warehouse.captcha.recaptcha.Service +# CAPTCHA_BACKEND=warehouse.captcha.hcaptcha.Service diff --git a/tests/unit/captcha/test_hcaptcha.py b/tests/unit/captcha/test_hcaptcha.py new file mode 100644 index 000000000000..07797eb22aa2 --- /dev/null +++ b/tests/unit/captcha/test_hcaptcha.py @@ -0,0 +1,209 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pretend +import pytest +import requests +import responses + +from warehouse.captcha import hcaptcha, interfaces + +_REQUEST = pretend.stub( + # returning a real requests.Session object because responses is responsible + # for mocking that out + http=requests.Session(), + registry=pretend.stub( + settings={ + "hcaptcha.site_key": "site_key_value", + "hcaptcha.secret_key": "secret_key_value", + }, + ), +) + + +def test_create_captcha_service(): + service = hcaptcha.Service.create_service( + context=None, + request=_REQUEST, + ) + assert isinstance(service, hcaptcha.Service) + + +def test_csp_policy(): + csp_hostnames = ["https://hcaptcha.com", "https://*.hcaptcha.com"] + service = hcaptcha.Service.create_service( + context=None, + request=_REQUEST, + ) + assert service.csp_policy == { + "script-src": csp_hostnames, + "frame-src": csp_hostnames, + "style-src": csp_hostnames, + "connect-src": csp_hostnames, + } + + +def test_enabled(): + service = hcaptcha.Service.create_service( + context=None, + request=_REQUEST, + ) + assert service.enabled + + +class TestVerifyResponse: + @responses.activate + def test_verify_service_disabled(self): + responses.add( + responses.POST, + hcaptcha.VERIFY_URL, + body="", + ) + + service = hcaptcha.Service.create_service( + context=None, + request=pretend.stub( + registry=pretend.stub( + settings={}, + ), + ), + ) + assert service.verify_response("") is None + + @responses.activate + def test_verify_response_success(self): + responses.add( + responses.POST, + hcaptcha.VERIFY_URL, + json={ + "success": True, + "hostname": "hostname_value", + "challenge_ts": 0, + }, + ) + + service = hcaptcha.Service.create_service( + context=None, + request=_REQUEST, + ) + assert service.verify_response("meaningless") == interfaces.ChallengeResponse( + challenge_ts=0, + hostname="hostname_value", + ) + + @responses.activate + def test_remote_ip_added(self): + responses.add( + responses.POST, + hcaptcha.VERIFY_URL, + json={"success": True}, + ) + + service = hcaptcha.Service.create_service( + context=None, + request=_REQUEST, + ) + assert service.verify_response( + "meaningless", remote_ip="someip" + ) == interfaces.ChallengeResponse( + challenge_ts=None, + hostname=None, + ) + + def test_unexpected_error(self, monkeypatch): + service = hcaptcha.Service.create_service( + context=None, + request=_REQUEST, + ) + monkeypatch.setattr( + service.request.http, "post", pretend.raiser(Exception("unexpected error")) + ) + + with pytest.raises(hcaptcha.UnexpectedError) as err: + service.verify_response("meaningless") + + assert err.value.args == ("unexpected error",) + + @responses.activate + def test_unexpected_data_error(self): + responses.add( + responses.POST, + hcaptcha.VERIFY_URL, + body="something awful", + ) + serv = hcaptcha.Service.create_service(context=None, request=_REQUEST) + + with pytest.raises(hcaptcha.UnexpectedError) as err: + serv.verify_response("meaningless") + + expected = "Unexpected data in response body: something awful" + assert str(err.value) == expected + + @responses.activate + def test_missing_success_key(self): + responses.add( + responses.POST, + hcaptcha.VERIFY_URL, + json={}, + ) + serv = hcaptcha.Service.create_service(context=None, request=_REQUEST) + + with pytest.raises(hcaptcha.UnexpectedError) as err: + serv.verify_response("meaningless") + + expected = "Missing 'success' key in response: {}" + assert str(err.value) == expected + + @responses.activate + def test_missing_error_codes_key(self): + responses.add( + responses.POST, + hcaptcha.VERIFY_URL, + json={"success": False}, + ) + serv = hcaptcha.Service.create_service(context=None, request=_REQUEST) + + with pytest.raises(hcaptcha.UnexpectedError) as err: + serv.verify_response("meaningless") + + expected = "Response missing 'error-codes' key: {'success': False}" + assert str(err.value) == expected + + @responses.activate + def test_invalid_error_code(self): + responses.add( + responses.POST, + hcaptcha.VERIFY_URL, + json={"success": False, "error_codes": ["foo"]}, + ) + serv = hcaptcha.Service.create_service(context=None, request=_REQUEST) + + with pytest.raises(hcaptcha.UnexpectedError) as err: + serv.verify_response("meaningless") + + expected = "Unexpected error code: foo" + assert str(err.value) == expected + + @responses.activate + def test_valid_error_code(self): + responses.add( + responses.POST, + hcaptcha.VERIFY_URL, + json={ + "success": False, + "error_codes": ["invalid-or-already-seen-response"], + }, + ) + serv = hcaptcha.Service.create_service(context=None, request=_REQUEST) + + with pytest.raises(hcaptcha.InvalidOrAlreadySeenResponseError): + serv.verify_response("meaningless") diff --git a/warehouse/captcha/hcaptcha.py b/warehouse/captcha/hcaptcha.py new file mode 100644 index 000000000000..f417b8a6909a --- /dev/null +++ b/warehouse/captcha/hcaptcha.py @@ -0,0 +1,173 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import http + +from urllib.parse import urlencode + +from zope.interface import implementer + +from .interfaces import ChallengeResponse, ICaptchaService + +VERIFY_URL = "https://api.hcaptcha.com/siteverify" + + +class HCaptchaError(ValueError): + pass + + +class MissingInputSecretError(HCaptchaError): + pass + + +class InvalidInputSecretError(HCaptchaError): + pass + + +class MissingInputResponseError(HCaptchaError): + pass + + +class InvalidInputResponseError(HCaptchaError): + pass + + +class BadRequestError(HCaptchaError): + pass + + +class InvalidOrAlreadySeenResponseError(HCaptchaError): + pass + + +class NotUsingDummyPasscodeError(HCaptchaError): + pass + + +class SitekeySecretMismatchError(HCaptchaError): + pass + + +class UnexpectedError(HCaptchaError): + pass + + +# https://docs.hcaptcha.com/#siteverify-error-codes-table +ERROR_CODE_MAP = { + "missing-input-secret": MissingInputSecretError, + "invalid-input-secret": InvalidInputSecretError, + "missing-input-response": MissingInputResponseError, + "invalid-input-response": InvalidInputResponseError, + "invalid-or-already-seen-response": InvalidOrAlreadySeenResponseError, + "not-using-dummy-passcode": NotUsingDummyPasscodeError, + "sitekey-secret-mismatch": SitekeySecretMismatchError, + "bad-request": BadRequestError, +} + + +_CSP_ENTRIES = [ + "https://hcaptcha.com", + "https://*.hcaptcha.com", +] + + +@implementer(ICaptchaService) +class Service: + def __init__(self, *, request, script_src_url, site_key, secret_key): + self.request = request + self.script_src_url = script_src_url + self.site_key = site_key + self.secret_key = secret_key + self.class_name = "h-captcha" + + @classmethod + def create_service(cls, context, request) -> "Service": + return cls( + request=request, + script_src_url="https://js.hcaptcha.com/1/api.js", + site_key=request.registry.settings.get("hcaptcha.site_key"), + secret_key=request.registry.settings.get("hcaptcha.secret_key"), + ) + + @property + def csp_policy(self) -> dict[str, list[str]]: + return { + "script-src": _CSP_ENTRIES, + "frame-src": _CSP_ENTRIES, + "style-src": _CSP_ENTRIES, + "connect-src": _CSP_ENTRIES, + } + + @property + def enabled(self) -> bool: + return bool(self.site_key and self.secret_key) + + def verify_response(self, response, remote_ip=None) -> ChallengeResponse | None: + if not self.enabled: + # TODO: debug logging + return None + + payload = { + "secret": self.secret_key, + "response": response, + } + if remote_ip is not None: + payload["remoteip"] = remote_ip + + try: + # TODO: the timeout is hardcoded for now. it would be nice to do + # something a little more generalized in the future. + resp = self.request.http.post( + VERIFY_URL, + urlencode(payload), + headers={ + "Content-Type": "application/x-www-form-urlencoded; charset=utf-8" + }, + timeout=10, + ) + except Exception as err: + raise UnexpectedError(str(err)) from err + + try: + data = resp.json() + except ValueError as e: + raise UnexpectedError( + f'Unexpected data in response body: {str(resp.content, "utf-8")}' + ) from e + + if "success" not in data: + raise UnexpectedError(f"Missing 'success' key in response: {data}") + + if resp.status_code != http.HTTPStatus.OK or not data["success"]: + try: + error_codes = data["error_codes"] + except KeyError as e: + raise UnexpectedError( + f"Response missing 'error-codes' key: {data}" + ) from e + try: + exc_tp = ERROR_CODE_MAP[error_codes[0]] + except KeyError as exc: + raise UnexpectedError( + f"Unexpected error code: {error_codes[0]}" + ) from exc + raise exc_tp + + # challenge_ts = timestamp of the challenge load + # (ISO format yyyy-MM-dd'T'HH:mm:ssZZ) + # TODO: maybe run some validation against the hostname and timestamp? + # TODO: log if either field is empty.. it shouldn't cause a failure, + # but it likely means that google has changed their response structure + return ChallengeResponse( + data.get("challenge_ts"), + data.get("hostname"), + ) diff --git a/warehouse/captcha/interfaces.py b/warehouse/captcha/interfaces.py index ada1897792a7..24ed73120ad0 100644 --- a/warehouse/captcha/interfaces.py +++ b/warehouse/captcha/interfaces.py @@ -10,8 +10,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections + from zope.interface import Interface +ChallengeResponse = collections.namedtuple( + "ChallengeResponse", ("challenge_ts", "hostname") +) + class ICaptchaService(Interface): def create_service(context, request): @@ -25,12 +31,12 @@ def enabled() -> bool: Return whether the Captcha service is enabled. """ - def csp_policy() -> str: + def csp_policy() -> dict[str, list[str]]: """ Return the CSP policy appropriate for the Captcha service. """ - def verify_response(response) -> bool: + def verify_response(response) -> ChallengeResponse | None: """ Verify the response from the Captcha service. """ diff --git a/warehouse/captcha/recaptcha.py b/warehouse/captcha/recaptcha.py index 014df2a59a46..25bd2ba1b989 100644 --- a/warehouse/captcha/recaptcha.py +++ b/warehouse/captcha/recaptcha.py @@ -10,14 +10,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections import http from urllib.parse import urlencode from zope.interface import implementer -from .interfaces import ICaptchaService +from .interfaces import ChallengeResponse, ICaptchaService VERIFY_URL = "https://www.google.com/recaptcha/api/siteverify" @@ -53,10 +52,6 @@ class UnexpectedError(RecaptchaError): "invalid-input-response": InvalidInputResponseError, } -ChallengeResponse = collections.namedtuple( - "ChallengeResponse", ("challenge_ts", "hostname") -) - @implementer(ICaptchaService) class Service: @@ -65,6 +60,7 @@ def __init__(self, *, request, script_src_url, site_key, secret_key): self.script_src_url = script_src_url self.site_key = site_key self.secret_key = secret_key + self.class_name = "g-recaptcha" @classmethod def create_service(cls, context, request): diff --git a/warehouse/config.py b/warehouse/config.py index e9aabba8cfb7..230bfa42be8b 100644 --- a/warehouse/config.py +++ b/warehouse/config.py @@ -206,6 +206,8 @@ def configure(settings=None): maybe_set(settings, "captcha.backend", "CAPTCHA_BACKEND") maybe_set(settings, "recaptcha.site_key", "RECAPTCHA_SITE_KEY") maybe_set(settings, "recaptcha.secret_key", "RECAPTCHA_SECRET_KEY") + maybe_set(settings, "hcaptcha.site_key", "HCAPTCHA_SITE_KEY") + maybe_set(settings, "hcaptcha.secret_key", "HCAPTCHA_SECRET_KEY") maybe_set(settings, "sessions.secret", "SESSION_SECRET") maybe_set(settings, "camo.url", "CAMO_URL") maybe_set(settings, "camo.key", "CAMO_KEY") diff --git a/warehouse/templates/includes/input-captcha.html b/warehouse/templates/includes/input-captcha.html index 010870a59c50..64b0e26d22f7 100644 --- a/warehouse/templates/includes/input-captcha.html +++ b/warehouse/templates/includes/input-captcha.html @@ -12,8 +12,9 @@ # limitations under the License. -#} {% macro captcha_html(request, form) -%} - {% if request.find_service(name="captcha").enabled %} -
+ {% set captcha_svc = request.find_service(name="captcha") %} + {% if captcha_svc.enabled %} +
{% if form.g_recaptcha_response.errors %}