@@ -97,7 +97,7 @@ def func():
97
97
from opentelemetry .trace import get_tracer
98
98
from opentelemetry .trace .status import Status , StatusCode
99
99
100
- ASYNCIO_PREFIX = "asyncio. "
100
+ ASYNCIO_PREFIX = "asyncio"
101
101
102
102
103
103
class AsyncioInstrumentor (BaseInstrumentor ):
@@ -118,8 +118,8 @@ class AsyncioInstrumentor(BaseInstrumentor):
118
118
119
119
def __init__ (self ):
120
120
super ().__init__ ()
121
- self .process_duration_metric = None
122
- self .process_counts_metric = None
121
+ self .process_duration_histogram = None
122
+ self .process_counts_counter = None
123
123
124
124
self ._tracer = None
125
125
self ._meter = None
@@ -131,8 +131,9 @@ def instrumentation_dependencies(self) -> Collection[str]:
131
131
return _instruments
132
132
133
133
def _instrument (self , ** kwargs ):
134
- tracer_provider = kwargs .get ("tracer_provider" )
135
- self ._tracer = get_tracer (__name__ , __version__ , tracer_provider )
134
+ self ._tracer = get_tracer (
135
+ __name__ , __version__ , kwargs .get ("tracer_provider" )
136
+ )
136
137
self ._meter = get_meter (
137
138
__name__ , __version__ , kwargs .get ("meter_provider" )
138
139
)
@@ -141,12 +142,12 @@ def _instrument(self, **kwargs):
141
142
self ._future_active_enabled = get_future_trace_enabled ()
142
143
self ._to_thread_name_to_trace = get_to_thread_to_trace ()
143
144
144
- self .process_duration_metric = self ._meter .create_histogram (
145
+ self .process_duration_histogram = self ._meter .create_histogram (
145
146
name = "asyncio.process.duration" ,
146
147
description = "Duration of asyncio process" ,
147
148
unit = "seconds" ,
148
149
)
149
- self .process_counts_metric = self ._meter .create_up_down_counter (
150
+ self .process_counts_counter = self ._meter .create_counter (
150
151
name = "asyncio.process.count" ,
151
152
description = "Number of asyncio process" ,
152
153
unit = "1" ,
@@ -166,7 +167,7 @@ def _uninstrument(self, **kwargs):
166
167
uninstrument_to_thread ()
167
168
uninstrument_taskgroup_create_task ()
168
169
169
- def instrument_method_with_coroutine (self , method_name ):
170
+ def instrument_method_with_coroutine (self , method_name : str ):
170
171
"""
171
172
Instruments specified asyncio method.
172
173
"""
@@ -201,12 +202,12 @@ def wrap_coros_or_futures(method, instance, args, kwargs):
201
202
202
203
_wrap (asyncio , "gather" , wrap_coros_or_futures )
203
204
204
- def instrument_to_thread (self ):
205
+ def instrument_to_thread (self ) -> None :
205
206
# to_thread was added in Python 3.9
206
207
if sys .version_info < (3 , 9 ):
207
208
return
208
209
209
- def wrap_to_thread (method , instance , args , kwargs ):
210
+ def wrap_to_thread (method , instance , args , kwargs ) -> None :
210
211
if args :
211
212
first_arg = args [0 ]
212
213
# Wrap the first argument
@@ -218,12 +219,12 @@ def wrap_to_thread(method, instance, args, kwargs):
218
219
219
220
_wrap (asyncio , "to_thread" , wrap_to_thread )
220
221
221
- def instrument_taskgroup_create_task (self ):
222
+ def instrument_taskgroup_create_task (self ) -> None :
222
223
# TaskGroup.create_task was added in Python 3.11
223
224
if sys .version_info < (3 , 11 ):
224
225
return
225
226
226
- def wrap_taskgroup_create_task (method , instance , args , kwargs ):
227
+ def wrap_taskgroup_create_task (method , instance , args , kwargs ) -> None :
227
228
if args :
228
229
coro = args [0 ]
229
230
wrapped_coro = self .trace_coroutine (coro )
@@ -237,18 +238,17 @@ def wrap_taskgroup_create_task(method, instance, args, kwargs):
237
238
wrap_taskgroup_create_task ,
238
239
)
239
240
240
- def trace_to_thread (self , func ):
241
+ def trace_to_thread (self , func : callable ):
241
242
"""Trace a function."""
242
243
start = default_timer ()
243
244
span = (
244
245
self ._tracer .start_span (
245
- f"{ ASYNCIO_PREFIX } to_thread_func -" + func .__name__
246
+ f"{ ASYNCIO_PREFIX } to_thread -" + func .__name__
246
247
)
247
248
if func .__name__ in self ._to_thread_name_to_trace
248
249
else None
249
250
)
250
251
attr = {"type" : "to_thread" , "name" : func .__name__ }
251
- duration_attr = attr .copy ()
252
252
exception = None
253
253
try :
254
254
attr ["state" ] = "finished"
@@ -257,14 +257,7 @@ def trace_to_thread(self, func):
257
257
attr ["state" ] = "exception"
258
258
raise
259
259
finally :
260
- duration = max (round ((default_timer () - start ) * 1000 ), 0 )
261
- self .process_duration_metric .record (duration , duration_attr )
262
- self .process_counts_metric .add (1 , attr )
263
- if span :
264
- if span .is_recording () and exception :
265
- span .set_status (Status (StatusCode .ERROR ))
266
- span .record_exception (exception )
267
- span .end ()
260
+ self .record_process (start , attr , span , exception )
268
261
269
262
def trace_item (self , coro_or_future ):
270
263
"""Trace a coroutine or future item."""
@@ -283,9 +276,8 @@ async def trace_coroutine(self, coro):
283
276
"type" : "coroutine" ,
284
277
"name" : coro .__name__ ,
285
278
}
286
- duration_attr = attr .copy ()
287
279
span = (
288
- self ._tracer .start_span (f"{ ASYNCIO_PREFIX } coro-" + coro .__name__ )
280
+ self ._tracer .start_span (f"{ ASYNCIO_PREFIX } coro-" + coro .__name__ )
289
281
if coro .__name__ in self ._coros_name_to_trace
290
282
else None
291
283
)
@@ -304,46 +296,51 @@ async def trace_coroutine(self, coro):
304
296
attr ["state" ] = state
305
297
raise
306
298
finally :
307
- duration = max (round (default_timer () - start ), 0 )
308
- self .process_duration_metric .record (duration , duration_attr )
309
- self .process_counts_metric .add (1 , attr )
310
-
311
- if span :
312
- if span .is_recording () and exception :
313
- span .set_status (Status (StatusCode .ERROR ))
314
- span .record_exception (exception )
315
- span .end ()
299
+ self .record_process (start , attr , span , exception )
316
300
317
- def trace_future (self , future ):
301
+ def trace_future (self , future ) -> futures . Future :
318
302
start = default_timer ()
319
303
span = (
320
- self ._tracer .start_span (f"{ ASYNCIO_PREFIX } future" )
304
+ self ._tracer .start_span (f"{ ASYNCIO_PREFIX } future" )
321
305
if self ._future_active_enabled
322
306
else None
323
307
)
324
308
325
309
def callback (f ):
326
- duration = max (round (default_timer () - start ), 0 )
327
310
exception = f .exception ()
328
311
attr = {
329
312
"type" : "future" ,
330
313
}
331
- duration_attr = attr .copy ()
332
314
state = determine_state (exception )
333
315
attr ["state" ] = state
334
- self .process_counts_metric .add (1 , attr )
335
- self .process_duration_metric .record (duration , duration_attr )
336
- if span :
337
- if span .is_recording () and exception :
338
- span .set_status (Status (StatusCode .ERROR ))
339
- span .record_exception (exception )
340
- span .end ()
316
+ self .record_process (start , attr , span , exception )
341
317
342
318
future .add_done_callback (callback )
343
319
return future
344
320
321
+ def record_process (
322
+ self , start : float , attr : dict , span = None , exception = None
323
+ ) -> None :
324
+ """
325
+ Record the processing time, update histogram and counter, and handle span.
326
+
327
+ :param start: Start time of the process.
328
+ :param attr: Attributes for the histogram and counter.
329
+ :param span: Optional span for tracing.
330
+ :param exception: Optional exception occurred during the process.
331
+ """
332
+ duration = max (default_timer () - start , 0 )
333
+ self .process_duration_histogram .record (duration , attr )
334
+ self .process_counts_counter .add (1 , attr )
335
+
336
+ if span :
337
+ if span .is_recording () and exception :
338
+ span .set_status (Status (StatusCode .ERROR ))
339
+ span .record_exception (exception )
340
+ span .end ()
341
+
345
342
346
- def determine_state (exception ) :
343
+ def determine_state (exception : Exception ) -> str :
347
344
if isinstance (exception , asyncio .CancelledError ):
348
345
return "cancelled"
349
346
if isinstance (exception , asyncio .TimeoutError ):
@@ -353,25 +350,25 @@ def determine_state(exception):
353
350
return "finished"
354
351
355
352
356
- def uninstrument_taskgroup_create_task ():
353
+ def uninstrument_taskgroup_create_task () -> None :
357
354
# TaskGroup.create_task was added in Python 3.11
358
355
if sys .version_info < (3 , 11 ):
359
356
return
360
357
unwrap (asyncio .TaskGroup , "create_task" ) # pylint: disable=no-member
361
358
362
359
363
- def uninstrument_to_thread ():
360
+ def uninstrument_to_thread () -> None :
364
361
# to_thread was added in Python 3.9
365
362
if sys .version_info < (3 , 9 ):
366
363
return
367
364
unwrap (asyncio , "to_thread" )
368
365
369
366
370
- def uninstrument_gather ():
367
+ def uninstrument_gather () -> None :
371
368
unwrap (asyncio , "gather" )
372
369
373
370
374
- def uninstrument_method_with_coroutine (method_name ) :
371
+ def uninstrument_method_with_coroutine (method_name : str ) -> None :
375
372
"""
376
373
Uninstrument specified asyncio method.
377
374
"""
0 commit comments