Skip to content

Commit 85cdb58

Browse files
committed
calculate span timestamp based of transaction timestamp
1 parent 24d5b8b commit 85cdb58

File tree

5 files changed

+57
-81
lines changed

5 files changed

+57
-81
lines changed

elasticapm/base.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import elasticapm
2424
from elasticapm.conf import Config, constants
2525
from elasticapm.conf.constants import ERROR
26-
from elasticapm.traces import TransactionsStore, get_transaction
26+
from elasticapm.traces import Tracer, get_transaction
2727
from elasticapm.utils import compat, is_master_process, stacks, varmap
2828
from elasticapm.utils.encoding import keyword_field, shorten, transform
2929
from elasticapm.utils.module_import import import_string
@@ -68,7 +68,7 @@ def __init__(self, config=None, **inline):
6868
self.logger = logging.getLogger("%s.%s" % (cls.__module__, cls.__name__))
6969
self.error_logger = logging.getLogger("elasticapm.errors")
7070

71-
self.transaction_store = None
71+
self.tracer = None
7272
self.processors = []
7373
self.filter_exception_types_dict = {}
7474
self._service_info = None
@@ -113,7 +113,7 @@ def __init__(self, config=None, **inline):
113113
else:
114114
skip_modules = ("elasticapm.",)
115115

116-
self.transaction_store = TransactionsStore(
116+
self.tracer = Tracer(
117117
frames_collector_func=lambda: list(
118118
stacks.iter_stack_frames(start_frame=inspect.currentframe(), skip_top_modules=skip_modules)
119119
),
@@ -199,10 +199,10 @@ def queue(self, event_type, data, flush=False):
199199
def begin_transaction(self, transaction_type, trace_parent=None):
200200
"""Register the start of a transaction on the client
201201
"""
202-
return self.transaction_store.begin_transaction(transaction_type, trace_parent=trace_parent)
202+
return self.tracer.begin_transaction(transaction_type, trace_parent=trace_parent)
203203

204204
def end_transaction(self, name=None, result=""):
205-
transaction = self.transaction_store.end_transaction(result, name)
205+
transaction = self.tracer.end_transaction(result, name)
206206
return transaction
207207

208208
def close(self):

elasticapm/instrumentation/packages/urllib3.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def call(self, module, method, wrapped, instance, args, kwargs):
4646

4747
if headers is not None:
4848
trace_parent = transaction.trace_parent.copy_from(
49-
span_id=leaf_span.idx, trace_options=TracingOptions(recorded=True)
49+
span_id=leaf_span.id, trace_options=TracingOptions(recorded=True)
5050
)
5151
headers[constants.TRACEPARENT_HEADER_NAME] = trace_parent.to_ascii()
5252
return wrapped(*args, **kwargs)

elasticapm/traces.py

Lines changed: 32 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
import datetime
21
import functools
32
import logging
43
import random
54
import re
6-
import threading
5+
import time
76
import timeit
87

98
from elasticapm.conf import constants
@@ -28,16 +27,15 @@
2827

2928

3029
class Transaction(object):
31-
def __init__(self, store, transaction_type="custom", trace_parent=None, is_sampled=True):
30+
def __init__(self, tracer, transaction_type="custom", trace_parent=None, is_sampled=True):
3231
self.id = "%016x" % random.getrandbits(64)
3332
self.trace_parent = trace_parent
34-
self.timestamp = datetime.datetime.utcnow()
35-
self.start_time = _time_func()
33+
self.timestamp, self.start_time = time.time(), _time_func()
3634
self.name = None
3735
self.duration = None
3836
self.result = None
3937
self.transaction_type = transaction_type
40-
self._store = store
38+
self._tracer = tracer
4139

4240
self.spans = []
4341
self.dropped_spans = 0
@@ -47,22 +45,20 @@ def __init__(self, store, transaction_type="custom", trace_parent=None, is_sampl
4745
self.is_sampled = is_sampled
4846
self._span_counter = 0
4947

50-
def end_transaction(self, skip_frames=8):
48+
def end_transaction(self):
5149
self.duration = _time_func() - self.start_time
5250

5351
def begin_span(self, name, span_type, context=None, leaf=False):
5452
parent_span = get_span()
55-
store = self._store
53+
store = self._tracer
5654
if parent_span and parent_span.leaf:
5755
span = DroppedSpan(parent_span, leaf=True)
5856
elif store.max_spans and self._span_counter > store.max_spans - 1:
5957
self.dropped_spans += 1
6058
span = DroppedSpan(parent_span)
6159
self._span_counter += 1
6260
else:
63-
start = _time_func() - self.start_time
64-
span_id = "%016x" % random.getrandbits(64) if self.trace_parent else self._span_counter - 1
65-
span = Span(span_id, self.id, self.trace_parent.trace_id, name, span_type, start, context, leaf=leaf)
61+
span = Span(transaction=self, name=name, span_type=span_type, context=context, leaf=leaf)
6662
span.frames = store.frames_collector_func()
6763
span.parent = parent_span
6864
self._span_counter += 1
@@ -77,15 +73,15 @@ def end_span(self, skip_frames):
7773
set_span(span.parent)
7874
return
7975

80-
span.duration = _time_func() - span.start_time - self.start_time
76+
span.duration = _time_func() - span.start_time
8177

82-
if not self._store.span_frames_min_duration or span.duration >= self._store.span_frames_min_duration:
83-
span.frames = self._store.frames_processing_func(span.frames)[skip_frames:]
78+
if not self._tracer.span_frames_min_duration or span.duration >= self._tracer.span_frames_min_duration:
79+
span.frames = self._tracer.frames_processing_func(span.frames)[skip_frames:]
8480
else:
8581
span.frames = None
8682
self.spans.append(span)
8783
set_span(span.parent)
88-
self._store.queue_func(SPAN, span.to_dict())
84+
self._tracer.queue_func(SPAN, span.to_dict())
8985
return span
9086

9187
def to_dict(self):
@@ -97,7 +93,7 @@ def to_dict(self):
9793
"type": encoding.keyword_field(self.transaction_type),
9894
"duration": self.duration * 1000, # milliseconds
9995
"result": encoding.keyword_field(str(self.result)),
100-
"timestamp": self.timestamp.strftime(constants.TIMESTAMP_FORMAT),
96+
"timestamp": int(self.timestamp * 1000000), # microseconds
10197
"sampled": self.is_sampled,
10298
"span_count": {"started": self._span_counter - self.dropped_spans, "dropped": self.dropped_spans},
10399
}
@@ -112,53 +108,54 @@ def to_dict(self):
112108

113109
class Span(object):
114110
__slots__ = (
115-
"idx",
116-
"transaction_id",
117-
"trace_id",
111+
"id",
112+
"transaction",
118113
"name",
119114
"type",
120115
"context",
121116
"leaf",
117+
"timestamp",
122118
"start_time",
123119
"duration",
124120
"parent",
125121
"frames",
126122
)
127123

128-
def __init__(self, idx, transaction_id, trace_id, name, span_type, start_time, context=None, leaf=False):
124+
def __init__(self, transaction, name, span_type, context=None, leaf=False):
129125
"""
130126
Create a new Span
131127
132-
:param idx: Index of this span
128+
:param transaction: transaction object that this span relates to
133129
:param name: Generic name of the span
134130
:param span_type: type of the span
135-
:param start_time: start time relative to the transaction
136131
:param context: context dictionary
137-
:param leaf: is this transaction a leaf transaction?
132+
:param leaf: is this span a leaf span?
138133
"""
139-
self.idx = idx
140-
self.transaction_id = transaction_id
141-
self.trace_id = trace_id
134+
self.start_time = _time_func()
135+
self.id = "%016x" % random.getrandbits(64)
136+
self.transaction = transaction
142137
self.name = name
143138
self.type = span_type
144139
self.context = context
145140
self.leaf = leaf
146-
self.start_time = start_time
141+
# timestamp is bit of a mix of monotonic and non-monotonic time sources.
142+
# we take the (non-monotonic) transaction timestamp, and add the (monotonic) difference of span
143+
# start time and transaction start time. In this respect, the span timestamp is guaranteed to grow
144+
# monotonically with respect to the transaction timestamp
145+
self.timestamp = transaction.timestamp + (self.start_time - transaction.start_time)
147146
self.duration = None
148147
self.parent = None
149148
self.frames = None
150-
self.trace_id = trace_id
151-
self.transaction_id = transaction_id
152149

153150
def to_dict(self):
154151
result = {
155-
"id": self.idx,
156-
"transaction_id": self.transaction_id,
157-
"trace_id": self.trace_id,
158-
"parent_id": self.parent.idx if self.parent else self.transaction_id,
152+
"id": self.id,
153+
"transaction_id": self.transaction.id,
154+
"trace_id": self.transaction.trace_parent.trace_id,
155+
"parent_id": self.parent.id if self.parent else self.transaction.id,
159156
"name": encoding.keyword_field(self.name),
160157
"type": encoding.keyword_field(self.type),
161-
"start": self.start_time * 1000, # milliseconds
158+
"timestamp": int(self.timestamp * 1000000), # microseconds
162159
"duration": self.duration * 1000, # milliseconds
163160
"context": self.context,
164161
}
@@ -175,7 +172,7 @@ def __init__(self, parent, leaf=False):
175172
self.leaf = leaf
176173

177174

178-
class TransactionsStore(object):
175+
class Tracer(object):
179176
def __init__(
180177
self,
181178
frames_collector_func,
@@ -186,13 +183,10 @@ def __init__(
186183
span_frames_min_duration=None,
187184
ignore_patterns=None,
188185
):
189-
self.cond = threading.Condition()
190186
self.max_spans = max_spans
191187
self.queue_func = queue_func
192188
self.frames_processing_func = frames_processing_func
193189
self.frames_collector_func = frames_collector_func
194-
self._transactions = []
195-
self._last_collect = _time_func()
196190
self._ignore_patterns = [re.compile(p) for p in ignore_patterns or []]
197191
self._sample_rate = sample_rate
198192
if span_frames_min_duration in (-1, None):
@@ -201,24 +195,6 @@ def __init__(
201195
else:
202196
self.span_frames_min_duration = span_frames_min_duration / 1000.0
203197

204-
def add_transaction(self, transaction):
205-
with self.cond:
206-
self._transactions.append(transaction)
207-
self.cond.notify()
208-
209-
def get_all(self, blocking=False):
210-
with self.cond:
211-
# If blocking is true, always return at least 1 item
212-
while blocking and len(self._transactions) == 0:
213-
self.cond.wait()
214-
transactions, self._transactions = self._transactions, []
215-
self._last_collect = _time_func()
216-
return transactions
217-
218-
def __len__(self):
219-
with self.cond:
220-
return len(self._transactions)
221-
222198
def begin_transaction(self, transaction_type, trace_parent=None):
223199
"""
224200
Start a new transactions and bind it in a thread-local variable

tests/client/client_tests.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ def test_transaction_max_spans(elasticapm_client):
586586
spans = elasticapm_client.events[SPAN]
587587
assert all(span["transaction_id"] == transaction["id"] for span in spans)
588588

589-
assert transaction_obj._store.max_spans == 5
589+
assert transaction_obj._tracer.max_spans == 5
590590
assert transaction_obj.dropped_spans == 10
591591
assert len(spans) == 5
592592
for span in spans:
@@ -676,22 +676,22 @@ def test_transaction_context_is_used_in_errors(elasticapm_client):
676676
assert "foo" not in transaction.context["custom"]
677677

678678

679-
def test_transaction_keyword_truncation(sending_elasticapm_client):
679+
def test_transaction_keyword_truncation(elasticapm_client):
680680
too_long = "x" * (KEYWORD_MAX_LENGTH + 1)
681681
expected = encoding.keyword_field(too_long)
682682
assert too_long != expected
683683
assert len(expected) == KEYWORD_MAX_LENGTH
684684
assert expected[-1] != "x"
685-
sending_elasticapm_client.begin_transaction(too_long)
685+
elasticapm_client.begin_transaction(too_long)
686686
elasticapm.tag(val=too_long)
687687
elasticapm.set_user_context(username=too_long, email=too_long, user_id=too_long)
688688
with elasticapm.capture_span(name=too_long, span_type=too_long):
689689
pass
690-
sending_elasticapm_client.end_transaction(too_long, too_long)
691-
sending_elasticapm_client.close()
692-
assert sending_elasticapm_client.httpserver.responses[0]["code"] == 202
693-
span = sending_elasticapm_client.httpserver.payloads[0][1]["span"]
694-
transaction = sending_elasticapm_client.httpserver.payloads[0][2]["transaction"]
690+
elasticapm_client.end_transaction(too_long, too_long)
691+
elasticapm_client.close()
692+
693+
span = elasticapm_client.events["span"][0]
694+
transaction = elasticapm_client.events["transaction"][0]
695695

696696
assert transaction["name"] == expected
697697
assert transaction["type"] == expected

tests/instrumentation/transactions_store_tests.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77

88
import elasticapm
99
from elasticapm.conf.constants import SPAN, TRANSACTION
10-
from elasticapm.traces import TransactionsStore, capture_span, get_transaction
10+
from elasticapm.traces import Tracer, capture_span, get_transaction
1111

1212

1313
@pytest.fixture()
14-
def transaction_store():
14+
def tracer():
1515
frames = [
1616
{
1717
"function": "something_expensive",
@@ -143,13 +143,13 @@ def transaction_store():
143143
def queue(event_type, event, flush=False):
144144
events[event_type].append(event)
145145

146-
store = TransactionsStore(lambda: frames, lambda frames: frames, queue)
146+
store = Tracer(lambda: frames, lambda frames: frames, queue)
147147
store.events = events
148148
return store
149149

150150

151-
def test_leaf_tracing(transaction_store):
152-
transaction_store.begin_transaction("transaction.test")
151+
def test_leaf_tracing(tracer):
152+
tracer.begin_transaction("transaction.test")
153153

154154
with capture_span("root", "custom"):
155155
with capture_span("child1-leaf", "custom", leaf=True):
@@ -161,9 +161,9 @@ def test_leaf_tracing(transaction_store):
161161
with capture_span("ignored-child2", "custom", leaf=False):
162162
time.sleep(0.01)
163163

164-
transaction_store.end_transaction(None, "transaction")
164+
tracer.end_transaction(None, "transaction")
165165

166-
spans = transaction_store.events[SPAN]
166+
spans = tracer.events[SPAN]
167167

168168
assert len(spans) == 2
169169

@@ -172,20 +172,20 @@ def test_leaf_tracing(transaction_store):
172172

173173

174174
def test_get_transaction():
175-
requests_store = TransactionsStore(lambda: [], lambda: [], lambda *args: None)
175+
requests_store = Tracer(lambda: [], lambda: [], lambda *args: None)
176176
t = requests_store.begin_transaction("test")
177177
assert t == get_transaction()
178178

179179

180180
def test_get_transaction_clear():
181-
requests_store = TransactionsStore(lambda: [], lambda: [], lambda *args: None)
181+
requests_store = Tracer(lambda: [], lambda: [], lambda *args: None)
182182
t = requests_store.begin_transaction("test")
183183
assert t == get_transaction(clear=True)
184184
assert get_transaction() is None
185185

186186

187187
def test_tag_transaction():
188-
requests_store = TransactionsStore(lambda: [], lambda: [], lambda *args: None)
188+
requests_store = Tracer(lambda: [], lambda: [], lambda *args: None)
189189
t = requests_store.begin_transaction("test")
190190
elasticapm.tag(foo="bar")
191191
requests_store.end_transaction(200, "test")
@@ -203,7 +203,7 @@ def test_tag_while_no_transaction(caplog):
203203

204204

205205
def test_tag_with_non_string_value():
206-
requests_store = TransactionsStore(lambda: [], lambda: [], lambda *args: None)
206+
requests_store = Tracer(lambda: [], lambda: [], lambda *args: None)
207207
t = requests_store.begin_transaction("test")
208208
elasticapm.tag(foo=1)
209209
requests_store.end_transaction(200, "test")

0 commit comments

Comments
 (0)