diff --git a/instrumentation/opentelemetry-instrumentation-celery/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-celery/CHANGELOG.md index e164a8913..da615bcc7 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-celery/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Span operation names now include the task type. ([#1135](https://github.com/open-telemetry/opentelemetry-python/pull/1135)) +- Added automatic context propagation. ([#1135](https://github.com/open-telemetry/opentelemetry-python/pull/1135)) + ## Version 0.12b0 Released 2020-08-14 diff --git a/instrumentation/opentelemetry-instrumentation-celery/README.rst b/instrumentation/opentelemetry-instrumentation-celery/README.rst index 42fe6646d..307fd352b 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/README.rst +++ b/instrumentation/opentelemetry-instrumentation-celery/README.rst @@ -29,11 +29,20 @@ Usage .. code-block:: python + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchExportSpanProcessor from opentelemetry.instrumentation.celery import CeleryInstrumentor - CeleryInstrumentor().instrument() - from celery import Celery + from celery.signals import worker_process_init + + @worker_process_init.connect(weak=False) + def init_celery_tracing(*args, **kwargs): + trace.set_tracer_provider(TracerProvider()) + span_processor = BatchExportSpanProcessor(ConsoleSpanExporter()) + trace.get_tracer_provider().add_span_processor(span_processor) + CeleryInstrumentor().instrument() app = Celery("tasks", broker="amqp://localhost") @@ -43,6 +52,15 @@ Usage add.delay(42, 50) + +Setting up tracing +-------------------- + +When tracing a celery worker process, tracing and instrumention both must be initialized after the celery worker +process is initialized. This is required for any tracing components that might use threading to work correctly +such as the BatchExportSpanProcessor. Celery provides a signal called ``worker_process_init`` that can be used to +accomplish this as shown in the example above. + References ---------- * `OpenTelemetry Celery Instrumentation `_ diff --git a/instrumentation/opentelemetry-instrumentation-celery/setup.cfg b/instrumentation/opentelemetry-instrumentation-celery/setup.cfg index 79b63d928..b5a039de1 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-celery/setup.cfg @@ -46,6 +46,7 @@ install_requires = [options.extras_require] test = pytest + celery ~= 4.0 opentelemetry-test == 0.14.dev0 [options.packages.find] diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index 7e2551142..4768e93d1 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -30,11 +30,20 @@ Usage .. code:: python + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchExportSpanProcessor from opentelemetry.instrumentation.celery import CeleryInstrumentor - CeleryInstrumentor().instrument() - from celery import Celery + from celery.signals import worker_process_init + + @worker_process_init.connect(weak=False) + def init_celery_tracing(*args, **kwargs): + trace.set_tracer_provider(TracerProvider()) + span_processor = BatchExportSpanProcessor(ConsoleSpanExporter()) + trace.get_tracer_provider().add_span_processor(span_processor) + CeleryInstrumentor().instrument() app = Celery("tasks", broker="amqp://localhost") @@ -50,13 +59,15 @@ API import logging import signal +from collections.abc import Iterable from celery import signals # pylint: disable=no-name-in-module -from opentelemetry import trace +from opentelemetry import propagators, trace from opentelemetry.instrumentation.celery import utils from opentelemetry.instrumentation.celery.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.trace.propagation import get_current_span from opentelemetry.trace.status import Status, StatusCanonicalCode logger = logging.getLogger(__name__) @@ -106,9 +117,16 @@ class CeleryInstrumentor(BaseInstrumentor): if task is None or task_id is None: return + request = task.request + tracectx = propagators.extract(carrier_extractor, request) or {} + parent = get_current_span(tracectx) + logger.debug("prerun signal start task_id=%s", task_id) - span = self._tracer.start_span(task.name, kind=trace.SpanKind.CONSUMER) + operation_name = "{0}/{1}".format(_TASK_RUN, task.name) + span = self._tracer.start_span( + operation_name, parent=parent, kind=trace.SpanKind.CONSUMER + ) activation = self._tracer.use_span(span, end_on_exit=True) activation.__enter__() @@ -146,7 +164,10 @@ class CeleryInstrumentor(BaseInstrumentor): if task is None or task_id is None: return - span = self._tracer.start_span(task.name, kind=trace.SpanKind.PRODUCER) + operation_name = "{0}/{1}".format(_TASK_APPLY_ASYNC, task.name) + span = self._tracer.start_span( + operation_name, kind=trace.SpanKind.PRODUCER + ) # apply some attributes here because most of the data is not available span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC) @@ -158,6 +179,10 @@ class CeleryInstrumentor(BaseInstrumentor): activation.__enter__() utils.attach_span(task, task_id, (span, activation), is_publish=True) + headers = kwargs.get("headers") + if headers: + propagators.inject(type(headers).__setitem__, headers) + @staticmethod def _trace_after_publish(*args, **kwargs): task = utils.retrieve_task_from_sender(kwargs) @@ -221,3 +246,10 @@ class CeleryInstrumentor(BaseInstrumentor): # Use `str(reason)` instead of `reason.message` in case we get # something that isn't an `Exception` span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason)) + + +def carrier_extractor(carrier, key): + value = getattr(carrier, key, []) + if isinstance(value, str) or not isinstance(value, Iterable): + value = (value,) + return value diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py new file mode 100644 index 000000000..d9660412f --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py @@ -0,0 +1,29 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from celery import Celery + + +class Config: + result_backend = "rpc" + broker_backend = "memory" + + +app = Celery(broker="memory:///") +app.config_from_object(Config) + + +@app.task +def task_add(num_a, num_b): + return num_a + num_b diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py new file mode 100644 index 000000000..3a05ebf33 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -0,0 +1,78 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import time + +from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import SpanKind + +from .celery_test_tasks import app, task_add + + +class TestCeleryInstrumentation(TestBase): + def setUp(self): + super().setUp() + self._worker = app.Worker(app=app, pool="solo", concurrency=1) + self._thread = threading.Thread(target=self._worker.start) + self._thread.daemon = True + self._thread.start() + + def tearDown(self): + super().tearDown() + self._worker.stop() + self._thread.join() + + def test_task(self): + CeleryInstrumentor().instrument() + + result = task_add.delay(1, 2) + while not result.ready(): + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual(consumer.name, "run/tests.celery_test_tasks.task_add") + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + self.assert_span_has_attributes( + consumer, + { + "celery.action": "run", + "celery.state": "SUCCESS", + "messaging.destination": "celery", + "celery.task_name": "tests.celery_test_tasks.task_add", + }, + ) + + self.assertEqual( + producer.name, "apply_async/tests.celery_test_tasks.task_add" + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) + self.assert_span_has_attributes( + producer, + { + "celery.action": "apply_async", + "celery.task_name": "tests.celery_test_tasks.task_add", + "messaging.destination_kind": "queue", + "messaging.destination": "celery", + }, + ) + + self.assertNotEqual(consumer.parent, producer.context) + self.assertEqual(consumer.parent.span_id, producer.context.span_id) + self.assertEqual(consumer.context.trace_id, producer.context.trace_id)