mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-30 21:56:07 +08:00
Support aio_pika 8.x (#1481)
* Support aio_pika 8 - Fix tests for new shape of the AbstractConnection class - Run tests against aio_pika 7 and 8 * Update CHANGELOG.md --------- Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:

committed by
GitHub

parent
2519223a5c
commit
66ceef5fe1
@ -16,9 +16,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553))
|
||||
- `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available
|
||||
([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212))
|
||||
- `opentelemetry-instrumentation-aio-pika` Support `aio_pika` 8.x
|
||||
([#1481](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1481))
|
||||
- `opentelemetry-instrumentation-aws-lambda` Flush `MeterProvider` at end of function invocation.
|
||||
([#1613](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1613))
|
||||
- Fix aiohttp bug with unset `trace_configs` ([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592))
|
||||
- Fix aiohttp bug with unset `trace_configs`
|
||||
([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592))
|
||||
- `opentelemetry-instrumentation-django` Allow explicit `excluded_urls` configuration through `instrument()`
|
||||
([#1618](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1618))
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
|
||||
| Instrumentation | Supported Packages | Metrics support |
|
||||
| --------------- | ------------------ | --------------- |
|
||||
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika ~= 7.2.0 | No
|
||||
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika >= 7.2.0, < 9.0.0 | No
|
||||
| [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No
|
||||
| [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 2.0.0 | No
|
||||
| [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | No
|
||||
|
@ -31,7 +31,7 @@ dependencies = [
|
||||
|
||||
[project.optional-dependencies]
|
||||
instruments = [
|
||||
"aio_pika ~= 7.2.0",
|
||||
"aio_pika >= 7.2.0, < 9.0.0",
|
||||
]
|
||||
test = [
|
||||
"opentelemetry-instrumentation-aio-pika[instruments]",
|
||||
|
@ -13,4 +13,4 @@
|
||||
# limitations under the License.
|
||||
from typing import Collection
|
||||
|
||||
_instruments: Collection[str] = ("aio_pika ~= 7.2.0",)
|
||||
_instruments: Collection[str] = ("aio_pika >= 7.2.0, < 9.0.0",)
|
||||
|
@ -24,7 +24,7 @@ from opentelemetry.semconv.trace import (
|
||||
)
|
||||
from opentelemetry.trace import Span, SpanKind, Tracer
|
||||
|
||||
_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'}
|
||||
_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: "rabbitmq"}
|
||||
|
||||
|
||||
class SpanBuilder:
|
||||
@ -49,18 +49,30 @@ class SpanBuilder:
|
||||
self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination
|
||||
|
||||
def set_channel(self, channel: AbstractChannel):
|
||||
url = channel.connection.connection.url
|
||||
self._attributes.update({
|
||||
SpanAttributes.NET_PEER_NAME: url.host,
|
||||
SpanAttributes.NET_PEER_PORT: url.port
|
||||
})
|
||||
connection = channel.connection
|
||||
if getattr(connection, "connection", None):
|
||||
# aio_rmq 7
|
||||
url = connection.connection.url
|
||||
else:
|
||||
# aio_rmq 8
|
||||
url = connection.url
|
||||
self._attributes.update(
|
||||
{
|
||||
SpanAttributes.NET_PEER_NAME: url.host,
|
||||
SpanAttributes.NET_PEER_PORT: url.port,
|
||||
}
|
||||
)
|
||||
|
||||
def set_message(self, message: AbstractMessage):
|
||||
properties = message.properties
|
||||
if properties.message_id:
|
||||
self._attributes[SpanAttributes.MESSAGING_MESSAGE_ID] = properties.message_id
|
||||
self._attributes[
|
||||
SpanAttributes.MESSAGING_MESSAGE_ID
|
||||
] = properties.message_id
|
||||
if properties.correlation_id:
|
||||
self._attributes[SpanAttributes.MESSAGING_CONVERSATION_ID] = properties.correlation_id
|
||||
self._attributes[
|
||||
SpanAttributes.MESSAGING_CONVERSATION_ID
|
||||
] = properties.correlation_id
|
||||
|
||||
def build(self) -> Optional[Span]:
|
||||
if not is_instrumentation_enabled():
|
||||
@ -69,9 +81,11 @@ class SpanBuilder:
|
||||
self._attributes[SpanAttributes.MESSAGING_OPERATION] = self._operation.value
|
||||
else:
|
||||
self._attributes[SpanAttributes.MESSAGING_TEMP_DESTINATION] = True
|
||||
span = self._tracer.start_span(self._generate_span_name(), kind=self._kind, attributes=self._attributes)
|
||||
span = self._tracer.start_span(
|
||||
self._generate_span_name(), kind=self._kind, attributes=self._attributes
|
||||
)
|
||||
return span
|
||||
|
||||
def _generate_span_name(self) -> str:
|
||||
operation_value = self._operation.value if self._operation else 'send'
|
||||
return f'{self._destination} {operation_value}'
|
||||
operation_value = self._operation.value if self._operation else "send"
|
||||
return f"{self._destination} {operation_value}"
|
||||
|
@ -15,8 +15,10 @@ SERVER_PASS = "guest"
|
||||
SERVER_URL = URL(
|
||||
f"amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/"
|
||||
)
|
||||
CONNECTION = Namespace(connection=Namespace(url=SERVER_URL))
|
||||
CHANNEL = Namespace(connection=CONNECTION, loop=None)
|
||||
CONNECTION_7 = Namespace(connection=Namespace(url=SERVER_URL))
|
||||
CONNECTION_8 = Namespace(url=SERVER_URL)
|
||||
CHANNEL_7 = Namespace(connection=CONNECTION_7, loop=None)
|
||||
CHANNEL_8 = Namespace(connection=CONNECTION_8, loop=None)
|
||||
MESSAGE = Namespace(
|
||||
properties=Namespace(
|
||||
message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers={}
|
||||
|
@ -12,9 +12,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import asyncio
|
||||
from unittest import TestCase, mock
|
||||
from unittest import TestCase, mock, skipIf
|
||||
|
||||
from aio_pika import Queue
|
||||
from aio_pika import Queue, version_info
|
||||
|
||||
from opentelemetry.instrumentation.aio_pika.callback_decorator import (
|
||||
CallbackDecorator,
|
||||
@ -23,7 +23,8 @@ from opentelemetry.semconv.trace import SpanAttributes
|
||||
from opentelemetry.trace import SpanKind, get_tracer
|
||||
|
||||
from .consts import (
|
||||
CHANNEL,
|
||||
CHANNEL_7,
|
||||
CHANNEL_8,
|
||||
CORRELATION_ID,
|
||||
EXCHANGE_NAME,
|
||||
MESSAGE,
|
||||
@ -35,7 +36,8 @@ from .consts import (
|
||||
)
|
||||
|
||||
|
||||
class TestInstrumentedQueue(TestCase):
|
||||
@skipIf(version_info >= (8, 0), "Only for aio_pika 7")
|
||||
class TestInstrumentedQueueAioRmq7(TestCase):
|
||||
EXPECTED_ATTRIBUTES = {
|
||||
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
|
||||
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
|
||||
@ -52,7 +54,7 @@ class TestInstrumentedQueue(TestCase):
|
||||
asyncio.set_event_loop(self.loop)
|
||||
|
||||
def test_get_callback_span(self):
|
||||
queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None)
|
||||
queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None)
|
||||
tracer = mock.MagicMock()
|
||||
CallbackDecorator(tracer, queue)._get_span(MESSAGE)
|
||||
tracer.start_span.assert_called_once_with(
|
||||
@ -62,7 +64,47 @@ class TestInstrumentedQueue(TestCase):
|
||||
)
|
||||
|
||||
def test_decorate_callback(self):
|
||||
queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None)
|
||||
queue = Queue(CHANNEL_7, QUEUE_NAME, False, False, False, None)
|
||||
callback = mock.MagicMock(return_value=asyncio.sleep(0))
|
||||
with mock.patch.object(
|
||||
CallbackDecorator, "_get_span"
|
||||
) as mocked_get_callback_span:
|
||||
callback_decorator = CallbackDecorator(self.tracer, queue)
|
||||
decorated_callback = callback_decorator.decorate(callback)
|
||||
self.loop.run_until_complete(decorated_callback(MESSAGE))
|
||||
mocked_get_callback_span.assert_called_once()
|
||||
callback.assert_called_once_with(MESSAGE)
|
||||
|
||||
|
||||
@skipIf(version_info <= (8, 0), "Only for aio_pika 8")
|
||||
class TestInstrumentedQueueAioRmq8(TestCase):
|
||||
EXPECTED_ATTRIBUTES = {
|
||||
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
|
||||
SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME,
|
||||
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
|
||||
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
|
||||
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
|
||||
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
|
||||
SpanAttributes.MESSAGING_OPERATION: "receive",
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
self.tracer = get_tracer(__name__)
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
|
||||
def test_get_callback_span(self):
|
||||
queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None)
|
||||
tracer = mock.MagicMock()
|
||||
CallbackDecorator(tracer, queue)._get_span(MESSAGE)
|
||||
tracer.start_span.assert_called_once_with(
|
||||
f"{EXCHANGE_NAME} receive",
|
||||
kind=SpanKind.CONSUMER,
|
||||
attributes=self.EXPECTED_ATTRIBUTES,
|
||||
)
|
||||
|
||||
def test_decorate_callback(self):
|
||||
queue = Queue(CHANNEL_8, QUEUE_NAME, False, False, False, None)
|
||||
callback = mock.MagicMock(return_value=asyncio.sleep(0))
|
||||
with mock.patch.object(
|
||||
CallbackDecorator, "_get_span"
|
||||
|
@ -13,9 +13,9 @@
|
||||
# limitations under the License.
|
||||
import asyncio
|
||||
from typing import Type
|
||||
from unittest import TestCase, mock
|
||||
from unittest import TestCase, mock, skipIf
|
||||
|
||||
from aio_pika import Exchange, RobustExchange
|
||||
from aio_pika import Exchange, RobustExchange, version_info
|
||||
|
||||
from opentelemetry.instrumentation.aio_pika.publish_decorator import (
|
||||
PublishDecorator,
|
||||
@ -24,8 +24,10 @@ from opentelemetry.semconv.trace import SpanAttributes
|
||||
from opentelemetry.trace import SpanKind, get_tracer
|
||||
|
||||
from .consts import (
|
||||
CHANNEL,
|
||||
CONNECTION,
|
||||
CHANNEL_7,
|
||||
CHANNEL_8,
|
||||
CONNECTION_7,
|
||||
CONNECTION_8,
|
||||
CORRELATION_ID,
|
||||
EXCHANGE_NAME,
|
||||
MESSAGE,
|
||||
@ -37,7 +39,8 @@ from .consts import (
|
||||
)
|
||||
|
||||
|
||||
class TestInstrumentedExchange(TestCase):
|
||||
@skipIf(version_info >= (8, 0), "Only for aio_pika 7")
|
||||
class TestInstrumentedExchangeAioRmq7(TestCase):
|
||||
EXPECTED_ATTRIBUTES = {
|
||||
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
|
||||
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
|
||||
@ -54,7 +57,7 @@ class TestInstrumentedExchange(TestCase):
|
||||
asyncio.set_event_loop(self.loop)
|
||||
|
||||
def test_get_publish_span(self):
|
||||
exchange = Exchange(CONNECTION, CHANNEL, EXCHANGE_NAME)
|
||||
exchange = Exchange(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
|
||||
tracer = mock.MagicMock()
|
||||
PublishDecorator(tracer, exchange)._get_publish_span(
|
||||
MESSAGE, ROUTING_KEY
|
||||
@ -66,7 +69,60 @@ class TestInstrumentedExchange(TestCase):
|
||||
)
|
||||
|
||||
def _test_publish(self, exchange_type: Type[Exchange]):
|
||||
exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME)
|
||||
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
|
||||
with mock.patch.object(
|
||||
PublishDecorator, "_get_publish_span"
|
||||
) as mock_get_publish_span:
|
||||
with mock.patch.object(
|
||||
Exchange, "publish", return_value=asyncio.sleep(0)
|
||||
) as mock_publish:
|
||||
decorated_publish = PublishDecorator(
|
||||
self.tracer, exchange
|
||||
).decorate(mock_publish)
|
||||
self.loop.run_until_complete(
|
||||
decorated_publish(MESSAGE, ROUTING_KEY)
|
||||
)
|
||||
mock_publish.assert_called_once()
|
||||
mock_get_publish_span.assert_called_once()
|
||||
|
||||
def test_publish(self):
|
||||
self._test_publish(Exchange)
|
||||
|
||||
def test_robust_publish(self):
|
||||
self._test_publish(RobustExchange)
|
||||
|
||||
|
||||
@skipIf(version_info <= (8, 0), "Only for aio_pika 8")
|
||||
class TestInstrumentedExchangeAioRmq8(TestCase):
|
||||
EXPECTED_ATTRIBUTES = {
|
||||
SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM,
|
||||
SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}",
|
||||
SpanAttributes.NET_PEER_NAME: SERVER_HOST,
|
||||
SpanAttributes.NET_PEER_PORT: SERVER_PORT,
|
||||
SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID,
|
||||
SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID,
|
||||
SpanAttributes.MESSAGING_TEMP_DESTINATION: True,
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
self.tracer = get_tracer(__name__)
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
|
||||
def test_get_publish_span(self):
|
||||
exchange = Exchange(CHANNEL_8, EXCHANGE_NAME)
|
||||
tracer = mock.MagicMock()
|
||||
PublishDecorator(tracer, exchange)._get_publish_span(
|
||||
MESSAGE, ROUTING_KEY
|
||||
)
|
||||
tracer.start_span.assert_called_once_with(
|
||||
f"{EXCHANGE_NAME},{ROUTING_KEY} send",
|
||||
kind=SpanKind.PRODUCER,
|
||||
attributes=self.EXPECTED_ATTRIBUTES,
|
||||
)
|
||||
|
||||
def _test_publish(self, exchange_type: Type[Exchange]):
|
||||
exchange = exchange_type(CONNECTION_8, CHANNEL_8, EXCHANGE_NAME)
|
||||
with mock.patch.object(
|
||||
PublishDecorator, "_get_publish_span"
|
||||
) as mock_get_publish_span:
|
||||
|
@ -21,6 +21,6 @@ class TestBuilder(TestCase):
|
||||
def test_build(self):
|
||||
builder = SpanBuilder(get_tracer(__name__))
|
||||
builder.set_as_consumer()
|
||||
builder.set_destination('destination')
|
||||
builder.set_destination("destination")
|
||||
span = builder.build()
|
||||
self.assertTrue(isinstance(span, Span))
|
||||
|
@ -17,7 +17,7 @@
|
||||
|
||||
libraries = {
|
||||
"aio_pika": {
|
||||
"library": "aio_pika ~= 7.2.0",
|
||||
"library": "aio_pika >= 7.2.0, < 9.0.0",
|
||||
"instrumentation": "opentelemetry-instrumentation-aio-pika==0.37b0.dev",
|
||||
},
|
||||
"aiohttp": {
|
||||
|
9
tox.ini
9
tox.ini
@ -206,6 +206,10 @@ envlist =
|
||||
py3{7,8,9,10,11}-test-instrumentation-pika{0,1}
|
||||
pypy3-test-instrumentation-pika{0,1}
|
||||
|
||||
; opentelemetry-instrumentation-aio-pika
|
||||
py3{7,8,9,10,11}-test-instrumentation-aio-pika{7,8}
|
||||
pypy3-test-instrumentation-aio-pika{7,8}
|
||||
|
||||
; opentelemetry-instrumentation-kafka-python
|
||||
py3{7,8,9,10,11}-test-instrumentation-kafka-python
|
||||
pypy3-test-instrumentation-kafka-python
|
||||
@ -250,6 +254,8 @@ deps =
|
||||
sqlalchemy14: sqlalchemy~=1.4
|
||||
pika0: pika>=0.12.0,<1.0.0
|
||||
pika1: pika>=1.0.0
|
||||
aio-pika7: aio_pika~=7.2.0
|
||||
aio-pika8: aio_pika>=8.0.0,<9.0.0
|
||||
pymemcache135: pymemcache ==1.3.5
|
||||
pymemcache200: pymemcache >2.0.0,<3.0.0
|
||||
pymemcache300: pymemcache >3.0.0,<3.4.2
|
||||
@ -296,6 +302,7 @@ changedir =
|
||||
test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests
|
||||
test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests
|
||||
test-instrumentation-pika{0,1}: instrumentation/opentelemetry-instrumentation-pika/tests
|
||||
test-instrumentation-aio-pika{7,8}: instrumentation/opentelemetry-instrumentation-aio-pika/tests
|
||||
test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests
|
||||
test-instrumentation-pymemcache{135,200,300,342}: instrumentation/opentelemetry-instrumentation-pymemcache/tests
|
||||
test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests
|
||||
@ -337,6 +344,8 @@ commands_pre =
|
||||
|
||||
pika{0,1}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test]
|
||||
|
||||
aio-pika{7,8}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-aio-pika[test]
|
||||
|
||||
kafka-python: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-kafka-python[test]
|
||||
|
||||
confluent-kafka: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-confluent-kafka[test]
|
||||
|
Reference in New Issue
Block a user