Files

209 lines
7.5 KiB
Python

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from unittest import mock
from celery import Celery
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.celery import utils
from opentelemetry.sdk import trace
class TestUtils(unittest.TestCase):
def setUp(self):
self.app = Celery("celery.test_app")
def test_set_attributes_from_context(self):
# it should extract only relevant keys
context = {
"correlation_id": "44b7f305",
"delivery_info": {"eager": True},
"eta": "soon",
"expires": "later",
"hostname": "localhost",
"id": "44b7f305",
"reply_to": "44b7f305",
"retries": 4,
"timelimit": ("now", "later"),
"custom_meta": "custom_value",
"routing_key": "celery",
}
span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext))
utils.set_attributes_from_context(span, context)
self.assertEqual(
span.attributes.get("messaging.message_id"), "44b7f305"
)
self.assertEqual(
span.attributes.get("messaging.conversation_id"), "44b7f305"
)
self.assertEqual(
span.attributes.get("messaging.destination"), "celery"
)
self.assertEqual(
span.attributes["celery.delivery_info"], str({"eager": True})
)
self.assertEqual(span.attributes.get("celery.eta"), "soon")
self.assertEqual(span.attributes.get("celery.expires"), "later")
self.assertEqual(span.attributes.get("celery.hostname"), "localhost")
self.assertEqual(span.attributes.get("celery.reply_to"), "44b7f305")
self.assertEqual(span.attributes.get("celery.retries"), 4)
self.assertEqual(
span.attributes.get("celery.timelimit"), ("now", "later")
)
self.assertNotIn("custom_meta", span.attributes)
def test_set_attributes_from_context_empty_keys(self):
# it should not extract empty keys
context = {
"correlation_id": None,
"exchange": "",
"timelimit": (None, None),
"retries": 0,
}
span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext))
utils.set_attributes_from_context(span, context)
self.assertEqual(len(span.attributes), 0)
# edge case: `timelimit` can also be a list of None values
context = {
"timelimit": [None, None],
}
utils.set_attributes_from_context(span, context)
self.assertEqual(len(span.attributes), 0)
def test_span_propagation(self):
# ensure spans getter and setter works properly
@self.app.task
def fn_task():
return 42
# propagate and retrieve a Span
task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f"
span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext))
utils.attach_span(fn_task, task_id, span)
span_after = utils.retrieve_span(fn_task, task_id)
self.assertIs(span, span_after)
def test_span_delete(self):
# ensure the helper removes properly a propagated Span
@self.app.task
def fn_task():
return 42
# propagate a Span
task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f"
span = trace.Span("name", mock.Mock(spec=trace_api.SpanContext))
utils.attach_span(fn_task, task_id, span)
# delete the Span
utils.detach_span(fn_task, task_id)
self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None))
def test_span_delete_empty(self):
# ensure detach_span doesn't raise an exception if span is not present
@self.app.task
def fn_task():
return 42
# delete the Span
task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f"
try:
utils.detach_span(fn_task, task_id)
self.assertEqual(
utils.retrieve_span(fn_task, task_id), (None, None)
)
except Exception as ex: # pylint: disable=broad-except
self.fail("Exception was raised: %s" % ex)
def test_task_id_from_protocol_v1(self):
# ensures a `task_id` is properly returned when Protocol v1 is used.
# `context` is an example of an emitted Signal with Protocol v1
context = {
"body": {
"expires": None,
"utc": True,
"args": ["user"],
"chord": None,
"callbacks": None,
"errbacks": None,
"taskset": None,
"id": "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7",
"retries": 0,
"task": "tests.contrib.celery.test_integration.fn_task_parameters",
"timelimit": (None, None),
"eta": None,
"kwargs": {"force_logout": True},
},
"sender": "tests.contrib.celery.test_integration.fn_task_parameters",
"exchange": "celery",
"routing_key": "celery",
"retry_policy": None,
"headers": {},
"properties": {},
}
task_id = utils.retrieve_task_id_from_message(context)
self.assertEqual(task_id, "dffcaec1-dd92-4a1a-b3ab-d6512f4beeb7")
def test_task_id_from_protocol_v2(self):
# ensures a `task_id` is properly returned when Protocol v2 is used.
# `context` is an example of an emitted Signal with Protocol v2
context = {
"body": (
["user"],
{"force_logout": True},
{
u"chord": None,
u"callbacks": None,
u"errbacks": None,
u"chain": None,
},
),
"sender": u"tests.contrib.celery.test_integration.fn_task_parameters",
"exchange": u"",
"routing_key": u"celery",
"retry_policy": None,
"headers": {
u"origin": u"gen83744@hostname",
u"root_id": "7e917b83-4018-431d-9832-73a28e1fb6c0",
u"expires": None,
u"shadow": None,
u"id": "7e917b83-4018-431d-9832-73a28e1fb6c0",
u"kwargsrepr": u"{'force_logout': True}",
u"lang": u"py",
u"retries": 0,
u"task": u"tests.contrib.celery.test_integration.fn_task_parameters",
u"group": None,
u"timelimit": [None, None],
u"parent_id": None,
u"argsrepr": u"['user']",
u"eta": None,
},
"properties": {
u"reply_to": "c3054a07-5b28-3855-b18c-1623a24aaeca",
u"correlation_id": "7e917b83-4018-431d-9832-73a28e1fb6c0",
},
}
task_id = utils.retrieve_task_id_from_message(context)
self.assertEqual(task_id, "7e917b83-4018-431d-9832-73a28e1fb6c0")