Skip to content

Commit f3afcd8

Browse files
committed
Allow Kafka producer headers to be dict or list
1 parent b8d7448 commit f3afcd8

File tree

3 files changed

+22
-1
lines changed

3 files changed

+22
-1
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4646
([#1435](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1435))
4747
- mongo db - fix db statement capturing
4848
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))
49+
- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list
50+
([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655))
4951

5052
## Version 1.15.0/0.36b0 (2022-12-10)
5153

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:
6060

6161
if value:
6262
value = value.encode()
63-
carrier.append((key, value))
63+
64+
if isinstance(carrier, list):
65+
carrier.append((key, value))
66+
67+
if isinstance(carrier, dict):
68+
carrier[key] = value
6469

6570

6671
_kafka_getter = KafkaContextGetter()

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

+14
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
ProxiedConsumer,
2424
ProxiedProducer,
2525
)
26+
from opentelemetry.instrumentation.confluent_kafka.utils import (
27+
KafkaContextSetter,
28+
)
2629

2730

2831
class TestConfluentKafka(TestCase):
@@ -58,3 +61,14 @@ def test_instrument_api(self) -> None:
5861

5962
consumer = instrumentation.uninstrument_consumer(consumer)
6063
self.assertEqual(consumer.__class__, Consumer)
64+
65+
def test_context_setter(self) -> None:
66+
context_setter = KafkaContextSetter()
67+
68+
carrier_dict = {"key1": "val1"}
69+
context_setter.set(carrier_dict, "key2", "val2")
70+
self.assertGreaterEqual(carrier_dict.items(), {"key2": "val2".encode()}.items())
71+
72+
carrier_list = [("key1", "val1")]
73+
context_setter.set(carrier_list, "key2", "val2")
74+
self.assertTrue(("key2", "val2".encode()) in carrier_list)

0 commit comments

Comments
 (0)