Skip to content

implement support for asyncio [double-plus-WIP] #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions elasticapm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
:copyright: (c) 2010 by the Sentry Team, see AUTHORS for more details.
:license: BSD, see LICENSE for more details.
"""
import sys

__all__ = ("VERSION", "Client")

Expand All @@ -22,3 +23,6 @@
from elasticapm.traces import capture_span, set_context, set_custom_context # noqa: F401
from elasticapm.traces import set_transaction_name, set_user_context, tag # noqa: F401
from elasticapm.traces import set_transaction_result # noqa: F401

if sys.version_info >= (3, 5):
from elasticapm.contrib.asyncio.traces import async_capture_span # noqa: F401
15 changes: 8 additions & 7 deletions elasticapm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from __future__ import absolute_import

import datetime
import inspect
import logging
import os
import platform
Expand Down Expand Up @@ -136,9 +137,12 @@ def __init__(self, config=None, **inline):
else:
skip_modules = ("elasticapm.",)

def frames_collector_func():
return self._get_stack_info_for_trace(
stacks.iter_stack_frames(skip_top_modules=skip_modules),
self.transaction_store = TransactionsStore(
frames_collector_func=lambda: list(
stacks.iter_stack_frames(start_frame=inspect.currentframe(), skip_top_modules=skip_modules)
),
frames_processing_func=lambda frames: self._get_stack_info_for_trace(
frames,
library_frame_context_lines=self.config.source_lines_span_library_frames,
in_app_frame_context_lines=self.config.source_lines_span_app_frames,
with_locals=self.config.collect_local_variables in ("all", "transactions"),
Expand All @@ -150,10 +154,7 @@ def frames_collector_func():
),
local_var,
),
)

self.transaction_store = TransactionsStore(
frames_collector_func=frames_collector_func,
),
collect_frequency=self.config.flush_interval,
sample_rate=self.config.transaction_sample_rate,
max_spans=self.config.transaction_max_spans,
Expand Down
Empty file added elasticapm/context/__init__.py
Empty file.
31 changes: 31 additions & 0 deletions elasticapm/context/contextvars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import absolute_import

import contextvars

elasticapm_transaction_var = contextvars.ContextVar("elasticapm_transaction_var")
elasticapm_span_var = contextvars.ContextVar("elasticapm_span_var")


def get_transaction(clear=False):
try:
transaction = elasticapm_transaction_var.get()
if clear:
set_transaction(None)
return transaction
except LookupError:
return None


def set_transaction(transaction):
elasticapm_transaction_var.set(transaction)


def get_span():
try:
return elasticapm_span_var.get()
except LookupError:
return None


def set_span(span):
elasticapm_span_var.set(span)
30 changes: 30 additions & 0 deletions elasticapm/context/threadlocal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import threading

thread_local = threading.local()
thread_local.transaction = None
elasticapm_span_var = None


def get_transaction(clear=False):
"""
Get the transaction registered for the current thread.

:return:
:rtype: Transaction
"""
transaction = getattr(thread_local, "transaction", None)
if clear:
thread_local.transaction = None
return transaction


def set_transaction(transaction):
thread_local.transaction = transaction


def get_span():
return getattr(thread_local, "span", None)


def set_span(span):
thread_local.span = span
29 changes: 29 additions & 0 deletions elasticapm/contrib/asyncio/traces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import functools

from elasticapm.traces import capture_span, error_logger, get_transaction
from elasticapm.utils import get_name_from_func


class async_capture_span(capture_span):
def __call__(self, func):
self.name = self.name or get_name_from_func(func)

@functools.wraps(func)
async def decorated(*args, **kwds):
async with self:
return await func(*args, **kwds)

return decorated

async def __aenter__(self):
transaction = get_transaction()
if transaction and transaction.is_sampled:
transaction.begin_span(self.name, self.type, context=self.extra, leaf=self.leaf)

async def __aexit__(self, exc_type, exc_val, exc_tb):
transaction = get_transaction()
if transaction and transaction.is_sampled:
try:
transaction.end_span(self.skip_frames)
except LookupError:
error_logger.info("ended non-existing span %s of type %s", self.name, self.type)
29 changes: 29 additions & 0 deletions elasticapm/instrumentation/packages/aiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from elasticapm import async_capture_span
from elasticapm.instrumentation.packages.asyncio_base import AsyncAbstractInstrumentedModule
from elasticapm.utils import default_ports
from elasticapm.utils.compat import urlparse


def get_host_from_url(url):
parsed_url = urlparse.urlparse(url)
host = parsed_url.hostname or " "

if parsed_url.port and default_ports.get(parsed_url.scheme) != parsed_url.port:
host += ":" + str(parsed_url.port)

return host


class AioHttpClientInstrumentation(AsyncAbstractInstrumentedModule):
name = "aiohttp_client"

instrument_list = [("aiohttp.client", "ClientSession._request")]

async def call(self, module, method, wrapped, instance, args, kwargs):
method = kwargs["method"] if "method" in kwargs else args[0]
url = kwargs["method"] if "method" in kwargs else args[1]

signature = " ".join([method.upper(), get_host_from_url(url)])

async with async_capture_span(signature, "ext.http.aiohttp", {"url": url}, leaf=True):
return await wrapped(*args, **kwargs)
6 changes: 6 additions & 0 deletions elasticapm/instrumentation/packages/asyncio_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule


class AsyncAbstractInstrumentedModule(AbstractInstrumentedModule):
async def call(self, module, method, wrapped, instance, args, kwargs):
raise NotImplementedError()
5 changes: 5 additions & 0 deletions elasticapm/instrumentation/register.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sys

from elasticapm.utils.module_import import import_string

_cls_register = {
Expand Down Expand Up @@ -25,6 +27,9 @@
"elasticapm.instrumentation.packages.django.template.DjangoTemplateSourceInstrumentation",
}

if sys.version_info >= (3, 5):
_cls_register.update(["elasticapm.instrumentation.packages.aiohttp.AioHttpClientInstrumentation"])


def register(cls):
_cls_register.add(cls)
Expand Down
119 changes: 45 additions & 74 deletions elasticapm/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,31 @@

error_logger = logging.getLogger("elasticapm.errors")

thread_local = threading.local()
thread_local.transaction = None


_time_func = timeit.default_timer


TAG_RE = re.compile('^[^.*"]+$')


DROPPED_SPAN = object()
IGNORED_SPAN = object()


def get_transaction(clear=False):
"""
Get the transaction registered for the current thread.

:return:
:rtype: Transaction
"""
transaction = getattr(thread_local, "transaction", None)
if clear:
thread_local.transaction = None
return transaction
try:
from elasticapm.context.contextvars import get_transaction, set_transaction, get_span, set_span
except ImportError:
from elasticapm.context.threadlocal import get_transaction, set_transaction, get_span, set_span


class Transaction(object):
def __init__(
self,
frames_collector_func,
transaction_type="custom",
is_sampled=True,
max_spans=None,
span_frames_min_duration=None,
):
def __init__(self, store, transaction_type="custom", is_sampled=True):
self.id = str(uuid.uuid4())
self.timestamp = datetime.datetime.utcnow()
self.start_time = _time_func()
self.name = None
self.duration = None
self.result = None
self.transaction_type = transaction_type
self._frames_collector_func = frames_collector_func
self._store = store

self.spans = []
self.span_stack = []
self.max_spans = max_spans
self.span_frames_min_duration = span_frames_min_duration
self.dropped_spans = 0
self.ignore_subtree = False
self.context = {}
self.tags = {}

Expand All @@ -75,46 +49,39 @@ def end_transaction(self, skip_frames=8):
self.duration = _time_func() - self.start_time

def begin_span(self, name, span_type, context=None, leaf=False):
# If we were already called with `leaf=True`, we'll just push
# a placeholder on the stack.
if self.ignore_subtree:
self.span_stack.append(IGNORED_SPAN)
return None

if leaf:
self.ignore_subtree = True

self._span_counter += 1

if self.max_spans and self._span_counter > self.max_spans:
parent_span = get_span()
store = self._store
if parent_span and parent_span.leaf:
span = DroppedSpan(parent_span, leaf=True)
elif store.max_spans and self._span_counter > store.max_spans - 1:
self.dropped_spans += 1
self.span_stack.append(DROPPED_SPAN)
return None

start = _time_func() - self.start_time
span = Span(self._span_counter - 1, name, span_type, start, context)
self.span_stack.append(span)
span = DroppedSpan(parent_span)
self._span_counter += 1
else:
start = _time_func() - self.start_time
span = Span(self._span_counter, name, span_type, start, context, leaf)
span.frames = store.frames_collector_func()
span.parent = parent_span
self._span_counter += 1
set_span(span)
return span

def end_span(self, skip_frames):
span = self.span_stack.pop()
if span is IGNORED_SPAN:
return None

self.ignore_subtree = False

if span is DROPPED_SPAN:
span = get_span()
if span is None:
raise LookupError()
if isinstance(span, DroppedSpan):
set_span(span.parent)
return

span.duration = _time_func() - span.start_time - self.start_time

if self.span_stack:
span.parent = self.span_stack[-1].idx

if not self.span_frames_min_duration or span.duration >= self.span_frames_min_duration:
span.frames = self._frames_collector_func()[skip_frames:]
if not self._store.span_frames_min_duration or span.duration >= self._store.span_frames_min_duration:
span.frames = self._store.frames_processing_func(span.frames)[skip_frames:]
else:
span.frames = None
self.spans.append(span)

set_span(span.parent)
return span

def to_dict(self):
Expand Down Expand Up @@ -168,18 +135,27 @@ def to_dict(self):
"type": encoding.keyword_field(self.type),
"start": self.start_time * 1000, # milliseconds
"duration": self.duration * 1000, # milliseconds
"parent": self.parent,
"parent": self.parent.idx if self.parent else None,
"context": self.context,
}
if self.frames:
result["stacktrace"] = self.frames
return result


class DroppedSpan(object):
__slots__ = ("leaf", "parent")

def __init__(self, parent, leaf=False):
self.parent = parent
self.leaf = leaf


class TransactionsStore(object):
def __init__(
self,
frames_collector_func,
frames_processing_func,
collect_frequency,
sample_rate=1.0,
max_spans=0,
Expand All @@ -191,7 +167,8 @@ def __init__(
self.collect_frequency = collect_frequency
self.max_queue_size = max_queue_size
self.max_spans = max_spans
self._frames_collector_func = frames_collector_func
self.frames_processing_func = frames_processing_func
self.frames_collector_func = frames_collector_func
self._transactions = []
self._last_collect = _time_func()
self._ignore_patterns = [re.compile(p) for p in ignore_patterns or []]
Expand Down Expand Up @@ -232,14 +209,8 @@ def begin_transaction(self, transaction_type):
:returns the Transaction object
"""
is_sampled = self._sample_rate == 1.0 or self._sample_rate > random.random()
transaction = Transaction(
self._frames_collector_func,
transaction_type,
max_spans=self.max_spans,
span_frames_min_duration=self.span_frames_min_duration,
is_sampled=is_sampled,
)
thread_local.transaction = transaction
transaction = Transaction(self, transaction_type, is_sampled=is_sampled)
set_transaction(transaction)
return transaction

def _should_ignore(self, transaction_name):
Expand Down Expand Up @@ -290,7 +261,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
if transaction and transaction.is_sampled:
try:
transaction.end_span(self.skip_frames)
except IndexError:
except LookupError:
error_logger.info("ended non-existing span %s of type %s", self.name, self.type)


Expand Down
Loading