Support older pika versions (#837)

* feat: support older pika versions

* update tox.ini

* update changelog

* take version from pika

* avoid exception when property name changes

* add callback attr name test
This commit is contained in:
Ran Nozik
2021-12-25 23:33:22 +02:00
committed by GitHub
parent c962da908c
commit 26aa17f8e9
7 changed files with 53 additions and 14 deletions

View File

@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `opentelemetry-instrumentation-aws-lambda` Adds support for configurable flush timeout via `OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT` property. ([#825](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/825))
- `opentelemetry-instrumentation-pika` Adds support for versions between `0.12.0` to `1.0.0`. ([#837](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/837))
### Fixed

View File

@ -20,7 +20,7 @@
| [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2 >= 2.7, < 4.0 |
| [opentelemetry-instrumentation-logging](./opentelemetry-instrumentation-logging) | logging |
| [opentelemetry-instrumentation-mysql](./opentelemetry-instrumentation-mysql) | mysql-connector-python ~= 8.0 |
| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 1.1.0 |
| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 0.12.0 |
| [opentelemetry-instrumentation-psycopg2](./opentelemetry-instrumentation-psycopg2) | psycopg2 >= 2.7.3.1 |
| [opentelemetry-instrumentation-pymemcache](./opentelemetry-instrumentation-pymemcache) | pymemcache ~= 1.3 |
| [opentelemetry-instrumentation-pymongo](./opentelemetry-instrumentation-pymongo) | pymongo ~= 3.1 |

View File

@ -13,4 +13,4 @@
# limitations under the License.
from typing import Collection
_instruments: Collection[str] = ("pika >= 1.1.0",)
_instruments: Collection[str] = ("pika >= 0.12.0",)

View File

@ -14,7 +14,9 @@
from logging import getLogger
from typing import Any, Collection, Dict, Optional
import pika
import wrapt
from packaging import version
from pika.adapters import BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel
@ -32,7 +34,18 @@ _CTX_KEY = "__otel_task_span"
_FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"]
def _consumer_callback_attribute_name() -> str:
pika_version = version.parse(pika.__version__)
return (
"on_message_callback"
if pika_version >= version.parse("1.0.0")
else "consumer_cb"
)
class PikaInstrumentor(BaseInstrumentor): # type: ignore
CONSUMER_CALLBACK_ATTR = _consumer_callback_attribute_name()
# pylint: disable=attribute-defined-outside-init
@staticmethod
def _instrument_blocking_channel_consumers(
@ -41,8 +54,12 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
consume_hook: utils.HookT = utils.dummy_callback,
) -> Any:
for consumer_tag, consumer_info in channel._consumer_infos.items():
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
consumer_callback = getattr(consumer_info, callback_attr, None)
if consumer_callback is None:
continue
decorated_callback = utils._decorate_callback(
consumer_info.on_message_callback,
consumer_callback,
tracer,
consumer_tag,
consume_hook,
@ -51,9 +68,9 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
setattr(
decorated_callback,
"_original_callback",
consumer_info.on_message_callback,
consumer_callback,
)
consumer_info.on_message_callback = decorated_callback
setattr(consumer_info, callback_attr, decorated_callback)
@staticmethod
def _instrument_basic_publish(
@ -126,10 +143,12 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
return
for consumers_tag, client_info in channel._consumer_infos.items():
if hasattr(client_info.on_message_callback, "_original_callback"):
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
consumer_callback = getattr(client_info, callback_attr, None)
if hasattr(consumer_callback, "_original_callback"):
channel._consumer_infos[
consumers_tag
] = client_info.on_message_callback._original_callback
] = consumer_callback._original_callback
PikaInstrumentor._uninstrument_channel_functions(channel)
def _decorate_channel_function(

View File

@ -18,6 +18,9 @@ from pika.channel import Channel
from wrapt import BoundFunctionWrapper
from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.instrumentation.pika.pika_instrumentor import (
_consumer_callback_attribute_name,
)
from opentelemetry.instrumentation.pika.utils import dummy_callback
from opentelemetry.trace import Tracer
@ -26,7 +29,8 @@ class TestPika(TestCase):
def setUp(self) -> None:
self.channel = mock.MagicMock(spec=Channel)
consumer_info = mock.MagicMock()
consumer_info.on_message_callback = mock.MagicMock()
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
setattr(consumer_info, callback_attr, mock.MagicMock())
self.channel._consumer_infos = {"consumer-tag": consumer_info}
self.mock_callback = mock.MagicMock()
@ -72,8 +76,11 @@ class TestPika(TestCase):
self, decorate_callback: mock.MagicMock
) -> None:
tracer = mock.MagicMock(spec=Tracer)
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
expected_decoration_calls = [
mock.call(value.on_message_callback, tracer, key, dummy_callback)
mock.call(
getattr(value, callback_attr), tracer, key, dummy_callback
)
for key, value in self.channel._consumer_infos.items()
]
PikaInstrumentor._instrument_blocking_channel_consumers(
@ -109,3 +116,13 @@ class TestPika(TestCase):
self.channel.basic_publish._original_function = original_function
PikaInstrumentor._uninstrument_channel_functions(self.channel)
self.assertEqual(self.channel.basic_publish, original_function)
def test_consumer_callback_attribute_name(self) -> None:
with mock.patch("pika.__version__", "1.0.0"):
self.assertEqual(
_consumer_callback_attribute_name(), "on_message_callback"
)
with mock.patch("pika.__version__", "0.12.0"):
self.assertEqual(
_consumer_callback_attribute_name(), "consumer_cb"
)

View File

@ -81,7 +81,7 @@ libraries = {
"instrumentation": "opentelemetry-instrumentation-mysql==0.27b0",
},
"pika": {
"library": "pika >= 1.1.0",
"library": "pika >= 0.12.0",
"instrumentation": "opentelemetry-instrumentation-pika==0.27b0",
},
"psycopg2": {

10
tox.ini
View File

@ -182,8 +182,8 @@ envlist =
pypy3-test-propagator-ot-trace
; opentelemetry-instrumentation-pika
py3{6,7,8,9,10}-test-instrumentation-pika
pypy3-test-instrumentation-pika
py3{6,7,8,9,10}-test-instrumentation-pika{0,1}
pypy3-test-instrumentation-pika{0,1}
lint
docker-tests
@ -216,6 +216,8 @@ deps =
sqlalchemy11: sqlalchemy>=1.1,<1.2
sqlalchemy14: aiosqlite
sqlalchemy14: sqlalchemy~=1.4
pika0: pika>=0.12.0,<1.0.0
pika1: pika>=1.0.0
; FIXME: add coverage testing
; FIXME: add mypy testing
@ -249,7 +251,7 @@ changedir =
test-instrumentation-jinja2: instrumentation/opentelemetry-instrumentation-jinja2/tests
test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests
test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests
test-instrumentation-pika: instrumentation/opentelemetry-instrumentation-pika/tests
test-instrumentation-pika{0,1}: instrumentation/opentelemetry-instrumentation-pika/tests
test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests
test-instrumentation-pymemcache: instrumentation/opentelemetry-instrumentation-pymemcache/tests
test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests
@ -286,7 +288,7 @@ commands_pre =
celery: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test]
pika: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test]
pika{0,1}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test]
grpc: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-grpc[test]