mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-27 12:14:32 +08:00
140 lines
4.8 KiB
Python
140 lines
4.8 KiB
Python
# Copyright The OpenTelemetry Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from importlib.metadata import PackageNotFoundError
|
|
from unittest import TestCase
|
|
from unittest.mock import call, patch
|
|
|
|
from kafka import KafkaConsumer, KafkaProducer
|
|
from wrapt import BoundFunctionWrapper
|
|
|
|
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
|
|
from opentelemetry.instrumentation.kafka.package import (
|
|
_instruments_kafka_python,
|
|
_instruments_kafka_python_ng,
|
|
)
|
|
|
|
|
|
class TestKafka(TestCase):
|
|
def test_instrument_api(self) -> None:
|
|
instrumentation = KafkaInstrumentor()
|
|
|
|
instrumentation.instrument()
|
|
self.assertTrue(isinstance(KafkaProducer.send, BoundFunctionWrapper))
|
|
self.assertTrue(
|
|
isinstance(KafkaConsumer.__next__, BoundFunctionWrapper)
|
|
)
|
|
|
|
instrumentation.uninstrument()
|
|
self.assertFalse(isinstance(KafkaProducer.send, BoundFunctionWrapper))
|
|
self.assertFalse(
|
|
isinstance(KafkaConsumer.__next__, BoundFunctionWrapper)
|
|
)
|
|
|
|
@patch("opentelemetry.instrumentation.kafka.distribution")
|
|
def test_instrumentation_dependencies_kafka_python_installed(
|
|
self, mock_distribution
|
|
) -> None:
|
|
instrumentation = KafkaInstrumentor()
|
|
|
|
def _distribution(name):
|
|
if name == "kafka-python":
|
|
return None
|
|
raise PackageNotFoundError
|
|
|
|
mock_distribution.side_effect = _distribution
|
|
package_to_instrument = instrumentation.instrumentation_dependencies()
|
|
|
|
self.assertEqual(mock_distribution.call_count, 2)
|
|
self.assertEqual(
|
|
mock_distribution.mock_calls,
|
|
[
|
|
call("kafka-python-ng"),
|
|
call("kafka-python"),
|
|
],
|
|
)
|
|
self.assertEqual(package_to_instrument, (_instruments_kafka_python,))
|
|
|
|
@patch("opentelemetry.instrumentation.kafka.distribution")
|
|
def test_instrumentation_dependencies_kafka_python_ng_installed(
|
|
self, mock_distribution
|
|
) -> None:
|
|
instrumentation = KafkaInstrumentor()
|
|
|
|
def _distribution(name):
|
|
if name == "kafka-python-ng":
|
|
return None
|
|
raise PackageNotFoundError
|
|
|
|
mock_distribution.side_effect = _distribution
|
|
package_to_instrument = instrumentation.instrumentation_dependencies()
|
|
|
|
self.assertEqual(mock_distribution.call_count, 1)
|
|
self.assertEqual(
|
|
mock_distribution.mock_calls, [call("kafka-python-ng")]
|
|
)
|
|
self.assertEqual(
|
|
package_to_instrument, (_instruments_kafka_python_ng,)
|
|
)
|
|
|
|
@patch("opentelemetry.instrumentation.kafka.distribution")
|
|
def test_instrumentation_dependencies_both_installed(
|
|
self, mock_distribution
|
|
) -> None:
|
|
instrumentation = KafkaInstrumentor()
|
|
|
|
def _distribution(name):
|
|
# The function returns None here for all names
|
|
# to simulate both packages being installed
|
|
return None
|
|
|
|
mock_distribution.side_effect = _distribution
|
|
package_to_instrument = instrumentation.instrumentation_dependencies()
|
|
|
|
self.assertEqual(mock_distribution.call_count, 1)
|
|
self.assertEqual(
|
|
mock_distribution.mock_calls, [call("kafka-python-ng")]
|
|
)
|
|
self.assertEqual(
|
|
package_to_instrument, (_instruments_kafka_python_ng,)
|
|
)
|
|
|
|
@patch("opentelemetry.instrumentation.kafka.distribution")
|
|
def test_instrumentation_dependencies_none_installed(
|
|
self, mock_distribution
|
|
) -> None:
|
|
instrumentation = KafkaInstrumentor()
|
|
|
|
def _distribution(name):
|
|
# Function raises PackageNotFoundError
|
|
# if name is not in the list. We will
|
|
# raise it for both names to simulate
|
|
# neither being installed
|
|
raise PackageNotFoundError
|
|
|
|
mock_distribution.side_effect = _distribution
|
|
package_to_instrument = instrumentation.instrumentation_dependencies()
|
|
|
|
self.assertEqual(mock_distribution.call_count, 2)
|
|
self.assertEqual(
|
|
mock_distribution.mock_calls,
|
|
[
|
|
call("kafka-python-ng"),
|
|
call("kafka-python"),
|
|
],
|
|
)
|
|
self.assertEqual(
|
|
package_to_instrument,
|
|
(_instruments_kafka_python, _instruments_kafka_python_ng),
|
|
)
|