Skip to content

Commit 362d36c

Browse files
committed
implement support for asyncio [double-plus-WIP]
Signed-off-by: Benjamin Wohlwend <[email protected]>
1 parent b5848aa commit 362d36c

File tree

6 files changed

+118
-54
lines changed

6 files changed

+118
-54
lines changed

elasticapm/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
:copyright: (c) 2010 by the Sentry Team, see AUTHORS for more details.
99
:license: BSD, see LICENSE for more details.
1010
"""
11+
import sys
1112

1213
__all__ = ('VERSION', 'Client')
1314

@@ -20,6 +21,11 @@
2021
from elasticapm.base import Client
2122
from elasticapm.conf import setup_logging # noqa: F401
2223
from elasticapm.instrumentation.control import instrument, uninstrument # noqa: F401
23-
from elasticapm.traces import capture_span, set_context, set_custom_context # noqa: F401
24+
from elasticapm.traces import set_context, set_custom_context # noqa: F401
2425
from elasticapm.traces import set_transaction_name, set_user_context, tag # noqa: F401
2526
from elasticapm.traces import set_transaction_result # noqa: F401
27+
28+
if sys.version_info >= (3, 5):
29+
from elasticapm.contrib.asyncio.traces import async_capture_span as capture_span # noqa: F401
30+
else:
31+
from elasticapm.traces import capture_span # noqa: F401

elasticapm/context/__init__.py

Whitespace-only changes.

elasticapm/context/contextvars.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from __future__ import absolute_import
2+
3+
import contextvars
4+
5+
elasticapm_transaction_var = contextvars.ContextVar('elasticapm_transaction_var')
6+
elasticapm_span_var = contextvars.ContextVar('elasticapm_span_var')
7+
8+
9+
def get_transaction(clear=False):
10+
try:
11+
transaction = elasticapm_transaction_var.get()
12+
if clear:
13+
set_transaction(None)
14+
return transaction
15+
except LookupError:
16+
return None
17+
18+
19+
def set_transaction(transaction):
20+
elasticapm_transaction_var.set(transaction)
21+
22+
23+
def get_span():
24+
try:
25+
return elasticapm_span_var.get()
26+
except LookupError:
27+
return None
28+
29+
30+
def set_span(span):
31+
elasticapm_span_var.set(span)

elasticapm/context/threadlocal.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import threading
2+
3+
thread_local = threading.local()
4+
thread_local.transaction = None
5+
elasticapm_span_var = None
6+
7+
8+
def get_transaction(clear=False):
9+
"""
10+
Get the transaction registered for the current thread.
11+
12+
:return:
13+
:rtype: Transaction
14+
"""
15+
transaction = getattr(thread_local, "transaction", None)
16+
if clear:
17+
thread_local.transaction = None
18+
return transaction
19+
20+
21+
def set_transaction(transaction):
22+
thread_local.transaction = transaction
23+
24+
25+
def get_span():
26+
return getattr(thread_local, "span", None)
27+
28+
29+
def set_span(span):
30+
thread_local.span = span

elasticapm/contrib/asyncio/traces.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from elasticapm.traces import capture_span, error_logger, get_transaction
2+
3+
4+
class async_capture_span(capture_span):
5+
async def __aenter__(self):
6+
transaction = get_transaction()
7+
if transaction and transaction.is_sampled:
8+
transaction.begin_span(self.name, self.type, context=self.extra, leaf=self.leaf)
9+
10+
async def __aexit__(self, exc_type, exc_val, exc_tb):
11+
transaction = get_transaction()
12+
if transaction and transaction.is_sampled:
13+
try:
14+
transaction.end_span(self.skip_frames)
15+
except LookupError:
16+
error_logger.info('ended non-existing span %s of type %s', self.name, self.type)

elasticapm/traces.py

Lines changed: 34 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,16 @@
1414

1515
error_logger = logging.getLogger('elasticapm.errors')
1616

17-
thread_local = threading.local()
18-
thread_local.transaction = None
19-
20-
2117
_time_func = timeit.default_timer
2218

2319

2420
TAG_RE = re.compile('^[^.*\"]+$')
2521

2622

27-
DROPPED_SPAN = object()
28-
IGNORED_SPAN = object()
29-
30-
31-
def get_transaction(clear=False):
32-
"""
33-
Get the transaction registered for the current thread.
34-
35-
:return:
36-
:rtype: Transaction
37-
"""
38-
transaction = getattr(thread_local, "transaction", None)
39-
if clear:
40-
thread_local.transaction = None
41-
return transaction
23+
try:
24+
from elasticapm.context.contextvars import get_transaction, set_transaction, get_span, set_span
25+
except ImportError:
26+
from elasticapm.context.threadlocal import get_transaction, set_transaction, get_span, set_span
4227

4328

4429
class Transaction(object):
@@ -54,11 +39,9 @@ def __init__(self, frames_collector_func, transaction_type="custom", is_sampled=
5439
self._frames_collector_func = frames_collector_func
5540

5641
self.spans = []
57-
self.span_stack = []
5842
self.max_spans = max_spans
5943
self.span_frames_min_duration = span_frames_min_duration
6044
self.dropped_spans = 0
61-
self.ignore_subtree = False
6245
self.context = {}
6346
self.tags = {}
6447

@@ -69,46 +52,35 @@ def end_transaction(self, skip_frames=8):
6952
self.duration = _time_func() - self.start_time
7053

7154
def begin_span(self, name, span_type, context=None, leaf=False):
72-
# If we were already called with `leaf=True`, we'll just push
73-
# a placeholder on the stack.
74-
if self.ignore_subtree:
75-
self.span_stack.append(IGNORED_SPAN)
76-
return None
77-
78-
if leaf:
79-
self.ignore_subtree = True
80-
81-
self._span_counter += 1
82-
83-
if self.max_spans and self._span_counter > self.max_spans:
55+
parent_span = get_span()
56+
if parent_span and parent_span.leaf:
57+
span = DroppedSpan(parent_span, leaf=True)
58+
elif self.max_spans and self._span_counter > self.max_spans - 1:
8459
self.dropped_spans += 1
85-
self.span_stack.append(DROPPED_SPAN)
86-
return None
87-
88-
start = _time_func() - self.start_time
89-
span = Span(self._span_counter - 1, name, span_type, start, context)
90-
self.span_stack.append(span)
60+
span = DroppedSpan(parent_span)
61+
self._span_counter += 1
62+
else:
63+
start = _time_func() - self.start_time
64+
span = Span(self._span_counter, name, span_type, start, context, leaf)
65+
span.parent = parent_span
66+
self._span_counter += 1
67+
set_span(span)
9168
return span
9269

9370
def end_span(self, skip_frames):
94-
span = self.span_stack.pop()
95-
if span is IGNORED_SPAN:
96-
return None
97-
98-
self.ignore_subtree = False
99-
100-
if span is DROPPED_SPAN:
71+
span = get_span()
72+
if span is None:
73+
raise LookupError()
74+
if isinstance(span, DroppedSpan):
75+
set_span(span.parent)
10176
return
10277

10378
span.duration = _time_func() - span.start_time - self.start_time
10479

105-
if self.span_stack:
106-
span.parent = self.span_stack[-1].idx
107-
10880
if not self.span_frames_min_duration or span.duration >= self.span_frames_min_duration:
10981
span.frames = self._frames_collector_func()[skip_frames:]
11082
self.spans.append(span)
111-
83+
set_span(span.parent)
11284
return span
11385

11486
def to_dict(self):
@@ -162,14 +134,22 @@ def to_dict(self):
162134
'type': encoding.keyword_field(self.type),
163135
'start': self.start_time * 1000, # milliseconds
164136
'duration': self.duration * 1000, # milliseconds
165-
'parent': self.parent,
137+
'parent': self.parent.idx if self.parent else None,
166138
'context': self.context
167139
}
168140
if self.frames:
169141
result['stacktrace'] = self.frames
170142
return result
171143

172144

145+
class DroppedSpan(object):
146+
__slots__ = ('leaf', 'parent')
147+
148+
def __init__(self, parent, leaf=False):
149+
self.parent = parent
150+
self.leaf = leaf
151+
152+
173153
class TransactionsStore(object):
174154
def __init__(self, frames_collector_func, collect_frequency, sample_rate=1.0, max_spans=0, max_queue_size=None,
175155
span_frames_min_duration=None, ignore_patterns=None):
@@ -219,7 +199,8 @@ def begin_transaction(self, transaction_type):
219199
is_sampled = self._sample_rate == 1.0 or self._sample_rate > random.random()
220200
transaction = Transaction(self._frames_collector_func, transaction_type, max_spans=self.max_spans,
221201
span_frames_min_duration=self.span_frames_min_duration, is_sampled=is_sampled)
222-
thread_local.transaction = transaction
202+
203+
set_transaction(transaction)
223204
return transaction
224205

225206
def _should_ignore(self, transaction_name):
@@ -270,7 +251,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
270251
if transaction and transaction.is_sampled:
271252
try:
272253
transaction.end_span(self.skip_frames)
273-
except IndexError:
254+
except LookupError:
274255
error_logger.info('ended non-existing span %s of type %s', self.name, self.type)
275256

276257

0 commit comments

Comments
 (0)