Skip to content

Commit 2a809cf

Browse files
committed
support topic as kwarg
1 parent 2ab6641 commit 2a809cf

File tree

3 files changed

+53
-8
lines changed

3 files changed

+53
-8
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2222
([#903](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/903))
2323
- `opentelemetry-instrumentation-falcon` Safer patching mechanism
2424
([#895](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/895))
25+
- `opentelemetry-instrumentation-kafka-python` Fix topic extraction
26+
([#949](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/949))
2527

2628
## [1.9.1-0.28b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.9.1-0.28b1) - 2022-01-29
2729

instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ def _extract_argument(key, position, default_value, args, kwargs):
2525
return kwargs.get(key, default_value)
2626

2727
@staticmethod
28-
def extract_send_topic(args):
28+
def extract_send_topic(args, kwargs):
2929
"""extract topic from `send` method arguments in KafkaProducer class"""
30-
if len(args) > 0:
31-
return args[0]
32-
return "unknown"
30+
return KafkaPropertiesExtractor._extract_argument(
31+
"topic", 0, "unknown", args, kwargs
32+
)
3333

3434
@staticmethod
3535
def extract_send_value(args, kwargs):
@@ -56,7 +56,7 @@ def extract_send_headers(args, kwargs):
5656
def extract_send_partition(instance, args, kwargs):
5757
"""extract partition `send` method arguments, using the `_partition` method in KafkaProducer class"""
5858
try:
59-
topic = KafkaPropertiesExtractor.extract_send_topic(args)
59+
topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs)
6060
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
6161
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs)
6262
partition = KafkaPropertiesExtractor._extract_argument(
@@ -145,7 +145,7 @@ def _traced_send(func, instance, args, kwargs):
145145
headers = []
146146
kwargs["headers"] = headers
147147

148-
topic = KafkaPropertiesExtractor.extract_send_topic(args)
148+
topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs)
149149
bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(
150150
instance
151151
)

instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py

+45-2
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,57 @@ def setUp(self) -> None:
2828
@mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span")
2929
@mock.patch("opentelemetry.trace.set_span_in_context")
3030
@mock.patch("opentelemetry.propagate.inject")
31-
def test_wrap_send(
31+
def test_wrap_send_with_topic_as_arg(
32+
self,
33+
inject: mock.MagicMock,
34+
set_span_in_context: mock.MagicMock,
35+
enrich_span: mock.MagicMock,
36+
extract_send_partition: mock.MagicMock,
37+
extract_bootstrap_servers: mock.MagicMock,
38+
) -> None:
39+
self.wrap_send_helper(
40+
inject,
41+
set_span_in_context,
42+
enrich_span,
43+
extract_send_partition,
44+
extract_bootstrap_servers,
45+
)
46+
47+
@mock.patch(
48+
"opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_bootstrap_servers"
49+
)
50+
@mock.patch(
51+
"opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_send_partition"
52+
)
53+
@mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span")
54+
@mock.patch("opentelemetry.trace.set_span_in_context")
55+
@mock.patch("opentelemetry.propagate.inject")
56+
def test_wrap_send_with_topic_as_kwarg(
57+
self,
58+
inject: mock.MagicMock,
59+
set_span_in_context: mock.MagicMock,
60+
enrich_span: mock.MagicMock,
61+
extract_send_partition: mock.MagicMock,
62+
extract_bootstrap_servers: mock.MagicMock,
63+
) -> None:
64+
self.args = []
65+
self.kwargs["topic"] = self.topic_name
66+
self.wrap_send_helper(
67+
inject,
68+
set_span_in_context,
69+
enrich_span,
70+
extract_send_partition,
71+
extract_bootstrap_servers
72+
)
73+
74+
def wrap_send_helper(
3275
self,
3376
inject: mock.MagicMock,
3477
set_span_in_context: mock.MagicMock,
3578
enrich_span: mock.MagicMock,
3679
extract_send_partition: mock.MagicMock,
3780
extract_bootstrap_servers: mock.MagicMock,
38-
):
81+
) -> None:
3982
tracer = mock.MagicMock()
4083
produce_hook = mock.MagicMock()
4184
original_send_callback = mock.MagicMock()

0 commit comments

Comments
 (0)