18
18
from opentelemetry .instrumentation .celery import CeleryInstrumentor
19
19
from opentelemetry .semconv .trace import SpanAttributes
20
20
from opentelemetry .test .test_base import TestBase
21
- from opentelemetry .trace import SpanKind
21
+ from opentelemetry .trace import SpanKind , StatusCode
22
22
23
- from .celery_test_tasks import app , task_add
23
+ from .celery_test_tasks import app , task_add , task_raises
24
24
25
25
26
26
class TestCeleryInstrumentation (TestBase ):
@@ -66,6 +66,10 @@ def test_task(self):
66
66
},
67
67
)
68
68
69
+ self .assertEqual (consumer .status .status_code , StatusCode .UNSET )
70
+
71
+ self .assertEqual (0 , len (consumer .events ))
72
+
69
73
self .assertEqual (
70
74
producer .name , "apply_async/tests.celery_test_tasks.task_add"
71
75
)
@@ -84,6 +88,70 @@ def test_task(self):
84
88
self .assertEqual (consumer .parent .span_id , producer .context .span_id )
85
89
self .assertEqual (consumer .context .trace_id , producer .context .trace_id )
86
90
91
+ def test_task_raises (self ):
92
+ CeleryInstrumentor ().instrument ()
93
+
94
+ result = task_raises .delay ()
95
+
96
+ timeout = time .time () + 60 * 1 # 1 minutes from now
97
+ while not result .ready ():
98
+ if time .time () > timeout :
99
+ break
100
+ time .sleep (0.05 )
101
+
102
+ spans = self .sorted_spans (self .memory_exporter .get_finished_spans ())
103
+ self .assertEqual (len (spans ), 2 )
104
+
105
+ consumer , producer = spans
106
+
107
+ self .assertEqual (
108
+ consumer .name , "run/tests.celery_test_tasks.task_raises"
109
+ )
110
+ self .assertEqual (consumer .kind , SpanKind .CONSUMER )
111
+ self .assertSpanHasAttributes (
112
+ consumer ,
113
+ {
114
+ "celery.action" : "run" ,
115
+ "celery.state" : "FAILURE" ,
116
+ SpanAttributes .MESSAGING_DESTINATION : "celery" ,
117
+ "celery.task_name" : "tests.celery_test_tasks.task_raises" ,
118
+ },
119
+ )
120
+
121
+ self .assertEqual (consumer .status .status_code , StatusCode .ERROR )
122
+
123
+ self .assertEqual (1 , len (consumer .events ))
124
+ event = consumer .events [0 ]
125
+
126
+ self .assertIn (SpanAttributes .EXCEPTION_STACKTRACE , event .attributes )
127
+
128
+ self .assertEqual (
129
+ event .attributes [SpanAttributes .EXCEPTION_TYPE ], "CustomError"
130
+ )
131
+
132
+ self .assertEqual (
133
+ event .attributes [SpanAttributes .EXCEPTION_MESSAGE ],
134
+ "The task failed!" ,
135
+ )
136
+
137
+ self .assertEqual (
138
+ producer .name , "apply_async/tests.celery_test_tasks.task_raises"
139
+ )
140
+ self .assertEqual (producer .kind , SpanKind .PRODUCER )
141
+ self .assertSpanHasAttributes (
142
+ producer ,
143
+ {
144
+ "celery.action" : "apply_async" ,
145
+ "celery.task_name" : "tests.celery_test_tasks.task_raises" ,
146
+ SpanAttributes .MESSAGING_DESTINATION_KIND : "queue" ,
147
+ SpanAttributes .MESSAGING_DESTINATION : "celery" ,
148
+ },
149
+ )
150
+
151
+ self .assertNotEqual (consumer .parent , producer .context )
152
+ self .assertEqual (consumer .parent .span_id , producer .context .span_id )
153
+ self .assertEqual (consumer .context .trace_id , producer .context .trace_id )
154
+
87
155
def test_uninstrument (self ):
88
156
CeleryInstrumentor ().instrument ()
89
157
CeleryInstrumentor ().uninstrument ()
0 commit comments