Files
2020-04-08 10:39:44 -07:00

155 lines
5.6 KiB
Python

from ddtrace import Pin, config
from celery import registry
from ...ext import SpanTypes
from ...internal.logger import get_logger
from . import constants as c
from .utils import tags_from_context, retrieve_task_id, attach_span, detach_span, retrieve_span
log = get_logger(__name__)
def trace_prerun(*args, **kwargs):
# safe-guard to avoid crashes in case the signals API
# changes in Celery
task = kwargs.get('sender')
task_id = kwargs.get('task_id')
log.debug('prerun signal start task_id=%s', task_id)
if task is None or task_id is None:
log.debug('unable to extract the Task and the task_id. This version of Celery may not be supported.')
return
# retrieve the task Pin or fallback to the global one
pin = Pin.get_from(task) or Pin.get_from(task.app)
if pin is None:
log.debug('no pin found on task or task.app task_id=%s', task_id)
return
# propagate the `Span` in the current task Context
service = config.celery['worker_service_name']
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)
attach_span(task, task_id, span)
def trace_postrun(*args, **kwargs):
# safe-guard to avoid crashes in case the signals API
# changes in Celery
task = kwargs.get('sender')
task_id = kwargs.get('task_id')
log.debug('postrun signal task_id=%s', task_id)
if task is None or task_id is None:
log.debug('unable to extract the Task and the task_id. This version of Celery may not be supported.')
return
# retrieve and finish the Span
span = retrieve_span(task, task_id)
if span is None:
log.warning('no existing span found for task_id=%s', task_id)
return
else:
# request context tags
span.set_tag(c.TASK_TAG_KEY, c.TASK_RUN)
span.set_tags(tags_from_context(kwargs))
span.set_tags(tags_from_context(task.request))
span.finish()
detach_span(task, task_id)
def trace_before_publish(*args, **kwargs):
# `before_task_publish` signal doesn't propagate the task instance so
# we need to retrieve it from the Celery Registry to access the `Pin`. The
# `Task` instance **does not** include any information about the current
# execution, so it **must not** be used to retrieve `request` data.
task_name = kwargs.get('sender')
task = registry.tasks.get(task_name)
task_id = retrieve_task_id(kwargs)
# safe-guard to avoid crashes in case the signals API
# changes in Celery
if task is None or task_id is None:
log.debug('unable to extract the Task and the task_id. This version of Celery may not be supported.')
return
# propagate the `Span` in the current task Context
pin = Pin.get_from(task) or Pin.get_from(task.app)
if pin is None:
return
# apply some tags here because most of the data is not available
# in the task_after_publish signal
service = config.celery['producer_service_name']
span = pin.tracer.trace(c.PRODUCER_ROOT_SPAN, service=service, resource=task_name)
span.set_tag(c.TASK_TAG_KEY, c.TASK_APPLY_ASYNC)
span.set_tag('celery.id', task_id)
span.set_tags(tags_from_context(kwargs))
# Note: adding tags from `traceback` or `state` calls will make an
# API call to the backend for the properties so we should rely
# only on the given `Context`
attach_span(task, task_id, span, is_publish=True)
def trace_after_publish(*args, **kwargs):
task_name = kwargs.get('sender')
task = registry.tasks.get(task_name)
task_id = retrieve_task_id(kwargs)
# safe-guard to avoid crashes in case the signals API
# changes in Celery
if task is None or task_id is None:
log.debug('unable to extract the Task and the task_id. This version of Celery may not be supported.')
return
# retrieve and finish the Span
span = retrieve_span(task, task_id, is_publish=True)
if span is None:
return
else:
span.finish()
detach_span(task, task_id, is_publish=True)
def trace_failure(*args, **kwargs):
# safe-guard to avoid crashes in case the signals API
# changes in Celery
task = kwargs.get('sender')
task_id = kwargs.get('task_id')
if task is None or task_id is None:
log.debug('unable to extract the Task and the task_id. This version of Celery may not be supported.')
return
# retrieve and finish the Span
span = retrieve_span(task, task_id)
if span is None:
return
else:
# add Exception tags; post signals are still called
# so we don't need to attach other tags here
ex = kwargs.get('einfo')
if ex is None:
return
if hasattr(task, 'throws') and isinstance(ex.exception, task.throws):
return
span.set_exc_info(ex.type, ex.exception, ex.tb)
def trace_retry(*args, **kwargs):
# safe-guard to avoid crashes in case the signals API
# changes in Celery
task = kwargs.get('sender')
context = kwargs.get('request')
if task is None or context is None:
log.debug('unable to extract the Task or the Context. This version of Celery may not be supported.')
return
reason = kwargs.get('reason')
if not reason:
log.debug('unable to extract the retry reason. This version of Celery may not be supported.')
return
span = retrieve_span(task, context.id)
if span is None:
return
# Add retry reason metadata to span
# DEV: Use `str(reason)` instead of `reason.message` in case we get something that isn't an `Exception`
span.set_tag(c.TASK_RETRY_REASON_KEY, str(reason))