29
29
Boto3SQSInstrumentor().instrument()
30
30
"""
31
31
import logging
32
- from typing import Any , Collection , Dict , Generator , List , Optional
32
+ from typing import Any , Collection , Dict , Generator , List , Mapping , Optional
33
33
34
34
import boto3
35
35
import botocore .client
53
53
from .version import __version__
54
54
55
55
_logger = logging .getLogger (__name__ )
56
- # We use this prefix so we can request all instrumentation MessageAttributeNames with a wildcard, without harming
57
- # existing filters
58
- _OPENTELEMETRY_ATTRIBUTE_IDENTIFIER : str = "otel."
59
- _OTEL_IDENTIFIER_LENGTH = len (_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER )
56
+
57
+ _IS_SQS_INSTRUMENTED_ATTRIBUTE = "_otel_boto3sqs_instrumented"
60
58
61
59
62
60
class Boto3SQSGetter (Getter [CarrierT ]):
63
61
def get (self , carrier : CarrierT , key : str ) -> Optional [List [str ]]:
64
- value = carrier .get (f"{ _OPENTELEMETRY_ATTRIBUTE_IDENTIFIER } { key } " , {})
65
- if not value :
62
+ msg_attr = carrier .get (key )
63
+ if not isinstance (msg_attr , Mapping ):
64
+ return None
65
+
66
+ value = msg_attr .get ("StringValue" )
67
+ if value is None :
66
68
return None
67
- return [value .get ("StringValue" )]
69
+
70
+ return [value ]
68
71
69
72
def keys (self , carrier : CarrierT ) -> List [str ]:
70
- return [
71
- key [_OTEL_IDENTIFIER_LENGTH :]
72
- if key .startswith (_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER )
73
- else key
74
- for key in carrier .keys ()
75
- ]
73
+ return list (carrier .keys ())
76
74
77
75
78
76
class Boto3SQSSetter (Setter [CarrierT ]):
79
77
def set (self , carrier : CarrierT , key : str , value : str ) -> None :
80
78
# This is a limitation defined by AWS for SQS MessageAttributes size
81
79
if len (carrier .items ()) < 10 :
82
- carrier [f" { _OPENTELEMETRY_ATTRIBUTE_IDENTIFIER } { key } " ] = {
80
+ carrier [key ] = {
83
81
"StringValue" : value ,
84
82
"DataType" : "String" ,
85
83
}
@@ -145,6 +143,7 @@ def instrumentation_dependencies(self) -> Collection[str]:
145
143
def _enrich_span (
146
144
span : Span ,
147
145
queue_name : str ,
146
+ queue_url : str ,
148
147
conversation_id : Optional [str ] = None ,
149
148
operation : Optional [MessagingOperationValues ] = None ,
150
149
message_id : Optional [str ] = None ,
@@ -157,12 +156,12 @@ def _enrich_span(
157
156
SpanAttributes .MESSAGING_DESTINATION_KIND ,
158
157
MessagingDestinationKindValues .QUEUE .value ,
159
158
)
159
+ span .set_attribute (SpanAttributes .MESSAGING_URL , queue_url )
160
+
160
161
if operation :
161
162
span .set_attribute (
162
163
SpanAttributes .MESSAGING_OPERATION , operation .value
163
164
)
164
- else :
165
- span .set_attribute (SpanAttributes .MESSAGING_TEMP_DESTINATION , True )
166
165
if conversation_id :
167
166
span .set_attribute (
168
167
SpanAttributes .MESSAGING_CONVERSATION_ID , conversation_id
@@ -190,15 +189,19 @@ def _extract_queue_name_from_url(queue_url: str) -> str:
190
189
return queue_url .split ("/" )[- 1 ]
191
190
192
191
def _create_processing_span (
193
- self , queue_name : str , receipt_handle : str , message : Dict [str , Any ]
192
+ self ,
193
+ queue_name : str ,
194
+ queue_url : str ,
195
+ receipt_handle : str ,
196
+ message : Dict [str , Any ],
194
197
) -> None :
195
198
message_attributes = message .get ("MessageAttributes" , {})
196
199
links = []
197
200
ctx = propagate .extract (message_attributes , getter = boto3sqs_getter )
198
- if ctx :
199
- for item in ctx . values () :
200
- if hasattr ( item , "get_span_context" ):
201
- links . append ( Link ( context = item . get_span_context ()))
201
+ parent_span_ctx = trace . get_current_span ( ctx ). get_span_context ()
202
+ if parent_span_ctx . is_valid :
203
+ links . append ( Link ( context = parent_span_ctx ))
204
+
202
205
span = self ._tracer .start_span (
203
206
name = f"{ queue_name } process" , links = links , kind = SpanKind .CONSUMER
204
207
)
@@ -208,11 +211,12 @@ def _create_processing_span(
208
211
Boto3SQSInstrumentor ._enrich_span (
209
212
span ,
210
213
queue_name ,
214
+ queue_url ,
211
215
message_id = message_id ,
212
216
operation = MessagingOperationValues .PROCESS ,
213
217
)
214
218
215
- def _wrap_send_message (self ) -> None :
219
+ def _wrap_send_message (self , sqs_class : type ) -> None :
216
220
def send_wrapper (wrapped , instance , args , kwargs ):
217
221
if context .get_value (_SUPPRESS_INSTRUMENTATION_KEY ):
218
222
return wrapped (* args , ** kwargs )
@@ -227,7 +231,7 @@ def send_wrapper(wrapped, instance, args, kwargs):
227
231
kind = SpanKind .PRODUCER ,
228
232
end_on_exit = True ,
229
233
) as span :
230
- Boto3SQSInstrumentor ._enrich_span (span , queue_name )
234
+ Boto3SQSInstrumentor ._enrich_span (span , queue_name , queue_url )
231
235
attributes = kwargs .pop ("MessageAttributes" , {})
232
236
propagate .inject (attributes , setter = boto3sqs_setter )
233
237
retval = wrapped (* args , MessageAttributes = attributes , ** kwargs )
@@ -239,9 +243,9 @@ def send_wrapper(wrapped, instance, args, kwargs):
239
243
)
240
244
return retval
241
245
242
- wrap_function_wrapper (self . _sqs_class , "send_message" , send_wrapper )
246
+ wrap_function_wrapper (sqs_class , "send_message" , send_wrapper )
243
247
244
- def _wrap_send_message_batch (self ) -> None :
248
+ def _wrap_send_message_batch (self , sqs_class : type ) -> None :
245
249
def send_batch_wrapper (wrapped , instance , args , kwargs ):
246
250
queue_url = kwargs .get ("QueueUrl" )
247
251
entries = kwargs .get ("Entries" )
@@ -260,12 +264,11 @@ def send_batch_wrapper(wrapped, instance, args, kwargs):
260
264
for entry in entries :
261
265
entry_id = entry ["Id" ]
262
266
span = self ._tracer .start_span (
263
- name = f"{ queue_name } send" ,
264
- kind = SpanKind .PRODUCER ,
267
+ name = f"{ queue_name } send" , kind = SpanKind .PRODUCER
265
268
)
266
269
ids_to_spans [entry_id ] = span
267
270
Boto3SQSInstrumentor ._enrich_span (
268
- span , queue_name , conversation_id = entry_id
271
+ span , queue_name , queue_url , conversation_id = entry_id
269
272
)
270
273
with trace .use_span (span ):
271
274
if "MessageAttributes" not in entry :
@@ -288,15 +291,15 @@ def send_batch_wrapper(wrapped, instance, args, kwargs):
288
291
return retval
289
292
290
293
wrap_function_wrapper (
291
- self . _sqs_class , "send_message_batch" , send_batch_wrapper
294
+ sqs_class , "send_message_batch" , send_batch_wrapper
292
295
)
293
296
294
- def _wrap_receive_message (self ) -> None :
297
+ def _wrap_receive_message (self , sqs_class : type ) -> None :
295
298
def receive_message_wrapper (wrapped , instance , args , kwargs ):
296
299
queue_url = kwargs .get ("QueueUrl" )
297
300
message_attribute_names = kwargs .pop ("MessageAttributeNames" , [])
298
- message_attribute_names .append (
299
- f" { _OPENTELEMETRY_ATTRIBUTE_IDENTIFIER } *"
301
+ message_attribute_names .extend (
302
+ propagate . get_global_textmap (). fields
300
303
)
301
304
queue_name = Boto3SQSInstrumentor ._extract_queue_name_from_url (
302
305
queue_url
@@ -309,6 +312,7 @@ def receive_message_wrapper(wrapped, instance, args, kwargs):
309
312
Boto3SQSInstrumentor ._enrich_span (
310
313
span ,
311
314
queue_name ,
315
+ queue_url ,
312
316
operation = MessagingOperationValues .RECEIVE ,
313
317
)
314
318
retval = wrapped (
@@ -327,29 +331,31 @@ def receive_message_wrapper(wrapped, instance, args, kwargs):
327
331
receipt_handle
328
332
)
329
333
self ._create_processing_span (
330
- queue_name , receipt_handle , message
334
+ queue_name , queue_url , receipt_handle , message
331
335
)
332
336
retval ["Messages" ] = Boto3SQSInstrumentor .ContextableList (
333
337
messages
334
338
)
335
339
return retval
336
340
337
341
wrap_function_wrapper (
338
- self . _sqs_class , "receive_message" , receive_message_wrapper
342
+ sqs_class , "receive_message" , receive_message_wrapper
339
343
)
340
344
341
- def _wrap_delete_message (self ) -> None :
345
+ @staticmethod
346
+ def _wrap_delete_message (sqs_class : type ) -> None :
342
347
def delete_message_wrapper (wrapped , instance , args , kwargs ):
343
348
receipt_handle = kwargs .get ("ReceiptHandle" )
344
349
if receipt_handle :
345
350
Boto3SQSInstrumentor ._safe_end_processing_span (receipt_handle )
346
351
return wrapped (* args , ** kwargs )
347
352
348
353
wrap_function_wrapper (
349
- self . _sqs_class , "delete_message" , delete_message_wrapper
354
+ sqs_class , "delete_message" , delete_message_wrapper
350
355
)
351
356
352
- def _wrap_delete_message_batch (self ) -> None :
357
+ @staticmethod
358
+ def _wrap_delete_message_batch (sqs_class : type ) -> None :
353
359
def delete_message_wrapper_batch (wrapped , instance , args , kwargs ):
354
360
entries = kwargs .get ("Entries" )
355
361
for entry in entries :
@@ -361,9 +367,7 @@ def delete_message_wrapper_batch(wrapped, instance, args, kwargs):
361
367
return wrapped (* args , ** kwargs )
362
368
363
369
wrap_function_wrapper (
364
- self ._sqs_class ,
365
- "delete_message_batch" ,
366
- delete_message_wrapper_batch ,
370
+ sqs_class , "delete_message_batch" , delete_message_wrapper_batch
367
371
)
368
372
369
373
def _wrap_client_creation (self ) -> None :
@@ -375,52 +379,58 @@ def _wrap_client_creation(self) -> None:
375
379
376
380
def client_wrapper (wrapped , instance , args , kwargs ):
377
381
retval = wrapped (* args , ** kwargs )
378
- if not self ._did_decorate :
379
- self ._decorate_sqs ()
382
+ self ._decorate_sqs (type (retval ))
380
383
return retval
381
384
382
385
wrap_function_wrapper (boto3 , "client" , client_wrapper )
383
386
384
- def _decorate_sqs (self ) -> None :
387
+ def _decorate_sqs (self , sqs_class : type ) -> None :
385
388
"""
386
389
Since botocore creates classes on the fly using schemas, we try to find the class that inherits from the base
387
390
class and is SQS to wrap.
388
391
"""
389
392
# We define SQS client as the only client that implements send_message_batch
390
- sqs_class = [
391
- cls
392
- for cls in botocore .client .BaseClient .__subclasses__ ()
393
- if hasattr (cls , "send_message_batch" )
394
- ]
395
- if sqs_class :
396
- self ._sqs_class = sqs_class [0 ]
397
- self ._did_decorate = True
398
- self ._wrap_send_message ()
399
- self ._wrap_send_message_batch ()
400
- self ._wrap_receive_message ()
401
- self ._wrap_delete_message ()
402
- self ._wrap_delete_message_batch ()
403
-
404
- def _un_decorate_sqs (self ) -> None :
405
- if self ._did_decorate :
406
- unwrap (self ._sqs_class , "send_message" )
407
- unwrap (self ._sqs_class , "send_message_batch" )
408
- unwrap (self ._sqs_class , "receive_message" )
409
- unwrap (self ._sqs_class , "delete_message" )
410
- unwrap (self ._sqs_class , "delete_message_batch" )
411
- self ._did_decorate = False
393
+ if not hasattr (sqs_class , "send_message_batch" ):
394
+ return
395
+
396
+ if getattr (sqs_class , _IS_SQS_INSTRUMENTED_ATTRIBUTE , False ):
397
+ return
398
+
399
+ setattr (sqs_class , _IS_SQS_INSTRUMENTED_ATTRIBUTE , True )
400
+
401
+ self ._wrap_send_message (sqs_class )
402
+ self ._wrap_send_message_batch (sqs_class )
403
+ self ._wrap_receive_message (sqs_class )
404
+ self ._wrap_delete_message (sqs_class )
405
+ self ._wrap_delete_message_batch (sqs_class )
406
+
407
+ @staticmethod
408
+ def _un_decorate_sqs (sqs_class : type ) -> None :
409
+ if not getattr (sqs_class , _IS_SQS_INSTRUMENTED_ATTRIBUTE , False ):
410
+ return
411
+
412
+ unwrap (sqs_class , "send_message" )
413
+ unwrap (sqs_class , "send_message_batch" )
414
+ unwrap (sqs_class , "receive_message" )
415
+ unwrap (sqs_class , "delete_message" )
416
+ unwrap (sqs_class , "delete_message_batch" )
417
+
418
+ setattr (sqs_class , _IS_SQS_INSTRUMENTED_ATTRIBUTE , False )
412
419
413
420
def _instrument (self , ** kwargs : Dict [str , Any ]) -> None :
414
- self ._did_decorate : bool = False
415
421
self ._tracer_provider : Optional [TracerProvider ] = kwargs .get (
416
422
"tracer_provider"
417
423
)
418
424
self ._tracer : Tracer = trace .get_tracer (
419
425
__name__ , __version__ , self ._tracer_provider
420
426
)
421
427
self ._wrap_client_creation ()
422
- self ._decorate_sqs ()
428
+
429
+ for client_cls in botocore .client .BaseClient .__subclasses__ ():
430
+ self ._decorate_sqs (client_cls )
423
431
424
432
def _uninstrument (self , ** kwargs : Dict [str , Any ]) -> None :
425
433
unwrap (boto3 , "client" )
426
- self ._un_decorate_sqs ()
434
+
435
+ for client_cls in botocore .client .BaseClient .__subclasses__ ():
436
+ self ._un_decorate_sqs (client_cls )
0 commit comments