Skip to content

Commit 88783f9

Browse files
mrajashreeshalevrsrikanthccv
authored
Allow Kafka producer headers to be dict or list (#1655)
* Allow Kafka producer headers to be dict or list * modify kafka context getter helper methods to work on dict and list --------- Co-authored-by: Shalev Roda <[email protected]> Co-authored-by: Srikanth Chekuri <[email protected]>
1 parent 4199751 commit 88783f9

File tree

3 files changed

+57
-12
lines changed

3 files changed

+57
-12
lines changed

CHANGELOG.md

+8-9
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10-
- Add metrics instrumentation for sqlalchemy
11-
([#1645](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1645))
12-
13-
- Fix exception in Urllib3 when dealing with filelike body.
14-
([#1399](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1399))
15-
16-
- Fix httpx resource warnings
17-
([#1695](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1695))
18-
1910
### Added
2011

2112
- Add connection attributes to sqlalchemy connect span
2213
([#1608](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1608))
2314
- Add support for enabling Redis sanitization from environment variable
2415
([#1690](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1690))
16+
- Add metrics instrumentation for sqlalchemy
17+
([#1645](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1645))
2518

2619
### Fixed
2720

2821
- Fix Flask instrumentation to only close the span if it was created by the same thread.
2922
([#1654](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1654))
23+
- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list
24+
([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655))
3025
- `opentelemetry-instrumentation-system-metrics` Fix initialization of the instrumentation class when configuration is provided
3126
([#1438](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1439))
27+
- Fix exception in Urllib3 when dealing with filelike body.
28+
([#1399](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1399))
29+
- Fix httpx resource warnings
30+
([#1695](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1695))
3231

3332
## Version 1.16.0/0.37b0 (2023-02-17)
3433

instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/utils.py

+18-3
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,26 @@ class KafkaContextGetter(textmap.Getter):
4141
def get(self, carrier: textmap.CarrierT, key: str) -> Optional[List[str]]:
4242
if carrier is None:
4343
return None
44-
for item_key, value in carrier:
44+
45+
carrier_items = carrier
46+
if isinstance(carrier, dict):
47+
carrier_items = carrier.items()
48+
49+
for item_key, value in carrier_items:
4550
if item_key == key:
4651
if value is not None:
4752
return [value.decode()]
53+
4854
return None
4955

5056
def keys(self, carrier: textmap.CarrierT) -> List[str]:
5157
if carrier is None:
5258
return []
53-
return [key for (key, value) in carrier]
59+
60+
carrier_items = carrier
61+
if isinstance(carrier, dict):
62+
carrier_items = carrier.items()
63+
return [key for (key, value) in carrier_items]
5464

5565

5666
class KafkaContextSetter(textmap.Setter):
@@ -60,7 +70,12 @@ def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None:
6070

6171
if value:
6272
value = value.encode()
63-
carrier.append((key, value))
73+
74+
if isinstance(carrier, list):
75+
carrier.append((key, value))
76+
77+
if isinstance(carrier, dict):
78+
carrier[key] = value
6479

6580

6681
_kafka_getter = KafkaContextGetter()

instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py

+31
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
ProxiedConsumer,
2424
ProxiedProducer,
2525
)
26+
from opentelemetry.instrumentation.confluent_kafka.utils import (
27+
KafkaContextGetter,
28+
KafkaContextSetter,
29+
)
2630

2731

2832
class TestConfluentKafka(TestCase):
@@ -73,3 +77,30 @@ def test_consumer_commit_method_exists(self) -> None:
7377
consumer = instrumentation.instrument_consumer(consumer)
7478
self.assertEqual(consumer.__class__, ProxiedConsumer)
7579
self.assertTrue(hasattr(consumer, "commit"))
80+
81+
def test_context_setter(self) -> None:
82+
context_setter = KafkaContextSetter()
83+
84+
carrier_dict = {"key1": "val1"}
85+
context_setter.set(carrier_dict, "key2", "val2")
86+
self.assertGreaterEqual(
87+
carrier_dict.items(), {"key2": "val2".encode()}.items()
88+
)
89+
90+
carrier_list = [("key1", "val1")]
91+
context_setter.set(carrier_list, "key2", "val2")
92+
self.assertTrue(("key2", "val2".encode()) in carrier_list)
93+
94+
def test_context_getter(self) -> None:
95+
context_setter = KafkaContextSetter()
96+
context_getter = KafkaContextGetter()
97+
98+
carrier_dict = {}
99+
context_setter.set(carrier_dict, "key1", "val1")
100+
self.assertEqual(context_getter.get(carrier_dict, "key1"), ["val1"])
101+
self.assertEqual(["key1"], context_getter.keys(carrier_dict))
102+
103+
carrier_list = []
104+
context_setter.set(carrier_list, "key1", "val1")
105+
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
106+
self.assertEqual(["key1"], context_getter.keys(carrier_list))

0 commit comments

Comments
 (0)