From c24c77dd3adcbfe9406b139acaa08d68155d6edb Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik <81902191+oxeye-nikolay@users.noreply.github.com> Date: Wed, 20 Oct 2021 00:02:24 +0300 Subject: [PATCH] Bugfix/instrument basic publish in pika (#759) --- CHANGELOG.md | 2 ++ .../instrumentation/pika/pika_instrumentor.py | 29 +++++++++++++++++++ .../instrumentation/pika/utils.py | 4 +++ .../tests/test_pika_instrumentation.py | 5 ++++ .../opentelemetry/instrumentation/utils.py | 1 + 5 files changed, 41 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d5aaca89e..9904b50a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-sdk-extension-aws` & `opentelemetry-propagator-aws` Release AWS Python SDK Extension as 2.0.1 and AWS Propagator as 1.0.1 ([#753](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/753)) +- `opentelemetry-instrumentation-pika` Add `_decorate_basic_consume` to ensure post instrumentation `basic_consume` calls are also instrumented. + ([#759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/759)) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index a48e46034..05496f53d 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -70,6 +70,7 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore function = getattr(channel, function_name) if hasattr(function, "_original_function"): channel.__setattr__(function_name, function._original_function) + unwrap(channel, "basic_consume") @staticmethod def instrument_channel( @@ -90,6 +91,7 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore PikaInstrumentor._instrument_consumers( channel._impl._consumers, tracer ) + PikaInstrumentor._decorate_basic_consume(channel, tracer) PikaInstrumentor._instrument_channel_functions(channel, tracer) @staticmethod @@ -120,6 +122,33 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper) + @staticmethod + def _decorate_basic_consume(channel, tracer: Optional[Tracer]) -> None: + def wrapper(wrapped, instance, args, kwargs): + if not hasattr(channel, "_impl"): + _LOG.error( + "Could not find implementation for provided channel!" + ) + return wrapped(*args, **kwargs) + current_keys = set(channel._impl._consumers.keys()) + return_value = wrapped(*args, **kwargs) + new_key_list = list( + set(channel._impl._consumers.keys()) - current_keys + ) + if not new_key_list: + _LOG.error("Could not find added callback") + return return_value + new_key = new_key_list[0] + callback = channel._impl._consumers[new_key] + decorated_callback = utils._decorate_callback( + callback, tracer, new_key + ) + setattr(decorated_callback, "_original_callback", callback) + channel._impl._consumers[new_key] = decorated_callback + return return_value + + wrapt.wrap_function_wrapper(channel, "basic_consume", wrapper) + def _instrument(self, **kwargs: Dict[str, Any]) -> None: tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) self.__setattr__("__opentelemetry_tracer_provider", tracer_provider) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index d1d85b299..12161d233 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -41,6 +41,8 @@ def _decorate_callback( ) -> Any: if not properties: properties = BasicProperties(headers={}) + if properties.headers is None: + properties.headers = {} ctx = propagate.extract(properties.headers, getter=_pika_getter) if not ctx: ctx = context.get_current() @@ -74,6 +76,8 @@ def _decorate_basic_publish( ) -> Any: if not properties: properties = BasicProperties(headers={}) + if properties.headers is None: + properties.headers = {} ctx = context.get_current() span = _get_span( tracer, diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 508d49c3b..da2a940b5 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -45,12 +45,16 @@ class TestPika(TestCase): @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions" ) + @mock.patch( + "opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_basic_consume" + ) @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_consumers" ) def test_instrument( self, instrument_consumers: mock.MagicMock, + instrument_basic_consume: mock.MagicMock, instrument_channel_functions: mock.MagicMock, ): PikaInstrumentor.instrument_channel(channel=self.channel) @@ -58,6 +62,7 @@ class TestPika(TestCase): self.channel, "_is_instrumented_by_opentelemetry" ), "channel is not marked as instrumented!" instrument_consumers.assert_called_once() + instrument_basic_consume.assert_called_once() instrument_channel_functions.assert_called_once() @mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback") diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py index 5f63b4af5..45741f082 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py @@ -17,6 +17,7 @@ from typing import Dict, Sequence from wrapt import ObjectProxy # pylint: disable=unused-import +# pylint: disable=E0611 from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY # noqa: F401 from opentelemetry.trace import StatusCode