Skip to content

feat(auth): Return auth error if application is requesting a wrong org #81193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 9 additions & 24 deletions src/sentry/api/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,11 @@ def authenticate_token(self, request: Request, token_str: str) -> tuple[Any, Any
if application_is_inactive:
raise AuthenticationFailed("UserApplication inactive or deleted")

if token.organization_id:
if token.scoping_organization_id:
# We need to make sure the organization to which the token has access is the same as the one in the URL
organization = None
organization_context = organization_service.get_organization_by_id(
id=token.organization_id
id=token.organization_id, include_projects=False, include_teams=False
)
if organization_context:
organization = organization_context.organization
Expand All @@ -439,30 +439,15 @@ def authenticate_token(self, request: Request, token_str: str) -> tuple[Any, Any
organization.slug != target_org_id_or_slug
and organization.id != target_org_id_or_slug
):
# TODO (@athena): We want to raise auth excecption here but to be sure
# I soft launch this by only logging the error for now
# raise AuthenticationFailed("Unauthorized organization access")
logger.info(
"Token has access to organization %s but wants to get access to organization %s: %s",
organization.slug,
target_org_id_or_slug,
request.path_info,
)
else:
# TODO (@athena): We want to limit org level token's access to org level endpoints only
# so in the future this will be an auth exception but for now we soft launch by logging an error
logger.info(
"Token has only access to organization %s but is calling an endpoint for multiple organizations: %s",
organization.slug,
request.path_info,
raise AuthenticationFailed("Unauthorized organization access.")
# We want to limit org scoped tokens access to org level endpoints only
# Except some none-org level endpoints that we added special treatments for
elif resolved_url.url_name not in ["sentry-api-0-organizations"]:
raise AuthenticationFailed(
"This token access is limited to organization endpoints."
)
else:
# TODO (@athena): If there is an organization token we should be able to fetch organization context
# Otherwise we should raise an exception
# For now adding logging to investigate if this is a valid case we need to address
logger.info(
"Token has access to an unknown organization: %s", token.organization_id
)
raise AuthenticationFailed("Cannot resolve organization from token.")

return self.transform_auth(
user,
Expand Down
66 changes: 66 additions & 0 deletions tests/sentry/api/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,72 @@ def test_no_match(self):
self.auth.authenticate(request)


@control_silo_test
class TestOrgScopedAppTokenAuthentication(TestCase):
def setUp(self):
super().setUp()

self.auth = UserAuthTokenAuthentication()
self.org = self.create_organization(owner=self.user)
self.another_org = self.create_organization(owner=self.user)
self.api_token = ApiToken.objects.create(
token_type=AuthTokenType.USER,
user=self.user,
scoping_organization_id=self.org.id,
)
self.token = self.api_token.plaintext_token

def test_authenticate_correct_org(self):
request = HttpRequest()
request.META["HTTP_AUTHORIZATION"] = f"Bearer {self.token}"
request.path_info = f"/api/0/organizations/{self.org.slug}/projects/"

result = self.auth.authenticate(request)
assert result is not None

user, auth = result
assert user.is_anonymous is False
assert user.id == self.user.id
assert AuthenticatedToken.from_token(auth) == AuthenticatedToken.from_token(self.api_token)

def test_authenticate_incorrect_org(self):
request = HttpRequest()
request.META["HTTP_AUTHORIZATION"] = f"Bearer {self.token}"
request.path_info = f"/api/0/organizations/{self.another_org}/projects/"

with pytest.raises(AuthenticationFailed):
self.auth.authenticate(request)

def test_authenticate_user_level_endpoints(self):
request = HttpRequest()
request.META["HTTP_AUTHORIZATION"] = f"Bearer {self.token}"
request.path_info = "/api/0/projects/"

with pytest.raises(AuthenticationFailed):
self.auth.authenticate(request)

def test_authenticate_allowlist_endpoint(self):
request = HttpRequest()
request.META["HTTP_AUTHORIZATION"] = f"Bearer {self.token}"
request.path_info = "/api/0/organizations/"

result = self.auth.authenticate(request)
assert result is not None

user, auth = result
assert user.is_anonymous is False
assert user.id == self.user.id
assert AuthenticatedToken.from_token(auth) == AuthenticatedToken.from_token(self.api_token)

def test_no_match(self):
request = HttpRequest()
request.META["HTTP_AUTHORIZATION"] = "Bearer abc"
request.path_info = f"/api/0/organizations/{self.another_org}/projects/"

with pytest.raises(AuthenticationFailed):
self.auth.authenticate(request)


@django_db_all
@pytest.mark.parametrize("internal", [True, False])
def test_registered_relay(internal):
Expand Down
Loading