Skip to content

Commit d0d2422

Browse files
Add traceresponse headers for asgi apps (FastAPI, Starlette)
This asgi version is modeled after the original wsgi version in open-telemetry#436 and corresponds to the SERVER span. Also cleans up some of the existing ASGI functionality to reduce complexity and make future contributions more straightforward.
1 parent c962da9 commit d0d2422

File tree

3 files changed

+184
-42
lines changed

3 files changed

+184
-42
lines changed

CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.8.0-0.27b0...HEAD)
99

10+
### Added
11+
12+
- `opentelemetry-instrumentation-asgi` now returns a `traceresponse` response header.
13+
([#817](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/817))
14+
1015
### Fixed
1116

1217
- `opentelemetry-instrumentation-flask` Flask: Conditionally create SERVER spans
@@ -28,6 +33,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2833

2934
## [1.7.1-0.26b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.7.0-0.26b0) - 2021-11-11
3035

36+
### Added
37+
3138
- `opentelemetry-instrumentation-aws-lambda` Add instrumentation for AWS Lambda Service - pkg metadata files (Part 1/2)
3239
([#739](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/739))
3340
- Add support for Python 3.10

instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py

+99-42
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,14 @@ def client_response_hook(span: Span, message: dict):
103103

104104
from opentelemetry import context, trace
105105
from opentelemetry.instrumentation.asgi.version import __version__ # noqa
106+
from opentelemetry.instrumentation.propagators import (
107+
get_global_response_propagator,
108+
)
106109
from opentelemetry.instrumentation.utils import http_status_to_status_code
107110
from opentelemetry.propagate import extract
108-
from opentelemetry.propagators.textmap import Getter
111+
from opentelemetry.propagators.textmap import Getter, Setter
109112
from opentelemetry.semconv.trace import SpanAttributes
110-
from opentelemetry.trace import Span
113+
from opentelemetry.trace import Span, set_span_in_context
111114
from opentelemetry.trace.status import Status, StatusCode
112115
from opentelemetry.util.http import remove_url_credentials
113116

@@ -152,6 +155,30 @@ def keys(self, carrier: dict) -> typing.List[str]:
152155
asgi_getter = ASGIGetter()
153156

154157

158+
class ASGISetter(Setter):
159+
def set(
160+
self, carrier: dict, key: str, value: str
161+
) -> None: # pylint: disable=no-self-use
162+
"""Sets response header values on an ASGI scope according to `the spec <https://asgi.readthedocs.io/en/latest/specs/www.html#response-start-send-event>`_.
163+
164+
Args:
165+
carrier: ASGI scope object
166+
key: response header name to set
167+
value: response header value
168+
Returns:
169+
None
170+
"""
171+
headers = carrier.get("headers")
172+
if not headers:
173+
headers = []
174+
carrier["headers"] = headers
175+
176+
headers.append([key.lower().encode(), value.encode()])
177+
178+
179+
asgi_setter = ASGISetter()
180+
181+
155182
def collect_request_attributes(scope):
156183
"""Collects HTTP request attributes from the ASGI scope and returns a
157184
dictionary to be used as span creation attributes."""
@@ -295,54 +322,84 @@ async def __call__(self, scope, receive, send):
295322
return await self.app(scope, receive, send)
296323

297324
token = context.attach(extract(scope, getter=asgi_getter))
298-
span_name, additional_attributes = self.default_span_details(scope)
325+
server_span_name, additional_attributes = self.default_span_details(
326+
scope
327+
)
299328

300329
try:
301330
with self.tracer.start_as_current_span(
302-
span_name,
331+
server_span_name,
303332
kind=trace.SpanKind.SERVER,
304-
) as span:
305-
if span.is_recording():
333+
) as server_span:
334+
if server_span.is_recording():
306335
attributes = collect_request_attributes(scope)
307336
attributes.update(additional_attributes)
308337
for key, value in attributes.items():
309-
span.set_attribute(key, value)
338+
server_span.set_attribute(key, value)
310339

311340
if callable(self.server_request_hook):
312-
self.server_request_hook(span, scope)
313-
314-
@wraps(receive)
315-
async def wrapped_receive():
316-
with self.tracer.start_as_current_span(
317-
" ".join((span_name, scope["type"], "receive"))
318-
) as receive_span:
319-
if callable(self.client_request_hook):
320-
self.client_request_hook(receive_span, scope)
321-
message = await receive()
322-
if receive_span.is_recording():
323-
if message["type"] == "websocket.receive":
324-
set_status_code(receive_span, 200)
325-
receive_span.set_attribute("type", message["type"])
326-
return message
327-
328-
@wraps(send)
329-
async def wrapped_send(message):
330-
with self.tracer.start_as_current_span(
331-
" ".join((span_name, scope["type"], "send"))
332-
) as send_span:
333-
if callable(self.client_response_hook):
334-
self.client_response_hook(send_span, message)
335-
if send_span.is_recording():
336-
if message["type"] == "http.response.start":
337-
status_code = message["status"]
338-
set_status_code(span, status_code)
339-
set_status_code(send_span, status_code)
340-
elif message["type"] == "websocket.send":
341-
set_status_code(span, 200)
342-
set_status_code(send_span, 200)
343-
send_span.set_attribute("type", message["type"])
344-
await send(message)
345-
346-
await self.app(scope, wrapped_receive, wrapped_send)
341+
self.server_request_hook(server_span, scope)
342+
343+
otel_receive = self._get_otel_receive(
344+
server_span_name, scope, receive
345+
)
346+
347+
otel_send = self._get_otel_send(
348+
server_span,
349+
server_span_name,
350+
scope,
351+
send,
352+
)
353+
354+
await self.app(scope, otel_receive, otel_send)
347355
finally:
348356
context.detach(token)
357+
358+
def _get_otel_receive(self, server_span_name, scope, receive):
359+
@wraps(receive)
360+
async def otel_receive():
361+
with self.tracer.start_as_current_span(
362+
" ".join((server_span_name, scope["type"], "receive"))
363+
) as receive_span:
364+
if callable(self.client_request_hook):
365+
self.client_request_hook(receive_span, scope)
366+
message = await receive()
367+
if receive_span.is_recording():
368+
if message["type"] == "websocket.receive":
369+
set_status_code(receive_span, 200)
370+
receive_span.set_attribute("type", message["type"])
371+
return message
372+
373+
return otel_receive
374+
375+
def _get_otel_send(self, server_span, server_span_name, scope, send):
376+
@wraps(send)
377+
async def otel_send(message):
378+
with self.tracer.start_as_current_span(
379+
" ".join((server_span_name, scope["type"], "send"))
380+
) as send_span:
381+
if callable(self.client_response_hook):
382+
self.client_response_hook(send_span, message)
383+
if send_span.is_recording():
384+
if message["type"] == "http.response.start":
385+
status_code = message["status"]
386+
set_status_code(server_span, status_code)
387+
set_status_code(send_span, status_code)
388+
elif message["type"] == "websocket.send":
389+
set_status_code(server_span, 200)
390+
set_status_code(send_span, 200)
391+
send_span.set_attribute("type", message["type"])
392+
393+
propagator = get_global_response_propagator()
394+
if propagator:
395+
propagator.inject(
396+
message,
397+
context=set_span_in_context(
398+
server_span, trace.context_api.Context()
399+
),
400+
setter=asgi_setter,
401+
)
402+
403+
await send(message)
404+
405+
return otel_send

instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py

+78
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@
1818

1919
import opentelemetry.instrumentation.asgi as otel_asgi
2020
from opentelemetry import trace as trace_api
21+
from opentelemetry.instrumentation.propagators import (
22+
TraceResponsePropagator,
23+
get_global_response_propagator,
24+
set_global_response_propagator,
25+
)
2126
from opentelemetry.sdk import resources
2227
from opentelemetry.semconv.trace import SpanAttributes
2328
from opentelemetry.test.asgitestutil import (
2429
AsgiTestBase,
2530
setup_testing_defaults,
2631
)
2732
from opentelemetry.test.test_base import TestBase
33+
from opentelemetry.trace import format_span_id, format_trace_id
2834

2935

3036
async def http_app(scope, receive, send):
@@ -287,6 +293,38 @@ def update_expected_user_agent(expected):
287293
outputs = self.get_all_output()
288294
self.validate_outputs(outputs, modifiers=[update_expected_user_agent])
289295

296+
def test_traceresponse_header(self):
297+
"""Test a traceresponse header is sent when a global propagator is set."""
298+
299+
orig = get_global_response_propagator()
300+
set_global_response_propagator(TraceResponsePropagator())
301+
302+
app = otel_asgi.OpenTelemetryMiddleware(simple_asgi)
303+
self.seed_app(app)
304+
self.send_default_request()
305+
306+
span = self.memory_exporter.get_finished_spans()[-1]
307+
self.assertEqual(trace_api.SpanKind.SERVER, span.kind)
308+
309+
response_start, response_body, *_ = self.get_all_output()
310+
self.assertEqual(response_body["body"], b"*")
311+
self.assertEqual(response_start["status"], 200)
312+
313+
traceresponse = "00-{0}-{1}-01".format(
314+
format_trace_id(span.get_span_context().trace_id),
315+
format_span_id(span.get_span_context().span_id),
316+
)
317+
self.assertListEqual(
318+
response_start["headers"],
319+
[
320+
[b"Content-Type", b"text/plain"],
321+
[b"traceresponse", f"{traceresponse}".encode()],
322+
[b"access-control-expose-headers", b"traceresponse"],
323+
],
324+
)
325+
326+
set_global_response_propagator(orig)
327+
290328
def test_websocket(self):
291329
self.scope = {
292330
"type": "websocket",
@@ -359,6 +397,46 @@ def test_websocket(self):
359397
self.assertEqual(span.kind, expected["kind"])
360398
self.assertDictEqual(dict(span.attributes), expected["attributes"])
361399

400+
def test_websocket_traceresponse_header(self):
401+
"""Test a traceresponse header is set for websocket messages"""
402+
403+
orig = get_global_response_propagator()
404+
set_global_response_propagator(TraceResponsePropagator())
405+
406+
self.scope = {
407+
"type": "websocket",
408+
"http_version": "1.1",
409+
"scheme": "ws",
410+
"path": "/",
411+
"query_string": b"",
412+
"headers": [],
413+
"client": ("127.0.0.1", 32767),
414+
"server": ("127.0.0.1", 80),
415+
}
416+
app = otel_asgi.OpenTelemetryMiddleware(simple_asgi)
417+
self.seed_app(app)
418+
self.send_input({"type": "websocket.connect"})
419+
self.send_input({"type": "websocket.receive", "text": "ping"})
420+
self.send_input({"type": "websocket.disconnect"})
421+
_, socket_send, *_ = self.get_all_output()
422+
423+
span = self.memory_exporter.get_finished_spans()[-1]
424+
self.assertEqual(trace_api.SpanKind.SERVER, span.kind)
425+
426+
traceresponse = "00-{0}-{1}-01".format(
427+
format_trace_id(span.get_span_context().trace_id),
428+
format_span_id(span.get_span_context().span_id),
429+
)
430+
self.assertListEqual(
431+
socket_send["headers"],
432+
[
433+
[b"traceresponse", f"{traceresponse}".encode()],
434+
[b"access-control-expose-headers", b"traceresponse"],
435+
],
436+
)
437+
438+
set_global_response_propagator(orig)
439+
362440
def test_lifespan(self):
363441
self.scope["type"] = "lifespan"
364442
app = otel_asgi.OpenTelemetryMiddleware(simple_asgi)

0 commit comments

Comments
 (0)