diff --git a/elasticapm/__init__.py b/elasticapm/__init__.py index 763ad88ae..2fbf6dac5 100644 --- a/elasticapm/__init__.py +++ b/elasticapm/__init__.py @@ -8,6 +8,7 @@ :copyright: (c) 2010 by the Sentry Team, see AUTHORS for more details. :license: BSD, see LICENSE for more details. """ +import sys __all__ = ("VERSION", "Client") @@ -22,3 +23,6 @@ from elasticapm.traces import capture_span, set_context, set_custom_context # noqa: F401 from elasticapm.traces import set_transaction_name, set_user_context, tag # noqa: F401 from elasticapm.traces import set_transaction_result # noqa: F401 + +if sys.version_info >= (3, 5): + from elasticapm.contrib.asyncio.traces import async_capture_span # noqa: F401 diff --git a/elasticapm/base.py b/elasticapm/base.py index 04d794cf2..4ed915a8d 100644 --- a/elasticapm/base.py +++ b/elasticapm/base.py @@ -12,6 +12,7 @@ from __future__ import absolute_import import datetime +import inspect import logging import os import platform @@ -136,9 +137,12 @@ def __init__(self, config=None, **inline): else: skip_modules = ("elasticapm.",) - def frames_collector_func(): - return self._get_stack_info_for_trace( - stacks.iter_stack_frames(skip_top_modules=skip_modules), + self.transaction_store = TransactionsStore( + frames_collector_func=lambda: list( + stacks.iter_stack_frames(start_frame=inspect.currentframe(), skip_top_modules=skip_modules) + ), + frames_processing_func=lambda frames: self._get_stack_info_for_trace( + frames, library_frame_context_lines=self.config.source_lines_span_library_frames, in_app_frame_context_lines=self.config.source_lines_span_app_frames, with_locals=self.config.collect_local_variables in ("all", "transactions"), @@ -150,10 +154,7 @@ def frames_collector_func(): ), local_var, ), - ) - - self.transaction_store = TransactionsStore( - frames_collector_func=frames_collector_func, + ), collect_frequency=self.config.flush_interval, sample_rate=self.config.transaction_sample_rate, max_spans=self.config.transaction_max_spans, diff --git a/elasticapm/context/__init__.py b/elasticapm/context/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/elasticapm/context/contextvars.py b/elasticapm/context/contextvars.py new file mode 100644 index 000000000..f1a13928c --- /dev/null +++ b/elasticapm/context/contextvars.py @@ -0,0 +1,31 @@ +from __future__ import absolute_import + +import contextvars + +elasticapm_transaction_var = contextvars.ContextVar("elasticapm_transaction_var") +elasticapm_span_var = contextvars.ContextVar("elasticapm_span_var") + + +def get_transaction(clear=False): + try: + transaction = elasticapm_transaction_var.get() + if clear: + set_transaction(None) + return transaction + except LookupError: + return None + + +def set_transaction(transaction): + elasticapm_transaction_var.set(transaction) + + +def get_span(): + try: + return elasticapm_span_var.get() + except LookupError: + return None + + +def set_span(span): + elasticapm_span_var.set(span) diff --git a/elasticapm/context/threadlocal.py b/elasticapm/context/threadlocal.py new file mode 100644 index 000000000..41b926c2a --- /dev/null +++ b/elasticapm/context/threadlocal.py @@ -0,0 +1,30 @@ +import threading + +thread_local = threading.local() +thread_local.transaction = None +elasticapm_span_var = None + + +def get_transaction(clear=False): + """ + Get the transaction registered for the current thread. + + :return: + :rtype: Transaction + """ + transaction = getattr(thread_local, "transaction", None) + if clear: + thread_local.transaction = None + return transaction + + +def set_transaction(transaction): + thread_local.transaction = transaction + + +def get_span(): + return getattr(thread_local, "span", None) + + +def set_span(span): + thread_local.span = span diff --git a/elasticapm/contrib/asyncio/traces.py b/elasticapm/contrib/asyncio/traces.py new file mode 100644 index 000000000..c8951c1b3 --- /dev/null +++ b/elasticapm/contrib/asyncio/traces.py @@ -0,0 +1,29 @@ +import functools + +from elasticapm.traces import capture_span, error_logger, get_transaction +from elasticapm.utils import get_name_from_func + + +class async_capture_span(capture_span): + def __call__(self, func): + self.name = self.name or get_name_from_func(func) + + @functools.wraps(func) + async def decorated(*args, **kwds): + async with self: + return await func(*args, **kwds) + + return decorated + + async def __aenter__(self): + transaction = get_transaction() + if transaction and transaction.is_sampled: + transaction.begin_span(self.name, self.type, context=self.extra, leaf=self.leaf) + + async def __aexit__(self, exc_type, exc_val, exc_tb): + transaction = get_transaction() + if transaction and transaction.is_sampled: + try: + transaction.end_span(self.skip_frames) + except LookupError: + error_logger.info("ended non-existing span %s of type %s", self.name, self.type) diff --git a/elasticapm/instrumentation/packages/aiohttp.py b/elasticapm/instrumentation/packages/aiohttp.py new file mode 100644 index 000000000..90c358199 --- /dev/null +++ b/elasticapm/instrumentation/packages/aiohttp.py @@ -0,0 +1,29 @@ +from elasticapm import async_capture_span +from elasticapm.instrumentation.packages.asyncio_base import AsyncAbstractInstrumentedModule +from elasticapm.utils import default_ports +from elasticapm.utils.compat import urlparse + + +def get_host_from_url(url): + parsed_url = urlparse.urlparse(url) + host = parsed_url.hostname or " " + + if parsed_url.port and default_ports.get(parsed_url.scheme) != parsed_url.port: + host += ":" + str(parsed_url.port) + + return host + + +class AioHttpClientInstrumentation(AsyncAbstractInstrumentedModule): + name = "aiohttp_client" + + instrument_list = [("aiohttp.client", "ClientSession._request")] + + async def call(self, module, method, wrapped, instance, args, kwargs): + method = kwargs["method"] if "method" in kwargs else args[0] + url = kwargs["method"] if "method" in kwargs else args[1] + + signature = " ".join([method.upper(), get_host_from_url(url)]) + + async with async_capture_span(signature, "ext.http.aiohttp", {"url": url}, leaf=True): + return await wrapped(*args, **kwargs) diff --git a/elasticapm/instrumentation/packages/asyncio_base.py b/elasticapm/instrumentation/packages/asyncio_base.py new file mode 100644 index 000000000..9ff865808 --- /dev/null +++ b/elasticapm/instrumentation/packages/asyncio_base.py @@ -0,0 +1,6 @@ +from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule + + +class AsyncAbstractInstrumentedModule(AbstractInstrumentedModule): + async def call(self, module, method, wrapped, instance, args, kwargs): + raise NotImplementedError() diff --git a/elasticapm/instrumentation/register.py b/elasticapm/instrumentation/register.py index dd5037b0b..32e9f2fc9 100644 --- a/elasticapm/instrumentation/register.py +++ b/elasticapm/instrumentation/register.py @@ -1,3 +1,5 @@ +import sys + from elasticapm.utils.module_import import import_string _cls_register = { @@ -25,6 +27,9 @@ "elasticapm.instrumentation.packages.django.template.DjangoTemplateSourceInstrumentation", } +if sys.version_info >= (3, 5): + _cls_register.update(["elasticapm.instrumentation.packages.aiohttp.AioHttpClientInstrumentation"]) + def register(cls): _cls_register.add(cls) diff --git a/elasticapm/traces.py b/elasticapm/traces.py index e39c1dbe8..bf33727d8 100644 --- a/elasticapm/traces.py +++ b/elasticapm/traces.py @@ -14,42 +14,20 @@ error_logger = logging.getLogger("elasticapm.errors") -thread_local = threading.local() -thread_local.transaction = None - - _time_func = timeit.default_timer TAG_RE = re.compile('^[^.*"]+$') -DROPPED_SPAN = object() -IGNORED_SPAN = object() - - -def get_transaction(clear=False): - """ - Get the transaction registered for the current thread. - - :return: - :rtype: Transaction - """ - transaction = getattr(thread_local, "transaction", None) - if clear: - thread_local.transaction = None - return transaction +try: + from elasticapm.context.contextvars import get_transaction, set_transaction, get_span, set_span +except ImportError: + from elasticapm.context.threadlocal import get_transaction, set_transaction, get_span, set_span class Transaction(object): - def __init__( - self, - frames_collector_func, - transaction_type="custom", - is_sampled=True, - max_spans=None, - span_frames_min_duration=None, - ): + def __init__(self, store, transaction_type="custom", is_sampled=True): self.id = str(uuid.uuid4()) self.timestamp = datetime.datetime.utcnow() self.start_time = _time_func() @@ -57,14 +35,10 @@ def __init__( self.duration = None self.result = None self.transaction_type = transaction_type - self._frames_collector_func = frames_collector_func + self._store = store self.spans = [] - self.span_stack = [] - self.max_spans = max_spans - self.span_frames_min_duration = span_frames_min_duration self.dropped_spans = 0 - self.ignore_subtree = False self.context = {} self.tags = {} @@ -75,46 +49,39 @@ def end_transaction(self, skip_frames=8): self.duration = _time_func() - self.start_time def begin_span(self, name, span_type, context=None, leaf=False): - # If we were already called with `leaf=True`, we'll just push - # a placeholder on the stack. - if self.ignore_subtree: - self.span_stack.append(IGNORED_SPAN) - return None - - if leaf: - self.ignore_subtree = True - - self._span_counter += 1 - - if self.max_spans and self._span_counter > self.max_spans: + parent_span = get_span() + store = self._store + if parent_span and parent_span.leaf: + span = DroppedSpan(parent_span, leaf=True) + elif store.max_spans and self._span_counter > store.max_spans - 1: self.dropped_spans += 1 - self.span_stack.append(DROPPED_SPAN) - return None - - start = _time_func() - self.start_time - span = Span(self._span_counter - 1, name, span_type, start, context) - self.span_stack.append(span) + span = DroppedSpan(parent_span) + self._span_counter += 1 + else: + start = _time_func() - self.start_time + span = Span(self._span_counter, name, span_type, start, context, leaf) + span.frames = store.frames_collector_func() + span.parent = parent_span + self._span_counter += 1 + set_span(span) return span def end_span(self, skip_frames): - span = self.span_stack.pop() - if span is IGNORED_SPAN: - return None - - self.ignore_subtree = False - - if span is DROPPED_SPAN: + span = get_span() + if span is None: + raise LookupError() + if isinstance(span, DroppedSpan): + set_span(span.parent) return span.duration = _time_func() - span.start_time - self.start_time - if self.span_stack: - span.parent = self.span_stack[-1].idx - - if not self.span_frames_min_duration or span.duration >= self.span_frames_min_duration: - span.frames = self._frames_collector_func()[skip_frames:] + if not self._store.span_frames_min_duration or span.duration >= self._store.span_frames_min_duration: + span.frames = self._store.frames_processing_func(span.frames)[skip_frames:] + else: + span.frames = None self.spans.append(span) - + set_span(span.parent) return span def to_dict(self): @@ -168,7 +135,7 @@ def to_dict(self): "type": encoding.keyword_field(self.type), "start": self.start_time * 1000, # milliseconds "duration": self.duration * 1000, # milliseconds - "parent": self.parent, + "parent": self.parent.idx if self.parent else None, "context": self.context, } if self.frames: @@ -176,10 +143,19 @@ def to_dict(self): return result +class DroppedSpan(object): + __slots__ = ("leaf", "parent") + + def __init__(self, parent, leaf=False): + self.parent = parent + self.leaf = leaf + + class TransactionsStore(object): def __init__( self, frames_collector_func, + frames_processing_func, collect_frequency, sample_rate=1.0, max_spans=0, @@ -191,7 +167,8 @@ def __init__( self.collect_frequency = collect_frequency self.max_queue_size = max_queue_size self.max_spans = max_spans - self._frames_collector_func = frames_collector_func + self.frames_processing_func = frames_processing_func + self.frames_collector_func = frames_collector_func self._transactions = [] self._last_collect = _time_func() self._ignore_patterns = [re.compile(p) for p in ignore_patterns or []] @@ -232,14 +209,8 @@ def begin_transaction(self, transaction_type): :returns the Transaction object """ is_sampled = self._sample_rate == 1.0 or self._sample_rate > random.random() - transaction = Transaction( - self._frames_collector_func, - transaction_type, - max_spans=self.max_spans, - span_frames_min_duration=self.span_frames_min_duration, - is_sampled=is_sampled, - ) - thread_local.transaction = transaction + transaction = Transaction(self, transaction_type, is_sampled=is_sampled) + set_transaction(transaction) return transaction def _should_ignore(self, transaction_name): @@ -290,7 +261,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): if transaction and transaction.is_sampled: try: transaction.end_span(self.skip_frames) - except IndexError: + except LookupError: error_logger.info("ended non-existing span %s of type %s", self.name, self.type) diff --git a/elasticapm/utils/stacks.py b/elasticapm/utils/stacks.py index 352301e11..1e54c9543 100644 --- a/elasticapm/utils/stacks.py +++ b/elasticapm/utils/stacks.py @@ -158,7 +158,7 @@ def iter_traceback_frames(tb): tb = tb.tb_next -def iter_stack_frames(frames=None, skip=0, skip_top_modules=()): +def iter_stack_frames(frames=None, start_frame=None, skip=0, skip_top_modules=()): """ Given an optional list of frames (defaults to current stack), iterates over all frames that do not contain the ``__traceback_hide__`` @@ -173,12 +173,13 @@ def iter_stack_frames(frames=None, skip=0, skip_top_modules=()): itself. :param frames: a list of frames, or None + :param start_frame: a Frame object or None :param skip: number of frames to skip from the beginning :param skip_top_modules: tuple of strings """ if not frames: - frame = inspect.currentframe().f_back + frame = start_frame if start_frame is not None else inspect.currentframe().f_back frames = _walk_stack(frame) stop_ignoring = False for i, frame in enumerate(frames): diff --git a/tests/client/client_tests.py b/tests/client/client_tests.py index 7ca8b19ce..b497af0e8 100644 --- a/tests/client/client_tests.py +++ b/tests/client/client_tests.py @@ -693,7 +693,7 @@ def test_transaction_max_spans(should_collect, elasticapm_client): transaction = elasticapm_client.transaction_store.get_all()[0] - assert transaction_obj.max_spans == 5 + assert transaction_obj._store.max_spans == 5 assert transaction_obj.dropped_spans == 10 assert len(transaction["spans"]) == 5 for span in transaction["spans"]: diff --git a/tests/instrumentation/transactions_store_tests.py b/tests/instrumentation/transactions_store_tests.py index 7b4a883de..cc708e66d 100644 --- a/tests/instrumentation/transactions_store_tests.py +++ b/tests/instrumentation/transactions_store_tests.py @@ -10,8 +10,6 @@ @pytest.fixture() def transaction_store(): - mock_get_frames = Mock() - frames = [ { "function": "something_expensive", @@ -138,8 +136,7 @@ def transaction_store(): }, ] - mock_get_frames.return_value = frames - return TransactionsStore(mock_get_frames, 99999) + return TransactionsStore(lambda: frames, lambda frames: frames, 99999) def test_leaf_tracing(transaction_store): @@ -167,34 +164,34 @@ def test_leaf_tracing(transaction_store): def test_get_transaction(): - requests_store = TransactionsStore(lambda: [], 99999) + requests_store = TransactionsStore(lambda: [], lambda: [], 99999) t = requests_store.begin_transaction("test") assert t == get_transaction() def test_get_transaction_clear(): - requests_store = TransactionsStore(lambda: [], 99999) + requests_store = TransactionsStore(lambda: [], lambda: [], 99999) t = requests_store.begin_transaction("test") assert t == get_transaction(clear=True) assert get_transaction() is None def test_should_collect_time(): - requests_store = TransactionsStore(lambda: [], collect_frequency=5) + requests_store = TransactionsStore(lambda: [], lambda: [], collect_frequency=5) requests_store._last_collect -= 6 assert requests_store.should_collect() def test_should_not_collect_time(): - requests_store = TransactionsStore(lambda: [], collect_frequency=5) + requests_store = TransactionsStore(lambda: [], lambda: [], collect_frequency=5) requests_store._last_collect -= 3 assert not requests_store.should_collect() def test_should_collect_count(): - requests_store = TransactionsStore(lambda: [], collect_frequency=5, max_queue_size=5) + requests_store = TransactionsStore(lambda: [], lambda: [], collect_frequency=5, max_queue_size=5) requests_store._transactions = 6 * [1] requests_store._last_collect -= 3 @@ -202,14 +199,14 @@ def test_should_collect_count(): def test_should_not_collect_count(): - requests_store = TransactionsStore(lambda: [], collect_frequency=5, max_queue_size=5) + requests_store = TransactionsStore(lambda: [], lambda: [], collect_frequency=5, max_queue_size=5) requests_store._transactions = 4 * [1] assert not requests_store.should_collect() def test_tag_transaction(): - requests_store = TransactionsStore(lambda: [], 99999) + requests_store = TransactionsStore(lambda: [], lambda: [], 99999) t = requests_store.begin_transaction("test") elasticapm.tag(foo="bar") requests_store.end_transaction(200, "test") @@ -227,7 +224,7 @@ def test_tag_while_no_transaction(caplog): def test_tag_with_non_string_value(): - requests_store = TransactionsStore(lambda: [], 99999) + requests_store = TransactionsStore(lambda: [], lambda: [], 99999) t = requests_store.begin_transaction("test") elasticapm.tag(foo=1) requests_store.end_transaction(200, "test")