mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-30 21:56:07 +08:00
Merge branch 'core-instrumentation-celery-v0.15b0'
This commit is contained in:
@ -63,6 +63,6 @@ accomplish this as shown in the example above.
|
||||
|
||||
References
|
||||
----------
|
||||
* `OpenTelemetry Celery Instrumentation <https://opentelemetry-python.readthedocs.io/en/latest/ext/celery/celery.html>`_
|
||||
* `OpenTelemetry Celery Instrumentation <https://opentelemetry-python.readthedocs.io/en/latest/instrumentation/celery/celery.html>`_
|
||||
* `OpenTelemetry Project <https://opentelemetry.io/>`_
|
||||
|
||||
|
@ -39,15 +39,15 @@ package_dir=
|
||||
=src
|
||||
packages=find_namespace:
|
||||
install_requires =
|
||||
opentelemetry-api == 0.15.dev0
|
||||
opentelemetry-instrumentation == 0.15.dev0
|
||||
opentelemetry-api == 0.15b0
|
||||
opentelemetry-instrumentation == 0.15b0
|
||||
celery ~= 4.0
|
||||
|
||||
[options.extras_require]
|
||||
test =
|
||||
pytest
|
||||
celery ~= 4.0
|
||||
opentelemetry-test == 0.15.dev0
|
||||
opentelemetry-test == 0.15b0
|
||||
|
||||
[options.packages.find]
|
||||
where = src
|
||||
|
@ -67,8 +67,8 @@ from opentelemetry import propagators, trace
|
||||
from opentelemetry.instrumentation.celery import utils
|
||||
from opentelemetry.instrumentation.celery.version import __version__
|
||||
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
||||
from opentelemetry.trace.propagation import get_current_span
|
||||
from opentelemetry.trace.status import Status, StatusCanonicalCode
|
||||
from opentelemetry.trace.propagation.textmap import DictGetter
|
||||
from opentelemetry.trace.status import Status, StatusCode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -84,6 +84,20 @@ _TASK_NAME_KEY = "celery.task_name"
|
||||
_MESSAGE_ID_ATTRIBUTE_NAME = "messaging.message_id"
|
||||
|
||||
|
||||
class CarrierGetter(DictGetter):
|
||||
def get(self, carrier, key):
|
||||
value = getattr(carrier, key, [])
|
||||
if isinstance(value, str) or not isinstance(value, Iterable):
|
||||
value = (value,)
|
||||
return value
|
||||
|
||||
def keys(self, carrier):
|
||||
return []
|
||||
|
||||
|
||||
carrier_getter = CarrierGetter()
|
||||
|
||||
|
||||
class CeleryInstrumentor(BaseInstrumentor):
|
||||
def _instrument(self, **kwargs):
|
||||
tracer_provider = kwargs.get("tracer_provider")
|
||||
@ -118,7 +132,7 @@ class CeleryInstrumentor(BaseInstrumentor):
|
||||
return
|
||||
|
||||
request = task.request
|
||||
tracectx = propagators.extract(carrier_extractor, request) or None
|
||||
tracectx = propagators.extract(carrier_getter, request) or None
|
||||
|
||||
logger.debug("prerun signal start task_id=%s", task_id)
|
||||
|
||||
@ -214,7 +228,7 @@ class CeleryInstrumentor(BaseInstrumentor):
|
||||
if span is None or not span.is_recording():
|
||||
return
|
||||
|
||||
status_kwargs = {"canonical_code": StatusCanonicalCode.UNKNOWN}
|
||||
status_kwargs = {"status_code": StatusCode.ERROR}
|
||||
|
||||
ex = kwargs.get("einfo")
|
||||
|
||||
@ -227,7 +241,6 @@ class CeleryInstrumentor(BaseInstrumentor):
|
||||
|
||||
if ex is not None:
|
||||
status_kwargs["description"] = str(ex)
|
||||
|
||||
span.set_status(Status(**status_kwargs))
|
||||
|
||||
@staticmethod
|
||||
@ -247,10 +260,3 @@ class CeleryInstrumentor(BaseInstrumentor):
|
||||
# Use `str(reason)` instead of `reason.message` in case we get
|
||||
# something that isn't an `Exception`
|
||||
span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason))
|
||||
|
||||
|
||||
def carrier_extractor(carrier, key):
|
||||
value = getattr(carrier, key, [])
|
||||
if isinstance(value, str) or not isinstance(value, Iterable):
|
||||
value = (value,)
|
||||
return value
|
||||
|
@ -12,4 +12,4 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
__version__ = "0.15.dev0"
|
||||
__version__ = "0.15b0"
|
||||
|
Reference in New Issue
Block a user