pika: added instrumentation for pika.connection.Connection and pika.c… (#3584)

* pika: added instrumentation for pika.connection.Connection and pika.channel.Channel, thus added instrumentation support to all SelectConnection adapters.

* updated changelog.

---------

Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
This commit is contained in:
warmagedon007
2025-07-07 11:17:31 +03:00
committed by GitHub
parent 3c4d18cc13
commit 80c357bb16
3 changed files with 127 additions and 23 deletions

View File

@ -11,6 +11,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
### Added
- `opentelemetry-instrumentation-pika` Added instrumentation for All `SelectConnection` adapters
([#3584](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3584))
### Fixed
- `opentelemetry-instrumentation-asgi`: fix excluded_urls in instrumentation-asgi

View File

@ -14,7 +14,7 @@
# pylint: disable=unnecessary-dunder-call
from logging import getLogger
from typing import Any, Collection, Dict, Optional
from typing import Any, Collection, Dict, Optional, Union
import pika
import wrapt
@ -24,6 +24,8 @@ from pika.adapters.blocking_connection import (
BlockingChannel,
_QueueConsumerGeneratorInfo,
)
from pika.channel import Channel
from pika.connection import Connection
from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
@ -53,12 +55,16 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
# pylint: disable=attribute-defined-outside-init
@staticmethod
def _instrument_blocking_channel_consumers(
channel: BlockingChannel,
def _instrument_channel_consumers(
channel: Union[BlockingChannel, Channel],
tracer: Tracer,
consume_hook: utils.HookT = utils.dummy_callback,
) -> Any:
for consumer_tag, consumer_info in channel._consumer_infos.items():
if isinstance(channel, BlockingChannel):
consumer_infos = channel._consumer_infos
elif isinstance(channel, Channel):
consumer_infos = channel._consumers
for consumer_tag, consumer_info in consumer_infos.items():
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
consumer_callback = getattr(consumer_info, callback_attr, None)
if consumer_callback is None:
@ -79,7 +85,7 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
@staticmethod
def _instrument_basic_publish(
channel: BlockingChannel,
channel: Union[BlockingChannel, Channel],
tracer: Tracer,
publish_hook: utils.HookT = utils.dummy_callback,
) -> None:
@ -93,7 +99,7 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
@staticmethod
def _instrument_channel_functions(
channel: BlockingChannel,
channel: Union[BlockingChannel, Channel],
tracer: Tracer,
publish_hook: utils.HookT = utils.dummy_callback,
) -> None:
@ -103,7 +109,9 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
)
@staticmethod
def _uninstrument_channel_functions(channel: BlockingChannel) -> None:
def _uninstrument_channel_functions(
channel: Union[BlockingChannel, Channel],
) -> None:
for function_name in _FUNCTIONS_TO_UNINSTRUMENT:
if not hasattr(channel, function_name):
continue
@ -115,7 +123,7 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
@staticmethod
# Make sure that the spans are created inside hash them set as parent and not as brothers
def instrument_channel(
channel: BlockingChannel,
channel: Union[BlockingChannel, Channel],
tracer_provider: Optional[TracerProvider] = None,
publish_hook: utils.HookT = utils.dummy_callback,
consume_hook: utils.HookT = utils.dummy_callback,
@ -133,7 +141,7 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
)
PikaInstrumentor._instrument_blocking_channel_consumers(
PikaInstrumentor._instrument_channel_consumers(
channel, tracer, consume_hook
)
PikaInstrumentor._decorate_basic_consume(channel, tracer, consume_hook)
@ -178,16 +186,17 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
return channel
wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper)
wrapt.wrap_function_wrapper(Connection, "channel", wrapper)
@staticmethod
def _decorate_basic_consume(
channel: BlockingChannel,
channel: Union[BlockingChannel, Channel],
tracer: Optional[Tracer],
consume_hook: utils.HookT = utils.dummy_callback,
) -> None:
def wrapper(wrapped, instance, args, kwargs):
return_value = wrapped(*args, **kwargs)
PikaInstrumentor._instrument_blocking_channel_consumers(
PikaInstrumentor._instrument_channel_consumers(
channel, tracer, consume_hook
)
return return_value
@ -236,6 +245,7 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
if hasattr(self, "__opentelemetry_tracer_provider"):
delattr(self, "__opentelemetry_tracer_provider")
unwrap(BlockingConnection, "channel")
unwrap(Connection, "channel")
unwrap(_QueueConsumerGeneratorInfo, "__init__")
def instrumentation_dependencies(self) -> Collection[str]:

View File

@ -13,9 +13,13 @@
# limitations under the License.
from unittest import TestCase, mock
from pika.adapters import BlockingConnection
from pika.adapters.blocking_connection import _QueueConsumerGeneratorInfo
from pika.adapters import BaseConnection, BlockingConnection
from pika.adapters.blocking_connection import (
BlockingChannel,
_QueueConsumerGeneratorInfo,
)
from pika.channel import Channel
from pika.connection import Connection
from wrapt import BoundFunctionWrapper
from opentelemetry.instrumentation.pika import PikaInstrumentor
@ -31,11 +35,13 @@ from opentelemetry.trace import Tracer
class TestPika(TestCase):
def setUp(self) -> None:
self.blocking_channel = mock.MagicMock(spec=BlockingChannel)
self.channel = mock.MagicMock(spec=Channel)
consumer_info = mock.MagicMock()
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
setattr(consumer_info, callback_attr, mock.MagicMock())
self.channel._consumer_infos = {"consumer-tag": consumer_info}
self.blocking_channel._consumer_infos = {"consumer-tag": consumer_info}
self.channel._consumers = {"consumer-tag": consumer_info}
self.mock_callback = mock.MagicMock()
def test_instrument_api(self) -> None:
@ -44,6 +50,10 @@ class TestPika(TestCase):
self.assertTrue(
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
)
self.assertTrue(isinstance(Connection.channel, BoundFunctionWrapper))
self.assertTrue(
isinstance(BaseConnection.channel, BoundFunctionWrapper)
)
self.assertTrue(
isinstance(
_QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper
@ -56,6 +66,10 @@ class TestPika(TestCase):
self.assertFalse(
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
)
self.assertFalse(isinstance(Connection.channel, BoundFunctionWrapper))
self.assertFalse(
isinstance(BaseConnection.channel, BoundFunctionWrapper)
)
self.assertFalse(
isinstance(
_QueueConsumerGeneratorInfo.__init__, BoundFunctionWrapper
@ -69,11 +83,34 @@ class TestPika(TestCase):
"opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_basic_consume"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_blocking_channel_consumers"
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_consumers"
)
def test_instrument_blocking_channel(
self,
instrument_channel_consumers: mock.MagicMock,
instrument_basic_consume: mock.MagicMock,
instrument_channel_functions: mock.MagicMock,
):
PikaInstrumentor.instrument_channel(channel=self.blocking_channel)
assert hasattr(
self.blocking_channel, "_is_instrumented_by_opentelemetry"
), "channel is not marked as instrumented!"
instrument_channel_consumers.assert_called_once()
instrument_basic_consume.assert_called_once()
instrument_channel_functions.assert_called_once()
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_basic_consume"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_consumers"
)
def test_instrument_channel(
self,
instrument_blocking_channel_consumers: mock.MagicMock,
instrument_channel_consumers: mock.MagicMock,
instrument_basic_consume: mock.MagicMock,
instrument_channel_functions: mock.MagicMock,
):
@ -81,12 +118,12 @@ class TestPika(TestCase):
assert hasattr(
self.channel, "_is_instrumented_by_opentelemetry"
), "channel is not marked as instrumented!"
instrument_blocking_channel_consumers.assert_called_once()
instrument_channel_consumers.assert_called_once()
instrument_basic_consume.assert_called_once()
instrument_channel_functions.assert_called_once()
@mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback")
def test_instrument_consumers(
def test_instrument_consumers_on_blocking_channel(
self, decorate_callback: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
@ -95,23 +132,63 @@ class TestPika(TestCase):
mock.call(
getattr(value, callback_attr), tracer, key, dummy_callback
)
for key, value in self.channel._consumer_infos.items()
for key, value in self.blocking_channel._consumer_infos.items()
]
PikaInstrumentor._instrument_blocking_channel_consumers(
self.channel, tracer
PikaInstrumentor._instrument_channel_consumers(
self.blocking_channel, tracer
)
decorate_callback.assert_has_calls(
calls=expected_decoration_calls, any_order=True
)
assert all(
hasattr(callback, "_original_callback")
for callback in self.channel._consumer_infos.values()
for callback in self.blocking_channel._consumer_infos.values()
)
@mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback")
def test_instrument_consumers_on_channel(
self, decorate_callback: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
expected_decoration_calls = [
mock.call(
getattr(value, callback_attr), tracer, key, dummy_callback
)
for key, value in self.channel._consumers.items()
]
PikaInstrumentor._instrument_channel_consumers(self.channel, tracer)
decorate_callback.assert_has_calls(
calls=expected_decoration_calls, any_order=True
)
assert all(
hasattr(callback, "_original_callback")
for callback in self.channel._consumers.values()
)
@mock.patch(
"opentelemetry.instrumentation.pika.utils._decorate_basic_publish"
)
def test_instrument_basic_publish(
def test_instrument_basic_publish_on_blocking_channel(
self, decorate_basic_publish: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
original_function = self.blocking_channel.basic_publish
PikaInstrumentor._instrument_basic_publish(
self.blocking_channel, tracer
)
decorate_basic_publish.assert_called_once_with(
original_function, self.blocking_channel, tracer, dummy_callback
)
self.assertEqual(
self.blocking_channel.basic_publish,
decorate_basic_publish.return_value,
)
@mock.patch(
"opentelemetry.instrumentation.pika.utils._decorate_basic_publish"
)
def test_instrument_basic_publish_on_channel(
self, decorate_basic_publish: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
@ -141,6 +218,17 @@ class TestPika(TestCase):
isinstance(generator_info.pending_events, ReadyMessagesDequeProxy)
)
def test_uninstrument_blocking_channel_functions(self) -> None:
original_function = self.blocking_channel.basic_publish
self.blocking_channel.basic_publish = mock.MagicMock()
self.blocking_channel.basic_publish._original_function = (
original_function
)
PikaInstrumentor._uninstrument_channel_functions(self.blocking_channel)
self.assertEqual(
self.blocking_channel.basic_publish, original_function
)
def test_uninstrument_channel_functions(self) -> None:
original_function = self.channel.basic_publish
self.channel.basic_publish = mock.MagicMock()