-
Notifications
You must be signed in to change notification settings - Fork 537
/
Copy path_wsgi_common.py
172 lines (133 loc) · 4.39 KB
/
_wsgi_common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import json
from sentry_sdk.hub import Hub, _should_send_default_pii
from sentry_sdk.utils import AnnotatedValue
from sentry_sdk._compat import text_type, iteritems
from sentry_sdk._types import TYPE_CHECKING
if TYPE_CHECKING:
import sentry_sdk
from typing import Any
from typing import Dict
from typing import Optional
from typing import Union
SENSITIVE_ENV_KEYS = (
"REMOTE_ADDR",
"HTTP_X_FORWARDED_FOR",
"HTTP_SET_COOKIE",
"HTTP_COOKIE",
"HTTP_AUTHORIZATION",
"HTTP_X_API_KEY",
"HTTP_X_FORWARDED_FOR",
"HTTP_X_REAL_IP",
)
SENSITIVE_HEADERS = tuple(
x[len("HTTP_") :] for x in SENSITIVE_ENV_KEYS if x.startswith("HTTP_")
)
def request_body_within_bounds(client, content_length):
# type: (Optional[sentry_sdk.Client], int) -> bool
if client is None:
return False
bodies = client.options["request_bodies"]
return not (
bodies == "never"
or (bodies == "small" and content_length > 10**3)
or (bodies == "medium" and content_length > 10**4)
)
class RequestExtractor(object):
def __init__(self, request):
# type: (Any) -> None
self.request = request
def extract_into_event(self, event):
# type: (Dict[str, Any]) -> None
client = Hub.current.client
if client is None:
return
data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]]
content_length = self.content_length()
request_info = event.get("request", {})
if _should_send_default_pii():
request_info["cookies"] = dict(self.cookies())
if not request_body_within_bounds(client, content_length):
data = AnnotatedValue.removed_because_over_size_limit()
else:
parsed_body = self.parsed_body()
if parsed_body is not None:
data = parsed_body
elif self.raw_data():
data = AnnotatedValue.removed_because_raw_data()
else:
data = None
if data is not None:
request_info["data"] = data
event["request"] = request_info
def content_length(self):
# type: () -> int
try:
return int(self.env().get("CONTENT_LENGTH", 0))
except ValueError:
return 0
def cookies(self):
# type: () -> Dict[str, Any]
raise NotImplementedError()
def raw_data(self):
# type: () -> Optional[Union[str, bytes]]
raise NotImplementedError()
def form(self):
# type: () -> Optional[Dict[str, Any]]
raise NotImplementedError()
def parsed_body(self):
# type: () -> Optional[Dict[str, Any]]
form = self.form()
files = self.files()
if form or files:
data = dict(iteritems(form))
for key, _ in iteritems(files):
data[key] = AnnotatedValue.removed_because_raw_data()
return data
return self.json()
def is_json(self):
# type: () -> bool
return _is_json_content_type(self.env().get("CONTENT_TYPE"))
def json(self):
# type: () -> Optional[Any]
try:
if not self.is_json():
return None
raw_data = self.raw_data()
if raw_data is None:
return None
if isinstance(raw_data, text_type):
return json.loads(raw_data)
else:
return json.loads(raw_data.decode("utf-8"))
except ValueError:
pass
return None
def files(self):
# type: () -> Optional[Dict[str, Any]]
raise NotImplementedError()
def size_of_file(self, file):
# type: (Any) -> int
raise NotImplementedError()
def env(self):
# type: () -> Dict[str, Any]
raise NotImplementedError()
def _is_json_content_type(ct):
# type: (Optional[str]) -> bool
mt = (ct or "").split(";", 1)[0]
return (
mt == "application/json"
or (mt.startswith("application/"))
and mt.endswith("+json")
)
def _filter_headers(headers):
# type: (Dict[str, str]) -> Dict[str, str]
if _should_send_default_pii():
return headers
return {
k: (
v
if k.upper().replace("-", "_") not in SENSITIVE_HEADERS
else AnnotatedValue.removed_because_over_size_limit()
)
for k, v in iteritems(headers)
}