forked from open-telemetry/opentelemetry-python-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
267 lines (219 loc) · 8.71 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import urllib
from timeit import default_timer
from typing import Dict, List, Tuple, Union
from aiohttp import web
from multidict import CIMultiDictProxy
from opentelemetry import metrics, trace
from opentelemetry.instrumentation.aiohttp_server.package import _instruments
from opentelemetry.instrumentation.aiohttp_server.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
http_status_to_status_code,
is_http_instrumentation_enabled,
)
from opentelemetry.propagate import extract
from opentelemetry.propagators.textmap import Getter
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util.http import get_excluded_urls, remove_url_credentials
_duration_attrs = [
SpanAttributes.HTTP_METHOD,
SpanAttributes.HTTP_HOST,
SpanAttributes.HTTP_SCHEME,
SpanAttributes.HTTP_STATUS_CODE,
SpanAttributes.HTTP_FLAVOR,
SpanAttributes.HTTP_SERVER_NAME,
SpanAttributes.NET_HOST_NAME,
SpanAttributes.NET_HOST_PORT,
SpanAttributes.HTTP_ROUTE,
]
_active_requests_count_attrs = [
SpanAttributes.HTTP_METHOD,
SpanAttributes.HTTP_HOST,
SpanAttributes.HTTP_SCHEME,
SpanAttributes.HTTP_FLAVOR,
SpanAttributes.HTTP_SERVER_NAME,
]
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__, __version__)
_excluded_urls = get_excluded_urls("AIOHTTP_SERVER")
def _parse_duration_attrs(req_attrs):
duration_attrs = {}
for attr_key in _duration_attrs:
if req_attrs.get(attr_key) is not None:
duration_attrs[attr_key] = req_attrs[attr_key]
return duration_attrs
def _parse_active_request_count_attrs(req_attrs):
active_requests_count_attrs = {}
for attr_key in _active_requests_count_attrs:
if req_attrs.get(attr_key) is not None:
active_requests_count_attrs[attr_key] = req_attrs[attr_key]
return active_requests_count_attrs
def get_default_span_details(request: web.Request) -> Tuple[str, dict]:
"""Default implementation for get_default_span_details
Args:
request: the request object itself.
Returns:
a tuple of the span name, and any attributes to attach to the span.
"""
span_name = request.path.strip() or f"HTTP {request.method}"
return span_name, {}
def _get_view_func(request: web.Request) -> str:
"""Returns the name of the request handler.
Args:
request: the request object itself.
Returns:
a string containing the name of the handler function
"""
try:
return request.match_info.handler.__name__
except AttributeError:
return "unknown"
def collect_request_attributes(request: web.Request) -> Dict:
"""Collects HTTP request attributes from the ASGI scope and returns a
dictionary to be used as span creation attributes."""
server_host, port, http_url = (
request.url.host,
request.url.port,
str(request.url),
)
query_string = request.query_string
if query_string and http_url:
if isinstance(query_string, bytes):
query_string = query_string.decode("utf8")
http_url += "?" + urllib.parse.unquote(query_string)
result = {
SpanAttributes.HTTP_SCHEME: request.scheme,
SpanAttributes.HTTP_HOST: server_host,
SpanAttributes.NET_HOST_PORT: port,
SpanAttributes.HTTP_ROUTE: _get_view_func(request),
SpanAttributes.HTTP_FLAVOR: f"{request.version.major}.{request.version.minor}",
SpanAttributes.HTTP_TARGET: request.path,
SpanAttributes.HTTP_URL: remove_url_credentials(http_url),
}
http_method = request.method
if http_method:
result[SpanAttributes.HTTP_METHOD] = http_method
http_host_value_list = (
[request.host] if not isinstance(request.host, list) else request.host
)
if http_host_value_list:
result[SpanAttributes.HTTP_SERVER_NAME] = ",".join(
http_host_value_list
)
http_user_agent = request.headers.get("user-agent")
if http_user_agent:
result[SpanAttributes.HTTP_USER_AGENT] = http_user_agent
# remove None values
result = {k: v for k, v in result.items() if v is not None}
return result
def set_status_code(span, status_code: int) -> None:
"""Adds HTTP response attributes to span using the status_code argument."""
try:
status_code = int(status_code)
except ValueError:
span.set_status(
Status(
StatusCode.ERROR,
"Non-integer HTTP status: " + repr(status_code),
)
)
else:
span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code)
span.set_status(
Status(http_status_to_status_code(status_code, server_span=True))
)
class AiohttpGetter(Getter):
"""Extract current trace from headers"""
def get(self, carrier, key: str) -> Union[List, None]:
"""Getter implementation to retrieve an HTTP header value from the ASGI
scope.
Args:
carrier: ASGI scope object
key: header name in scope
Returns:
A list of all header values matching the key, or None if the key
does not match any header.
"""
headers: CIMultiDictProxy = carrier.headers
if not headers:
return None
return headers.getall(key, None)
def keys(self, carrier: Dict) -> List:
return list(carrier.keys())
getter = AiohttpGetter()
@web.middleware
async def middleware(request, handler):
"""Middleware for aiohttp implementing tracing logic"""
if not is_http_instrumentation_enabled() or _excluded_urls.url_disabled(
request.url.path
):
return await handler(request)
span_name, additional_attributes = get_default_span_details(request)
req_attrs = collect_request_attributes(request)
duration_attrs = _parse_duration_attrs(req_attrs)
active_requests_count_attrs = _parse_active_request_count_attrs(req_attrs)
duration_histogram = meter.create_histogram(
name=MetricInstruments.HTTP_SERVER_DURATION,
unit="ms",
description="Duration of HTTP server requests.",
)
active_requests_counter = meter.create_up_down_counter(
name=MetricInstruments.HTTP_SERVER_ACTIVE_REQUESTS,
unit="requests",
description="measures the number of concurrent HTTP requests those are currently in flight",
)
with tracer.start_as_current_span(
span_name,
context=extract(request, getter=getter),
kind=trace.SpanKind.SERVER,
) as span:
attributes = collect_request_attributes(request)
attributes.update(additional_attributes)
span.set_attributes(attributes)
start = default_timer()
active_requests_counter.add(1, active_requests_count_attrs)
try:
resp = await handler(request)
set_status_code(span, resp.status)
except web.HTTPException as ex:
set_status_code(span, ex.status_code)
raise
finally:
duration = max((default_timer() - start) * 1000, 0)
duration_histogram.record(duration, duration_attrs)
active_requests_counter.add(-1, active_requests_count_attrs)
return resp
class _InstrumentedApplication(web.Application):
"""Insert tracing middleware"""
def __init__(self, *args, **kwargs):
middlewares = kwargs.pop("middlewares", [])
middlewares.insert(0, middleware)
kwargs["middlewares"] = middlewares
super().__init__(*args, **kwargs)
class AioHttpServerInstrumentor(BaseInstrumentor):
# pylint: disable=protected-access,attribute-defined-outside-init
"""An instrumentor for aiohttp.web.Application
See `BaseInstrumentor`
"""
def _instrument(self, **kwargs):
self._original_app = web.Application
setattr(web, "Application", _InstrumentedApplication)
def _uninstrument(self, **kwargs):
setattr(web, "Application", self._original_app)
def instrumentation_dependencies(self):
return _instruments