Skip to content

Commit fb6bd3e

Browse files
create_consumer_span function
1 parent 79991f7 commit fb6bd3e

File tree

2 files changed

+87
-31
lines changed
  • instrumentation/opentelemetry-instrumentation-kafka-python

2 files changed

+87
-31
lines changed

instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py

+35-19
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,32 @@ def _traced_send(func, instance, args, kwargs):
164164
return _traced_send
165165

166166

167+
def _create_consumer_span(
168+
tracer,
169+
consume_hook,
170+
record,
171+
extracted_context,
172+
bootstrap_servers,
173+
args,
174+
kwargs,
175+
):
176+
span_name = _get_span_name("receive", record.topic)
177+
with tracer.start_as_current_span(
178+
span_name,
179+
context=extracted_context,
180+
kind=trace.SpanKind.CONSUMER,
181+
) as span:
182+
new_context = trace.set_span_in_context(span, extracted_context)
183+
token = context.attach(new_context)
184+
_enrich_span(span, bootstrap_servers, record.topic, record.partition)
185+
try:
186+
if callable(consume_hook):
187+
consume_hook(span, record, args, kwargs)
188+
except Exception as hook_exception: # pylint: disable=W0703
189+
_LOG.exception(hook_exception)
190+
context.detach(token)
191+
192+
167193
def _wrap_next(
168194
tracer: Tracer,
169195
consume_hook: ConsumeHookT,
@@ -180,25 +206,15 @@ def _traced_next(func, instance, args, kwargs):
180206
extracted_context = propagate.extract(
181207
record.headers, getter=_kafka_getter
182208
)
183-
span_name = _get_span_name("receive", record.topic)
184-
with tracer.start_as_current_span(
185-
span_name,
186-
context=extracted_context,
187-
kind=trace.SpanKind.CONSUMER,
188-
) as span:
189-
new_context = trace.set_span_in_context(
190-
span, extracted_context
191-
)
192-
token = context.attach(new_context)
193-
_enrich_span(
194-
span, bootstrap_servers, record.topic, record.partition
195-
)
196-
try:
197-
if callable(consume_hook):
198-
consume_hook(span, record, args, kwargs)
199-
except Exception as hook_exception: # pylint: disable=W0703
200-
_LOG.exception(hook_exception)
201-
context.detach(token)
209+
_create_consumer_span(
210+
tracer,
211+
consume_hook,
212+
record,
213+
extracted_context,
214+
bootstrap_servers,
215+
args,
216+
kwargs,
217+
)
202218
return record
203219

204220
return _traced_next

instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py

+52-12
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from unittest import TestCase, mock
22

33
from opentelemetry.instrumentation.kafka.utils import (
4+
_create_consumer_span,
45
_get_span_name,
56
_kafka_getter,
67
_kafka_setter,
@@ -75,22 +76,18 @@ def test_wrap_send(
7576
)
7677
self.assertEqual(retval, original_send_callback.return_value)
7778

78-
@mock.patch("opentelemetry.trace.set_span_in_context")
7979
@mock.patch("opentelemetry.propagate.extract")
80-
@mock.patch("opentelemetry.context.attach")
81-
@mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span")
80+
@mock.patch(
81+
"opentelemetry.instrumentation.kafka.utils._create_consumer_span"
82+
)
8283
@mock.patch(
8384
"opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_bootstrap_servers"
8485
)
85-
@mock.patch("opentelemetry.context.detach")
8686
def test_wrap_next(
8787
self,
88-
detach: mock.MagicMock,
8988
extract_bootstrap_servers: mock.MagicMock,
90-
enrich_span: mock.MagicMock,
91-
attach: mock.MagicMock,
89+
_create_consumer_span: mock.MagicMock,
9290
extract: mock.MagicMock,
93-
set_span_in_context: mock.MagicMock,
9491
) -> None:
9592
tracer = mock.MagicMock()
9693
consume_hook = mock.MagicMock()
@@ -109,19 +106,62 @@ def test_wrap_next(
109106
*self.args, **self.kwargs
110107
)
111108
self.assertEqual(record, original_next_callback.return_value)
112-
expected_span_name = _get_span_name("receive", record.topic)
113109

114110
extract.assert_called_once_with(record.headers, getter=_kafka_getter)
115111
context = extract.return_value
112+
113+
_create_consumer_span.assert_called_once_with(
114+
tracer,
115+
consume_hook,
116+
record,
117+
context,
118+
bootstrap_servers,
119+
self.args,
120+
self.kwargs,
121+
)
122+
123+
@mock.patch("opentelemetry.trace.set_span_in_context")
124+
@mock.patch("opentelemetry.context.attach")
125+
@mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span")
126+
@mock.patch("opentelemetry.context.detach")
127+
def test_create_consumer_span(
128+
self,
129+
detach: mock.MagicMock,
130+
enrich_span: mock.MagicMock,
131+
attach: mock.MagicMock,
132+
set_span_in_context: mock.MagicMock,
133+
) -> None:
134+
tracer = mock.MagicMock()
135+
consume_hook = mock.MagicMock()
136+
bootstrap_servers = mock.MagicMock()
137+
extracted_context = mock.MagicMock()
138+
record = mock.MagicMock()
139+
140+
_create_consumer_span(
141+
tracer,
142+
consume_hook,
143+
record,
144+
extracted_context,
145+
bootstrap_servers,
146+
self.args,
147+
self.kwargs,
148+
)
149+
150+
expected_span_name = _get_span_name("receive", record.topic)
151+
116152
tracer.start_as_current_span.assert_called_once_with(
117-
expected_span_name, context=context, kind=SpanKind.CONSUMER
153+
expected_span_name,
154+
context=extracted_context,
155+
kind=SpanKind.CONSUMER,
118156
)
119157
span = tracer.start_as_current_span.return_value.__enter__()
120-
set_span_in_context.assert_called_once_with(span, context)
158+
set_span_in_context.assert_called_once_with(span, extracted_context)
121159
attach.assert_called_once_with(set_span_in_context.return_value)
122160

123161
enrich_span.assert_called_once_with(
124162
span, bootstrap_servers, record.topic, record.partition
125163
)
126-
consume_hook.assert_called_once_with(span, self.args, self.kwargs)
164+
consume_hook.assert_called_once_with(
165+
span, record, self.args, self.kwargs
166+
)
127167
detach.assert_called_once_with(attach.return_value)

0 commit comments

Comments
 (0)