Skip to content

Commit edb9ccc

Browse files
committed
feat(confluent-kafka): Add instrumentation to consume method
1 parent 890e5dd commit edb9ccc

File tree

3 files changed

+73
-15
lines changed

3 files changed

+73
-15
lines changed

Diff for: CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2525
([#1407](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1407))
2626
- `opentelemetry-instrumentation-logging` Add `otelTraceSampled` to instrumetation-logging
2727
([#1773](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1773))
28-
28+
- `opentelemetry-instrumentation-kafka-python` Add instrumentation to `consume` method
2929

3030
### Fixed
3131

Diff for: instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

+53-9
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def instrument_consumer(consumer: Consumer, tracer_provider=None)
113113
from .utils import (
114114
KafkaPropertiesExtractor,
115115
_enrich_span,
116+
_get_links_from_records,
116117
_get_span_name,
117118
_kafka_getter,
118119
_kafka_setter,
@@ -136,6 +137,10 @@ def __init__(self, config):
136137
# This method is deliberately implemented in order to allow wrapt to wrap this function
137138
def poll(self, timeout=-1): # pylint: disable=useless-super-delegation
138139
return super().poll(timeout)
140+
141+
# This method is deliberately implemented in order to allow wrapt to wrap this function
142+
def consume(self, *args, **kwargs): # pylint: disable=useless-super-delegation
143+
return super().consume(*args, **kwargs)
139144

140145

141146
class ProxiedProducer(Producer):
@@ -178,9 +183,11 @@ def commit(self, *args, **kwargs):
178183
return self._consumer.commit(*args, **kwargs)
179184

180185
def consume(
181-
self, num_messages=1, *args, **kwargs
186+
self, *args, **kwargs
182187
): # pylint: disable=keyword-arg-before-vararg
183-
return self._consumer.consume(num_messages, *args, **kwargs)
188+
return ConfluentKafkaInstrumentor.wrap_consume(
189+
self._consumer.consume, self, self._tracer, args, kwargs,
190+
)
184191

185192
def get_watermark_offsets(
186193
self, partition, timeout=-1, *args, **kwargs
@@ -274,6 +281,11 @@ def _inner_wrap_poll(func, instance, args, kwargs):
274281
return ConfluentKafkaInstrumentor.wrap_poll(
275282
func, instance, self._tracer, args, kwargs
276283
)
284+
285+
def _inner_wrap_consume(func, instance, args, kwargs):
286+
return ConfluentKafkaInstrumentor.wrap_consume(
287+
func, instance, self._tracer, args, kwargs
288+
)
277289

278290
wrapt.wrap_function_wrapper(
279291
AutoInstrumentedProducer,
@@ -286,6 +298,12 @@ def _inner_wrap_poll(func, instance, args, kwargs):
286298
"poll",
287299
_inner_wrap_poll,
288300
)
301+
302+
wrapt.wrap_function_wrapper(
303+
AutoInstrumentedConsumer,
304+
"consume",
305+
_inner_wrap_consume,
306+
)
289307

290308
def _uninstrument(self, **kwargs):
291309
confluent_kafka.Producer = self._original_kafka_producer
@@ -336,13 +354,7 @@ def wrap_poll(func, instance, tracer, args, kwargs):
336354
):
337355
record = func(*args, **kwargs)
338356
if record:
339-
links = []
340-
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
341-
if ctx:
342-
for item in ctx.values():
343-
if hasattr(item, "get_span_context"):
344-
links.append(Link(context=item.get_span_context()))
345-
357+
links = _get_links_from_records([record])
346358
instance._current_consume_span = tracer.start_span(
347359
name=f"{record.topic()} process",
348360
links=links,
@@ -361,3 +373,35 @@ def wrap_poll(func, instance, tracer, args, kwargs):
361373
)
362374

363375
return record
376+
377+
@staticmethod
378+
def wrap_consume(func, instance, tracer, args, kwargs):
379+
if instance._current_consume_span:
380+
context.detach(instance._current_context_token)
381+
instance._current_context_token = None
382+
instance._current_consume_span.end()
383+
instance._current_consume_span = None
384+
385+
with tracer.start_as_current_span(
386+
"recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER
387+
):
388+
records = func(*args, **kwargs)
389+
if len(records) > 0:
390+
links = _get_links_from_records(records)
391+
instance._current_consume_span = tracer.start_span(
392+
name=f"{records[0].topic()} process",
393+
links=links,
394+
kind=SpanKind.CONSUMER,
395+
)
396+
397+
_enrich_span(
398+
instance._current_consume_span,
399+
records[0].topic(),
400+
operation=MessagingOperationValues.PROCESS,
401+
)
402+
403+
instance._current_context_token = context.attach(
404+
trace.set_span_in_context(instance._current_consume_span)
405+
)
406+
407+
return records

Diff for: instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

+19-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from logging import getLogger
22
from typing import List, Optional
33

4+
from opentelemetry import propagate
5+
from opentelemetry.trace import SpanKind, Link
46
from opentelemetry.propagators import textmap
57
from opentelemetry.semconv.trace import (
68
MessagingDestinationKindValues,
@@ -82,11 +84,11 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:
8284

8385

8486
def _enrich_span(
85-
span,
86-
topic,
87-
partition: Optional[int] = None,
88-
offset: Optional[int] = None,
89-
operation: Optional[MessagingOperationValues] = None,
87+
span,
88+
topic,
89+
partition: Optional[int] = None,
90+
offset: Optional[int] = None,
91+
operation: Optional[MessagingOperationValues] = None,
9092
):
9193
if not span.is_recording():
9294
return
@@ -116,6 +118,18 @@ def _enrich_span(
116118
)
117119

118120

121+
def _get_links_from_records(records):
122+
links = []
123+
for record in records:
124+
ctx = propagate.extract(record.headers(), getter=_kafka_getter)
125+
if ctx:
126+
for item in ctx.values():
127+
if hasattr(item, "get_span_context"):
128+
links.append(Link(context=item.get_span_context()))
129+
130+
return links
131+
132+
119133
_kafka_setter = KafkaContextSetter()
120134

121135

0 commit comments

Comments
 (0)