Skip to content

Commit 0eada47

Browse files
allen-k1mxrmx
authored andcommitted
Fix to allow topic to be passed via kwargs (open-telemetry#2901)
* Fix to allow topic to be imported from kwargs * add changelog * lint * separate assert function
1 parent 8b81f6a commit 0eada47

File tree

4 files changed

+22
-5
lines changed

4 files changed

+22
-5
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2424

2525
- `opentelemetry-instrumentation-aiokafka` Wrap `AIOKafkaConsumer.getone()` instead of `AIOKafkaConsumer.__anext__`
2626
([#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874))
27+
- `opentelemetry-instrumentation-confluent-kafka` Fix to allow `topic` to be extracted from `kwargs` in `produce()`
28+
([#2901])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2901)
2729

2830
### Breaking changes
2931

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,9 @@ def wrap_produce(func, instance, tracer, args, kwargs):
363363
headers = []
364364
kwargs["headers"] = headers
365365

366-
topic = KafkaPropertiesExtractor.extract_produce_topic(args)
366+
topic = KafkaPropertiesExtractor.extract_produce_topic(
367+
args, kwargs
368+
)
367369
_enrich_span(
368370
span,
369371
topic,

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,9 @@ def _extract_argument(key, position, default_value, args, kwargs):
2525
return kwargs.get(key, default_value)
2626

2727
@staticmethod
28-
def extract_produce_topic(args):
28+
def extract_produce_topic(args, kwargs):
2929
"""extract topic from `produce` method arguments in Producer class"""
30-
if len(args) > 0:
31-
return args[0]
32-
return "unknown"
30+
return kwargs.get("topic") or (args[0] if args else "unknown")
3331

3432
@staticmethod
3533
def extract_produce_headers(args, kwargs):

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

+15
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,15 @@ def _compare_spans(self, spans, expected_spans):
284284
expected_attribute_value, span.attributes[attribute_key]
285285
)
286286

287+
def _assert_topic(self, span, expected_topic: str) -> None:
288+
self.assertEqual(
289+
span.attributes[SpanAttributes.MESSAGING_DESTINATION],
290+
expected_topic,
291+
)
292+
293+
def _assert_span_count(self, span_list, expected_count: int) -> None:
294+
self.assertEqual(len(span_list), expected_count)
295+
287296
def test_producer_poll(self) -> None:
288297
instrumentation = ConfluentKafkaInstrumentor()
289298
message_queue = []
@@ -299,6 +308,9 @@ def test_producer_poll(self) -> None:
299308
producer.produce(topic="topic-1", key="key-1", value="value-1")
300309
msg = producer.poll()
301310
self.assertIsNotNone(msg)
311+
span_list = self.memory_exporter.get_finished_spans()
312+
self._assert_span_count(span_list, 1)
313+
self._assert_topic(span_list[0], "topic-1")
302314

303315
def test_producer_flush(self) -> None:
304316
instrumentation = ConfluentKafkaInstrumentor()
@@ -315,3 +327,6 @@ def test_producer_flush(self) -> None:
315327
producer.produce(topic="topic-1", key="key-1", value="value-1")
316328
msg = producer.flush()
317329
self.assertIsNotNone(msg)
330+
span_list = self.memory_exporter.get_finished_spans()
331+
self._assert_span_count(span_list, 1)
332+
self._assert_topic(span_list[0], "topic-1")

0 commit comments

Comments
 (0)