Skip to content

Commit 1e0b7b5

Browse files
committed
Implemented distributed tracing
This handles the w3c traceparent header (currently using our own naming, as the spec for the w3c header isn't frozen yet), as well as propagating it to other HTTP services.
1 parent ee930e0 commit 1e0b7b5

23 files changed

+396
-76
lines changed

CHANGELOG.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,14 @@ Further breaking changes:
1313
* Some settings now require a unit for duration or size. See documentation on
1414
configuration for more information.
1515

16-
1716
Other changes:
1817
* on Python 3.7, use [contextvars](https://docs.python.org/3/library/contextvars.html) instead of threadlocals for storing
1918
current transaction and span. This is a necessary precursor for full asyncio support. (#291)
2019

2120
## Unreleased
2221

2322
* fixed an issue with detecting names of wrapped functions that are partials (#294)
24-
23+
2524
## v3.0.1
2625

2726
[Check the diff](https://github.com/elastic/apm-agent-python/compare/v3.0.0...v3.0.1)

elasticapm/base.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,10 @@ def queue(self, event_type, data, flush=False):
196196
flush = False
197197
self._transport.queue(event_type, data, flush)
198198

199-
def begin_transaction(self, transaction_type):
199+
def begin_transaction(self, transaction_type, trace_parent=None):
200200
"""Register the start of a transaction on the client
201201
"""
202-
return self.transaction_store.begin_transaction(transaction_type)
202+
return self.transaction_store.begin_transaction(transaction_type, trace_parent=trace_parent)
203203

204204
def end_transaction(self, name=None, result=""):
205205
transaction = self.transaction_store.end_transaction(result, name)
@@ -351,7 +351,10 @@ def _build_msg_for_logging(
351351

352352
transaction = get_transaction()
353353
if transaction:
354-
event_data["transaction"] = {"id": transaction.id}
354+
if transaction.trace_parent:
355+
event_data["trace_id"] = transaction.trace_parent.trace_id
356+
event_data["parent_id"] = transaction.id
357+
event_data["transaction_id"] = transaction.id
355358

356359
return event_data
357360

elasticapm/conf/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ class Config(_ConfigBase):
220220
framework_version = _ConfigValue("FRAMEWORK_VERSION", default=None)
221221
disable_send = _BoolConfigValue("DISABLE_SEND", default=False)
222222
instrument = _BoolConfigValue("DISABLE_INSTRUMENTATION", default=True)
223+
enable_distributed_tracing = _BoolConfigValue("ENABLE_DISTRIBUTED_TRACING", default=True)
223224

224225

225226
def setup_logging(handler, exclude=["elasticapm", "gunicorn", "south", "elasticapm.errors"]):

elasticapm/conf/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
EVENTS_API_PATH = "/intake/v2/events"
22

3+
TRACE_CONTEXT_VERSION = 0
4+
TRACEPARENT_HEADER_NAME = "elastic-apm-traceparent"
5+
36
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
47

58
KEYWORD_MAX_LENGTH = 1024

elasticapm/contrib/django/apps.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33
from django.apps import AppConfig
44
from django.conf import settings as django_settings
55

6+
from elasticapm.conf import constants
67
from elasticapm.contrib.django.client import get_client
8+
from elasticapm.utils.disttracing import TraceParent
79

810
ERROR_DISPATCH_UID = "elasticapm-exceptions"
911
REQUEST_START_DISPATCH_UID = "elasticapm-request-start"
1012
REQUEST_FINISH_DISPATCH_UID = "elasticapm-request-stop"
1113

14+
TRACEPARENT_HEADER_NAME_WSGI = "HTTP_" + constants.TRACEPARENT_HEADER_NAME.upper().replace("-", "_")
15+
1216

1317
class ElasticAPMConfig(AppConfig):
1418
name = "elasticapm.contrib.django"
@@ -38,11 +42,7 @@ def register_handlers(client):
3842

3943
request_started.disconnect(dispatch_uid=REQUEST_START_DISPATCH_UID)
4044
request_started.connect(
41-
lambda sender, *args, **kwargs: client.begin_transaction("request")
42-
if _should_start_transaction(client)
43-
else None,
44-
dispatch_uid=REQUEST_START_DISPATCH_UID,
45-
weak=False,
45+
partial(_request_started_handler, client), dispatch_uid=REQUEST_START_DISPATCH_UID, weak=False
4646
)
4747

4848
request_finished.disconnect(dispatch_uid=REQUEST_FINISH_DISPATCH_UID)
@@ -65,6 +65,23 @@ def register_handlers(client):
6565
client.logger.debug("Not instrumenting Celery, couldn't import")
6666

6767

68+
def _request_started_handler(client, sender, *args, **kwargs):
69+
if not _should_start_transaction(client):
70+
return
71+
# try to find trace id
72+
traceparent_header = None
73+
if "environ" in kwargs:
74+
traceparent_header = kwargs["environ"].get(TRACEPARENT_HEADER_NAME_WSGI)
75+
elif "scope" in kwargs:
76+
# TODO handle Django Channels
77+
traceparent_header = None
78+
if traceparent_header:
79+
trace_parent = TraceParent.from_string(traceparent_header)
80+
else:
81+
trace_parent = None
82+
client.begin_transaction("request", trace_parent=trace_parent)
83+
84+
6885
def instrument(client):
6986
"""
7087
Auto-instruments code to get nice spans

elasticapm/contrib/django/middleware/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,6 @@ def instrument_middlewares(self):
138138
except ImportError:
139139
client.logger.info("Can't instrument middleware %s", middleware_path)
140140

141-
def process_request(self, request):
142-
if not django_settings.DEBUG or self.client.config.debug:
143-
self.client.begin_transaction("request")
144-
145141
def process_view(self, request, view_func, view_args, view_kwargs):
146142
request._elasticapm_view_func = view_func
147143

elasticapm/contrib/flask/__init__.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
import elasticapm
2020
import elasticapm.instrumentation.control
2121
from elasticapm.base import Client
22-
from elasticapm.conf import setup_logging
22+
from elasticapm.conf import constants, setup_logging
2323
from elasticapm.contrib.flask.utils import get_data_from_request, get_data_from_response
2424
from elasticapm.handlers.logging import LoggingHandler
2525
from elasticapm.utils import build_name_with_http_method_prefix
26+
from elasticapm.utils.disttracing import TraceParent
2627

2728
logger = logging.getLogger("elasticapm.errors.client")
2829

@@ -132,7 +133,11 @@ def init_app(self, app, **defaults):
132133

133134
def request_started(self, app):
134135
if not self.app.debug or self.client.config.debug:
135-
self.client.begin_transaction("request")
136+
if constants.TRACEPARENT_HEADER_NAME in request.headers:
137+
trace_parent = TraceParent.from_string(request.headers[constants.TRACEPARENT_HEADER_NAME])
138+
else:
139+
trace_parent = None
140+
self.client.begin_transaction("request", trace_parent=trace_parent)
136141

137142
def request_finished(self, app, response):
138143
if not self.app.debug or self.client.config.debug:

elasticapm/instrumentation/packages/base.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
class AbstractInstrumentedModule(object):
1212
name = None
13+
mutates_unsampled_arguments = False
1314

1415
instrument_list = [
1516
# List of (module, method) pairs to instrument. E.g.:
@@ -101,10 +102,41 @@ def uninstrument(self):
101102

102103
def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
103104
transaction = get_transaction()
104-
if not transaction or not transaction.is_sampled:
105+
if not transaction:
106+
return wrapped(*args, **kwargs)
107+
elif not transaction.is_sampled:
108+
args, kwargs = self.mutate_unsampled_call_args(module, method, wrapped, instance, args, kwargs, transaction)
105109
return wrapped(*args, **kwargs)
106110
else:
107111
return self.call(module, method, wrapped, instance, args, kwargs)
108112

113+
def mutate_unsampled_call_args(self, module, method, wrapped, instance, args, kwargs, transaction):
114+
"""
115+
Method called for unsampled wrapped calls. This can e.g. be used to add traceparent headers to the
116+
underlying http call for HTTP instrumentations.
117+
118+
:param module:
119+
:param method:
120+
:param wrapped:
121+
:param instance:
122+
:param args:
123+
:param kwargs:
124+
:param transaction:
125+
:return:
126+
"""
127+
return args, kwargs
128+
109129
def call(self, module, method, wrapped, instance, args, kwargs):
130+
"""
131+
Wrapped call. This method should gather all necessary data, then call `wrapped` in a `capture_span` context
132+
manager.
133+
134+
:param module: Name of the wrapped module
135+
:param method: Name of the wrapped method/function
136+
:param wrapped: the wrapped method/function object
137+
:param instance: the wrapped instance
138+
:param args: arguments to the wrapped method/function
139+
:param kwargs: keyword arguments to the wrapped method/function
140+
:return: the result of calling the wrapped method/function
141+
"""
110142
raise NotImplemented

elasticapm/instrumentation/packages/urllib3.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
from elasticapm.conf import constants
12
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
2-
from elasticapm.traces import capture_span
3+
from elasticapm.traces import DroppedSpan, capture_span, get_transaction
34
from elasticapm.utils import default_ports
5+
from elasticapm.utils.disttracing import TracingOptions
46

57

68
class Urllib3Instrumentation(AbstractInstrumentedModule):
@@ -14,6 +16,13 @@ def call(self, module, method, wrapped, instance, args, kwargs):
1416
else:
1517
method = args[0]
1618

19+
headers = None
20+
if "headers" in kwargs:
21+
headers = kwargs["headers"]
22+
if headers is None:
23+
headers = {}
24+
kwargs["headers"] = headers
25+
1726
host = instance.host
1827

1928
if instance.port != default_ports.get(instance.scheme):
@@ -27,6 +36,30 @@ def call(self, module, method, wrapped, instance, args, kwargs):
2736
signature = method.upper() + " " + host
2837

2938
url = instance.scheme + "://" + host + url
39+
transaction = get_transaction()
40+
41+
with capture_span(signature, "ext.http.urllib3", {"url": url}, leaf=True) as span:
42+
# if urllib3 has been called in a leaf span, this span might be a DroppedSpan.
43+
leaf_span = span
44+
while isinstance(leaf_span, DroppedSpan):
45+
leaf_span = leaf_span.parent
3046

31-
with capture_span(signature, "ext.http.urllib3", {"url": url}, leaf=True):
47+
if headers is not None:
48+
trace_parent = transaction.trace_parent.copy_from(
49+
span_id=leaf_span.idx, trace_options=TracingOptions(recorded=True)
50+
)
51+
headers[constants.TRACEPARENT_HEADER_NAME] = trace_parent.to_ascii()
3252
return wrapped(*args, **kwargs)
53+
54+
def mutate_unsampled_call_args(self, module, method, wrapped, instance, args, kwargs, transaction):
55+
# since we don't have a span, we set the span id to the transaction id
56+
trace_parent = transaction.trace_parent.copy_from(
57+
span_id=transaction.id, trace_options=TracingOptions(recorded=False)
58+
)
59+
if "headers" in kwargs:
60+
headers = kwargs["headers"]
61+
if headers is None:
62+
headers = {}
63+
kwargs["headers"] = headers
64+
headers[constants.TRACEPARENT_HEADER_NAME] = trace_parent.to_ascii()
65+
return args, kwargs

elasticapm/traces.py

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
import re
66
import threading
77
import timeit
8-
import uuid
98

109
from elasticapm.conf import constants
1110
from elasticapm.conf.constants import SPAN, TRANSACTION
1211
from elasticapm.utils import compat, encoding, get_name_from_func
12+
from elasticapm.utils.disttracing import TraceParent, TracingOptions
1313

1414
__all__ = ("capture_span", "tag", "set_transaction_name", "set_custom_context", "set_user_context")
1515

@@ -28,9 +28,9 @@
2828

2929

3030
class Transaction(object):
31-
def __init__(self, store, transaction_type="custom", is_sampled=True):
32-
self.id = str(uuid.uuid4())
33-
self.trace_id = None # for later use in distributed tracing
31+
def __init__(self, store, transaction_type="custom", trace_parent=None, is_sampled=True):
32+
self.id = "%016x" % random.getrandbits(64)
33+
self.trace_parent = trace_parent
3434
self.timestamp = datetime.datetime.utcnow()
3535
self.start_time = _time_func()
3636
self.name = None
@@ -61,7 +61,8 @@ def begin_span(self, name, span_type, context=None, leaf=False):
6161
self._span_counter += 1
6262
else:
6363
start = _time_func() - self.start_time
64-
span = Span(self._span_counter, self.id, self.trace_id, name, span_type, start, context, leaf)
64+
span_id = "%016x" % random.getrandbits(64) if self.trace_parent else self._span_counter - 1
65+
span = Span(span_id, self.id, self.trace_parent.trace_id, name, span_type, start, context, leaf=leaf)
6566
span.frames = store.frames_collector_func()
6667
span.parent = parent_span
6768
self._span_counter += 1
@@ -91,7 +92,7 @@ def to_dict(self):
9192
self.context["tags"] = self.tags
9293
result = {
9394
"id": self.id,
94-
"trace_id": self.trace_id,
95+
"trace_id": self.trace_parent.trace_id,
9596
"name": encoding.keyword_field(self.name or ""),
9697
"type": encoding.keyword_field(self.transaction_type),
9798
"duration": self.duration * 1000, # milliseconds
@@ -100,6 +101,10 @@ def to_dict(self):
100101
"sampled": self.is_sampled,
101102
"span_count": {"started": self._span_counter - self.dropped_spans, "dropped": self.dropped_spans},
102103
}
104+
if self.trace_parent:
105+
result["trace_id"] = self.trace_parent.trace_id
106+
if self.trace_parent.span_id:
107+
result["parent_id"] = self.trace_parent.span_id
103108
if self.is_sampled:
104109
result["context"] = self.context
105110
return result
@@ -142,17 +147,19 @@ def __init__(self, idx, transaction_id, trace_id, name, span_type, start_time, c
142147
self.duration = None
143148
self.parent = None
144149
self.frames = None
150+
self.trace_id = trace_id
151+
self.transaction_id = transaction_id
145152

146153
def to_dict(self):
147154
result = {
148-
"id": compat.text_type(self.idx),
155+
"id": self.idx,
149156
"transaction_id": self.transaction_id,
150157
"trace_id": self.trace_id,
158+
"parent_id": self.parent.idx if self.parent else self.transaction_id,
151159
"name": encoding.keyword_field(self.name),
152160
"type": encoding.keyword_field(self.type),
153161
"start": self.start_time * 1000, # milliseconds
154162
"duration": self.duration * 1000, # milliseconds
155-
"parent": self.parent.idx if self.parent else None,
156163
"context": self.context,
157164
}
158165
if self.frames:
@@ -212,14 +219,24 @@ def __len__(self):
212219
with self.cond:
213220
return len(self._transactions)
214221

215-
def begin_transaction(self, transaction_type):
222+
def begin_transaction(self, transaction_type, trace_parent=None):
216223
"""
217224
Start a new transactions and bind it in a thread-local variable
218225
219226
:returns the Transaction object
220227
"""
221-
is_sampled = self._sample_rate == 1.0 or self._sample_rate > random.random()
222-
transaction = Transaction(self, transaction_type, is_sampled=is_sampled)
228+
if trace_parent:
229+
is_sampled = bool(trace_parent.trace_options.recorded)
230+
else:
231+
is_sampled = self._sample_rate == 1.0 or self._sample_rate > random.random()
232+
transaction = Transaction(self, transaction_type, trace_parent=trace_parent, is_sampled=is_sampled)
233+
if trace_parent is None:
234+
transaction.trace_parent = TraceParent(
235+
constants.TRACE_CONTEXT_VERSION,
236+
"%032x" % random.getrandbits(128),
237+
transaction.id,
238+
TracingOptions(recorded=is_sampled),
239+
)
223240
set_transaction(transaction)
224241
return transaction
225242

@@ -264,7 +281,7 @@ def decorated(*args, **kwds):
264281
def __enter__(self):
265282
transaction = get_transaction()
266283
if transaction and transaction.is_sampled:
267-
transaction.begin_span(self.name, self.type, context=self.extra, leaf=self.leaf)
284+
return transaction.begin_span(self.name, self.type, context=self.extra, leaf=self.leaf)
268285

269286
def __exit__(self, exc_type, exc_val, exc_tb):
270287
transaction = get_transaction()

0 commit comments

Comments
 (0)