-
Notifications
You must be signed in to change notification settings - Fork 145
/
Copy pathsubsegment.py
161 lines (125 loc) · 5.02 KB
/
subsegment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import copy
import traceback
import wrapt
from .entity import Entity
from ..exceptions.exceptions import SegmentNotFoundException
# Attribute starts with _self_ to prevent wrapt proxying to underlying function
SUBSEGMENT_RECORDING_ATTRIBUTE = '_self___SUBSEGMENT_RECORDING_ATTRIBUTE__'
def set_as_recording(decorated_func, wrapped):
# If the wrapped function has the attribute, then it has already been patched
setattr(decorated_func, SUBSEGMENT_RECORDING_ATTRIBUTE, hasattr(wrapped, SUBSEGMENT_RECORDING_ATTRIBUTE))
def is_already_recording(func):
# The function might have the attribute, but its value might still be false
# as it might be the first decorator
return getattr(func, SUBSEGMENT_RECORDING_ATTRIBUTE, False)
@wrapt.decorator
def subsegment_decorator(wrapped, instance, args, kwargs):
decorated_func = wrapt.decorator(wrapped)(*args, **kwargs)
set_as_recording(decorated_func, wrapped)
return decorated_func
class SubsegmentContextManager:
"""
Wrapper for segment and recorder to provide segment context manager.
"""
def __init__(self, recorder, name=None, **subsegment_kwargs):
self.name = name
self.subsegment_kwargs = subsegment_kwargs
self.recorder = recorder
self.subsegment = None
@subsegment_decorator
def __call__(self, wrapped, instance, args, kwargs):
if is_already_recording(wrapped):
# The wrapped function is already decorated, the subsegment will be created later,
# just return the result
return wrapped(*args, **kwargs)
func_name = self.name
if not func_name:
func_name = wrapped.__name__
return self.recorder.record_subsegment(
wrapped, instance, args, kwargs,
name=func_name,
namespace='local',
meta_processor=None,
)
def __enter__(self):
self.subsegment = self.recorder.begin_subsegment(
name=self.name, **self.subsegment_kwargs)
return self.subsegment
def __exit__(self, exc_type, exc_val, exc_tb):
if self.subsegment is None:
return
if exc_type is not None:
self.subsegment.add_exception(
exc_val,
traceback.extract_tb(
exc_tb,
limit=self.recorder.max_trace_back,
)
)
self.recorder.end_subsegment()
class Subsegment(Entity):
"""
The work done in a single segment can be broke down into subsegments.
Subsegments provide more granular timing information and details about
downstream calls that your application made to fulfill the original request.
A subsegment can contain additional details about a call to an AWS service,
an external HTTP API, or an SQL database.
"""
def __init__(self, name, namespace, segment):
"""
Create a new subsegment.
:param str name: Subsegment name is required.
:param str namespace: The namespace of the subsegment. Currently
support `aws`, `remote` and `local`.
:param Segment segment: The parent segment
"""
super().__init__(name)
if not segment:
raise SegmentNotFoundException("A parent segment is required for creating subsegments.")
self.parent_segment = segment
self.trace_id = segment.trace_id
self.type = 'subsegment'
self.namespace = namespace
self.sql = {}
def add_subsegment(self, subsegment):
"""
Add input subsegment as a child subsegment and increment
reference counter and total subsegments counter of the
parent segment.
"""
super().add_subsegment(subsegment)
self.parent_segment.increment()
def remove_subsegment(self, subsegment):
"""
Remove input subsegment from child subsegemnts and
decrement parent segment total subsegments count.
:param Subsegment: subsegment to remove.
"""
super().remove_subsegment(subsegment)
self.parent_segment.decrement_subsegments_size()
def close(self, end_time=None):
"""
Close the trace entity by setting `end_time`
and flip the in progress flag to False. Also decrement
parent segment's ref counter by 1.
:param int end_time: Epoch in seconds. If not specified
current time will be used.
"""
super().close(end_time)
self.parent_segment.decrement_ref_counter()
def set_sql(self, sql):
"""
Set sql related metadata. This function is used by patchers
for database connectors and is not recommended to
invoke manually.
:param dict sql: sql related metadata
"""
self.sql = sql
def to_dict(self):
"""
Convert Subsegment object to dict with required properties
that have non-empty values.
"""
subsegment_dict = super().to_dict()
del subsegment_dict['parent_segment']
return subsegment_dict