commit 1b9d22dbf77aa9f1e202fe4b01296aebe0d9516d Author: Leighton Chen Date: Wed Jul 29 10:03:46 2020 -0700 Rename exporter packages from "ext" to "exporter" (#953) diff --git a/exporter/opentelemetry-exporter-datadog/CHANGELOG.md b/exporter/opentelemetry-exporter-datadog/CHANGELOG.md new file mode 100644 index 000000000..d15d5a4b5 --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/CHANGELOG.md @@ -0,0 +1,14 @@ +# Changelog + +## Unreleased + +- Change package name to opentelemetry-exporter-datadog + ([#953](https://github.com/open-telemetry/opentelemetry-python/pull/953)) + +## 0.8b0 + +Released 2020-05-27 + +- Add exporter to Datadog + ([#572](https://github.com/open-telemetry/opentelemetry-python/pull/572)) + diff --git a/exporter/opentelemetry-exporter-datadog/README.rst b/exporter/opentelemetry-exporter-datadog/README.rst new file mode 100644 index 000000000..cb97e5997 --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/README.rst @@ -0,0 +1,29 @@ +OpenTelemetry Datadog Exporter +============================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-exporter-datadog.svg + :target: https://pypi.org/project/opentelemetry-exporter-datadog/ + +This library allows to export tracing data to `Datadog +`_. OpenTelemetry span event and links are not +supported. + +Installation +------------ + +:: + + pip install opentelemetry-exporter-datadog + + +.. _Datadog: https://www.datadoghq.com/ +.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/ + + +References +---------- + +* `Datadog `_ +* `OpenTelemetry Project `_ diff --git a/exporter/opentelemetry-exporter-datadog/setup.cfg b/exporter/opentelemetry-exporter-datadog/setup.cfg new file mode 100644 index 000000000..266abe9e0 --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/setup.cfg @@ -0,0 +1,50 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-exporter-datadog +description = Datadog Span Exporter for OpenTelemetry +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python/exporter/opentelemetry-exporter-datadog +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.5 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + +[options] +python_requires = >=3.5 +package_dir= + =src +packages=find_namespace: +install_requires = + ddtrace>=0.34.0 + opentelemetry-api == 0.12.dev0 + opentelemetry-sdk == 0.12.dev0 + +[options.packages.find] +where = src + +[options.extras_require] +test = diff --git a/exporter/opentelemetry-exporter-datadog/setup.py b/exporter/opentelemetry-exporter-datadog/setup.py new file mode 100644 index 000000000..0c3bdf453 --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/setup.py @@ -0,0 +1,27 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "exporter", "datadog", "version.py" +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup(version=PACKAGE_INFO["__version__"]) diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/__init__.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/__init__.py new file mode 100644 index 000000000..7adde5df5 --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/__init__.py @@ -0,0 +1,76 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +The **OpenTelemetry Datadog Exporter** provides a span exporter from +`OpenTelemetry`_ traces to `Datadog`_ by using the Datadog Agent. + +Installation +------------ + +:: + + pip install opentelemetry-ext-datadog + + +Usage +----- + +The Datadog exporter provides a span processor that must be added along with the +exporter. In addition, a formatter is provided to handle propagation of trace +context between OpenTelemetry-instrumented and Datadog-instrumented services in +a distributed trace. + +.. code:: python + + from opentelemetry import propagators, trace + from opentelemetry.exporter.datadog import DatadogExportSpanProcessor, DatadogSpanExporter + from opentelemetry.exporter.datadog.propagator import DatadogFormat + from opentelemetry.sdk.trace import TracerProvider + + trace.set_tracer_provider(TracerProvider()) + tracer = trace.get_tracer(__name__) + + exporter = DatadogSpanExporter( + agent_url="http://agent:8126", service="my-helloworld-service" + ) + + span_processor = DatadogExportSpanProcessor(exporter) + trace.get_tracer_provider().add_span_processor(span_processor) + + # Optional: use Datadog format for propagation in distributed traces + propagators.set_global_httptextformat(DatadogFormat()) + + with tracer.start_as_current_span("foo"): + print("Hello world!") + + +Examples +-------- + +The `docs/examples/datadog_exporter`_ includes examples for using the Datadog +exporter with OpenTelemetry instrumented applications. + +API +--- +.. _Datadog: https://www.datadoghq.com/ +.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/ +.. _docs/examples/datadog_exporter: https://github.com/open-telemetry/opentelemetry-python/tree/master/docs/examples/datadog_exporter +""" +# pylint: disable=import-error + +from .exporter import DatadogSpanExporter +from .spanprocessor import DatadogExportSpanProcessor + +__all__ = ["DatadogExportSpanProcessor", "DatadogSpanExporter"] diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/constants.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/constants.py new file mode 100644 index 000000000..92d736c91 --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/constants.py @@ -0,0 +1,8 @@ +DD_ORIGIN = "_dd_origin" +AUTO_REJECT = 0 +AUTO_KEEP = 1 +USER_KEEP = 2 +SAMPLE_RATE_METRIC_KEY = "_sample_rate" +SAMPLING_PRIORITY_KEY = "_sampling_priority_v1" +ENV_KEY = "env" +VERSION_KEY = "version" diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/exporter.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/exporter.py new file mode 100644 index 000000000..e11772d0d --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/exporter.py @@ -0,0 +1,283 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +from urllib.parse import urlparse + +from ddtrace.ext import SpanTypes as DatadogSpanTypes +from ddtrace.internal.writer import AgentWriter +from ddtrace.span import Span as DatadogSpan + +import opentelemetry.trace as trace_api +from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult +from opentelemetry.trace.status import StatusCanonicalCode + +# pylint:disable=relative-beyond-top-level +from .constants import DD_ORIGIN, ENV_KEY, SAMPLE_RATE_METRIC_KEY, VERSION_KEY + +logger = logging.getLogger(__name__) + + +DEFAULT_AGENT_URL = "http://localhost:8126" +_INSTRUMENTATION_SPAN_TYPES = { + "opentelemetry.ext.aiohttp-client": DatadogSpanTypes.HTTP, + "opentelemetry.ext.asgi": DatadogSpanTypes.WEB, + "opentelemetry.ext.dbapi": DatadogSpanTypes.SQL, + "opentelemetry.ext.django": DatadogSpanTypes.WEB, + "opentelemetry.ext.flask": DatadogSpanTypes.WEB, + "opentelemetry.ext.grpc": DatadogSpanTypes.GRPC, + "opentelemetry.ext.jinja2": DatadogSpanTypes.TEMPLATE, + "opentelemetry.ext.mysql": DatadogSpanTypes.SQL, + "opentelemetry.ext.psycopg2": DatadogSpanTypes.SQL, + "opentelemetry.ext.pymemcache": DatadogSpanTypes.CACHE, + "opentelemetry.ext.pymongo": DatadogSpanTypes.MONGODB, + "opentelemetry.ext.pymysql": DatadogSpanTypes.SQL, + "opentelemetry.ext.redis": DatadogSpanTypes.REDIS, + "opentelemetry.ext.requests": DatadogSpanTypes.HTTP, + "opentelemetry.ext.sqlalchemy": DatadogSpanTypes.SQL, + "opentelemetry.ext.wsgi": DatadogSpanTypes.WEB, +} + + +class DatadogSpanExporter(SpanExporter): + """Datadog span exporter for OpenTelemetry. + + Args: + agent_url: The url of the Datadog Agent or use ``DD_TRACE_AGENT_URL`` environment variable + service: The service name to be used for the application or use ``DD_SERVICE`` environment variable + env: Set the application’s environment or use ``DD_ENV`` environment variable + version: Set the application’s version or use ``DD_VERSION`` environment variable + tags: A list of default tags to be added to every span or use ``DD_TAGS`` environment variable + """ + + def __init__( + self, agent_url=None, service=None, env=None, version=None, tags=None + ): + self.agent_url = ( + agent_url + if agent_url + else os.environ.get("DD_TRACE_AGENT_URL", DEFAULT_AGENT_URL) + ) + self.service = service or os.environ.get("DD_SERVICE") + self.env = env or os.environ.get("DD_ENV") + self.version = version or os.environ.get("DD_VERSION") + self.tags = _parse_tags_str(tags or os.environ.get("DD_TAGS")) + self._agent_writer = None + + @property + def agent_writer(self): + if self._agent_writer is None: + url_parsed = urlparse(self.agent_url) + if url_parsed.scheme in ("http", "https"): + self._agent_writer = AgentWriter( + hostname=url_parsed.hostname, + port=url_parsed.port, + https=url_parsed.scheme == "https", + ) + elif url_parsed.scheme == "unix": + self._agent_writer = AgentWriter(uds_path=url_parsed.path) + else: + raise ValueError( + "Unknown scheme `%s` for agent URL" % url_parsed.scheme + ) + return self._agent_writer + + def export(self, spans): + datadog_spans = self._translate_to_datadog(spans) + + self.agent_writer.write(spans=datadog_spans) + + return SpanExportResult.SUCCESS + + def shutdown(self): + if self.agent_writer.started: + self.agent_writer.stop() + self.agent_writer.join(self.agent_writer.exit_timeout) + + def _translate_to_datadog(self, spans): + datadog_spans = [] + + for span in spans: + trace_id, parent_id, span_id = _get_trace_ids(span) + + # datadog Span is initialized with a reference to the tracer which is + # used to record the span when it is finished. We can skip ignore this + # because we are not calling the finish method and explictly set the + # duration. + tracer = None + + datadog_span = DatadogSpan( + tracer, + _get_span_name(span), + service=self.service, + resource=_get_resource(span), + span_type=_get_span_type(span), + trace_id=trace_id, + span_id=span_id, + parent_id=parent_id, + ) + datadog_span.start_ns = span.start_time + datadog_span.duration_ns = span.end_time - span.start_time + + if span.status.canonical_code is not StatusCanonicalCode.OK: + datadog_span.error = 1 + if span.status.description: + exc_type, exc_val = _get_exc_info(span) + # no mapping for error.stack since traceback not recorded + datadog_span.set_tag("error.msg", exc_val) + datadog_span.set_tag("error.type", exc_type) + + datadog_span.set_tags(span.attributes) + + # add configured env tag + if self.env is not None: + datadog_span.set_tag(ENV_KEY, self.env) + + # add configured application version tag to only root span + if self.version is not None and parent_id == 0: + datadog_span.set_tag(VERSION_KEY, self.version) + + # add configured global tags + datadog_span.set_tags(self.tags) + + # add origin to root span + origin = _get_origin(span) + if origin and parent_id == 0: + datadog_span.set_tag(DD_ORIGIN, origin) + + sampling_rate = _get_sampling_rate(span) + if sampling_rate is not None: + datadog_span.set_metric(SAMPLE_RATE_METRIC_KEY, sampling_rate) + + # span events and span links are not supported + + datadog_spans.append(datadog_span) + + return datadog_spans + + +def _get_trace_ids(span): + """Extract tracer ids from span""" + ctx = span.get_context() + trace_id = ctx.trace_id + span_id = ctx.span_id + + if isinstance(span.parent, trace_api.Span): + parent_id = span.parent.get_context().span_id + elif isinstance(span.parent, trace_api.SpanContext): + parent_id = span.parent.span_id + else: + parent_id = 0 + + trace_id = _convert_trace_id_uint64(trace_id) + + return trace_id, parent_id, span_id + + +def _convert_trace_id_uint64(otel_id): + """Convert 128-bit int used for trace_id to 64-bit unsigned int""" + return otel_id & 0xFFFFFFFFFFFFFFFF + + +def _get_span_name(span): + """Get span name by using instrumentation and kind while backing off to + span.name + """ + instrumentation_name = ( + span.instrumentation_info.name if span.instrumentation_info else None + ) + span_kind_name = span.kind.name if span.kind else None + name = ( + "{}.{}".format(instrumentation_name, span_kind_name) + if instrumentation_name and span_kind_name + else span.name + ) + return name + + +def _get_resource(span): + """Get resource name for span""" + if "http.method" in span.attributes: + route = span.attributes.get("http.route") + return ( + span.attributes["http.method"] + " " + route + if route + else span.attributes["http.method"] + ) + + return span.name + + +def _get_span_type(span): + """Get Datadog span type""" + instrumentation_name = ( + span.instrumentation_info.name if span.instrumentation_info else None + ) + span_type = _INSTRUMENTATION_SPAN_TYPES.get(instrumentation_name) + return span_type + + +def _get_exc_info(span): + """Parse span status description for exception type and value""" + exc_type, exc_val = span.status.description.split(":", 1) + return exc_type, exc_val.strip() + + +def _get_origin(span): + ctx = span.get_context() + origin = ctx.trace_state.get(DD_ORIGIN) + return origin + + +def _get_sampling_rate(span): + ctx = span.get_context() + return ( + span.sampler.rate + if ctx.trace_flags.sampled + and isinstance(span.sampler, trace_api.sampling.ProbabilitySampler) + else None + ) + + +def _parse_tags_str(tags_str): + """Parse a string of tags typically provided via environment variables. + + The expected string is of the form:: + "key1:value1,key2:value2" + + :param tags_str: A string of the above form to parse tags from. + :return: A dict containing the tags that were parsed. + """ + parsed_tags = {} + if not tags_str: + return parsed_tags + + for tag in tags_str.split(","): + try: + key, value = tag.split(":", 1) + + # Validate the tag + if key == "" or value == "" or value.endswith(":"): + raise ValueError + except ValueError: + logger.error( + "Malformed tag in tag pair '%s' from tag string '%s'.", + tag, + tags_str, + ) + else: + parsed_tags[key] = value + + return parsed_tags diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/propagator.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/propagator.py new file mode 100644 index 000000000..5935557de --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/propagator.py @@ -0,0 +1,127 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +from opentelemetry import trace +from opentelemetry.context import Context +from opentelemetry.trace import get_current_span, set_span_in_context +from opentelemetry.trace.propagation.httptextformat import ( + Getter, + HTTPTextFormat, + HTTPTextFormatT, + Setter, +) + +# pylint:disable=relative-beyond-top-level +from . import constants + + +class DatadogFormat(HTTPTextFormat): + """Propagator for the Datadog HTTP header format. + """ + + TRACE_ID_KEY = "x-datadog-trace-id" + PARENT_ID_KEY = "x-datadog-parent-id" + SAMPLING_PRIORITY_KEY = "x-datadog-sampling-priority" + ORIGIN_KEY = "x-datadog-origin" + + def extract( + self, + get_from_carrier: Getter[HTTPTextFormatT], + carrier: HTTPTextFormatT, + context: typing.Optional[Context] = None, + ) -> Context: + trace_id = extract_first_element( + get_from_carrier(carrier, self.TRACE_ID_KEY) + ) + + span_id = extract_first_element( + get_from_carrier(carrier, self.PARENT_ID_KEY) + ) + + sampled = extract_first_element( + get_from_carrier(carrier, self.SAMPLING_PRIORITY_KEY) + ) + + origin = extract_first_element( + get_from_carrier(carrier, self.ORIGIN_KEY) + ) + + trace_flags = trace.TraceFlags() + if sampled and int(sampled) in ( + constants.AUTO_KEEP, + constants.USER_KEEP, + ): + trace_flags |= trace.TraceFlags.SAMPLED + + if trace_id is None or span_id is None: + return set_span_in_context(trace.INVALID_SPAN, context) + + span_context = trace.SpanContext( + trace_id=int(trace_id), + span_id=int(span_id), + is_remote=True, + trace_flags=trace_flags, + trace_state=trace.TraceState({constants.DD_ORIGIN: origin}), + ) + + return set_span_in_context(trace.DefaultSpan(span_context), context) + + def inject( + self, + set_in_carrier: Setter[HTTPTextFormatT], + carrier: HTTPTextFormatT, + context: typing.Optional[Context] = None, + ) -> None: + span = get_current_span(context) + span_context = span.get_context() + if span_context == trace.INVALID_SPAN_CONTEXT: + return + sampled = (trace.TraceFlags.SAMPLED & span.context.trace_flags) != 0 + set_in_carrier( + carrier, self.TRACE_ID_KEY, format_trace_id(span.context.trace_id), + ) + set_in_carrier( + carrier, self.PARENT_ID_KEY, format_span_id(span.context.span_id) + ) + set_in_carrier( + carrier, + self.SAMPLING_PRIORITY_KEY, + str(constants.AUTO_KEEP if sampled else constants.AUTO_REJECT), + ) + if constants.DD_ORIGIN in span.context.trace_state: + set_in_carrier( + carrier, + self.ORIGIN_KEY, + span.context.trace_state[constants.DD_ORIGIN], + ) + + +def format_trace_id(trace_id: int) -> str: + """Format the trace id for Datadog.""" + return str(trace_id & 0xFFFFFFFFFFFFFFFF) + + +def format_span_id(span_id: int) -> str: + """Format the span id for Datadog.""" + return str(span_id) + + +def extract_first_element( + items: typing.Iterable[HTTPTextFormatT], +) -> typing.Optional[HTTPTextFormatT]: + if items is None: + return None + return next(iter(items), None) diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/spanprocessor.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/spanprocessor.py new file mode 100644 index 000000000..600778c88 --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/spanprocessor.py @@ -0,0 +1,222 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import logging +import threading +import typing + +from opentelemetry.context import attach, detach, set_value +from opentelemetry.sdk.trace import Span, SpanProcessor +from opentelemetry.sdk.trace.export import SpanExporter +from opentelemetry.trace import INVALID_TRACE_ID +from opentelemetry.util import time_ns + +logger = logging.getLogger(__name__) + + +class DatadogExportSpanProcessor(SpanProcessor): + """Datadog exporter span processor + + DatadogExportSpanProcessor is an implementation of `SpanProcessor` that + batches all opened spans into a list per trace. When all spans for a trace + are ended, the trace is queues up for export. This is required for exporting + to the Datadog Agent which expects to received list of spans for each trace. + """ + + _FLUSH_TOKEN = INVALID_TRACE_ID + + def __init__( + self, + span_exporter: SpanExporter, + schedule_delay_millis: float = 5000, + max_trace_size: int = 4096, + ): + if max_trace_size <= 0: + raise ValueError("max_queue_size must be a positive integer.") + + if schedule_delay_millis <= 0: + raise ValueError("schedule_delay_millis must be positive.") + + self.span_exporter = span_exporter + + # queue trace_ids for traces with recently ended spans for worker thread to check + # for exporting + self.check_traces_queue = ( + collections.deque() + ) # type: typing.Deque[int] + + self.traces_lock = threading.Lock() + # dictionary of trace_ids to a list of spans where the first span is the + # first opened span for the trace + self.traces = collections.defaultdict(list) + # counter to keep track of the number of spans and ended spans for a + # trace_id + self.traces_spans_count = collections.Counter() + self.traces_spans_ended_count = collections.Counter() + + self.worker_thread = threading.Thread(target=self.worker, daemon=True) + + # threading conditions used for flushing and shutdown + self.condition = threading.Condition(threading.Lock()) + self.flush_condition = threading.Condition(threading.Lock()) + + # flag to indicate that there is a flush operation on progress + self._flushing = False + + self.max_trace_size = max_trace_size + self._spans_dropped = False + self.schedule_delay_millis = schedule_delay_millis + self.done = False + self.worker_thread.start() + + def on_start(self, span: Span) -> None: + ctx = span.get_context() + trace_id = ctx.trace_id + + with self.traces_lock: + # check upper bound on number of spans for trace before adding new + # span + if self.traces_spans_count[trace_id] == self.max_trace_size: + logger.warning("Max spans for trace, spans will be dropped.") + self._spans_dropped = True + return + + # add span to end of list for a trace and update the counter + self.traces[trace_id].append(span) + self.traces_spans_count[trace_id] += 1 + + def on_end(self, span: Span) -> None: + if self.done: + logger.warning("Already shutdown, dropping span.") + return + + ctx = span.get_context() + trace_id = ctx.trace_id + + with self.traces_lock: + self.traces_spans_ended_count[trace_id] += 1 + if self.is_trace_exportable(trace_id): + self.check_traces_queue.appendleft(trace_id) + + def worker(self): + timeout = self.schedule_delay_millis / 1e3 + while not self.done: + if not self._flushing: + with self.condition: + self.condition.wait(timeout) + if not self.check_traces_queue: + # spurious notification, let's wait again + continue + if self.done: + # missing spans will be sent when calling flush + break + + # substract the duration of this export call to the next timeout + start = time_ns() + self.export() + end = time_ns() + duration = (end - start) / 1e9 + timeout = self.schedule_delay_millis / 1e3 - duration + + # be sure that all spans are sent + self._drain_queue() + + def is_trace_exportable(self, trace_id): + return ( + self.traces_spans_count[trace_id] + - self.traces_spans_ended_count[trace_id] + <= 0 + ) + + def export(self) -> None: + """Exports traces with finished spans.""" + notify_flush = False + export_trace_ids = [] + + while self.check_traces_queue: + trace_id = self.check_traces_queue.pop() + if trace_id is self._FLUSH_TOKEN: + notify_flush = True + else: + with self.traces_lock: + # check whether trace is exportable again in case that new + # spans were started since we last concluded trace was + # exportable + if self.is_trace_exportable(trace_id): + export_trace_ids.append(trace_id) + del self.traces_spans_count[trace_id] + del self.traces_spans_ended_count[trace_id] + + if len(export_trace_ids) > 0: + token = attach(set_value("suppress_instrumentation", True)) + + for trace_id in export_trace_ids: + with self.traces_lock: + try: + # Ignore type b/c the Optional[None]+slicing is too "clever" + # for mypy + self.span_exporter.export(self.traces[trace_id]) # type: ignore + # pylint: disable=broad-except + except Exception: + logger.exception( + "Exception while exporting Span batch." + ) + finally: + del self.traces[trace_id] + + detach(token) + + if notify_flush: + with self.flush_condition: + self.flush_condition.notify() + + def _drain_queue(self): + """"Export all elements until queue is empty. + + Can only be called from the worker thread context because it invokes + `export` that is not thread safe. + """ + while self.check_traces_queue: + self.export() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + if self.done: + logger.warning("Already shutdown, ignoring call to force_flush().") + return True + + self._flushing = True + self.check_traces_queue.appendleft(self._FLUSH_TOKEN) + + # wake up worker thread + with self.condition: + self.condition.notify_all() + + # wait for token to be processed + with self.flush_condition: + ret = self.flush_condition.wait(timeout_millis / 1e3) + + self._flushing = False + + if not ret: + logger.warning("Timeout was exceeded in force_flush().") + return ret + + def shutdown(self) -> None: + # signal the worker thread to finish and then wait for it + self.done = True + with self.condition: + self.condition.notify_all() + self.worker_thread.join() + self.span_exporter.shutdown() diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/version.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/version.py new file mode 100644 index 000000000..780a92b6a --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.12.dev0" diff --git a/exporter/opentelemetry-exporter-datadog/tests/__init__.py b/exporter/opentelemetry-exporter-datadog/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/exporter/opentelemetry-exporter-datadog/tests/test_datadog_exporter.py b/exporter/opentelemetry-exporter-datadog/tests/test_datadog_exporter.py new file mode 100644 index 000000000..a3e67790d --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/tests/test_datadog_exporter.py @@ -0,0 +1,523 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import logging +import time +import unittest +from unittest import mock + +from ddtrace.internal.writer import AgentWriter + +from opentelemetry import trace as trace_api +from opentelemetry.exporter import datadog +from opentelemetry.sdk import trace +from opentelemetry.sdk.util.instrumentation import InstrumentationInfo + + +class MockDatadogSpanExporter(datadog.DatadogSpanExporter): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + agent_writer_mock = mock.Mock(spec=AgentWriter) + agent_writer_mock.started = True + agent_writer_mock.exit_timeout = 1 + self._agent_writer = agent_writer_mock + + +def get_spans(tracer, exporter, shutdown=True): + if shutdown: + tracer.source.shutdown() + + spans = [ + call_args[-1]["spans"] + for call_args in exporter.agent_writer.write.call_args_list + ] + + return [span.to_dict() for span in itertools.chain.from_iterable(spans)] + + +class TestDatadogSpanExporter(unittest.TestCase): + def setUp(self): + self.exporter = MockDatadogSpanExporter() + self.span_processor = datadog.DatadogExportSpanProcessor(self.exporter) + tracer_provider = trace.TracerProvider() + tracer_provider.add_span_processor(self.span_processor) + self.tracer_provider = tracer_provider + self.tracer = tracer_provider.get_tracer(__name__) + + def tearDown(self): + self.tracer_provider.shutdown() + + def test_constructor_default(self): + """Test the default values assigned by constructor.""" + exporter = datadog.DatadogSpanExporter() + + self.assertEqual(exporter.agent_url, "http://localhost:8126") + self.assertIsNone(exporter.service) + self.assertIsNotNone(exporter.agent_writer) + + def test_constructor_explicit(self): + """Test the constructor passing all the options.""" + agent_url = "http://localhost:8126" + exporter = datadog.DatadogSpanExporter( + agent_url=agent_url, service="explicit", + ) + + self.assertEqual(exporter.agent_url, agent_url) + self.assertEqual(exporter.service, "explicit") + self.assertIsNone(exporter.env) + self.assertIsNone(exporter.version) + self.assertEqual(exporter.tags, {}) + + exporter = datadog.DatadogSpanExporter( + agent_url=agent_url, + service="explicit", + env="test", + version="0.0.1", + tags="", + ) + + self.assertEqual(exporter.agent_url, agent_url) + self.assertEqual(exporter.service, "explicit") + self.assertEqual(exporter.env, "test") + self.assertEqual(exporter.version, "0.0.1") + self.assertEqual(exporter.tags, {}) + + exporter = datadog.DatadogSpanExporter( + agent_url=agent_url, + service="explicit", + env="test", + version="0.0.1", + tags="team:testers,layer:app", + ) + + self.assertEqual(exporter.agent_url, agent_url) + self.assertEqual(exporter.service, "explicit") + self.assertEqual(exporter.env, "test") + self.assertEqual(exporter.version, "0.0.1") + self.assertEqual(exporter.tags, {"team": "testers", "layer": "app"}) + + @mock.patch.dict( + "os.environ", + { + "DD_TRACE_AGENT_URL": "http://agent:8126", + "DD_SERVICE": "test-service", + "DD_ENV": "test", + "DD_VERSION": "0.0.1", + "DD_TAGS": "team:testers", + }, + ) + def test_constructor_environ(self): + exporter = datadog.DatadogSpanExporter() + + self.assertEqual(exporter.agent_url, "http://agent:8126") + self.assertEqual(exporter.service, "test-service") + self.assertEqual(exporter.env, "test") + self.assertEqual(exporter.version, "0.0.1") + self.assertEqual(exporter.tags, {"team": "testers"}) + self.assertIsNotNone(exporter.agent_writer) + + # pylint: disable=too-many-locals + @mock.patch.dict( + "os.environ", + { + "DD_SERVICE": "test-service", + "DD_ENV": "test", + "DD_VERSION": "0.0.1", + "DD_TAGS": "team:testers", + }, + ) + def test_translate_to_datadog(self): + # pylint: disable=invalid-name + self.maxDiff = None + + span_names = ("test1", "test2", "test3") + trace_id = 0x6E0C63257DE34C926F9EFCD03927272E + trace_id_low = 0x6F9EFCD03927272E + span_id = 0x34BF92DEEFC58C92 + parent_id = 0x1111111111111111 + other_id = 0x2222222222222222 + + base_time = 683647322 * 10 ** 9 # in ns + start_times = ( + base_time, + base_time + 150 * 10 ** 6, + base_time + 300 * 10 ** 6, + ) + durations = (50 * 10 ** 6, 100 * 10 ** 6, 200 * 10 ** 6) + end_times = ( + start_times[0] + durations[0], + start_times[1] + durations[1], + start_times[2] + durations[2], + ) + + span_context = trace_api.SpanContext( + trace_id, span_id, is_remote=False + ) + parent_context = trace_api.SpanContext( + trace_id, parent_id, is_remote=False + ) + other_context = trace_api.SpanContext( + trace_id, other_id, is_remote=False + ) + + instrumentation_info = InstrumentationInfo(__name__, "0") + + otel_spans = [ + trace.Span( + name=span_names[0], + context=span_context, + parent=parent_context, + kind=trace_api.SpanKind.CLIENT, + instrumentation_info=instrumentation_info, + ), + trace.Span( + name=span_names[1], + context=parent_context, + parent=None, + instrumentation_info=instrumentation_info, + ), + trace.Span( + name=span_names[2], context=other_context, parent=None, + ), + ] + + otel_spans[0].start(start_time=start_times[0]) + otel_spans[0].end(end_time=end_times[0]) + + otel_spans[1].start(start_time=start_times[1]) + otel_spans[1].end(end_time=end_times[1]) + + otel_spans[2].start(start_time=start_times[2]) + otel_spans[2].end(end_time=end_times[2]) + + # pylint: disable=protected-access + exporter = datadog.DatadogSpanExporter() + datadog_spans = [ + span.to_dict() + for span in exporter._translate_to_datadog(otel_spans) + ] + + expected_spans = [ + dict( + trace_id=trace_id_low, + parent_id=parent_id, + span_id=span_id, + name="tests.test_datadog_exporter.CLIENT", + resource=span_names[0], + start=start_times[0], + duration=durations[0], + error=0, + service="test-service", + meta={"env": "test", "team": "testers"}, + ), + dict( + trace_id=trace_id_low, + parent_id=0, + span_id=parent_id, + name="tests.test_datadog_exporter.INTERNAL", + resource=span_names[1], + start=start_times[1], + duration=durations[1], + error=0, + service="test-service", + meta={"env": "test", "team": "testers", "version": "0.0.1"}, + ), + dict( + trace_id=trace_id_low, + parent_id=0, + span_id=other_id, + name=span_names[2], + resource=span_names[2], + start=start_times[2], + duration=durations[2], + error=0, + service="test-service", + meta={"env": "test", "team": "testers", "version": "0.0.1"}, + ), + ] + + self.assertEqual(datadog_spans, expected_spans) + + def test_export(self): + """Test that agent and/or collector are invoked""" + # create and save span to be used in tests + context = trace_api.SpanContext( + trace_id=0x000000000000000000000000DEADBEEF, + span_id=0x00000000DEADBEF0, + is_remote=False, + ) + + test_span = trace.Span("test_span", context=context) + test_span.start() + test_span.end() + + self.exporter.export((test_span,)) + + self.assertEqual(self.exporter.agent_writer.write.call_count, 1) + + def test_resources(self): + test_attributes = [ + {}, + {"http.method": "GET", "http.route": "/foo/"}, + {"http.method": "GET", "http.target": "/foo/200"}, + ] + + for index, test in enumerate(test_attributes): + with self.tracer.start_span(str(index), attributes=test): + pass + + datadog_spans = get_spans(self.tracer, self.exporter) + + self.assertEqual(len(datadog_spans), 3) + + actual = [span["resource"] for span in datadog_spans] + expected = ["0", "GET /foo/", "GET"] + + self.assertEqual(actual, expected) + + def test_span_types(self): + test_instrumentations = [ + "opentelemetry.ext.aiohttp-client", + "opentelemetry.ext.dbapi", + "opentelemetry.ext.django", + "opentelemetry.ext.flask", + "opentelemetry.ext.grpc", + "opentelemetry.ext.jinja2", + "opentelemetry.ext.mysql", + "opentelemetry.ext.psycopg2", + "opentelemetry.ext.pymongo", + "opentelemetry.ext.pymysql", + "opentelemetry.ext.redis", + "opentelemetry.ext.requests", + "opentelemetry.ext.sqlalchemy", + "opentelemetry.ext.wsgi", + ] + + for index, instrumentation in enumerate(test_instrumentations): + # change tracer's instrumentation info before starting span + self.tracer.instrumentation_info = InstrumentationInfo( + instrumentation, "0" + ) + with self.tracer.start_span(str(index)): + pass + + datadog_spans = get_spans(self.tracer, self.exporter) + + self.assertEqual(len(datadog_spans), 14) + + actual = [span.get("type") for span in datadog_spans] + expected = [ + "http", + "sql", + "web", + "web", + "grpc", + "template", + "sql", + "sql", + "mongodb", + "sql", + "redis", + "http", + "sql", + "web", + ] + self.assertEqual(actual, expected) + + def test_errors(self): + with self.assertRaises(ValueError): + with self.tracer.start_span("foo"): + raise ValueError("bar") + + datadog_spans = get_spans(self.tracer, self.exporter) + + self.assertEqual(len(datadog_spans), 1) + + span = datadog_spans[0] + self.assertEqual(span["error"], 1) + self.assertEqual(span["meta"]["error.msg"], "bar") + self.assertEqual(span["meta"]["error.type"], "ValueError") + + def test_shutdown(self): + span_names = ["xxx", "bar", "foo"] + + for name in span_names: + with self.tracer.start_span(name): + pass + + self.span_processor.shutdown() + + # check that spans are exported without an explicitly call to + # force_flush() + datadog_spans = get_spans(self.tracer, self.exporter) + actual = [span.get("resource") for span in datadog_spans] + self.assertListEqual(span_names, actual) + + def test_flush(self): + span_names0 = ["xxx", "bar", "foo"] + span_names1 = ["yyy", "baz", "fox"] + + for name in span_names0: + with self.tracer.start_span(name): + pass + + self.assertTrue(self.span_processor.force_flush()) + datadog_spans = get_spans(self.tracer, self.exporter, shutdown=False) + actual0 = [span.get("resource") for span in datadog_spans] + self.assertListEqual(span_names0, actual0) + + # create some more spans to check that span processor still works + for name in span_names1: + with self.tracer.start_span(name): + pass + + self.assertTrue(self.span_processor.force_flush()) + datadog_spans = get_spans(self.tracer, self.exporter) + actual1 = [span.get("resource") for span in datadog_spans] + self.assertListEqual(span_names0 + span_names1, actual1) + + def test_span_processor_lossless(self): + """Test that no spans are lost when sending max_trace_size spans""" + span_processor = datadog.DatadogExportSpanProcessor( + self.exporter, max_trace_size=128 + ) + tracer_provider = trace.TracerProvider() + tracer_provider.add_span_processor(span_processor) + tracer = tracer_provider.get_tracer(__name__) + + with tracer.start_as_current_span("root"): + for _ in range(127): + with tracer.start_span("foo"): + pass + + self.assertTrue(span_processor.force_flush()) + datadog_spans = get_spans(tracer, self.exporter) + self.assertEqual(len(datadog_spans), 128) + tracer_provider.shutdown() + + def test_span_processor_dropped_spans(self): + """Test that spans are lost when exceeding max_trace_size spans""" + span_processor = datadog.DatadogExportSpanProcessor( + self.exporter, max_trace_size=128 + ) + tracer_provider = trace.TracerProvider() + tracer_provider.add_span_processor(span_processor) + tracer = tracer_provider.get_tracer(__name__) + + with tracer.start_as_current_span("root"): + for _ in range(127): + with tracer.start_span("foo"): + pass + with self.assertLogs(level=logging.WARNING): + with tracer.start_span("one-too-many"): + pass + + self.assertTrue(span_processor.force_flush()) + datadog_spans = get_spans(tracer, self.exporter) + self.assertEqual(len(datadog_spans), 128) + tracer_provider.shutdown() + + def test_span_processor_scheduled_delay(self): + """Test that spans are exported each schedule_delay_millis""" + delay = 300 + span_processor = datadog.DatadogExportSpanProcessor( + self.exporter, schedule_delay_millis=delay + ) + tracer_provider = trace.TracerProvider() + tracer_provider.add_span_processor(span_processor) + tracer = tracer_provider.get_tracer(__name__) + + with tracer.start_span("foo"): + pass + + time.sleep(delay / (1e3 * 2)) + datadog_spans = get_spans(tracer, self.exporter, shutdown=False) + self.assertEqual(len(datadog_spans), 0) + + time.sleep(delay / (1e3 * 2) + 0.01) + datadog_spans = get_spans(tracer, self.exporter, shutdown=False) + self.assertEqual(len(datadog_spans), 1) + + tracer_provider.shutdown() + + def test_origin(self): + context = trace_api.SpanContext( + trace_id=0x000000000000000000000000DEADBEEF, + span_id=trace_api.INVALID_SPAN, + is_remote=True, + trace_state=trace_api.TraceState( + {datadog.constants.DD_ORIGIN: "origin-service"} + ), + ) + + root_span = trace.Span(name="root", context=context, parent=None) + child_span = trace.Span( + name="child", context=context, parent=root_span + ) + root_span.start() + child_span.start() + child_span.end() + root_span.end() + + # pylint: disable=protected-access + exporter = datadog.DatadogSpanExporter() + datadog_spans = [ + span.to_dict() + for span in exporter._translate_to_datadog([root_span, child_span]) + ] + + self.assertEqual(len(datadog_spans), 2) + + actual = [ + span["meta"].get(datadog.constants.DD_ORIGIN) + if "meta" in span + else None + for span in datadog_spans + ] + expected = ["origin-service", None] + self.assertListEqual(actual, expected) + + def test_sampling_rate(self): + context = trace_api.SpanContext( + trace_id=0x000000000000000000000000DEADBEEF, + span_id=0x34BF92DEEFC58C92, + is_remote=False, + trace_flags=trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED), + ) + sampler = trace_api.sampling.ProbabilitySampler(0.5) + + span = trace.Span( + name="sampled", context=context, parent=None, sampler=sampler + ) + span.start() + span.end() + + # pylint: disable=protected-access + exporter = datadog.DatadogSpanExporter() + datadog_spans = [ + span.to_dict() for span in exporter._translate_to_datadog([span]) + ] + + self.assertEqual(len(datadog_spans), 1) + + actual = [ + span["metrics"].get(datadog.constants.SAMPLE_RATE_METRIC_KEY) + if "metrics" in span + else None + for span in datadog_spans + ] + expected = [0.5] + self.assertListEqual(actual, expected) diff --git a/exporter/opentelemetry-exporter-datadog/tests/test_datadog_format.py b/exporter/opentelemetry-exporter-datadog/tests/test_datadog_format.py new file mode 100644 index 000000000..1a398745b --- /dev/null +++ b/exporter/opentelemetry-exporter-datadog/tests/test_datadog_format.py @@ -0,0 +1,170 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from opentelemetry import trace as trace_api +from opentelemetry.exporter.datadog import constants, propagator +from opentelemetry.sdk import trace +from opentelemetry.trace import get_current_span, set_span_in_context + +FORMAT = propagator.DatadogFormat() + + +def get_as_list(dict_object, key): + value = dict_object.get(key) + return [value] if value is not None else [] + + +class TestDatadogFormat(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.serialized_trace_id = propagator.format_trace_id( + trace.generate_trace_id() + ) + cls.serialized_parent_id = propagator.format_span_id( + trace.generate_span_id() + ) + cls.serialized_origin = "origin-service" + + def test_malformed_headers(self): + """Test with no Datadog headers""" + malformed_trace_id_key = FORMAT.TRACE_ID_KEY + "-x" + malformed_parent_id_key = FORMAT.PARENT_ID_KEY + "-x" + context = get_current_span( + FORMAT.extract( + get_as_list, + { + malformed_trace_id_key: self.serialized_trace_id, + malformed_parent_id_key: self.serialized_parent_id, + }, + ) + ).get_context() + + self.assertNotEqual(context.trace_id, int(self.serialized_trace_id)) + self.assertNotEqual(context.span_id, int(self.serialized_parent_id)) + self.assertFalse(context.is_remote) + + def test_missing_trace_id(self): + """If a trace id is missing, populate an invalid trace id.""" + carrier = { + FORMAT.PARENT_ID_KEY: self.serialized_parent_id, + } + + ctx = FORMAT.extract(get_as_list, carrier) + span_context = get_current_span(ctx).get_context() + self.assertEqual(span_context.trace_id, trace_api.INVALID_TRACE_ID) + + def test_missing_parent_id(self): + """If a parent id is missing, populate an invalid trace id.""" + carrier = { + FORMAT.TRACE_ID_KEY: self.serialized_trace_id, + } + + ctx = FORMAT.extract(get_as_list, carrier) + span_context = get_current_span(ctx).get_context() + self.assertEqual(span_context.span_id, trace_api.INVALID_SPAN_ID) + + def test_context_propagation(self): + """Test the propagation of Datadog headers.""" + parent_context = get_current_span( + FORMAT.extract( + get_as_list, + { + FORMAT.TRACE_ID_KEY: self.serialized_trace_id, + FORMAT.PARENT_ID_KEY: self.serialized_parent_id, + FORMAT.SAMPLING_PRIORITY_KEY: str(constants.AUTO_KEEP), + FORMAT.ORIGIN_KEY: self.serialized_origin, + }, + ) + ).get_context() + + self.assertEqual( + parent_context.trace_id, int(self.serialized_trace_id) + ) + self.assertEqual( + parent_context.span_id, int(self.serialized_parent_id) + ) + self.assertEqual(parent_context.trace_flags, constants.AUTO_KEEP) + self.assertEqual( + parent_context.trace_state.get(constants.DD_ORIGIN), + self.serialized_origin, + ) + self.assertTrue(parent_context.is_remote) + + child = trace.Span( + "child", + trace_api.SpanContext( + parent_context.trace_id, + trace.generate_span_id(), + is_remote=False, + trace_flags=parent_context.trace_flags, + trace_state=parent_context.trace_state, + ), + parent=parent_context, + ) + + child_carrier = {} + child_context = set_span_in_context(child) + FORMAT.inject(dict.__setitem__, child_carrier, context=child_context) + + self.assertEqual( + child_carrier[FORMAT.TRACE_ID_KEY], self.serialized_trace_id + ) + self.assertEqual( + child_carrier[FORMAT.PARENT_ID_KEY], str(child.context.span_id) + ) + self.assertEqual( + child_carrier[FORMAT.SAMPLING_PRIORITY_KEY], + str(constants.AUTO_KEEP), + ) + self.assertEqual( + child_carrier.get(FORMAT.ORIGIN_KEY), self.serialized_origin + ) + + def test_sampling_priority_auto_reject(self): + """Test sampling priority rejected.""" + parent_context = get_current_span( + FORMAT.extract( + get_as_list, + { + FORMAT.TRACE_ID_KEY: self.serialized_trace_id, + FORMAT.PARENT_ID_KEY: self.serialized_parent_id, + FORMAT.SAMPLING_PRIORITY_KEY: str(constants.AUTO_REJECT), + }, + ) + ).get_context() + + self.assertEqual(parent_context.trace_flags, constants.AUTO_REJECT) + + child = trace.Span( + "child", + trace_api.SpanContext( + parent_context.trace_id, + trace.generate_span_id(), + is_remote=False, + trace_flags=parent_context.trace_flags, + trace_state=parent_context.trace_state, + ), + parent=parent_context, + ) + + child_carrier = {} + child_context = set_span_in_context(child) + FORMAT.inject(dict.__setitem__, child_carrier, context=child_context) + + self.assertEqual( + child_carrier[FORMAT.SAMPLING_PRIORITY_KEY], + str(constants.AUTO_REJECT), + )