|
1 |
| -from __future__ import annotations |
| 1 | +from sentry.users.api.bases.user import RegionSiloUserEndpoint, UserAndStaffPermission, UserEndpoint |
2 | 2 |
|
3 |
| -from django.contrib.auth.models import AnonymousUser |
4 |
| -from rest_framework.permissions import BasePermission |
5 |
| -from rest_framework.request import Request |
6 |
| -from typing_extensions import override |
7 |
| - |
8 |
| -from sentry.api.base import Endpoint |
9 |
| -from sentry.api.exceptions import ResourceDoesNotExist |
10 |
| -from sentry.api.permissions import SentryPermission, StaffPermissionMixin |
11 |
| -from sentry.auth.services.access.service import access_service |
12 |
| -from sentry.auth.superuser import is_active_superuser, superuser_has_permission |
13 |
| -from sentry.auth.system import is_system_auth |
14 |
| -from sentry.models.organization import OrganizationStatus |
15 |
| -from sentry.models.organizationmapping import OrganizationMapping |
16 |
| -from sentry.models.organizationmembermapping import OrganizationMemberMapping |
17 |
| -from sentry.organizations.services.organization import organization_service |
18 |
| -from sentry.users.models.user import User |
19 |
| -from sentry.users.services.user import RpcUser |
20 |
| -from sentry.users.services.user.service import user_service |
21 |
| - |
22 |
| - |
23 |
| -class UserPermission(SentryPermission): |
24 |
| - def has_object_permission(self, request: Request, view, user: User | RpcUser | None = None): |
25 |
| - if user is None or request.user.id == user.id: |
26 |
| - return True |
27 |
| - if is_system_auth(request.auth): |
28 |
| - return True |
29 |
| - if request.auth: |
30 |
| - return False |
31 |
| - |
32 |
| - if is_active_superuser(request): |
33 |
| - # collect admin level permissions (only used when a user is active superuser) |
34 |
| - permissions = access_service.get_permissions_for_user(request.user.id) |
35 |
| - |
36 |
| - if superuser_has_permission(request, permissions): |
37 |
| - return True |
38 |
| - |
39 |
| - return False |
40 |
| - |
41 |
| - |
42 |
| -class UserAndStaffPermission(StaffPermissionMixin, UserPermission): |
43 |
| - """ |
44 |
| - Allows staff to access any endpoints this permission is used on. Note that |
45 |
| - UserPermission already includes a check for Superuser |
46 |
| - """ |
47 |
| - |
48 |
| - |
49 |
| -class OrganizationUserPermission(UserAndStaffPermission): |
50 |
| - scope_map = {"DELETE": ["member:admin"]} |
51 |
| - |
52 |
| - def has_org_permission(self, request: Request, user): |
53 |
| - """ |
54 |
| - Org can act on a user account, if the user is a member of only one org |
55 |
| - e.g. reset org member's 2FA |
56 |
| - """ |
57 |
| - |
58 |
| - organization_id = self._get_single_organization_id(user) |
59 |
| - if organization_id is None: |
60 |
| - return False |
61 |
| - organization = organization_service.get_organization_by_id( |
62 |
| - id=organization_id, user_id=request.user.id |
63 |
| - ) |
64 |
| - if not organization: |
65 |
| - return False |
66 |
| - |
67 |
| - self.determine_access(request, organization) |
68 |
| - assert request.method is not None |
69 |
| - allowed_scopes = set(self.scope_map.get(request.method, [])) |
70 |
| - return any(request.access.has_scope(s) for s in allowed_scopes) |
71 |
| - |
72 |
| - @staticmethod |
73 |
| - def _get_single_organization_id(user) -> int | None: |
74 |
| - """If the user is a member of only one active org, return its ID.""" |
75 |
| - |
76 |
| - # Multiple OrganizationMemberMappings are okay if only one |
77 |
| - # of them points to an *active* organization |
78 |
| - membership_ids = OrganizationMemberMapping.objects.filter(user_id=user.id).values_list( |
79 |
| - "organization_id", flat=True |
80 |
| - ) |
81 |
| - |
82 |
| - try: |
83 |
| - org_mapping = OrganizationMapping.objects.get( |
84 |
| - status=OrganizationStatus.ACTIVE, organization_id__in=membership_ids |
85 |
| - ) |
86 |
| - except (OrganizationMapping.DoesNotExist, OrganizationMapping.MultipleObjectsReturned): |
87 |
| - return None |
88 |
| - return org_mapping.organization_id |
89 |
| - |
90 |
| - def has_object_permission(self, request: Request, view, user=None): |
91 |
| - if super().has_object_permission(request, view, user): |
92 |
| - return True |
93 |
| - return self.has_org_permission(request, user) |
94 |
| - |
95 |
| - |
96 |
| -class UserEndpoint(Endpoint): |
97 |
| - """ |
98 |
| - The base endpoint for APIs that deal with Users. Inherit from this class to |
99 |
| - get permission checks and to automatically convert user ID "me" to the |
100 |
| - currently logged in user's ID. |
101 |
| - """ |
102 |
| - |
103 |
| - permission_classes: tuple[type[BasePermission], ...] = (UserPermission,) |
104 |
| - |
105 |
| - @override |
106 |
| - def convert_args(self, request: Request, user_id: int | str | None = None, *args, **kwargs): |
107 |
| - if user_id == "me": |
108 |
| - if not request.user.is_authenticated: |
109 |
| - raise ResourceDoesNotExist |
110 |
| - user_id = request.user.id |
111 |
| - |
112 |
| - if user_id is None: |
113 |
| - raise ResourceDoesNotExist |
114 |
| - |
115 |
| - try: |
116 |
| - user = User.objects.get(id=user_id) |
117 |
| - except (User.DoesNotExist, ValueError): |
118 |
| - raise ResourceDoesNotExist |
119 |
| - |
120 |
| - self.check_object_permissions(request, user) |
121 |
| - |
122 |
| - kwargs["user"] = user |
123 |
| - return args, kwargs |
124 |
| - |
125 |
| - |
126 |
| -class RegionSiloUserEndpoint(Endpoint): |
127 |
| - """ |
128 |
| - The base endpoint for APIs that deal with Users but live in the region silo. |
129 |
| - Inherit from this class to get permission checks and to automatically |
130 |
| - convert user ID "me" to the currently logged in user's ID. |
131 |
| - """ |
132 |
| - |
133 |
| - permission_classes = (UserPermission,) |
134 |
| - |
135 |
| - @override |
136 |
| - def convert_args(self, request: Request, user_id: str | None = None, *args, **kwargs): |
137 |
| - user: RpcUser | User | None = None |
138 |
| - |
139 |
| - if user_id == "me": |
140 |
| - if isinstance(request.user, AnonymousUser) or not request.user.is_authenticated: |
141 |
| - raise ResourceDoesNotExist |
142 |
| - user = request.user |
143 |
| - elif user_id is not None: |
144 |
| - user = user_service.get_user(user_id=int(user_id)) |
145 |
| - |
146 |
| - if not user: |
147 |
| - raise ResourceDoesNotExist |
148 |
| - |
149 |
| - self.check_object_permissions(request, user) |
150 |
| - |
151 |
| - kwargs["user"] = user |
152 |
| - return args, kwargs |
| 3 | +__all__ = ("UserEndpoint", "RegionSiloUserEndpoint", "UserAndStaffPermission") |
0 commit comments