Skip to content

Commit ffb995d

Browse files
rayrapetyansrikanthccvlzchenocelotl
authored
opentelemetry-instrumentation-kafka-python: wait for metadata (#1260)
* fix kafka: wait for metadata Kafka's instance metadata could be unavailable (because it's being filled asynchronously). extract_send_partition() is based on a metadata, so it may return `None` for partition and later cause all type of warning messages (e.g. `Invalid type NoneType for attribute value. Expected one of ['bool', 'str', 'bytes', 'int', 'float'] or a sequence of those types`). The proposed fix makes sure metadata is pre-populated (based on https://github.com/dpkp/kafka-python/blob/4d598055dab7da99e41bfcceffa8462b32931cdd/kafka/producer/kafka.py#L579). I'm just not sure if we should wrap `_wait_on_metadata` into try\except, maybe just passing Exception to the caller would be a better idea... * upd: changelog * fix: changelog * fix: import KafkaErrors * fix: tox -e lint errors * fix: refact and added unit test Co-authored-by: Srikanth Chekuri <[email protected]> Co-authored-by: Leighton Chen <[email protected]> Co-authored-by: Diego Hurtado <[email protected]>
1 parent 868049e commit ffb995d

File tree

3 files changed

+31
-3
lines changed

3 files changed

+31
-3
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
106106

107107
### Fixed
108108

109+
- `opentelemetry-instrumentation-kafka-python`: wait for metadata
110+
([#1260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1260))
109111
- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies.
110112
([#1234](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1234))
111113
- `opentelemetry-instrumentation-pymongo` Change span names to not contain queries but only database name and command name

instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ def extract_send_partition(instance, args, kwargs):
7575
):
7676
return None
7777

78-
all_partitions = instance._metadata.partitions_for_topic(topic)
79-
if all_partitions is None or len(all_partitions) == 0:
80-
return None
78+
instance._wait_on_metadata(
79+
topic, instance.config["max_block_ms"] / 1000.0
80+
)
8181

8282
return instance._partition(
8383
topic, partition, key, value, key_bytes, value_bytes

instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py

+26
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from unittest import TestCase, mock
22

33
from opentelemetry.instrumentation.kafka.utils import (
4+
KafkaPropertiesExtractor,
45
_create_consumer_span,
56
_get_span_name,
67
_kafka_getter,
@@ -208,3 +209,28 @@ def test_create_consumer_span(
208209
span, record, self.args, self.kwargs
209210
)
210211
detach.assert_called_once_with(attach.return_value)
212+
213+
@mock.patch(
214+
"opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor"
215+
)
216+
def test_kafka_properties_extractor(
217+
self,
218+
kafka_properties_extractor: mock.MagicMock,
219+
):
220+
kafka_properties_extractor._serialize.return_value = None
221+
kafka_properties_extractor._partition.return_value = "partition"
222+
assert (
223+
KafkaPropertiesExtractor.extract_send_partition(
224+
kafka_properties_extractor, self.args, self.kwargs
225+
)
226+
== "partition"
227+
)
228+
kafka_properties_extractor._wait_on_metadata.side_effect = Exception(
229+
"mocked error"
230+
)
231+
assert (
232+
KafkaPropertiesExtractor.extract_send_partition(
233+
kafka_properties_extractor, self.args, self.kwargs
234+
)
235+
is None
236+
)

0 commit comments

Comments
 (0)