mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-08-01 09:13:23 +08:00
Remove DataDog exporter from main (#1366)
This commit is contained in:
@ -12,7 +12,7 @@ ignore=CVS,gen,Dockerfile,docker-compose.yml,README.md,requirements.txt,docs
|
||||
# Add files or directories matching the regex patterns to be excluded. The
|
||||
# regex matches against base names, not paths.
|
||||
ignore-patterns=
|
||||
ignore-paths=exporter/opentelemetry-exporter-datadog/.*$
|
||||
ignore-paths=
|
||||
|
||||
# Python code to execute, usually for sys.path manipulation such as
|
||||
# pygtk.require().
|
||||
|
@ -1,7 +0,0 @@
|
||||
OpenTelemetry Datadog Exporter
|
||||
==============================
|
||||
|
||||
.. automodule:: opentelemetry.exporter.datadog
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
@ -53,14 +53,6 @@ install <https://pip.pypa.io/en/stable/reference/pip_install/#editable-installs>
|
||||
pip install -e ./sdk-extension/opentelemetry-sdk-extension-aws
|
||||
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:caption: OpenTelemetry Exporters
|
||||
:name: exporters
|
||||
:glob:
|
||||
|
||||
exporter/**
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
:caption: OpenTelemetry Instrumentations
|
||||
|
@ -9,7 +9,6 @@ class_references=
|
||||
opentelemetry.propagators.textmap.DefaultGetter
|
||||
; API
|
||||
opentelemetry.propagators.textmap.Getter
|
||||
; - DatadogFormat
|
||||
; - AWSXRayPropagator
|
||||
opentelemetry.sdk.trace.id_generator.IdGenerator
|
||||
; - AwsXRayIdGenerator
|
||||
@ -38,7 +37,6 @@ anys=
|
||||
; - AwsXRayIdGenerator
|
||||
; SDK
|
||||
SpanProcessor
|
||||
; - DatadogExportSpanProcessor
|
||||
TracerProvider
|
||||
; - AwsXRayIdGenerator
|
||||
; Instrumentation
|
||||
|
@ -46,7 +46,6 @@ packages=
|
||||
|
||||
[exclude_release]
|
||||
packages=
|
||||
opentelemetry-exporter-datadog
|
||||
opentelemetry-sdk-extension-aws
|
||||
opentelemetry-propagator-aws-xray
|
||||
|
||||
|
@ -1,32 +0,0 @@
|
||||
OpenTelemetry Datadog Span Exporter
|
||||
===================================
|
||||
|
||||
|pypi|
|
||||
|
||||
.. |pypi| image:: https://badge.fury.io/py/opentelemetry-exporter-datadog.svg
|
||||
:target: https://pypi.org/project/opentelemetry-exporter-datadog/
|
||||
|
||||
This library allows to export tracing data to `Datadog
|
||||
<https://www.datadoghq.com/>`_. OpenTelemetry span event and links are not
|
||||
supported.
|
||||
|
||||
.. warning:: This exporter has been deprecated. To export your OTLP traces from OpenTelemetry SDK directly to Datadog Agent, please refer to `OTLP Ingest in Datadog Agent <https://docs.datadoghq.com/tracing/setup_overview/open_standards/#otlp-ingest-in-datadog-agent>`_ .
|
||||
|
||||
|
||||
Installation
|
||||
------------
|
||||
|
||||
::
|
||||
|
||||
pip install opentelemetry-exporter-datadog
|
||||
|
||||
|
||||
.. _Datadog: https://www.datadoghq.com/
|
||||
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
|
||||
|
||||
|
||||
References
|
||||
----------
|
||||
|
||||
* `Datadog <https://www.datadoghq.com/>`_
|
||||
* `OpenTelemetry Project <https://opentelemetry.io/>`_
|
@ -1,49 +0,0 @@
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "opentelemetry-exporter-datadog"
|
||||
dynamic = ["version"]
|
||||
description = "Datadog Span Exporter for OpenTelemetry"
|
||||
readme = "README.rst"
|
||||
license = "Apache-2.0"
|
||||
requires-python = ">=3.7"
|
||||
authors = [
|
||||
{ name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" },
|
||||
]
|
||||
classifiers = [
|
||||
"Development Status :: 4 - Beta",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: Apache Software License",
|
||||
"Programming Language :: Python",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
]
|
||||
dependencies = [
|
||||
"ddtrace>=0.34.0,<0.47.0",
|
||||
"opentelemetry-api ~= 1.12",
|
||||
"opentelemetry-sdk ~= 1.12",
|
||||
"opentelemetry-semantic-conventions == 0.30b0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
test = []
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/exporter/opentelemetry-exporter-datadog"
|
||||
|
||||
[tool.hatch.version]
|
||||
path = "src/opentelemetry/exporter/datadog/version.py"
|
||||
|
||||
[tool.hatch.build.targets.sdist]
|
||||
include = [
|
||||
"/src",
|
||||
"/tests",
|
||||
]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/opentelemetry"]
|
@ -1,77 +0,0 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
The **OpenTelemetry Datadog Exporter** provides a span exporter from
|
||||
`OpenTelemetry`_ traces to `Datadog`_ by using the Datadog Agent.
|
||||
|
||||
Installation
|
||||
------------
|
||||
|
||||
::
|
||||
|
||||
pip install opentelemetry-exporter-datadog
|
||||
|
||||
|
||||
Usage
|
||||
-----
|
||||
|
||||
The Datadog exporter provides a span processor that must be added along with the
|
||||
exporter. In addition, a formatter is provided to handle propagation of trace
|
||||
context between OpenTelemetry-instrumented and Datadog-instrumented services in
|
||||
a distributed trace.
|
||||
|
||||
.. code:: python
|
||||
|
||||
from opentelemetry.propagate import set_global_textmap
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.exporter.datadog import DatadogExportSpanProcessor, DatadogSpanExporter
|
||||
from opentelemetry.exporter.datadog.propagator import DatadogFormat
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
|
||||
trace.set_tracer_provider(TracerProvider())
|
||||
tracer = trace.get_tracer(__name__)
|
||||
|
||||
exporter = DatadogSpanExporter(
|
||||
agent_url="http://agent:8126", service="my-helloworld-service"
|
||||
)
|
||||
|
||||
span_processor = DatadogExportSpanProcessor(exporter)
|
||||
trace.get_tracer_provider().add_span_processor(span_processor)
|
||||
|
||||
# Optional: use Datadog format for propagation in distributed traces
|
||||
set_global_textmap(DatadogFormat())
|
||||
|
||||
with tracer.start_as_current_span("foo"):
|
||||
print("Hello world!")
|
||||
|
||||
|
||||
Examples
|
||||
--------
|
||||
|
||||
The `docs/examples/datadog_exporter`_ includes examples for using the Datadog
|
||||
exporter with OpenTelemetry instrumented applications.
|
||||
|
||||
API
|
||||
---
|
||||
.. _Datadog: https://www.datadoghq.com/
|
||||
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
|
||||
.. _docs/examples/datadog_exporter: https://github.com/open-telemetry/opentelemetry-python/tree/main/docs/examples/datadog_exporter
|
||||
"""
|
||||
# pylint: disable=import-error
|
||||
|
||||
from .exporter import DatadogSpanExporter
|
||||
from .spanprocessor import DatadogExportSpanProcessor
|
||||
|
||||
__all__ = ["DatadogExportSpanProcessor", "DatadogSpanExporter"]
|
@ -1,16 +0,0 @@
|
||||
DD_ORIGIN = "dd_origin"
|
||||
AUTO_REJECT = 0
|
||||
AUTO_KEEP = 1
|
||||
USER_KEEP = 2
|
||||
SAMPLE_RATE_METRIC_KEY = "_sample_rate"
|
||||
SAMPLING_PRIORITY_KEY = "_sampling_priority_v1"
|
||||
ENV_KEY = "env"
|
||||
VERSION_KEY = "version"
|
||||
SERVICE_NAME_TAG = "service.name"
|
||||
EVENT_NAME_EXCEPTION = "exception"
|
||||
EXCEPTION_TYPE_ATTR_KEY = "exception.type"
|
||||
EXCEPTION_MSG_ATTR_KEY = "exception.message"
|
||||
EXCEPTION_STACK_ATTR_KEY = "exception.stacktrace"
|
||||
DD_ERROR_TYPE_TAG_KEY = "error.type"
|
||||
DD_ERROR_MSG_TAG_KEY = "error.msg"
|
||||
DD_ERROR_STACK_TAG_KEY = "error.stack"
|
@ -1,346 +0,0 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from ddtrace.ext import SpanTypes as DatadogSpanTypes
|
||||
from ddtrace.internal.writer import AgentWriter
|
||||
from ddtrace.span import Span as DatadogSpan
|
||||
|
||||
import opentelemetry.trace as trace_api
|
||||
from opentelemetry.exporter.datadog.constants import (
|
||||
DD_ERROR_MSG_TAG_KEY,
|
||||
DD_ERROR_STACK_TAG_KEY,
|
||||
DD_ERROR_TYPE_TAG_KEY,
|
||||
DD_ORIGIN,
|
||||
ENV_KEY,
|
||||
EVENT_NAME_EXCEPTION,
|
||||
EXCEPTION_MSG_ATTR_KEY,
|
||||
EXCEPTION_STACK_ATTR_KEY,
|
||||
EXCEPTION_TYPE_ATTR_KEY,
|
||||
SAMPLE_RATE_METRIC_KEY,
|
||||
SERVICE_NAME_TAG,
|
||||
VERSION_KEY,
|
||||
)
|
||||
from opentelemetry.sdk.trace import sampling
|
||||
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
|
||||
from opentelemetry.semconv.trace import SpanAttributes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
DEFAULT_AGENT_URL = "http://localhost:8126"
|
||||
_INSTRUMENTATION_SPAN_TYPES = {
|
||||
"opentelemetry.instrumentation.aiohttp-client": DatadogSpanTypes.HTTP,
|
||||
"opentelemetry.instrumentation.asgi": DatadogSpanTypes.WEB,
|
||||
"opentelemetry.instrumentation.dbapi": DatadogSpanTypes.SQL,
|
||||
"opentelemetry.instrumentation.django": DatadogSpanTypes.WEB,
|
||||
"opentelemetry.instrumentation.flask": DatadogSpanTypes.WEB,
|
||||
"opentelemetry.instrumentation.grpc": DatadogSpanTypes.GRPC,
|
||||
"opentelemetry.instrumentation.jinja2": DatadogSpanTypes.TEMPLATE,
|
||||
"opentelemetry.instrumentation.mysql": DatadogSpanTypes.SQL,
|
||||
"opentelemetry.instrumentation.psycopg2": DatadogSpanTypes.SQL,
|
||||
"opentelemetry.instrumentation.pymemcache": DatadogSpanTypes.CACHE,
|
||||
"opentelemetry.instrumentation.pymongo": DatadogSpanTypes.MONGODB,
|
||||
"opentelemetry.instrumentation.pymysql": DatadogSpanTypes.SQL,
|
||||
"opentelemetry.instrumentation.redis": DatadogSpanTypes.REDIS,
|
||||
"opentelemetry.instrumentation.requests": DatadogSpanTypes.HTTP,
|
||||
"opentelemetry.instrumentation.sqlalchemy": DatadogSpanTypes.SQL,
|
||||
"opentelemetry.instrumentation.wsgi": DatadogSpanTypes.WEB,
|
||||
}
|
||||
|
||||
|
||||
class DatadogSpanExporter(SpanExporter):
|
||||
"""Datadog span exporter for OpenTelemetry.
|
||||
|
||||
Args:
|
||||
agent_url: The url of the Datadog Agent or use ``DD_TRACE_AGENT_URL`` environment variable
|
||||
service: The service name to be used for the application or use ``DD_SERVICE`` environment variable
|
||||
env: Set the application’s environment or use ``DD_ENV`` environment variable
|
||||
version: Set the application’s version or use ``DD_VERSION`` environment variable
|
||||
tags: A list (formatted as a comma-separated string) of default tags to be added to every span or use ``DD_TAGS`` environment variable
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, agent_url=None, service=None, env=None, version=None, tags=None
|
||||
):
|
||||
self.agent_url = (
|
||||
agent_url
|
||||
if agent_url
|
||||
else os.environ.get("DD_TRACE_AGENT_URL", DEFAULT_AGENT_URL)
|
||||
)
|
||||
self.service = service or os.environ.get("DD_SERVICE")
|
||||
self.env = env or os.environ.get("DD_ENV")
|
||||
self.version = version or os.environ.get("DD_VERSION")
|
||||
self.tags = _parse_tags_str(tags or os.environ.get("DD_TAGS"))
|
||||
self._agent_writer = None
|
||||
|
||||
@property
|
||||
def agent_writer(self):
|
||||
if self._agent_writer is None:
|
||||
url_parsed = urlparse(self.agent_url)
|
||||
if url_parsed.scheme in ("http", "https"):
|
||||
self._agent_writer = AgentWriter(
|
||||
hostname=url_parsed.hostname,
|
||||
port=url_parsed.port,
|
||||
https=url_parsed.scheme == "https",
|
||||
)
|
||||
elif url_parsed.scheme == "unix":
|
||||
self._agent_writer = AgentWriter(uds_path=url_parsed.path)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Unknown scheme `{url_parsed.scheme}` for agent URL"
|
||||
)
|
||||
return self._agent_writer
|
||||
|
||||
def export(self, spans):
|
||||
datadog_spans = self._translate_to_datadog(spans)
|
||||
|
||||
self.agent_writer.write(spans=datadog_spans)
|
||||
|
||||
return SpanExportResult.SUCCESS
|
||||
|
||||
def shutdown(self):
|
||||
if self.agent_writer.started:
|
||||
self.agent_writer.stop()
|
||||
self.agent_writer.join(self.agent_writer.exit_timeout)
|
||||
|
||||
# pylint: disable=too-many-locals
|
||||
def _translate_to_datadog(self, spans):
|
||||
datadog_spans = []
|
||||
|
||||
for span in spans:
|
||||
trace_id, parent_id, span_id = _get_trace_ids(span)
|
||||
|
||||
# datadog Span is initialized with a reference to the tracer which is
|
||||
# used to record the span when it is finished. We can skip ignore this
|
||||
# because we are not calling the finish method and explicitly set the
|
||||
# duration.
|
||||
tracer = None
|
||||
|
||||
# extract resource attributes to be used as tags as well as potential service name
|
||||
[
|
||||
resource_tags,
|
||||
resource_service_name,
|
||||
] = _extract_tags_from_resource(span.resource, self.service)
|
||||
|
||||
datadog_span = DatadogSpan(
|
||||
tracer,
|
||||
_get_span_name(span),
|
||||
service=resource_service_name,
|
||||
resource=_get_resource(span),
|
||||
span_type=_get_span_type(span),
|
||||
trace_id=trace_id,
|
||||
span_id=span_id,
|
||||
parent_id=parent_id,
|
||||
)
|
||||
datadog_span.start_ns = span.start_time
|
||||
datadog_span.duration_ns = span.end_time - span.start_time
|
||||
|
||||
if not span.status.is_ok:
|
||||
datadog_span.error = 1
|
||||
# loop over events and look for exception events, extract info.
|
||||
# https://github.com/open-telemetry/opentelemetry-python/blob/71e3a7a192c0fc8a7503fac967ada36a74b79e58/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py#L810-L819
|
||||
if span.events:
|
||||
_extract_tags_from_exception_events(
|
||||
span.events, datadog_span
|
||||
)
|
||||
|
||||
# combine resource attributes and span attributes, don't modify existing span attributes
|
||||
combined_span_tags = {}
|
||||
combined_span_tags.update(resource_tags)
|
||||
combined_span_tags.update(span.attributes)
|
||||
|
||||
datadog_span.set_tags(combined_span_tags)
|
||||
|
||||
# add configured env tag
|
||||
if self.env is not None:
|
||||
datadog_span.set_tag(ENV_KEY, self.env)
|
||||
|
||||
# add configured application version tag to only root span
|
||||
if self.version is not None and parent_id == 0:
|
||||
datadog_span.set_tag(VERSION_KEY, self.version)
|
||||
|
||||
# add configured global tags
|
||||
datadog_span.set_tags(self.tags)
|
||||
|
||||
# add origin to root span
|
||||
origin = _get_origin(span)
|
||||
if origin and parent_id == 0:
|
||||
datadog_span.set_tag(DD_ORIGIN, origin)
|
||||
|
||||
sampling_rate = _get_sampling_rate(span)
|
||||
if sampling_rate is not None:
|
||||
datadog_span.set_metric(SAMPLE_RATE_METRIC_KEY, sampling_rate)
|
||||
|
||||
# span events and span links are not supported except for extracting exception event context
|
||||
|
||||
datadog_spans.append(datadog_span)
|
||||
|
||||
return datadog_spans
|
||||
|
||||
|
||||
def _get_trace_ids(span):
|
||||
"""Extract tracer ids from span"""
|
||||
ctx = span.get_span_context()
|
||||
trace_id = ctx.trace_id
|
||||
span_id = ctx.span_id
|
||||
|
||||
if isinstance(span.parent, trace_api.Span):
|
||||
parent_id = span.parent.get_span_context().span_id
|
||||
elif isinstance(span.parent, trace_api.SpanContext):
|
||||
parent_id = span.parent.span_id
|
||||
else:
|
||||
parent_id = 0
|
||||
|
||||
trace_id = _convert_trace_id_uint64(trace_id)
|
||||
|
||||
return trace_id, parent_id, span_id
|
||||
|
||||
|
||||
def _convert_trace_id_uint64(otel_id):
|
||||
"""Convert 128-bit int used for trace_id to 64-bit unsigned int"""
|
||||
return otel_id & 0xFFFFFFFFFFFFFFFF
|
||||
|
||||
|
||||
def _get_span_name(span):
|
||||
"""Get span name by using instrumentation and kind while backing off to
|
||||
span.name
|
||||
"""
|
||||
instrumentation_name = (
|
||||
span.instrumentation_info.name if span.instrumentation_info else None
|
||||
)
|
||||
span_kind_name = span.kind.name if span.kind else None
|
||||
name = (
|
||||
f"{instrumentation_name}.{span_kind_name}"
|
||||
if instrumentation_name and span_kind_name
|
||||
else span.name
|
||||
)
|
||||
return name
|
||||
|
||||
|
||||
def _get_resource(span):
|
||||
"""Get resource name for span"""
|
||||
if SpanAttributes.HTTP_METHOD in span.attributes:
|
||||
route = span.attributes.get(SpanAttributes.HTTP_ROUTE)
|
||||
return (
|
||||
span.attributes[SpanAttributes.HTTP_METHOD] + " " + route
|
||||
if route
|
||||
else span.attributes[SpanAttributes.HTTP_METHOD]
|
||||
)
|
||||
|
||||
return span.name
|
||||
|
||||
|
||||
def _get_span_type(span):
|
||||
"""Get Datadog span type"""
|
||||
instrumentation_name = (
|
||||
span.instrumentation_info.name if span.instrumentation_info else None
|
||||
)
|
||||
span_type = _INSTRUMENTATION_SPAN_TYPES.get(instrumentation_name)
|
||||
return span_type
|
||||
|
||||
|
||||
def _get_exc_info(span):
|
||||
"""Parse span status description for exception type and value"""
|
||||
exc_type, exc_val = span.status.description.split(":", 1)
|
||||
return exc_type, exc_val.strip()
|
||||
|
||||
|
||||
def _get_origin(span):
|
||||
ctx = span.get_span_context()
|
||||
origin = ctx.trace_state.get(DD_ORIGIN)
|
||||
return origin
|
||||
|
||||
|
||||
def _get_sampling_rate(span):
|
||||
ctx = span.get_span_context()
|
||||
tracer_provider = trace_api.get_tracer_provider()
|
||||
if not hasattr(tracer_provider, "sampler"):
|
||||
return None
|
||||
sampler = tracer_provider.sampler
|
||||
return (
|
||||
sampler.rate
|
||||
if ctx.trace_flags.sampled
|
||||
and isinstance(sampler, sampling.TraceIdRatioBased)
|
||||
else None
|
||||
)
|
||||
|
||||
|
||||
def _parse_tags_str(tags_str):
|
||||
"""Parse a string of tags typically provided via environment variables.
|
||||
|
||||
The expected string is of the form::
|
||||
"key1:value1,key2:value2"
|
||||
|
||||
:param tags_str: A string of the above form to parse tags from.
|
||||
:return: A dict containing the tags that were parsed.
|
||||
"""
|
||||
parsed_tags = {}
|
||||
if not tags_str:
|
||||
return parsed_tags
|
||||
|
||||
for tag in tags_str.split(","):
|
||||
try:
|
||||
key, value = tag.split(":", 1)
|
||||
|
||||
# Validate the tag
|
||||
if key == "" or value == "" or value.endswith(":"):
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
logger.error(
|
||||
"Malformed tag in tag pair '%s' from tag string '%s'.",
|
||||
tag,
|
||||
tags_str,
|
||||
)
|
||||
else:
|
||||
parsed_tags[key] = value
|
||||
|
||||
return parsed_tags
|
||||
|
||||
|
||||
def _extract_tags_from_resource(resource, fallback_service_name):
|
||||
"""Parse tags from resource.attributes, except service.name which
|
||||
has special significance within datadog"""
|
||||
tags = {}
|
||||
if not (resource and getattr(resource, "attributes", None)):
|
||||
return [tags, fallback_service_name]
|
||||
|
||||
service_name = None
|
||||
for attribute_key, attribute_value in resource.attributes.items():
|
||||
if attribute_key == SERVICE_NAME_TAG:
|
||||
service_name = attribute_value
|
||||
else:
|
||||
tags[attribute_key] = attribute_value
|
||||
|
||||
if service_name is None or service_name == "unknown_service":
|
||||
service_name = fallback_service_name
|
||||
|
||||
return [tags, service_name]
|
||||
|
||||
|
||||
def _extract_tags_from_exception_events(events, datadog_span):
|
||||
"""Parse error tags from exception events, error.msg error.type
|
||||
and error.stack have special significance within datadog"""
|
||||
for event in events:
|
||||
if event.name is not None and event.name == EVENT_NAME_EXCEPTION:
|
||||
for key, value in event.attributes.items():
|
||||
if key == EXCEPTION_TYPE_ATTR_KEY:
|
||||
datadog_span.set_tag(DD_ERROR_TYPE_TAG_KEY, value)
|
||||
elif key == EXCEPTION_MSG_ATTR_KEY:
|
||||
datadog_span.set_tag(DD_ERROR_MSG_TAG_KEY, value)
|
||||
elif key == EXCEPTION_STACK_ATTR_KEY:
|
||||
datadog_span.set_tag(DD_ERROR_STACK_TAG_KEY, value)
|
@ -1,148 +0,0 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import typing
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.context import Context
|
||||
from opentelemetry.exporter.datadog import constants
|
||||
from opentelemetry.propagators.textmap import (
|
||||
CarrierT,
|
||||
Getter,
|
||||
Setter,
|
||||
TextMapPropagator,
|
||||
default_getter,
|
||||
default_setter,
|
||||
)
|
||||
from opentelemetry.trace import get_current_span, set_span_in_context
|
||||
|
||||
|
||||
class DatadogFormat(TextMapPropagator):
|
||||
"""Propagator for the Datadog HTTP header format."""
|
||||
|
||||
TRACE_ID_KEY = "x-datadog-trace-id"
|
||||
PARENT_ID_KEY = "x-datadog-parent-id"
|
||||
SAMPLING_PRIORITY_KEY = "x-datadog-sampling-priority"
|
||||
ORIGIN_KEY = "x-datadog-origin"
|
||||
|
||||
def extract(
|
||||
self,
|
||||
carrier: CarrierT,
|
||||
context: typing.Optional[Context] = None,
|
||||
getter: Getter[CarrierT] = default_getter,
|
||||
) -> Context:
|
||||
if context is None:
|
||||
context = Context()
|
||||
|
||||
trace_id = extract_first_element(
|
||||
getter.get(carrier, self.TRACE_ID_KEY)
|
||||
)
|
||||
|
||||
span_id = extract_first_element(
|
||||
getter.get(carrier, self.PARENT_ID_KEY)
|
||||
)
|
||||
|
||||
sampled = extract_first_element(
|
||||
getter.get(carrier, self.SAMPLING_PRIORITY_KEY)
|
||||
)
|
||||
|
||||
origin = extract_first_element(getter.get(carrier, self.ORIGIN_KEY))
|
||||
|
||||
trace_flags = trace.TraceFlags()
|
||||
if sampled and int(sampled) in (
|
||||
constants.AUTO_KEEP,
|
||||
constants.USER_KEEP,
|
||||
):
|
||||
trace_flags = trace.TraceFlags(trace.TraceFlags.SAMPLED)
|
||||
|
||||
if trace_id is None or span_id is None:
|
||||
return context
|
||||
|
||||
trace_state = []
|
||||
if origin is not None:
|
||||
trace_state.append((constants.DD_ORIGIN, origin))
|
||||
span_context = trace.SpanContext(
|
||||
trace_id=int(trace_id),
|
||||
span_id=int(span_id),
|
||||
is_remote=True,
|
||||
trace_flags=trace_flags,
|
||||
trace_state=trace.TraceState(trace_state),
|
||||
)
|
||||
|
||||
return set_span_in_context(
|
||||
trace.NonRecordingSpan(span_context), context
|
||||
)
|
||||
|
||||
def inject(
|
||||
self,
|
||||
carrier: CarrierT,
|
||||
context: typing.Optional[Context] = None,
|
||||
setter: Setter[CarrierT] = default_setter,
|
||||
) -> None:
|
||||
span = get_current_span(context)
|
||||
span_context = span.get_span_context()
|
||||
if span_context == trace.INVALID_SPAN_CONTEXT:
|
||||
return
|
||||
sampled = (trace.TraceFlags.SAMPLED & span.context.trace_flags) != 0
|
||||
setter.set(
|
||||
carrier,
|
||||
self.TRACE_ID_KEY,
|
||||
format_trace_id(span.context.trace_id),
|
||||
)
|
||||
setter.set(
|
||||
carrier, self.PARENT_ID_KEY, format_span_id(span.context.span_id)
|
||||
)
|
||||
setter.set(
|
||||
carrier,
|
||||
self.SAMPLING_PRIORITY_KEY,
|
||||
str(constants.AUTO_KEEP if sampled else constants.AUTO_REJECT),
|
||||
)
|
||||
if constants.DD_ORIGIN in span.context.trace_state:
|
||||
setter.set(
|
||||
carrier,
|
||||
self.ORIGIN_KEY,
|
||||
span.context.trace_state[constants.DD_ORIGIN],
|
||||
)
|
||||
|
||||
@property
|
||||
def fields(self):
|
||||
"""Returns a set with the fields set in `inject`.
|
||||
|
||||
See
|
||||
`opentelemetry.propagators.textmap.TextMapPropagator.fields`
|
||||
"""
|
||||
return {
|
||||
self.TRACE_ID_KEY,
|
||||
self.PARENT_ID_KEY,
|
||||
self.SAMPLING_PRIORITY_KEY,
|
||||
self.ORIGIN_KEY,
|
||||
}
|
||||
|
||||
|
||||
def format_trace_id(trace_id: int) -> str:
|
||||
"""Format the trace id for Datadog."""
|
||||
return str(trace_id & 0xFFFFFFFFFFFFFFFF)
|
||||
|
||||
|
||||
def format_span_id(span_id: int) -> str:
|
||||
"""Format the span id for Datadog."""
|
||||
return str(span_id)
|
||||
|
||||
|
||||
def extract_first_element(
|
||||
items: typing.Iterable[CarrierT],
|
||||
) -> typing.Optional[CarrierT]:
|
||||
if items is None:
|
||||
return None
|
||||
return next(iter(items), None)
|
@ -1,226 +0,0 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import logging
|
||||
import threading
|
||||
import typing
|
||||
from time import time_ns
|
||||
|
||||
from opentelemetry.context import Context, attach, detach, set_value
|
||||
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
|
||||
from opentelemetry.sdk.trace import Span, SpanProcessor
|
||||
from opentelemetry.sdk.trace.export import SpanExporter
|
||||
from opentelemetry.trace import INVALID_TRACE_ID
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DatadogExportSpanProcessor(SpanProcessor):
|
||||
"""Datadog exporter span processor
|
||||
|
||||
DatadogExportSpanProcessor is an implementation of `SpanProcessor` that
|
||||
batches all opened spans into a list per trace. When all spans for a trace
|
||||
are ended, the trace is queues up for export. This is required for exporting
|
||||
to the Datadog Agent which expects to received list of spans for each trace.
|
||||
"""
|
||||
|
||||
_FLUSH_TOKEN = INVALID_TRACE_ID
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
span_exporter: SpanExporter,
|
||||
schedule_delay_millis: float = 5000,
|
||||
max_trace_size: int = 4096,
|
||||
):
|
||||
if max_trace_size <= 0:
|
||||
raise ValueError("max_queue_size must be a positive integer.")
|
||||
|
||||
if schedule_delay_millis <= 0:
|
||||
raise ValueError("schedule_delay_millis must be positive.")
|
||||
|
||||
self.span_exporter = span_exporter
|
||||
|
||||
# queue trace_ids for traces with recently ended spans for worker thread to check
|
||||
# for exporting
|
||||
self.check_traces_queue = (
|
||||
collections.deque()
|
||||
) # type: typing.Deque[int]
|
||||
|
||||
self.traces_lock = threading.Lock()
|
||||
# dictionary of trace_ids to a list of spans where the first span is the
|
||||
# first opened span for the trace
|
||||
self.traces = collections.defaultdict(list)
|
||||
# counter to keep track of the number of spans and ended spans for a
|
||||
# trace_id
|
||||
self.traces_spans_count = collections.Counter()
|
||||
self.traces_spans_ended_count = collections.Counter()
|
||||
|
||||
self.worker_thread = threading.Thread(target=self.worker, daemon=True)
|
||||
|
||||
# threading conditions used for flushing and shutdown
|
||||
self.condition = threading.Condition(threading.Lock())
|
||||
self.flush_condition = threading.Condition(threading.Lock())
|
||||
|
||||
# flag to indicate that there is a flush operation on progress
|
||||
self._flushing = False
|
||||
|
||||
self.max_trace_size = max_trace_size
|
||||
self._spans_dropped = False
|
||||
self.schedule_delay_millis = schedule_delay_millis
|
||||
self.done = False
|
||||
self.worker_thread.start()
|
||||
|
||||
def on_start(
|
||||
self, span: Span, parent_context: typing.Optional[Context] = None
|
||||
) -> None:
|
||||
ctx = span.get_span_context()
|
||||
trace_id = ctx.trace_id
|
||||
|
||||
with self.traces_lock:
|
||||
# check upper bound on number of spans for trace before adding new
|
||||
# span
|
||||
if self.traces_spans_count[trace_id] == self.max_trace_size:
|
||||
logger.warning("Max spans for trace, spans will be dropped.")
|
||||
self._spans_dropped = True
|
||||
return
|
||||
|
||||
# add span to end of list for a trace and update the counter
|
||||
self.traces[trace_id].append(span)
|
||||
self.traces_spans_count[trace_id] += 1
|
||||
|
||||
def on_end(self, span: Span) -> None:
|
||||
if self.done:
|
||||
logger.warning("Already shutdown, dropping span.")
|
||||
return
|
||||
|
||||
ctx = span.get_span_context()
|
||||
trace_id = ctx.trace_id
|
||||
|
||||
with self.traces_lock:
|
||||
self.traces_spans_ended_count[trace_id] += 1
|
||||
if self.is_trace_exportable(trace_id):
|
||||
self.check_traces_queue.appendleft(trace_id)
|
||||
|
||||
def worker(self):
|
||||
timeout = self.schedule_delay_millis / 1e3
|
||||
while not self.done:
|
||||
if not self._flushing:
|
||||
with self.condition:
|
||||
self.condition.wait(timeout)
|
||||
if not self.check_traces_queue:
|
||||
# spurious notification, let's wait again, reset timeout
|
||||
timeout = self.schedule_delay_millis / 1e3
|
||||
continue
|
||||
if self.done:
|
||||
# missing spans will be sent when calling flush
|
||||
break
|
||||
|
||||
# subtract the duration of this export call to the next timeout
|
||||
start = time_ns()
|
||||
self.export()
|
||||
end = time_ns()
|
||||
duration = (end - start) / 1e9
|
||||
timeout = self.schedule_delay_millis / 1e3 - duration
|
||||
|
||||
# be sure that all spans are sent
|
||||
self._drain_queue()
|
||||
|
||||
def is_trace_exportable(self, trace_id):
|
||||
return (
|
||||
self.traces_spans_count[trace_id]
|
||||
- self.traces_spans_ended_count[trace_id]
|
||||
<= 0
|
||||
)
|
||||
|
||||
def export(self) -> None:
|
||||
"""Exports traces with finished spans."""
|
||||
notify_flush = False
|
||||
export_trace_ids = []
|
||||
|
||||
while self.check_traces_queue:
|
||||
trace_id = self.check_traces_queue.pop()
|
||||
if trace_id is self._FLUSH_TOKEN:
|
||||
notify_flush = True
|
||||
else:
|
||||
with self.traces_lock:
|
||||
# check whether trace is exportable again in case that new
|
||||
# spans were started since we last concluded trace was
|
||||
# exportable
|
||||
if self.is_trace_exportable(trace_id):
|
||||
export_trace_ids.append(trace_id)
|
||||
del self.traces_spans_count[trace_id]
|
||||
del self.traces_spans_ended_count[trace_id]
|
||||
|
||||
if len(export_trace_ids) > 0:
|
||||
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
|
||||
|
||||
for trace_id in export_trace_ids:
|
||||
with self.traces_lock:
|
||||
try:
|
||||
# Ignore type b/c the Optional[None]+slicing is too "clever"
|
||||
# for mypy
|
||||
self.span_exporter.export(self.traces[trace_id]) # type: ignore
|
||||
# pylint: disable=broad-except
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Exception while exporting Span batch."
|
||||
)
|
||||
finally:
|
||||
del self.traces[trace_id]
|
||||
|
||||
detach(token)
|
||||
|
||||
if notify_flush:
|
||||
with self.flush_condition:
|
||||
self.flush_condition.notify()
|
||||
|
||||
def _drain_queue(self):
|
||||
"""Export all elements until queue is empty.
|
||||
|
||||
Can only be called from the worker thread context because it invokes
|
||||
`export` that is not thread safe.
|
||||
"""
|
||||
while self.check_traces_queue:
|
||||
self.export()
|
||||
|
||||
def force_flush(self, timeout_millis: int = 30000) -> bool:
|
||||
if self.done:
|
||||
logger.warning("Already shutdown, ignoring call to force_flush().")
|
||||
return True
|
||||
|
||||
self._flushing = True
|
||||
self.check_traces_queue.appendleft(self._FLUSH_TOKEN)
|
||||
|
||||
# wake up worker thread
|
||||
with self.condition:
|
||||
self.condition.notify_all()
|
||||
|
||||
# wait for token to be processed
|
||||
with self.flush_condition:
|
||||
ret = self.flush_condition.wait(timeout_millis / 1e3)
|
||||
|
||||
self._flushing = False
|
||||
|
||||
if not ret:
|
||||
logger.warning("Timeout was exceeded in force_flush().")
|
||||
return ret
|
||||
|
||||
def shutdown(self) -> None:
|
||||
# signal the worker thread to finish and then wait for it
|
||||
self.done = True
|
||||
with self.condition:
|
||||
self.condition.notify_all()
|
||||
self.worker_thread.join()
|
||||
self.span_exporter.shutdown()
|
@ -1,15 +0,0 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
__version__ = "0.30b0"
|
@ -1,669 +0,0 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
from unittest import mock
|
||||
|
||||
from ddtrace.internal.writer import AgentWriter
|
||||
from flaky import flaky
|
||||
from pytest import mark
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.context import Context
|
||||
from opentelemetry.exporter import datadog
|
||||
from opentelemetry.sdk import trace
|
||||
from opentelemetry.sdk.trace import Resource, sampling
|
||||
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
|
||||
from opentelemetry.semconv.trace import SpanAttributes
|
||||
|
||||
|
||||
class MockDatadogSpanExporter(datadog.DatadogSpanExporter):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
agent_writer_mock = mock.Mock(spec=AgentWriter)
|
||||
agent_writer_mock.started = True
|
||||
agent_writer_mock.exit_timeout = 1
|
||||
self._agent_writer = agent_writer_mock
|
||||
|
||||
|
||||
def get_spans(tracer, exporter, shutdown=True):
|
||||
if shutdown:
|
||||
tracer.span_processor.shutdown()
|
||||
|
||||
spans = [
|
||||
call_args[-1]["spans"]
|
||||
for call_args in exporter.agent_writer.write.call_args_list
|
||||
]
|
||||
|
||||
return [span.to_dict() for span in itertools.chain.from_iterable(spans)]
|
||||
|
||||
|
||||
class TestDatadogSpanExporter(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.exporter = MockDatadogSpanExporter()
|
||||
self.span_processor = datadog.DatadogExportSpanProcessor(self.exporter)
|
||||
tracer_provider = trace.TracerProvider()
|
||||
tracer_provider.add_span_processor(self.span_processor)
|
||||
self.tracer_provider = tracer_provider
|
||||
self.tracer = tracer_provider.get_tracer(__name__)
|
||||
|
||||
def tearDown(self):
|
||||
self.tracer_provider.shutdown()
|
||||
|
||||
def test_constructor_default(self):
|
||||
"""Test the default values assigned by constructor."""
|
||||
exporter = datadog.DatadogSpanExporter()
|
||||
|
||||
self.assertEqual(exporter.agent_url, "http://localhost:8126")
|
||||
self.assertIsNone(exporter.service)
|
||||
self.assertIsNotNone(exporter.agent_writer)
|
||||
|
||||
def test_constructor_explicit(self):
|
||||
"""Test the constructor passing all the options."""
|
||||
agent_url = "http://localhost:8126"
|
||||
exporter = datadog.DatadogSpanExporter(
|
||||
agent_url=agent_url,
|
||||
service="explicit",
|
||||
)
|
||||
|
||||
self.assertEqual(exporter.agent_url, agent_url)
|
||||
self.assertEqual(exporter.service, "explicit")
|
||||
self.assertIsNone(exporter.env)
|
||||
self.assertIsNone(exporter.version)
|
||||
self.assertEqual(exporter.tags, {})
|
||||
|
||||
exporter = datadog.DatadogSpanExporter(
|
||||
agent_url=agent_url,
|
||||
service="explicit",
|
||||
env="test",
|
||||
version="0.0.1",
|
||||
tags="",
|
||||
)
|
||||
|
||||
self.assertEqual(exporter.agent_url, agent_url)
|
||||
self.assertEqual(exporter.service, "explicit")
|
||||
self.assertEqual(exporter.env, "test")
|
||||
self.assertEqual(exporter.version, "0.0.1")
|
||||
self.assertEqual(exporter.tags, {})
|
||||
|
||||
exporter = datadog.DatadogSpanExporter(
|
||||
agent_url=agent_url,
|
||||
service="explicit",
|
||||
env="test",
|
||||
version="0.0.1",
|
||||
tags="team:testers,layer:app",
|
||||
)
|
||||
|
||||
self.assertEqual(exporter.agent_url, agent_url)
|
||||
self.assertEqual(exporter.service, "explicit")
|
||||
self.assertEqual(exporter.env, "test")
|
||||
self.assertEqual(exporter.version, "0.0.1")
|
||||
self.assertEqual(exporter.tags, {"team": "testers", "layer": "app"})
|
||||
|
||||
@mock.patch.dict(
|
||||
"os.environ",
|
||||
{
|
||||
"DD_TRACE_AGENT_URL": "http://agent:8126",
|
||||
"DD_SERVICE": "test-service",
|
||||
"DD_ENV": "test",
|
||||
"DD_VERSION": "0.0.1",
|
||||
"DD_TAGS": "team:testers",
|
||||
},
|
||||
)
|
||||
def test_constructor_environ(self):
|
||||
exporter = datadog.DatadogSpanExporter()
|
||||
|
||||
self.assertEqual(exporter.agent_url, "http://agent:8126")
|
||||
self.assertEqual(exporter.service, "test-service")
|
||||
self.assertEqual(exporter.env, "test")
|
||||
self.assertEqual(exporter.version, "0.0.1")
|
||||
self.assertEqual(exporter.tags, {"team": "testers"})
|
||||
self.assertIsNotNone(exporter.agent_writer)
|
||||
|
||||
# pylint: disable=too-many-locals
|
||||
@mock.patch.dict(
|
||||
"os.environ",
|
||||
{
|
||||
"DD_SERVICE": "test-service",
|
||||
"DD_ENV": "test",
|
||||
"DD_VERSION": "0.0.1",
|
||||
"DD_TAGS": "team:testers",
|
||||
},
|
||||
)
|
||||
def test_translate_to_datadog(self):
|
||||
# pylint: disable=invalid-name
|
||||
self.maxDiff = None
|
||||
|
||||
resource = Resource(
|
||||
attributes={
|
||||
"key_resource": "some_resource",
|
||||
"service.name": "resource_service_name",
|
||||
}
|
||||
)
|
||||
|
||||
resource_without_service = Resource(
|
||||
attributes={"conflicting_key": "conflicting_value"}
|
||||
)
|
||||
|
||||
span_names = ("test1", "test2", "test3")
|
||||
trace_id = 0x6E0C63257DE34C926F9EFCD03927272E
|
||||
trace_id_low = 0x6F9EFCD03927272E
|
||||
span_id = 0x34BF92DEEFC58C92
|
||||
parent_id = 0x1111111111111111
|
||||
other_id = 0x2222222222222222
|
||||
|
||||
base_time = 683647322 * 10**9 # in ns
|
||||
start_times = (
|
||||
base_time,
|
||||
base_time + 150 * 10**6,
|
||||
base_time + 300 * 10**6,
|
||||
)
|
||||
durations = (50 * 10**6, 100 * 10**6, 200 * 10**6)
|
||||
end_times = (
|
||||
start_times[0] + durations[0],
|
||||
start_times[1] + durations[1],
|
||||
start_times[2] + durations[2],
|
||||
)
|
||||
|
||||
span_context = trace_api.SpanContext(
|
||||
trace_id, span_id, is_remote=False
|
||||
)
|
||||
parent_span_context = trace_api.SpanContext(
|
||||
trace_id, parent_id, is_remote=False
|
||||
)
|
||||
other_context = trace_api.SpanContext(
|
||||
trace_id, other_id, is_remote=False
|
||||
)
|
||||
|
||||
instrumentation_info = InstrumentationInfo(__name__, "0")
|
||||
|
||||
otel_spans = [
|
||||
trace._Span(
|
||||
name=span_names[0],
|
||||
context=span_context,
|
||||
parent=parent_span_context,
|
||||
kind=trace_api.SpanKind.CLIENT,
|
||||
instrumentation_info=instrumentation_info,
|
||||
resource=Resource({}),
|
||||
),
|
||||
trace._Span(
|
||||
name=span_names[1],
|
||||
context=parent_span_context,
|
||||
parent=None,
|
||||
instrumentation_info=instrumentation_info,
|
||||
resource=resource_without_service,
|
||||
),
|
||||
trace._Span(
|
||||
name=span_names[2],
|
||||
context=other_context,
|
||||
parent=None,
|
||||
resource=resource,
|
||||
),
|
||||
]
|
||||
|
||||
otel_spans[1].set_attribute("conflicting_key", "original_value")
|
||||
|
||||
otel_spans[0].start(start_time=start_times[0])
|
||||
otel_spans[0].end(end_time=end_times[0])
|
||||
|
||||
otel_spans[1].start(start_time=start_times[1])
|
||||
otel_spans[1].end(end_time=end_times[1])
|
||||
|
||||
otel_spans[2].start(start_time=start_times[2])
|
||||
otel_spans[2].end(end_time=end_times[2])
|
||||
|
||||
# pylint: disable=protected-access
|
||||
exporter = datadog.DatadogSpanExporter()
|
||||
datadog_spans = [
|
||||
span.to_dict()
|
||||
for span in exporter._translate_to_datadog(otel_spans)
|
||||
]
|
||||
|
||||
expected_spans = [
|
||||
dict(
|
||||
trace_id=trace_id_low,
|
||||
parent_id=parent_id,
|
||||
span_id=span_id,
|
||||
name="tests.test_datadog_exporter.CLIENT",
|
||||
resource=span_names[0],
|
||||
start=start_times[0],
|
||||
duration=durations[0],
|
||||
error=0,
|
||||
service="test-service",
|
||||
meta={"env": "test", "team": "testers"},
|
||||
),
|
||||
dict(
|
||||
trace_id=trace_id_low,
|
||||
parent_id=0,
|
||||
span_id=parent_id,
|
||||
name="tests.test_datadog_exporter.INTERNAL",
|
||||
resource=span_names[1],
|
||||
start=start_times[1],
|
||||
duration=durations[1],
|
||||
error=0,
|
||||
service="test-service",
|
||||
meta={
|
||||
"env": "test",
|
||||
"team": "testers",
|
||||
"version": "0.0.1",
|
||||
"conflicting_key": "original_value",
|
||||
},
|
||||
),
|
||||
dict(
|
||||
trace_id=trace_id_low,
|
||||
parent_id=0,
|
||||
span_id=other_id,
|
||||
name=span_names[2],
|
||||
resource=span_names[2],
|
||||
start=start_times[2],
|
||||
duration=durations[2],
|
||||
error=0,
|
||||
service="resource_service_name",
|
||||
meta={
|
||||
"env": "test",
|
||||
"team": "testers",
|
||||
"version": "0.0.1",
|
||||
"key_resource": "some_resource",
|
||||
},
|
||||
),
|
||||
]
|
||||
|
||||
self.assertEqual(datadog_spans, expected_spans)
|
||||
|
||||
def test_export(self):
|
||||
"""Test that agent and/or collector are invoked"""
|
||||
# create and save span to be used in tests
|
||||
context = trace_api.SpanContext(
|
||||
trace_id=0x000000000000000000000000DEADBEEF,
|
||||
span_id=0x00000000DEADBEF0,
|
||||
is_remote=False,
|
||||
)
|
||||
|
||||
test_span = trace._Span("test_span", context=context)
|
||||
test_span.start()
|
||||
test_span.end()
|
||||
|
||||
self.exporter.export((test_span,))
|
||||
|
||||
self.assertEqual(self.exporter.agent_writer.write.call_count, 1)
|
||||
|
||||
def test_resources(self):
|
||||
test_attributes = [
|
||||
{},
|
||||
{
|
||||
SpanAttributes.HTTP_METHOD: "GET",
|
||||
SpanAttributes.HTTP_ROUTE: "/foo/<int:id>",
|
||||
},
|
||||
{
|
||||
SpanAttributes.HTTP_METHOD: "GET",
|
||||
SpanAttributes.HTTP_TARGET: "/foo/200",
|
||||
},
|
||||
]
|
||||
|
||||
for index, test in enumerate(test_attributes):
|
||||
with self.tracer.start_span(str(index), attributes=test):
|
||||
pass
|
||||
|
||||
datadog_spans = get_spans(self.tracer, self.exporter)
|
||||
|
||||
self.assertEqual(len(datadog_spans), 3)
|
||||
|
||||
actual = [span["resource"] for span in datadog_spans]
|
||||
expected = ["0", "GET /foo/<int:id>", "GET"]
|
||||
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_span_types(self):
|
||||
test_instrumentations = [
|
||||
"opentelemetry.instrumentation.aiohttp-client",
|
||||
"opentelemetry.instrumentation.dbapi",
|
||||
"opentelemetry.instrumentation.django",
|
||||
"opentelemetry.instrumentation.flask",
|
||||
"opentelemetry.instrumentation.grpc",
|
||||
"opentelemetry.instrumentation.jinja2",
|
||||
"opentelemetry.instrumentation.mysql",
|
||||
"opentelemetry.instrumentation.psycopg2",
|
||||
"opentelemetry.instrumentation.pymongo",
|
||||
"opentelemetry.instrumentation.pymysql",
|
||||
"opentelemetry.instrumentation.redis",
|
||||
"opentelemetry.instrumentation.requests",
|
||||
"opentelemetry.instrumentation.sqlalchemy",
|
||||
"opentelemetry.instrumentation.wsgi",
|
||||
]
|
||||
|
||||
for index, instrumentation in enumerate(test_instrumentations):
|
||||
# change tracer's instrumentation info before starting span
|
||||
self.tracer.instrumentation_info = InstrumentationInfo(
|
||||
instrumentation, "0"
|
||||
)
|
||||
with self.tracer.start_span(str(index)):
|
||||
pass
|
||||
|
||||
datadog_spans = get_spans(self.tracer, self.exporter)
|
||||
|
||||
self.assertEqual(len(datadog_spans), 14)
|
||||
|
||||
actual = [span.get("type") for span in datadog_spans]
|
||||
expected = [
|
||||
"http",
|
||||
"sql",
|
||||
"web",
|
||||
"web",
|
||||
"grpc",
|
||||
"template",
|
||||
"sql",
|
||||
"sql",
|
||||
"mongodb",
|
||||
"sql",
|
||||
"redis",
|
||||
"http",
|
||||
"sql",
|
||||
"web",
|
||||
]
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_errors(self):
|
||||
with self.assertRaises(ValueError):
|
||||
with self.tracer.start_span("foo"):
|
||||
raise ValueError("bar")
|
||||
|
||||
datadog_spans = get_spans(self.tracer, self.exporter)
|
||||
|
||||
self.assertEqual(len(datadog_spans), 1)
|
||||
|
||||
span = datadog_spans[0]
|
||||
self.assertEqual(span["error"], 1)
|
||||
self.assertEqual(span["meta"]["error.msg"], "bar")
|
||||
self.assertEqual(span["meta"]["error.type"], "ValueError")
|
||||
self.assertTrue(span["meta"]["error.stack"] is not None)
|
||||
|
||||
def test_shutdown(self):
|
||||
span_names = ["xxx", "bar", "foo"]
|
||||
|
||||
for name in span_names:
|
||||
with self.tracer.start_span(name):
|
||||
pass
|
||||
|
||||
self.span_processor.shutdown()
|
||||
|
||||
# check that spans are exported without an explicitly call to
|
||||
# force_flush()
|
||||
datadog_spans = get_spans(self.tracer, self.exporter)
|
||||
actual = [span.get("resource") for span in datadog_spans]
|
||||
self.assertListEqual(span_names, actual)
|
||||
|
||||
def test_flush(self):
|
||||
span_names0 = ["xxx", "bar", "foo"]
|
||||
span_names1 = ["yyy", "baz", "fox"]
|
||||
|
||||
for name in span_names0:
|
||||
with self.tracer.start_span(name):
|
||||
pass
|
||||
|
||||
self.assertTrue(self.span_processor.force_flush())
|
||||
datadog_spans = get_spans(self.tracer, self.exporter, shutdown=False)
|
||||
actual0 = [span.get("resource") for span in datadog_spans]
|
||||
self.assertListEqual(span_names0, actual0)
|
||||
|
||||
# create some more spans to check that span processor still works
|
||||
for name in span_names1:
|
||||
with self.tracer.start_span(name):
|
||||
pass
|
||||
|
||||
self.assertTrue(self.span_processor.force_flush())
|
||||
datadog_spans = get_spans(self.tracer, self.exporter)
|
||||
actual1 = [span.get("resource") for span in datadog_spans]
|
||||
self.assertListEqual(span_names0 + span_names1, actual1)
|
||||
|
||||
def test_span_processor_lossless(self):
|
||||
"""Test that no spans are lost when sending max_trace_size spans"""
|
||||
span_processor = datadog.DatadogExportSpanProcessor(
|
||||
self.exporter, max_trace_size=128
|
||||
)
|
||||
tracer_provider = trace.TracerProvider()
|
||||
tracer_provider.add_span_processor(span_processor)
|
||||
tracer = tracer_provider.get_tracer(__name__)
|
||||
|
||||
with tracer.start_as_current_span("root"):
|
||||
for _ in range(127):
|
||||
with tracer.start_span("foo"):
|
||||
pass
|
||||
|
||||
self.assertTrue(span_processor.force_flush())
|
||||
datadog_spans = get_spans(tracer, self.exporter)
|
||||
self.assertEqual(len(datadog_spans), 128)
|
||||
tracer_provider.shutdown()
|
||||
|
||||
def test_span_processor_dropped_spans(self):
|
||||
"""Test that spans are lost when exceeding max_trace_size spans"""
|
||||
span_processor = datadog.DatadogExportSpanProcessor(
|
||||
self.exporter, max_trace_size=128
|
||||
)
|
||||
tracer_provider = trace.TracerProvider()
|
||||
tracer_provider.add_span_processor(span_processor)
|
||||
tracer = tracer_provider.get_tracer(__name__)
|
||||
|
||||
with tracer.start_as_current_span("root"):
|
||||
for _ in range(127):
|
||||
with tracer.start_span("foo"):
|
||||
pass
|
||||
with self.assertLogs(level=logging.WARNING):
|
||||
with tracer.start_span("one-too-many"):
|
||||
pass
|
||||
|
||||
self.assertTrue(span_processor.force_flush())
|
||||
datadog_spans = get_spans(tracer, self.exporter)
|
||||
self.assertEqual(len(datadog_spans), 128)
|
||||
tracer_provider.shutdown()
|
||||
|
||||
@mark.skipif(
|
||||
sys.platform == "win32",
|
||||
reason="unreliable test on windows",
|
||||
)
|
||||
def test_span_processor_scheduled_delay(self):
|
||||
"""Test that spans are exported each schedule_delay_millis"""
|
||||
delay = 300
|
||||
span_processor = datadog.DatadogExportSpanProcessor(
|
||||
self.exporter, schedule_delay_millis=delay
|
||||
)
|
||||
tracer_provider = trace.TracerProvider()
|
||||
tracer_provider.add_span_processor(span_processor)
|
||||
tracer = tracer_provider.get_tracer(__name__)
|
||||
|
||||
with tracer.start_span("foo"):
|
||||
pass
|
||||
|
||||
time.sleep(delay / (1e3 * 2))
|
||||
datadog_spans = get_spans(tracer, self.exporter, shutdown=False)
|
||||
self.assertEqual(len(datadog_spans), 0)
|
||||
|
||||
time.sleep(delay / (1e3 * 2) + 0.01)
|
||||
datadog_spans = get_spans(tracer, self.exporter, shutdown=False)
|
||||
self.assertEqual(len(datadog_spans), 1)
|
||||
|
||||
tracer_provider.shutdown()
|
||||
|
||||
@flaky(max_runs=3, min_passes=1)
|
||||
def test_batch_span_processor_reset_timeout(self):
|
||||
"""Test that the scheduled timeout is reset on cycles without spans"""
|
||||
delay = 50
|
||||
# pylint: disable=protected-access
|
||||
exporter = MockDatadogSpanExporter()
|
||||
exporter._agent_writer.write.side_effect = lambda spans: time.sleep(
|
||||
0.05
|
||||
)
|
||||
span_processor = datadog.DatadogExportSpanProcessor(
|
||||
exporter, schedule_delay_millis=delay
|
||||
)
|
||||
tracer_provider = trace.TracerProvider()
|
||||
tracer_provider.add_span_processor(span_processor)
|
||||
tracer = tracer_provider.get_tracer(__name__)
|
||||
with mock.patch.object(span_processor.condition, "wait") as mock_wait:
|
||||
with tracer.start_span("foo"):
|
||||
pass
|
||||
|
||||
# give some time for exporter to loop
|
||||
# since wait is mocked it should return immediately
|
||||
time.sleep(0.1)
|
||||
mock_wait_calls = list(mock_wait.mock_calls)
|
||||
|
||||
# find the index of the call that processed the singular span
|
||||
for idx, wait_call in enumerate(mock_wait_calls):
|
||||
_, args, __ = wait_call
|
||||
if args[0] <= 0:
|
||||
after_calls = mock_wait_calls[idx + 1 :]
|
||||
break
|
||||
|
||||
self.assertTrue(
|
||||
all(args[0] >= 0.05 for _, args, __ in after_calls)
|
||||
)
|
||||
|
||||
span_processor.shutdown()
|
||||
|
||||
def test_span_processor_accepts_parent_context(self):
|
||||
span_processor = mock.Mock(
|
||||
wraps=datadog.DatadogExportSpanProcessor(self.exporter)
|
||||
)
|
||||
tracer_provider = trace.TracerProvider()
|
||||
tracer_provider.add_span_processor(span_processor)
|
||||
tracer = tracer_provider.get_tracer(__name__)
|
||||
|
||||
context = Context()
|
||||
span = tracer.start_span("foo", context=context)
|
||||
|
||||
span_processor.on_start.assert_called_once_with(
|
||||
span, parent_context=context
|
||||
)
|
||||
|
||||
def test_origin(self):
|
||||
trace_id = 0x000000000000000000000000DEADBEEF
|
||||
trace_state = trace_api.TraceState(
|
||||
[(datadog.constants.DD_ORIGIN, "origin-service")]
|
||||
)
|
||||
parent_span_ctx = trace_api.SpanContext(
|
||||
trace_id=trace_id,
|
||||
span_id=0x1,
|
||||
is_remote=False,
|
||||
trace_state=trace_state,
|
||||
)
|
||||
child_span_ctx = trace_api.SpanContext(
|
||||
trace_id=trace_id,
|
||||
span_id=0x2,
|
||||
is_remote=False,
|
||||
trace_state=trace_state,
|
||||
)
|
||||
|
||||
root_span = trace._Span(
|
||||
name="root", context=parent_span_ctx, parent=None
|
||||
)
|
||||
child_span = trace._Span(
|
||||
name="child", context=child_span_ctx, parent=parent_span_ctx
|
||||
)
|
||||
root_span.start()
|
||||
child_span.start()
|
||||
child_span.end()
|
||||
root_span.end()
|
||||
|
||||
# pylint: disable=protected-access
|
||||
exporter = datadog.DatadogSpanExporter()
|
||||
datadog_spans = [
|
||||
span.to_dict()
|
||||
for span in exporter._translate_to_datadog([root_span, child_span])
|
||||
]
|
||||
|
||||
self.assertEqual(len(datadog_spans), 2)
|
||||
|
||||
actual = [
|
||||
span["meta"].get(datadog.constants.DD_ORIGIN)
|
||||
if "meta" in span
|
||||
else None
|
||||
for span in datadog_spans
|
||||
]
|
||||
expected = ["origin-service", None]
|
||||
self.assertListEqual(actual, expected)
|
||||
|
||||
def test_sampling_rate(self):
|
||||
context = trace_api.SpanContext(
|
||||
trace_id=0x000000000000000000000000DEADBEEF,
|
||||
span_id=0x34BF92DEEFC58C92,
|
||||
is_remote=False,
|
||||
trace_flags=trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED),
|
||||
)
|
||||
trace_api.get_tracer_provider().sampler = sampling.TraceIdRatioBased(
|
||||
0.5
|
||||
)
|
||||
|
||||
span = trace._Span(name="sampled", context=context, parent=None)
|
||||
span.start()
|
||||
span.end()
|
||||
|
||||
# pylint: disable=protected-access
|
||||
exporter = datadog.DatadogSpanExporter()
|
||||
datadog_spans = [
|
||||
span.to_dict() for span in exporter._translate_to_datadog([span])
|
||||
]
|
||||
|
||||
self.assertEqual(len(datadog_spans), 1)
|
||||
|
||||
actual = [
|
||||
span["metrics"].get(datadog.constants.SAMPLE_RATE_METRIC_KEY)
|
||||
if "metrics" in span
|
||||
else None
|
||||
for span in datadog_spans
|
||||
]
|
||||
expected = [0.5]
|
||||
self.assertListEqual(actual, expected)
|
||||
|
||||
def test_service_name_fallback(self):
|
||||
context = trace_api.SpanContext(
|
||||
trace_id=0x000000000000000000000000DEADBEEF,
|
||||
span_id=0x34BF92DEEFC58C92,
|
||||
is_remote=False,
|
||||
trace_flags=trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED),
|
||||
)
|
||||
trace_api.get_tracer_provider().sampler = sampling.TraceIdRatioBased(
|
||||
0.5
|
||||
)
|
||||
|
||||
resource_with_default_name = Resource(
|
||||
attributes={
|
||||
"key_resource": "some_resource",
|
||||
"service.name": "unknown_service",
|
||||
}
|
||||
)
|
||||
|
||||
span = trace._Span(
|
||||
name="sampled",
|
||||
context=context,
|
||||
parent=None,
|
||||
resource=resource_with_default_name,
|
||||
)
|
||||
span.start()
|
||||
span.end()
|
||||
|
||||
# pylint: disable=protected-access
|
||||
exporter = datadog.DatadogSpanExporter(service="fallback_service_name")
|
||||
datadog_spans = [
|
||||
span.to_dict() for span in exporter._translate_to_datadog([span])
|
||||
]
|
||||
|
||||
self.assertEqual(len(datadog_spans), 1)
|
||||
|
||||
span = datadog_spans[0]
|
||||
self.assertEqual(span["service"], "fallback_service_name")
|
@ -1,216 +0,0 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.context import Context
|
||||
from opentelemetry.exporter.datadog import constants, propagator
|
||||
from opentelemetry.sdk import trace
|
||||
from opentelemetry.sdk.trace.id_generator import RandomIdGenerator
|
||||
from opentelemetry.trace import get_current_span, set_span_in_context
|
||||
|
||||
FORMAT = propagator.DatadogFormat()
|
||||
|
||||
|
||||
class TestDatadogFormat(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
id_generator = RandomIdGenerator()
|
||||
cls.serialized_trace_id = propagator.format_trace_id(
|
||||
id_generator.generate_trace_id()
|
||||
)
|
||||
cls.serialized_parent_id = propagator.format_span_id(
|
||||
id_generator.generate_span_id()
|
||||
)
|
||||
cls.serialized_origin = "origin-service"
|
||||
|
||||
def test_extract_malformed_headers_to_explicit_ctx(self):
|
||||
"""Test with no Datadog headers"""
|
||||
orig_ctx = Context({"k1": "v1"})
|
||||
malformed_trace_id_key = FORMAT.TRACE_ID_KEY + "-x"
|
||||
malformed_parent_id_key = FORMAT.PARENT_ID_KEY + "-x"
|
||||
context = FORMAT.extract(
|
||||
{
|
||||
malformed_trace_id_key: self.serialized_trace_id,
|
||||
malformed_parent_id_key: self.serialized_parent_id,
|
||||
},
|
||||
orig_ctx,
|
||||
)
|
||||
self.assertDictEqual(orig_ctx, context)
|
||||
|
||||
def test_extract_malformed_headers_to_implicit_ctx(self):
|
||||
malformed_trace_id_key = FORMAT.TRACE_ID_KEY + "-x"
|
||||
malformed_parent_id_key = FORMAT.PARENT_ID_KEY + "-x"
|
||||
context = FORMAT.extract(
|
||||
{
|
||||
malformed_trace_id_key: self.serialized_trace_id,
|
||||
malformed_parent_id_key: self.serialized_parent_id,
|
||||
}
|
||||
)
|
||||
self.assertDictEqual(Context(), context)
|
||||
|
||||
def test_extract_missing_trace_id_to_explicit_ctx(self):
|
||||
"""If a trace id is missing, populate an invalid trace id."""
|
||||
orig_ctx = Context({"k1": "v1"})
|
||||
carrier = {FORMAT.PARENT_ID_KEY: self.serialized_parent_id}
|
||||
|
||||
ctx = FORMAT.extract(carrier, orig_ctx)
|
||||
self.assertDictEqual(orig_ctx, ctx)
|
||||
|
||||
def test_extract_missing_trace_id_to_implicit_ctx(self):
|
||||
carrier = {FORMAT.PARENT_ID_KEY: self.serialized_parent_id}
|
||||
|
||||
ctx = FORMAT.extract(carrier)
|
||||
self.assertDictEqual(Context(), ctx)
|
||||
|
||||
def test_extract_missing_parent_id_to_explicit_ctx(self):
|
||||
"""If a parent id is missing, populate an invalid trace id."""
|
||||
orig_ctx = Context({"k1": "v1"})
|
||||
carrier = {FORMAT.TRACE_ID_KEY: self.serialized_trace_id}
|
||||
|
||||
ctx = FORMAT.extract(carrier, orig_ctx)
|
||||
self.assertDictEqual(orig_ctx, ctx)
|
||||
|
||||
def test_extract_missing_parent_id_to_implicit_ctx(self):
|
||||
carrier = {FORMAT.TRACE_ID_KEY: self.serialized_trace_id}
|
||||
|
||||
ctx = FORMAT.extract(carrier)
|
||||
self.assertDictEqual(Context(), ctx)
|
||||
|
||||
def test_context_propagation(self):
|
||||
"""Test the propagation of Datadog headers."""
|
||||
parent_span_context = get_current_span(
|
||||
FORMAT.extract(
|
||||
{
|
||||
FORMAT.TRACE_ID_KEY: self.serialized_trace_id,
|
||||
FORMAT.PARENT_ID_KEY: self.serialized_parent_id,
|
||||
FORMAT.SAMPLING_PRIORITY_KEY: str(constants.AUTO_KEEP),
|
||||
FORMAT.ORIGIN_KEY: self.serialized_origin,
|
||||
},
|
||||
)
|
||||
).get_span_context()
|
||||
|
||||
self.assertEqual(
|
||||
parent_span_context.trace_id, int(self.serialized_trace_id)
|
||||
)
|
||||
self.assertEqual(
|
||||
parent_span_context.span_id, int(self.serialized_parent_id)
|
||||
)
|
||||
self.assertEqual(parent_span_context.trace_flags, constants.AUTO_KEEP)
|
||||
self.assertEqual(
|
||||
parent_span_context.trace_state.get(constants.DD_ORIGIN),
|
||||
self.serialized_origin,
|
||||
)
|
||||
self.assertTrue(parent_span_context.is_remote)
|
||||
|
||||
child = trace._Span(
|
||||
"child",
|
||||
trace_api.SpanContext(
|
||||
parent_span_context.trace_id,
|
||||
RandomIdGenerator().generate_span_id(),
|
||||
is_remote=False,
|
||||
trace_flags=parent_span_context.trace_flags,
|
||||
trace_state=parent_span_context.trace_state,
|
||||
),
|
||||
parent=parent_span_context,
|
||||
)
|
||||
|
||||
child_carrier = {}
|
||||
child_context = set_span_in_context(child)
|
||||
FORMAT.inject(child_carrier, context=child_context)
|
||||
|
||||
self.assertEqual(
|
||||
child_carrier[FORMAT.TRACE_ID_KEY], self.serialized_trace_id
|
||||
)
|
||||
self.assertEqual(
|
||||
child_carrier[FORMAT.PARENT_ID_KEY], str(child.context.span_id)
|
||||
)
|
||||
self.assertEqual(
|
||||
child_carrier[FORMAT.SAMPLING_PRIORITY_KEY],
|
||||
str(constants.AUTO_KEEP),
|
||||
)
|
||||
self.assertEqual(
|
||||
child_carrier.get(FORMAT.ORIGIN_KEY), self.serialized_origin
|
||||
)
|
||||
|
||||
def test_sampling_priority_auto_reject(self):
|
||||
"""Test sampling priority rejected."""
|
||||
parent_span_context = get_current_span(
|
||||
FORMAT.extract(
|
||||
{
|
||||
FORMAT.TRACE_ID_KEY: self.serialized_trace_id,
|
||||
FORMAT.PARENT_ID_KEY: self.serialized_parent_id,
|
||||
FORMAT.SAMPLING_PRIORITY_KEY: str(constants.AUTO_REJECT),
|
||||
},
|
||||
)
|
||||
).get_span_context()
|
||||
|
||||
self.assertEqual(
|
||||
parent_span_context.trace_flags, constants.AUTO_REJECT
|
||||
)
|
||||
|
||||
child = trace._Span(
|
||||
"child",
|
||||
trace_api.SpanContext(
|
||||
parent_span_context.trace_id,
|
||||
RandomIdGenerator().generate_span_id(),
|
||||
is_remote=False,
|
||||
trace_flags=parent_span_context.trace_flags,
|
||||
trace_state=parent_span_context.trace_state,
|
||||
),
|
||||
parent=parent_span_context,
|
||||
)
|
||||
|
||||
child_carrier = {}
|
||||
child_context = set_span_in_context(child)
|
||||
FORMAT.inject(child_carrier, context=child_context)
|
||||
|
||||
self.assertEqual(
|
||||
child_carrier[FORMAT.SAMPLING_PRIORITY_KEY],
|
||||
str(constants.AUTO_REJECT),
|
||||
)
|
||||
|
||||
@patch("opentelemetry.exporter.datadog.propagator.get_current_span")
|
||||
def test_fields(self, mock_get_current_span):
|
||||
"""Make sure the fields attribute returns the fields used in inject"""
|
||||
|
||||
tracer = trace.TracerProvider().get_tracer("sdk_tracer_provider")
|
||||
|
||||
mock_setter = Mock()
|
||||
|
||||
mock_get_current_span.configure_mock(
|
||||
**{
|
||||
"return_value": Mock(
|
||||
**{
|
||||
"get_span_context.return_value": None,
|
||||
"context.trace_flags": 0,
|
||||
"context.trace_id": 1,
|
||||
"context.trace_state": {constants.DD_ORIGIN: 0},
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
with tracer.start_as_current_span("parent"):
|
||||
with tracer.start_as_current_span("child"):
|
||||
FORMAT.inject({}, setter=mock_setter)
|
||||
|
||||
inject_fields = set()
|
||||
|
||||
for call in mock_setter.mock_calls:
|
||||
inject_fields.add(call[1][1])
|
||||
|
||||
self.assertEqual(FORMAT.fields, inject_fields)
|
@ -41,6 +41,3 @@ DISTDIR=dist
|
||||
done
|
||||
)
|
||||
)
|
||||
|
||||
# FIXME: This is a temporary workaround, see #1357.
|
||||
rm -rf $DISTDIR/opentelemetry_exporter_datadog-0.30b0.tar.gz
|
||||
|
@ -16,7 +16,6 @@ PYTHON_VERSION_INFO=(${PYTHON_VERSION//./ })
|
||||
|
||||
coverage erase
|
||||
|
||||
cov exporter/opentelemetry-exporter-datadog
|
||||
cov instrumentation/opentelemetry-instrumentation-flask
|
||||
cov instrumentation/opentelemetry-instrumentation-requests
|
||||
cov instrumentation/opentelemetry-instrumentation-wsgi
|
||||
|
Reference in New Issue
Block a user