opentelemetry-instrumentation-kafka-python: wait for metadata (#1260)

* fix kafka: wait for metadata

Kafka's instance metadata could be unavailable (because it's being filled asynchronously). extract_send_partition() is based on a metadata, so it may return `None` for partition and later cause all type of warning messages (e.g. `Invalid type NoneType for attribute value. Expected one of ['bool', 'str', 'bytes', 'int', 'float'] or a sequence of those types`).
The proposed fix makes sure metadata is pre-populated (based on 4d598055da/kafka/producer/kafka.py (L579)).
I'm just not sure if we should wrap `_wait_on_metadata` into try\except, maybe just passing Exception to the caller would be a better idea...

* upd: changelog

* fix: changelog

* fix: import KafkaErrors

* fix: tox -e lint errors

* fix: refact and added unit test

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
Co-authored-by: Leighton Chen <lechen@microsoft.com>
Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com>
This commit is contained in:
Robert Ayrapetyan
2022-11-15 04:42:56 -08:00
committed by GitHub
parent 868049ecd9
commit ffb995d28b
3 changed files with 31 additions and 3 deletions

View File

@ -106,6 +106,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed ### Fixed
- `opentelemetry-instrumentation-kafka-python`: wait for metadata
([#1260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1260))
- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies. - `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies.
([#1234](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1234)) ([#1234](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1234))
- `opentelemetry-instrumentation-pymongo` Change span names to not contain queries but only database name and command name - `opentelemetry-instrumentation-pymongo` Change span names to not contain queries but only database name and command name

View File

@ -75,9 +75,9 @@ class KafkaPropertiesExtractor:
): ):
return None return None
all_partitions = instance._metadata.partitions_for_topic(topic) instance._wait_on_metadata(
if all_partitions is None or len(all_partitions) == 0: topic, instance.config["max_block_ms"] / 1000.0
return None )
return instance._partition( return instance._partition(
topic, partition, key, value, key_bytes, value_bytes topic, partition, key, value, key_bytes, value_bytes

View File

@ -1,6 +1,7 @@
from unittest import TestCase, mock from unittest import TestCase, mock
from opentelemetry.instrumentation.kafka.utils import ( from opentelemetry.instrumentation.kafka.utils import (
KafkaPropertiesExtractor,
_create_consumer_span, _create_consumer_span,
_get_span_name, _get_span_name,
_kafka_getter, _kafka_getter,
@ -208,3 +209,28 @@ class TestUtils(TestCase):
span, record, self.args, self.kwargs span, record, self.args, self.kwargs
) )
detach.assert_called_once_with(attach.return_value) detach.assert_called_once_with(attach.return_value)
@mock.patch(
"opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor"
)
def test_kafka_properties_extractor(
self,
kafka_properties_extractor: mock.MagicMock,
):
kafka_properties_extractor._serialize.return_value = None
kafka_properties_extractor._partition.return_value = "partition"
assert (
KafkaPropertiesExtractor.extract_send_partition(
kafka_properties_extractor, self.args, self.kwargs
)
== "partition"
)
kafka_properties_extractor._wait_on_metadata.side_effect = Exception(
"mocked error"
)
assert (
KafkaPropertiesExtractor.extract_send_partition(
kafka_properties_extractor, self.args, self.kwargs
)
is None
)