|
18 | 18 | from opentelemetry.instrumentation.celery import CeleryInstrumentor
|
19 | 19 | from opentelemetry.semconv.trace import SpanAttributes
|
20 | 20 | from opentelemetry.test.test_base import TestBase
|
21 |
| -from opentelemetry.trace import SpanKind |
| 21 | +from opentelemetry.trace import SpanKind, StatusCode |
22 | 22 |
|
23 |
| -from .celery_test_tasks import app, task_add |
| 23 | +from .celery_test_tasks import app, task_add, task_error |
24 | 24 |
|
25 | 25 |
|
26 | 26 | class TestCeleryInstrumentation(TestBase):
|
@@ -83,6 +83,54 @@ def test_task(self):
|
83 | 83 | self.assertEqual(consumer.parent.span_id, producer.context.span_id)
|
84 | 84 | self.assertEqual(consumer.context.trace_id, producer.context.trace_id)
|
85 | 85 |
|
| 86 | + def test_task_with_error(self): |
| 87 | + CeleryInstrumentor().instrument() |
| 88 | + |
| 89 | + result = task_error.delay() |
| 90 | + |
| 91 | + timeout = time.time() + 60 * 1 # 1 minutes from now |
| 92 | + while not result.ready(): |
| 93 | + if time.time() > timeout: |
| 94 | + break |
| 95 | + time.sleep(0.05) |
| 96 | + |
| 97 | + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) |
| 98 | + self.assertEqual(len(spans), 2) |
| 99 | + |
| 100 | + consumer, producer = spans |
| 101 | + |
| 102 | + self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_error") |
| 103 | + self.assertEqual(consumer.kind, SpanKind.CONSUMER) |
| 104 | + self.assertSpanHasAttributes( |
| 105 | + consumer, |
| 106 | + { |
| 107 | + "celery.action": "run", |
| 108 | + "celery.state": "FAILURE", |
| 109 | + SpanAttributes.MESSAGING_DESTINATION: "celery", |
| 110 | + "celery.task_name": "tests.celery_test_tasks.task_error", |
| 111 | + }, |
| 112 | + ) |
| 113 | + self.assertEqual(consumer.status.status_code, StatusCode.ERROR) |
| 114 | + |
| 115 | + self.assertEqual( |
| 116 | + producer.name, "apply_async/tests.celery_test_tasks.task_error" |
| 117 | + ) |
| 118 | + self.assertEqual(producer.kind, SpanKind.PRODUCER) |
| 119 | + self.assertSpanHasAttributes( |
| 120 | + producer, |
| 121 | + { |
| 122 | + "celery.action": "apply_async", |
| 123 | + "celery.task_name": "tests.celery_test_tasks.task_error", |
| 124 | + SpanAttributes.MESSAGING_DESTINATION_KIND: "queue", |
| 125 | + SpanAttributes.MESSAGING_DESTINATION: "celery", |
| 126 | + }, |
| 127 | + ) |
| 128 | + self.assertEqual(producer.status.status_code, StatusCode.UNSET) |
| 129 | + |
| 130 | + self.assertNotEqual(consumer.parent, producer.context) |
| 131 | + self.assertEqual(consumer.parent.span_id, producer.context.span_id) |
| 132 | + self.assertEqual(consumer.context.trace_id, producer.context.trace_id) |
| 133 | + |
86 | 134 | def test_uninstrument(self):
|
87 | 135 | CeleryInstrumentor().instrument()
|
88 | 136 | CeleryInstrumentor().uninstrument()
|
|
0 commit comments