@@ -26,10 +26,15 @@ def __init__(self, _tracer):
26
26
27
27
def before_process_message (self , _broker , message ):
28
28
trace_ctx = extract (message .options ["trace_ctx" ])
29
- operation_name = "remoulade/process"
29
+ retry_count = message .options .get ("retries" )
30
+
31
+ operation_name = "remoulade/process" if retry_count is None else f"remoulade/process(retry-{ retry_count } )"
30
32
31
33
span = self ._tracer .start_span (operation_name , kind = trace .SpanKind .CONSUMER , context = trace_ctx )
32
34
35
+ if retry_count is not None :
36
+ span .set_attribute ("retry_count" , retry_count )
37
+
33
38
activation = trace .use_span (span , end_on_exit = True )
34
39
activation .__enter__ ()
35
40
@@ -44,24 +49,26 @@ def after_process_message(self, _broker, message, *, result=None, exception=None
44
49
45
50
if span .is_recording ():
46
51
span .set_attribute (_MESSAGE_TAG_KEY , _MESSAGE_RUN )
47
- # utils.set_attributes_from_context(span, kwargs)
48
- # utils.set_attributes_from_context(span, task.request)
49
52
span .set_attribute (_MESSAGE_NAME_KEY , message .actor_name )
50
53
pass
51
54
52
55
activation .__exit__ (None , None , None )
53
56
utils .detach_span (self ._span_registry , message .message_id )
54
57
55
58
def before_enqueue (self , _broker , message , delay ):
56
- operation_name = "remoulade/send"
59
+ retry_count = message .options .get ("retries" )
60
+
61
+ operation_name = "remoulade/send" if retry_count is None else f"remoulade/send(retry-{ retry_count } )"
57
62
58
63
span = self ._tracer .start_span (operation_name , kind = trace .SpanKind .PRODUCER )
59
64
65
+ if retry_count is not None :
66
+ span .set_attribute ("retry_count" , retry_count )
67
+
60
68
if span .is_recording ():
61
69
span .set_attribute (_MESSAGE_TAG_KEY , _MESSAGE_SEND )
62
70
span .set_attribute (SpanAttributes .MESSAGING_MESSAGE_ID , message .message_id )
63
71
span .set_attribute (_MESSAGE_NAME_KEY , message .actor_name )
64
- # utils.set_attributes_from_context(span, kwargs)
65
72
pass
66
73
67
74
activation = trace .use_span (span , end_on_exit = True )
0 commit comments