Feature/add pika instrumentation (#680)

* Added initial code

* Add all needed spans, and add support of instrumentation and uninstrumentation

* Added tests. Ready for PR

* Rename RequestsInstrumentation to RequestsInstrumentor to follow conventions

* Add suppress_instrumentation functionality

* Fix suppress_instrumentation functionality

* Fix CR comments and lint test failures

* Add usage of wrapt according to CR comments

* Fix according to CR Comments

* Move the tracer to be an attribute of the instrumentor instead of the channel

* Fix Tests

* Update Changelog and fix failing test

* update code using tox -e generate

* Update the name of the variable to store the tracer provider.

* Update the core repo hash in the workflow

* Update the core repo hash in the workflow

Co-authored-by: Leighton Chen <lechen@microsoft.com>
Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com>
Co-authored-by: Owais Lone <owais@users.noreply.github.com>
This commit is contained in:
Nikolay Sokolik
2021-10-06 21:22:39 +03:00
committed by GitHub
parent 196037125f
commit fb24599324
16 changed files with 930 additions and 1 deletions

View File

@ -6,7 +6,7 @@ on:
- 'release/*'
pull_request:
env:
CORE_REPO_SHA: 10208c1be1e720925a80a66f711b8afbe67537f4
CORE_REPO_SHA: adad94bfa69520cb4cbabca714827fd14503baf0
jobs:
build:

View File

@ -53,6 +53,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-aiohttp-client`, `openetelemetry-instrumentation-fastapi`,
`opentelemetry-instrumentation-starlette`, `opentelemetry-instrumentation-urllib`, `opentelemetry-instrumentation-urllib3` Added `request_hook` and `response_hook` callbacks
([#576](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/576))
- `opentelemetry-instrumentation-pika` added RabbitMQ's pika module instrumentation.
([#680](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/680))
### Changed

View File

@ -19,6 +19,7 @@
| [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2~=2.7 |
| [opentelemetry-instrumentation-logging](./opentelemetry-instrumentation-logging) | logging |
| [opentelemetry-instrumentation-mysql](./opentelemetry-instrumentation-mysql) | mysql-connector-python ~= 8.0 |
| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 1.1.0 |
| [opentelemetry-instrumentation-psycopg2](./opentelemetry-instrumentation-psycopg2) | psycopg2 >= 2.7.3.1 |
| [opentelemetry-instrumentation-pymemcache](./opentelemetry-instrumentation-pymemcache) | pymemcache ~= 1.3 |
| [opentelemetry-instrumentation-pymongo](./opentelemetry-instrumentation-pymongo) | pymongo ~= 3.1 |

View File

@ -0,0 +1,70 @@
OpenTelemetry pika Instrumentation
==================================
|pypi|
.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-pika.svg
:target: https://pypi.org/project/opentelemetry-instrumentation-pika/
This library allows tracing requests made by the pika library.
Installation
------------
::
pip install opentelemetry-instrumentation-pika
Usage
-----
* Start broker backend
.. code-block:: python
docker run -p 5672:5672 rabbitmq
* Run instrumented task
.. code-block:: python
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor
PikaInstrumentor().instrument()
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
* PikaInstrumentor also supports instrumentation of a single channel
.. code-block:: python
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
pika_instrumentation = PikaInstrumentor()
pika_instrumentation.instrument_channel(channel=channel)
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
pika_instrumentation.uninstrument_channel(channel=channel)
* PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider
.. code-block:: python
PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider)
References
----------
* `OpenTelemetry pika/ Tracing <https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/pika/pika.html>`_
* `OpenTelemetry Project <https://opentelemetry.io/>`_

View File

@ -0,0 +1,56 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
[metadata]
name = opentelemetry-instrumentation-pika
description = OpenTelemetry pika instrumentation
long_description = file: README.rst
long_description_content_type = text/x-rst
author = OpenTelemetry Authors
author_email = cncf-opentelemetry-contributors@lists.cncf.io
url = https://github.com/open-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-pika
platforms = any
license = Apache-2.0
classifiers =
Development Status :: 4 - Beta
Intended Audience :: Developers
License :: OSI Approved :: Apache Software License
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
[options]
python_requires = >=3.6
package_dir=
=src
packages=find_namespace:
install_requires =
opentelemetry-api ~= 1.5
wrapt >= 1.0.0, < 2.0.0
[options.extras_require]
test =
pytest
wrapt >= 1.0.0, < 2.0.0
opentelemetry-test == 0.24b0
[options.packages.find]
where = src
[options.entry_points]
opentelemetry_instrumentor =
pika = opentelemetry.instrumentation.pika:PikaInstrumentor

View File

@ -0,0 +1,89 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt.
# RUN `python scripts/generate_setup.py` TO REGENERATE.
import distutils.cmd
import json
import os
from configparser import ConfigParser
import setuptools
config = ConfigParser()
config.read("setup.cfg")
# We provide extras_require parameter to setuptools.setup later which
# overwrites the extra_require section from setup.cfg. To support extra_require
# secion in setup.cfg, we load it here and merge it with the extra_require param.
extras_require = {}
if "options.extras_require" in config:
for key, value in config["options.extras_require"].items():
extras_require[key] = [v for v in value.split("\n") if v.strip()]
BASE_DIR = os.path.dirname(__file__)
PACKAGE_INFO = {}
VERSION_FILENAME = os.path.join(
BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "version.py"
)
with open(VERSION_FILENAME, encoding="utf-8") as f:
exec(f.read(), PACKAGE_INFO)
PACKAGE_FILENAME = os.path.join(
BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "package.py"
)
with open(PACKAGE_FILENAME, encoding="utf-8") as f:
exec(f.read(), PACKAGE_INFO)
# Mark any instruments/runtime dependencies as test dependencies as well.
extras_require["instruments"] = PACKAGE_INFO["_instruments"]
test_deps = extras_require.get("test", [])
for dep in extras_require["instruments"]:
test_deps.append(dep)
extras_require["test"] = test_deps
class JSONMetadataCommand(distutils.cmd.Command):
description = (
"print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ",
"auto-generate code in other places",
)
user_options = []
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
metadata = {
"name": config["metadata"]["name"],
"version": PACKAGE_INFO["__version__"],
"instruments": PACKAGE_INFO["_instruments"],
}
print(json.dumps(metadata))
setuptools.setup(
cmdclass={"meta": JSONMetadataCommand},
version=PACKAGE_INFO["__version__"],
extras_require=extras_require,
)

View File

@ -0,0 +1,73 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Instrument `pika` to trace RabbitMQ applications.
Usage
-----
* Start broker backend
.. code-block:: python
docker run -p 5672:5672 rabbitmq
* Run instrumented task
.. code-block:: python
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor
PikaInstrumentor().instrument()
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
* PikaInstrumentor also supports instrumentation of a single channel
.. code-block:: python
import pika
from opentelemetry.instrumentation.pika import PikaInstrumentor
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
pika_instrumentation = PikaInstrumentor()
pika_instrumentation.instrument_channel(channel=channel)
channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!')
pika_instrumentation.uninstrument_channel(channel=channel)
* PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider
.. code-block:: python
PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider)
API
---
"""
# pylint: disable=import-error
from .pika_instrumentor import PikaInstrumentor
from .version import __version__
__all__ = ["PikaInstrumentor", "__version__"]

View File

@ -0,0 +1,16 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Collection
_instruments: Collection[str] = ("pika >= 1.1.0",)

View File

@ -0,0 +1,134 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from logging import getLogger
from typing import Any, Callable, Collection, Dict, Optional
import wrapt
from pika.adapters import BlockingConnection
from pika.channel import Channel
from opentelemetry import trace
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.pika import utils
from opentelemetry.instrumentation.pika.package import _instruments
from opentelemetry.instrumentation.pika.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.trace import Tracer, TracerProvider
_LOG = getLogger(__name__)
_CTX_KEY = "__otel_task_span"
_FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"]
class PikaInstrumentor(BaseInstrumentor): # type: ignore
# pylint: disable=attribute-defined-outside-init
@staticmethod
def _instrument_consumers(
consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer
) -> Any:
for key, callback in consumers_dict.items():
decorated_callback = utils._decorate_callback(
callback, tracer, key
)
setattr(decorated_callback, "_original_callback", callback)
consumers_dict[key] = decorated_callback
@staticmethod
def _instrument_basic_publish(channel: Channel, tracer: Tracer) -> None:
original_function = getattr(channel, "basic_publish")
decorated_function = utils._decorate_basic_publish(
original_function, channel, tracer
)
setattr(decorated_function, "_original_function", original_function)
channel.__setattr__("basic_publish", decorated_function)
channel.basic_publish = decorated_function
@staticmethod
def _instrument_channel_functions(
channel: Channel, tracer: Tracer
) -> None:
if hasattr(channel, "basic_publish"):
PikaInstrumentor._instrument_basic_publish(channel, tracer)
@staticmethod
def _uninstrument_channel_functions(channel: Channel) -> None:
for function_name in _FUNCTIONS_TO_UNINSTRUMENT:
if not hasattr(channel, function_name):
continue
function = getattr(channel, function_name)
if hasattr(function, "_original_function"):
channel.__setattr__(function_name, function._original_function)
@staticmethod
def instrument_channel(
channel: Channel, tracer_provider: Optional[TracerProvider] = None,
) -> None:
if not hasattr(channel, "_is_instrumented_by_opentelemetry"):
channel._is_instrumented_by_opentelemetry = False
if channel._is_instrumented_by_opentelemetry:
_LOG.warning(
"Attempting to instrument Pika channel while already instrumented!"
)
return
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
if not hasattr(channel, "_impl"):
_LOG.error("Could not find implementation for provided channel!")
return
if channel._impl._consumers:
PikaInstrumentor._instrument_consumers(
channel._impl._consumers, tracer
)
PikaInstrumentor._instrument_channel_functions(channel, tracer)
@staticmethod
def uninstrument_channel(channel: Channel) -> None:
if (
not hasattr(channel, "_is_instrumented_by_opentelemetry")
or not channel._is_instrumented_by_opentelemetry
):
_LOG.error(
"Attempting to uninstrument Pika channel while already uninstrumented!"
)
return
if not hasattr(channel, "_impl"):
_LOG.error("Could not find implementation for provided channel!")
return
for key, callback in channel._impl._consumers.items():
if hasattr(callback, "_original_callback"):
channel._impl._consumers[key] = callback._original_callback
PikaInstrumentor._uninstrument_channel_functions(channel)
def _decorate_channel_function(
self, tracer_provider: Optional[TracerProvider]
) -> None:
def wrapper(wrapped, instance, args, kwargs):
channel = wrapped(*args, **kwargs)
self.instrument_channel(channel, tracer_provider=tracer_provider)
return channel
wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper)
def _instrument(self, **kwargs: Dict[str, Any]) -> None:
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
self.__setattr__("__opentelemetry_tracer_provider", tracer_provider)
self._decorate_channel_function(tracer_provider)
def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
if hasattr(self, "__opentelemetry_tracer_provider"):
delattr(self, "__opentelemetry_tracer_provider")
unwrap(BlockingConnection, "channel")
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

View File

@ -0,0 +1,160 @@
from typing import Any, Callable, List, Optional
from pika.channel import Channel
from pika.spec import Basic, BasicProperties
from opentelemetry import context, propagate, trace
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.propagators.textmap import CarrierT, Getter
from opentelemetry.semconv.trace import (
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Tracer
from opentelemetry.trace.span import Span
class _PikaGetter(Getter): # type: ignore
def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]:
value = carrier.get(key, None)
if value is None:
return None
return [value]
def keys(self, carrier: CarrierT) -> List[str]:
return []
_pika_getter = _PikaGetter()
def _decorate_callback(
callback: Callable[[Channel, Basic.Deliver, BasicProperties, bytes], Any],
tracer: Tracer,
task_name: str,
):
def decorated_callback(
channel: Channel,
method: Basic.Deliver,
properties: BasicProperties,
body: bytes,
) -> Any:
if not properties:
properties = BasicProperties()
span = _get_span(
tracer,
channel,
properties,
task_name=task_name,
operation=MessagingOperationValues.RECEIVE,
)
with trace.use_span(span, end_on_exit=True):
propagate.inject(properties.headers)
retval = callback(channel, method, properties, body)
return retval
return decorated_callback
def _decorate_basic_publish(
original_function: Callable[[str, str, bytes, BasicProperties, bool], Any],
channel: Channel,
tracer: Tracer,
):
def decorated_function(
exchange: str,
routing_key: str,
body: bytes,
properties: BasicProperties = None,
mandatory: bool = False,
) -> Any:
if not properties:
properties = BasicProperties()
span = _get_span(
tracer,
channel,
properties,
task_name="(temporary)",
operation=None,
)
if not span:
return original_function(
exchange, routing_key, body, properties, mandatory
)
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(properties.headers)
retval = original_function(
exchange, routing_key, body, properties, mandatory
)
return retval
return decorated_function
def _get_span(
tracer: Tracer,
channel: Channel,
properties: BasicProperties,
task_name: str,
operation: Optional[MessagingOperationValues] = None,
) -> Optional[Span]:
if properties.headers is None:
properties.headers = {}
ctx = propagate.extract(properties.headers, getter=_pika_getter)
if context.get_value("suppress_instrumentation") or context.get_value(
_SUPPRESS_INSTRUMENTATION_KEY
):
return None
task_name = properties.type if properties.type else task_name
span = tracer.start_span(
context=ctx, name=_generate_span_name(task_name, operation)
)
if span.is_recording():
_enrich_span(span, channel, properties, task_name, operation)
return span
def _generate_span_name(
task_name: str, operation: Optional[MessagingOperationValues]
) -> str:
if not operation:
return f"{task_name} send"
return f"{task_name} {operation.value}"
def _enrich_span(
span: Span,
channel: Channel,
properties: BasicProperties,
task_destination: str,
operation: Optional[MessagingOperationValues] = None,
) -> None:
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq")
if operation:
span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value)
else:
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, task_destination)
if properties.message_id:
span.set_attribute(
SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id
)
if properties.correlation_id:
span.set_attribute(
SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id
)
if not hasattr(channel.connection, "params"):
span.set_attribute(
SpanAttributes.NET_PEER_NAME, channel.connection._impl.params.host
)
span.set_attribute(
SpanAttributes.NET_PEER_PORT, channel.connection._impl.params.port
)
else:
span.set_attribute(
SpanAttributes.NET_PEER_NAME, channel.connection.params.host
)
span.set_attribute(
SpanAttributes.NET_PEER_PORT, channel.connection.params.port
)

View File

@ -0,0 +1,15 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "0.24b0"

View File

@ -0,0 +1,37 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import TestCase
from opentelemetry.instrumentation.pika.utils import _PikaGetter
class TestPikaGetter(TestCase):
def setUp(self) -> None:
self.getter = _PikaGetter()
def test_get_none(self) -> None:
carrier = {}
value = self.getter.get(carrier, "test")
self.assertIsNone(value)
def test_get_value(self) -> None:
key = "test"
value = "value"
carrier = {key: value}
val = self.getter.get(carrier, key)
self.assertEqual(val, [value])
def test_keys(self):
keys = self.getter.keys({})
self.assertEqual(keys, [])

View File

@ -0,0 +1,104 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import TestCase, mock
from pika.adapters import BaseConnection, BlockingConnection
from pika.channel import Channel
from wrapt import BoundFunctionWrapper
from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.trace import Tracer
class TestPika(TestCase):
def setUp(self) -> None:
self.channel = mock.MagicMock(spec=Channel)
self.channel._impl = mock.MagicMock(spec=BaseConnection)
self.mock_callback = mock.MagicMock()
self.channel._impl._consumers = {"mock_key": self.mock_callback}
def test_instrument_api(self) -> None:
instrumentation = PikaInstrumentor()
instrumentation.instrument()
self.assertTrue(
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
)
assert hasattr(
instrumentation, "__opentelemetry_tracer_provider"
), "Tracer not stored for the object!"
instrumentation.uninstrument(channel=self.channel)
self.assertFalse(
isinstance(BlockingConnection.channel, BoundFunctionWrapper)
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions"
)
@mock.patch(
"opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_consumers"
)
def test_instrument(
self,
instrument_consumers: mock.MagicMock,
instrument_channel_functions: mock.MagicMock,
):
PikaInstrumentor.instrument_channel(channel=self.channel)
assert hasattr(
self.channel, "_is_instrumented_by_opentelemetry"
), "channel is not marked as instrumented!"
instrument_consumers.assert_called_once()
instrument_channel_functions.assert_called_once()
@mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback")
def test_instrument_consumers(
self, decorate_callback: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
expected_decoration_calls = [
mock.call(value, tracer, key)
for key, value in self.channel._impl._consumers.items()
]
PikaInstrumentor._instrument_consumers(
self.channel._impl._consumers, tracer
)
decorate_callback.assert_has_calls(
calls=expected_decoration_calls, any_order=True
)
assert all(
hasattr(callback, "_original_callback")
for callback in self.channel._impl._consumers.values()
)
@mock.patch(
"opentelemetry.instrumentation.pika.utils._decorate_basic_publish"
)
def test_instrument_basic_publish(
self, decorate_basic_publish: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
original_function = self.channel.basic_publish
PikaInstrumentor._instrument_basic_publish(self.channel, tracer)
decorate_basic_publish.assert_called_once_with(
original_function, self.channel, tracer
)
self.assertEqual(
self.channel.basic_publish, decorate_basic_publish.return_value
)
def test_uninstrument_channel_functions(self) -> None:
original_function = self.channel.basic_publish
self.channel.basic_publish = mock.MagicMock()
self.channel.basic_publish._original_function = original_function
PikaInstrumentor._uninstrument_channel_functions(self.channel)
self.assertEqual(self.channel.basic_publish, original_function)

View File

@ -0,0 +1,163 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import TestCase, mock
from opentelemetry.instrumentation.pika import utils
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span, Tracer
class TestUtils(TestCase):
@staticmethod
@mock.patch("opentelemetry.context.get_value")
@mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name")
@mock.patch("opentelemetry.instrumentation.pika.utils._enrich_span")
@mock.patch("opentelemetry.propagate.extract")
def test_get_span(
extract: mock.MagicMock,
enrich_span: mock.MagicMock,
generate_span_name: mock.MagicMock,
get_value: mock.MagicMock,
) -> None:
tracer = mock.MagicMock(spec=Tracer)
channel = mock.MagicMock()
properties = mock.MagicMock()
task_name = "test.test"
get_value.return_value = None
_ = utils._get_span(tracer, channel, properties, task_name)
extract.assert_called_once()
generate_span_name.assert_called_once()
tracer.start_span.assert_called_once_with(
context=extract.return_value, name=generate_span_name.return_value
)
enrich_span.assert_called_once()
@mock.patch("opentelemetry.context.get_value")
@mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name")
@mock.patch("opentelemetry.instrumentation.pika.utils._enrich_span")
@mock.patch("opentelemetry.propagate.extract")
def test_get_span_suppressed(
self,
extract: mock.MagicMock,
enrich_span: mock.MagicMock,
generate_span_name: mock.MagicMock,
get_value: mock.MagicMock,
) -> None:
tracer = mock.MagicMock(spec=Tracer)
channel = mock.MagicMock()
properties = mock.MagicMock()
task_name = "test.test"
get_value.return_value = True
span = utils._get_span(tracer, channel, properties, task_name)
self.assertEqual(span, None)
extract.assert_called_once()
generate_span_name.assert_not_called()
def test_generate_span_name_no_operation(self) -> None:
task_name = "test.test"
operation = None
span_name = utils._generate_span_name(task_name, operation)
self.assertEqual(span_name, f"{task_name} send")
def test_generate_span_name_with_operation(self) -> None:
task_name = "test.test"
operation = mock.MagicMock()
operation.value = "process"
span_name = utils._generate_span_name(task_name, operation)
self.assertEqual(span_name, f"{task_name} {operation.value}")
@staticmethod
def test_enrich_span_basic_values() -> None:
channel = mock.MagicMock()
properties = mock.MagicMock()
task_destination = "test.test"
span = mock.MagicMock(spec=Span)
utils._enrich_span(span, channel, properties, task_destination)
span.set_attribute.assert_has_calls(
any_order=True,
calls=[
mock.call(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq"),
mock.call(SpanAttributes.MESSAGING_TEMP_DESTINATION, True),
mock.call(
SpanAttributes.MESSAGING_DESTINATION, task_destination
),
mock.call(
SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id
),
mock.call(
SpanAttributes.MESSAGING_CONVERSATION_ID,
properties.correlation_id,
),
mock.call(
SpanAttributes.NET_PEER_NAME,
channel.connection.params.host,
),
mock.call(
SpanAttributes.NET_PEER_PORT,
channel.connection.params.port,
),
],
)
@staticmethod
def test_enrich_span_with_operation() -> None:
channel = mock.MagicMock()
properties = mock.MagicMock()
task_destination = "test.test"
operation = mock.MagicMock()
span = mock.MagicMock(spec=Span)
utils._enrich_span(
span, channel, properties, task_destination, operation
)
span.set_attribute.assert_has_calls(
any_order=True,
calls=[
mock.call(SpanAttributes.MESSAGING_OPERATION, operation.value)
],
)
@staticmethod
def test_enrich_span_without_operation() -> None:
channel = mock.MagicMock()
properties = mock.MagicMock()
task_destination = "test.test"
span = mock.MagicMock(spec=Span)
utils._enrich_span(span, channel, properties, task_destination)
span.set_attribute.assert_has_calls(
any_order=True,
calls=[mock.call(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)],
)
@staticmethod
def test_enrich_span_unique_connection() -> None:
channel = mock.MagicMock()
properties = mock.MagicMock()
task_destination = "test.test"
span = mock.MagicMock(spec=Span)
# We do this to create the behaviour of hasattr(channel.connection, "params") == False
del channel.connection.params
utils._enrich_span(span, channel, properties, task_destination)
span.set_attribute.assert_has_calls(
any_order=True,
calls=[
mock.call(
SpanAttributes.NET_PEER_NAME,
channel.connection._impl.params.host,
),
mock.call(
SpanAttributes.NET_PEER_PORT,
channel.connection._impl.params.port,
),
],
)

View File

@ -153,6 +153,10 @@ envlist =
py3{6,7,8,9}-test-propagator-ot-trace
pypy3-test-propagator-ot-trace
; opentelemetry-instrumentation-pika
py3{6,7,8,9}-test-instrumentation-pika
pypy3-test-instrumentation-pika
lint
docker-tests
docs
@ -210,6 +214,7 @@ changedir =
test-instrumentation-jinja2: instrumentation/opentelemetry-instrumentation-jinja2/tests
test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests
test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests
test-instrumentation-pika: instrumentation/opentelemetry-instrumentation-pika/tests
test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests
test-instrumentation-pymemcache: instrumentation/opentelemetry-instrumentation-pymemcache/tests
test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests
@ -242,6 +247,8 @@ commands_pre =
celery: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test]
pika: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test]
grpc: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-grpc[test]
falcon{2,3},flask,django,pyramid,tornado,starlette,fastapi,aiohttp,asgi,requests,urllib,urllib3,wsgi: pip install {toxinidir}/util/opentelemetry-util-http[test]
@ -372,6 +379,7 @@ commands_pre =
python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-flask[test]
python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-sqlalchemy[test]
python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test]
python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test]
python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-sklearn[test]
python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-redis[test]
python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-fastapi[test]
@ -430,6 +438,7 @@ commands_pre =
"{env:CORE_REPO}#egg=opentelemetry-test&subdirectory=tests/util" \
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-asyncpg \
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-celery \
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-pika \
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-dbapi \
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-mysql \
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-psycopg2 \