mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-29 21:23:55 +08:00
Allow Kafka producer headers to be dict or list (#1655)
* Allow Kafka producer headers to be dict or list * modify kafka context getter helper methods to work on dict and list --------- Co-authored-by: Shalev Roda <65566801+shalevr@users.noreply.github.com> Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:

committed by
GitHub

parent
419975138b
commit
88783f9632
17
CHANGELOG.md
17
CHANGELOG.md
@ -7,28 +7,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
- Add metrics instrumentation for sqlalchemy
|
|
||||||
([#1645](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1645))
|
|
||||||
|
|
||||||
- Fix exception in Urllib3 when dealing with filelike body.
|
|
||||||
([#1399](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1399))
|
|
||||||
|
|
||||||
- Fix httpx resource warnings
|
|
||||||
([#1695](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1695))
|
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
- Add connection attributes to sqlalchemy connect span
|
- Add connection attributes to sqlalchemy connect span
|
||||||
([#1608](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1608))
|
([#1608](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1608))
|
||||||
- Add support for enabling Redis sanitization from environment variable
|
- Add support for enabling Redis sanitization from environment variable
|
||||||
([#1690](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1690))
|
([#1690](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1690))
|
||||||
|
- Add metrics instrumentation for sqlalchemy
|
||||||
|
([#1645](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1645))
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
- Fix Flask instrumentation to only close the span if it was created by the same thread.
|
- Fix Flask instrumentation to only close the span if it was created by the same thread.
|
||||||
([#1654](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1654))
|
([#1654](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1654))
|
||||||
|
- Fix confluent-kafka instrumentation by allowing Producer headers to be dict or list
|
||||||
|
([#1655](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1655))
|
||||||
- `opentelemetry-instrumentation-system-metrics` Fix initialization of the instrumentation class when configuration is provided
|
- `opentelemetry-instrumentation-system-metrics` Fix initialization of the instrumentation class when configuration is provided
|
||||||
([#1438](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1439))
|
([#1438](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1439))
|
||||||
|
- Fix exception in Urllib3 when dealing with filelike body.
|
||||||
|
([#1399](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1399))
|
||||||
|
- Fix httpx resource warnings
|
||||||
|
([#1695](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1695))
|
||||||
|
|
||||||
## Version 1.16.0/0.37b0 (2023-02-17)
|
## Version 1.16.0/0.37b0 (2023-02-17)
|
||||||
|
|
||||||
|
@ -41,16 +41,26 @@ class KafkaContextGetter(textmap.Getter):
|
|||||||
def get(self, carrier: textmap.CarrierT, key: str) -> Optional[List[str]]:
|
def get(self, carrier: textmap.CarrierT, key: str) -> Optional[List[str]]:
|
||||||
if carrier is None:
|
if carrier is None:
|
||||||
return None
|
return None
|
||||||
for item_key, value in carrier:
|
|
||||||
|
carrier_items = carrier
|
||||||
|
if isinstance(carrier, dict):
|
||||||
|
carrier_items = carrier.items()
|
||||||
|
|
||||||
|
for item_key, value in carrier_items:
|
||||||
if item_key == key:
|
if item_key == key:
|
||||||
if value is not None:
|
if value is not None:
|
||||||
return [value.decode()]
|
return [value.decode()]
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def keys(self, carrier: textmap.CarrierT) -> List[str]:
|
def keys(self, carrier: textmap.CarrierT) -> List[str]:
|
||||||
if carrier is None:
|
if carrier is None:
|
||||||
return []
|
return []
|
||||||
return [key for (key, value) in carrier]
|
|
||||||
|
carrier_items = carrier
|
||||||
|
if isinstance(carrier, dict):
|
||||||
|
carrier_items = carrier.items()
|
||||||
|
return [key for (key, value) in carrier_items]
|
||||||
|
|
||||||
|
|
||||||
class KafkaContextSetter(textmap.Setter):
|
class KafkaContextSetter(textmap.Setter):
|
||||||
@ -60,7 +70,12 @@ class KafkaContextSetter(textmap.Setter):
|
|||||||
|
|
||||||
if value:
|
if value:
|
||||||
value = value.encode()
|
value = value.encode()
|
||||||
carrier.append((key, value))
|
|
||||||
|
if isinstance(carrier, list):
|
||||||
|
carrier.append((key, value))
|
||||||
|
|
||||||
|
if isinstance(carrier, dict):
|
||||||
|
carrier[key] = value
|
||||||
|
|
||||||
|
|
||||||
_kafka_getter = KafkaContextGetter()
|
_kafka_getter = KafkaContextGetter()
|
||||||
|
@ -23,6 +23,10 @@ from opentelemetry.instrumentation.confluent_kafka import (
|
|||||||
ProxiedConsumer,
|
ProxiedConsumer,
|
||||||
ProxiedProducer,
|
ProxiedProducer,
|
||||||
)
|
)
|
||||||
|
from opentelemetry.instrumentation.confluent_kafka.utils import (
|
||||||
|
KafkaContextGetter,
|
||||||
|
KafkaContextSetter,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestConfluentKafka(TestCase):
|
class TestConfluentKafka(TestCase):
|
||||||
@ -73,3 +77,30 @@ class TestConfluentKafka(TestCase):
|
|||||||
consumer = instrumentation.instrument_consumer(consumer)
|
consumer = instrumentation.instrument_consumer(consumer)
|
||||||
self.assertEqual(consumer.__class__, ProxiedConsumer)
|
self.assertEqual(consumer.__class__, ProxiedConsumer)
|
||||||
self.assertTrue(hasattr(consumer, "commit"))
|
self.assertTrue(hasattr(consumer, "commit"))
|
||||||
|
|
||||||
|
def test_context_setter(self) -> None:
|
||||||
|
context_setter = KafkaContextSetter()
|
||||||
|
|
||||||
|
carrier_dict = {"key1": "val1"}
|
||||||
|
context_setter.set(carrier_dict, "key2", "val2")
|
||||||
|
self.assertGreaterEqual(
|
||||||
|
carrier_dict.items(), {"key2": "val2".encode()}.items()
|
||||||
|
)
|
||||||
|
|
||||||
|
carrier_list = [("key1", "val1")]
|
||||||
|
context_setter.set(carrier_list, "key2", "val2")
|
||||||
|
self.assertTrue(("key2", "val2".encode()) in carrier_list)
|
||||||
|
|
||||||
|
def test_context_getter(self) -> None:
|
||||||
|
context_setter = KafkaContextSetter()
|
||||||
|
context_getter = KafkaContextGetter()
|
||||||
|
|
||||||
|
carrier_dict = {}
|
||||||
|
context_setter.set(carrier_dict, "key1", "val1")
|
||||||
|
self.assertEqual(context_getter.get(carrier_dict, "key1"), ["val1"])
|
||||||
|
self.assertEqual(["key1"], context_getter.keys(carrier_dict))
|
||||||
|
|
||||||
|
carrier_list = []
|
||||||
|
context_setter.set(carrier_list, "key1", "val1")
|
||||||
|
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
|
||||||
|
self.assertEqual(["key1"], context_getter.keys(carrier_list))
|
||||||
|
Reference in New Issue
Block a user