Files

140 lines
4.8 KiB
Python

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from importlib.metadata import PackageNotFoundError
from unittest import TestCase
from unittest.mock import call, patch
from kafka import KafkaConsumer, KafkaProducer
from wrapt import BoundFunctionWrapper
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
from opentelemetry.instrumentation.kafka.package import (
_instruments_kafka_python,
_instruments_kafka_python_ng,
)
class TestKafka(TestCase):
def test_instrument_api(self) -> None:
instrumentation = KafkaInstrumentor()
instrumentation.instrument()
self.assertTrue(isinstance(KafkaProducer.send, BoundFunctionWrapper))
self.assertTrue(
isinstance(KafkaConsumer.__next__, BoundFunctionWrapper)
)
instrumentation.uninstrument()
self.assertFalse(isinstance(KafkaProducer.send, BoundFunctionWrapper))
self.assertFalse(
isinstance(KafkaConsumer.__next__, BoundFunctionWrapper)
)
@patch("opentelemetry.instrumentation.kafka.distribution")
def test_instrumentation_dependencies_kafka_python_installed(
self, mock_distribution
) -> None:
instrumentation = KafkaInstrumentor()
def _distribution(name):
if name == "kafka-python":
return None
raise PackageNotFoundError
mock_distribution.side_effect = _distribution
package_to_instrument = instrumentation.instrumentation_dependencies()
self.assertEqual(mock_distribution.call_count, 2)
self.assertEqual(
mock_distribution.mock_calls,
[
call("kafka-python-ng"),
call("kafka-python"),
],
)
self.assertEqual(package_to_instrument, (_instruments_kafka_python,))
@patch("opentelemetry.instrumentation.kafka.distribution")
def test_instrumentation_dependencies_kafka_python_ng_installed(
self, mock_distribution
) -> None:
instrumentation = KafkaInstrumentor()
def _distribution(name):
if name == "kafka-python-ng":
return None
raise PackageNotFoundError
mock_distribution.side_effect = _distribution
package_to_instrument = instrumentation.instrumentation_dependencies()
self.assertEqual(mock_distribution.call_count, 1)
self.assertEqual(
mock_distribution.mock_calls, [call("kafka-python-ng")]
)
self.assertEqual(
package_to_instrument, (_instruments_kafka_python_ng,)
)
@patch("opentelemetry.instrumentation.kafka.distribution")
def test_instrumentation_dependencies_both_installed(
self, mock_distribution
) -> None:
instrumentation = KafkaInstrumentor()
def _distribution(name):
# The function returns None here for all names
# to simulate both packages being installed
return None
mock_distribution.side_effect = _distribution
package_to_instrument = instrumentation.instrumentation_dependencies()
self.assertEqual(mock_distribution.call_count, 1)
self.assertEqual(
mock_distribution.mock_calls, [call("kafka-python-ng")]
)
self.assertEqual(
package_to_instrument, (_instruments_kafka_python_ng,)
)
@patch("opentelemetry.instrumentation.kafka.distribution")
def test_instrumentation_dependencies_none_installed(
self, mock_distribution
) -> None:
instrumentation = KafkaInstrumentor()
def _distribution(name):
# Function raises PackageNotFoundError
# if name is not in the list. We will
# raise it for both names to simulate
# neither being installed
raise PackageNotFoundError
mock_distribution.side_effect = _distribution
package_to_instrument = instrumentation.instrumentation_dependencies()
self.assertEqual(mock_distribution.call_count, 2)
self.assertEqual(
mock_distribution.mock_calls,
[
call("kafka-python-ng"),
call("kafka-python"),
],
)
self.assertEqual(
package_to_instrument,
(_instruments_kafka_python, _instruments_kafka_python_ng),
)