mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-29 21:23:55 +08:00
fix: safe kafka partition extraction (#872)
* safe partition extraction * update changelog
This commit is contained in:
@ -45,6 +45,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
- `opentelemetry-instrumentation-sqlite3` Instrumentation now works with `dbapi2.connect`
|
- `opentelemetry-instrumentation-sqlite3` Instrumentation now works with `dbapi2.connect`
|
||||||
|
|
||||||
|
- `opentelemetry-instrumentation-kafka` Kafka: safe kafka partition extraction
|
||||||
|
([#872](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/872))
|
||||||
|
|
||||||
## [1.8.0-0.27b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.8.0-0.27b0) - 2021-12-17
|
## [1.8.0-0.27b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.8.0-0.27b0) - 2021-12-17
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
@ -55,6 +55,7 @@ class KafkaPropertiesExtractor:
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def extract_send_partition(instance, args, kwargs):
|
def extract_send_partition(instance, args, kwargs):
|
||||||
"""extract partition `send` method arguments, using the `_partition` method in KafkaProducer class"""
|
"""extract partition `send` method arguments, using the `_partition` method in KafkaProducer class"""
|
||||||
|
try:
|
||||||
topic = KafkaPropertiesExtractor.extract_send_topic(args)
|
topic = KafkaPropertiesExtractor.extract_send_topic(args)
|
||||||
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
|
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
|
||||||
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs)
|
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs)
|
||||||
@ -73,9 +74,17 @@ class KafkaPropertiesExtractor:
|
|||||||
or type(value_bytes) not in valid_types
|
or type(value_bytes) not in valid_types
|
||||||
):
|
):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
all_partitions = instance._metadata.partitions_for_topic(topic)
|
||||||
|
if all_partitions is None or len(all_partitions) == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
return instance._partition(
|
return instance._partition(
|
||||||
topic, partition, key, value, key_bytes, value_bytes
|
topic, partition, key, value, key_bytes, value_bytes
|
||||||
)
|
)
|
||||||
|
except Exception as exception: # pylint: disable=W0703
|
||||||
|
_LOG.debug("Unable to extract partition: %s", exception)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
ProduceHookT = Optional[Callable[[Span, List, Dict], None]]
|
ProduceHookT = Optional[Callable[[Span, List, Dict], None]]
|
||||||
|
Reference in New Issue
Block a user