From ffb995d28b9c632b9fafe31867208dc585e24542 Mon Sep 17 00:00:00 2001 From: Robert Ayrapetyan Date: Tue, 15 Nov 2022 04:42:56 -0800 Subject: [PATCH] opentelemetry-instrumentation-kafka-python: wait for metadata (#1260) * fix kafka: wait for metadata Kafka's instance metadata could be unavailable (because it's being filled asynchronously). extract_send_partition() is based on a metadata, so it may return `None` for partition and later cause all type of warning messages (e.g. `Invalid type NoneType for attribute value. Expected one of ['bool', 'str', 'bytes', 'int', 'float'] or a sequence of those types`). The proposed fix makes sure metadata is pre-populated (based on https://github.com/dpkp/kafka-python/blob/4d598055dab7da99e41bfcceffa8462b32931cdd/kafka/producer/kafka.py#L579). I'm just not sure if we should wrap `_wait_on_metadata` into try\except, maybe just passing Exception to the caller would be a better idea... * upd: changelog * fix: changelog * fix: import KafkaErrors * fix: tox -e lint errors * fix: refact and added unit test Co-authored-by: Srikanth Chekuri Co-authored-by: Leighton Chen Co-authored-by: Diego Hurtado --- CHANGELOG.md | 2 ++ .../instrumentation/kafka/utils.py | 6 ++--- .../tests/test_utils.py | 26 +++++++++++++++++++ 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d23cf8ced..626c29bb2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -106,6 +106,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- `opentelemetry-instrumentation-kafka-python`: wait for metadata + ([#1260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1260)) - `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies. ([#1234](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1234)) - `opentelemetry-instrumentation-pymongo` Change span names to not contain queries but only database name and command name diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py index 52344ceaf..97880970c 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/utils.py @@ -75,9 +75,9 @@ class KafkaPropertiesExtractor: ): return None - all_partitions = instance._metadata.partitions_for_topic(topic) - if all_partitions is None or len(all_partitions) == 0: - return None + instance._wait_on_metadata( + topic, instance.config["max_block_ms"] / 1000.0 + ) return instance._partition( topic, partition, key, value, key_bytes, value_bytes diff --git a/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py index 74d359bfd..7da1ed059 100644 --- a/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-kafka-python/tests/test_utils.py @@ -1,6 +1,7 @@ from unittest import TestCase, mock from opentelemetry.instrumentation.kafka.utils import ( + KafkaPropertiesExtractor, _create_consumer_span, _get_span_name, _kafka_getter, @@ -208,3 +209,28 @@ class TestUtils(TestCase): span, record, self.args, self.kwargs ) detach.assert_called_once_with(attach.return_value) + + @mock.patch( + "opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor" + ) + def test_kafka_properties_extractor( + self, + kafka_properties_extractor: mock.MagicMock, + ): + kafka_properties_extractor._serialize.return_value = None + kafka_properties_extractor._partition.return_value = "partition" + assert ( + KafkaPropertiesExtractor.extract_send_partition( + kafka_properties_extractor, self.args, self.kwargs + ) + == "partition" + ) + kafka_properties_extractor._wait_on_metadata.side_effect = Exception( + "mocked error" + ) + assert ( + KafkaPropertiesExtractor.extract_send_partition( + kafka_properties_extractor, self.args, self.kwargs + ) + is None + )