4
4
from remoulade import Middleware , broker
5
5
6
6
from opentelemetry import trace
7
+ from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
7
8
from opentelemetry .instrumentation .remoulade import utils
8
- from opentelemetry .instrumentation .remoulade .version import __version__
9
9
from opentelemetry .instrumentation .remoulade .package import _instruments
10
- from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
10
+ from opentelemetry .instrumentation .remoulade . version import __version__
11
11
from opentelemetry .propagate import extract , inject
12
12
from opentelemetry .semconv .trace import SpanAttributes
13
13
14
-
15
14
_MESSAGE_TAG_KEY = "remoulade.action"
16
15
_MESSAGE_SEND = "send"
17
16
_MESSAGE_RUN = "run"
@@ -28,20 +27,32 @@ def before_process_message(self, _broker, message):
28
27
trace_ctx = extract (message .options ["trace_ctx" ])
29
28
retry_count = message .options .get ("retries" )
30
29
31
- operation_name = "remoulade/process" if retry_count is None else f"remoulade/process(retry-{ retry_count } )"
30
+ operation_name = (
31
+ "remoulade/process"
32
+ if retry_count is None
33
+ else f"remoulade/process(retry-{ retry_count } )"
34
+ )
32
35
33
- span = self ._tracer .start_span (operation_name , kind = trace .SpanKind .CONSUMER , context = trace_ctx )
36
+ span = self ._tracer .start_span (
37
+ operation_name , kind = trace .SpanKind .CONSUMER , context = trace_ctx
38
+ )
34
39
35
40
if retry_count is not None :
36
41
span .set_attribute ("retry_count" , retry_count )
37
42
38
43
activation = trace .use_span (span , end_on_exit = True )
39
44
activation .__enter__ ()
40
45
41
- utils .attach_span (self ._span_registry , message .message_id , (span , activation ))
46
+ utils .attach_span (
47
+ self ._span_registry , message .message_id , (span , activation )
48
+ )
42
49
43
- def after_process_message (self , _broker , message , * , result = None , exception = None ):
44
- span , activation = utils .retrieve_span (self ._span_registry , message .message_id )
50
+ def after_process_message (
51
+ self , _broker , message , * , result = None , exception = None
52
+ ):
53
+ span , activation = utils .retrieve_span (
54
+ self ._span_registry , message .message_id
55
+ )
45
56
46
57
if span is None :
47
58
# no existing span found for message_id
@@ -58,37 +69,54 @@ def after_process_message(self, _broker, message, *, result=None, exception=None
58
69
def before_enqueue (self , _broker , message , delay ):
59
70
retry_count = message .options .get ("retries" )
60
71
61
- operation_name = "remoulade/send" if retry_count is None else f"remoulade/send(retry-{ retry_count } )"
72
+ operation_name = (
73
+ "remoulade/send"
74
+ if retry_count is None
75
+ else f"remoulade/send(retry-{ retry_count } )"
76
+ )
62
77
63
- span = self ._tracer .start_span (operation_name , kind = trace .SpanKind .PRODUCER )
78
+ span = self ._tracer .start_span (
79
+ operation_name , kind = trace .SpanKind .PRODUCER
80
+ )
64
81
65
82
if retry_count is not None :
66
83
span .set_attribute ("retry_count" , retry_count )
67
84
68
85
if span .is_recording ():
69
86
span .set_attribute (_MESSAGE_TAG_KEY , _MESSAGE_SEND )
70
- span .set_attribute (SpanAttributes .MESSAGING_MESSAGE_ID , message .message_id )
87
+ span .set_attribute (
88
+ SpanAttributes .MESSAGING_MESSAGE_ID , message .message_id
89
+ )
71
90
span .set_attribute (_MESSAGE_NAME_KEY , message .actor_name )
72
91
pass
73
92
74
93
activation = trace .use_span (span , end_on_exit = True )
75
94
activation .__enter__ ()
76
95
77
- utils .attach_span (self ._span_registry , message .message_id , (span , activation ), is_publish = True )
96
+ utils .attach_span (
97
+ self ._span_registry ,
98
+ message .message_id ,
99
+ (span , activation ),
100
+ is_publish = True ,
101
+ )
78
102
79
103
if "trace_ctx" not in message .options :
80
104
message .options ["trace_ctx" ] = {}
81
105
inject (message .options ["trace_ctx" ])
82
106
83
107
def after_enqueue (self , _broker , message , delay , exception = None ):
84
- _ , activation = utils .retrieve_span (self ._span_registry , message .message_id , is_publish = True )
108
+ _ , activation = utils .retrieve_span (
109
+ self ._span_registry , message .message_id , is_publish = True
110
+ )
85
111
86
112
if activation is None :
87
113
# no existing span found for message_id
88
114
return
89
115
90
116
activation .__exit__ (None , None , None )
91
- utils .detach_span (self ._span_registry , message .message_id , is_publish = True )
117
+ utils .detach_span (
118
+ self ._span_registry , message .message_id , is_publish = True
119
+ )
92
120
93
121
94
122
class RemouladeInstrumentor (BaseInstrumentor ):
0 commit comments