diff --git a/.pylintrc b/.pylintrc index 42443e5b5..30d60bc2d 100644 --- a/.pylintrc +++ b/.pylintrc @@ -12,7 +12,7 @@ ignore=CVS,gen,Dockerfile,docker-compose.yml,README.md,requirements.txt,docs # Add files or directories matching the regex patterns to be excluded. The # regex matches against base names, not paths. ignore-patterns= -ignore-paths=exporter/opentelemetry-exporter-datadog/.*$ +ignore-paths= # Python code to execute, usually for sys.path manipulation such as # pygtk.require(). diff --git a/docs/exporter/datadog/datadog.rst b/docs/exporter/datadog/datadog.rst deleted file mode 100644 index 3b43c2bdf..000000000 --- a/docs/exporter/datadog/datadog.rst +++ /dev/null @@ -1,7 +0,0 @@ -OpenTelemetry Datadog Exporter -============================== - -.. automodule:: opentelemetry.exporter.datadog - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/index.rst b/docs/index.rst index 44bff86ce..44fbfc118 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -53,14 +53,6 @@ install pip install -e ./sdk-extension/opentelemetry-sdk-extension-aws -.. toctree:: - :maxdepth: 2 - :caption: OpenTelemetry Exporters - :name: exporters - :glob: - - exporter/** - .. toctree:: :maxdepth: 2 :caption: OpenTelemetry Instrumentations diff --git a/docs/nitpick-exceptions.ini b/docs/nitpick-exceptions.ini index 6e43ee8c6..fa8de3251 100644 --- a/docs/nitpick-exceptions.ini +++ b/docs/nitpick-exceptions.ini @@ -9,7 +9,6 @@ class_references= opentelemetry.propagators.textmap.DefaultGetter ; API opentelemetry.propagators.textmap.Getter - ; - DatadogFormat ; - AWSXRayPropagator opentelemetry.sdk.trace.id_generator.IdGenerator ; - AwsXRayIdGenerator @@ -38,7 +37,6 @@ anys= ; - AwsXRayIdGenerator ; SDK SpanProcessor - ; - DatadogExportSpanProcessor TracerProvider ; - AwsXRayIdGenerator ; Instrumentation diff --git a/eachdist.ini b/eachdist.ini index bf3f91fb3..4a7eec76d 100644 --- a/eachdist.ini +++ b/eachdist.ini @@ -46,7 +46,6 @@ packages= [exclude_release] packages= - opentelemetry-exporter-datadog opentelemetry-sdk-extension-aws opentelemetry-propagator-aws-xray diff --git a/exporter/opentelemetry-exporter-datadog/README.rst b/exporter/opentelemetry-exporter-datadog/README.rst deleted file mode 100644 index 08dc5e800..000000000 --- a/exporter/opentelemetry-exporter-datadog/README.rst +++ /dev/null @@ -1,32 +0,0 @@ -OpenTelemetry Datadog Span Exporter -=================================== - -|pypi| - -.. |pypi| image:: https://badge.fury.io/py/opentelemetry-exporter-datadog.svg - :target: https://pypi.org/project/opentelemetry-exporter-datadog/ - -This library allows to export tracing data to `Datadog -`_. OpenTelemetry span event and links are not -supported. - -.. warning:: This exporter has been deprecated. To export your OTLP traces from OpenTelemetry SDK directly to Datadog Agent, please refer to `OTLP Ingest in Datadog Agent `_ . - - -Installation ------------- - -:: - - pip install opentelemetry-exporter-datadog - - -.. _Datadog: https://www.datadoghq.com/ -.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/ - - -References ----------- - -* `Datadog `_ -* `OpenTelemetry Project `_ diff --git a/exporter/opentelemetry-exporter-datadog/pyproject.toml b/exporter/opentelemetry-exporter-datadog/pyproject.toml deleted file mode 100644 index a926b6f70..000000000 --- a/exporter/opentelemetry-exporter-datadog/pyproject.toml +++ /dev/null @@ -1,49 +0,0 @@ -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[project] -name = "opentelemetry-exporter-datadog" -dynamic = ["version"] -description = "Datadog Span Exporter for OpenTelemetry" -readme = "README.rst" -license = "Apache-2.0" -requires-python = ">=3.7" -authors = [ - { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, -] -classifiers = [ - "Development Status :: 4 - Beta", - "Intended Audience :: Developers", - "License :: OSI Approved :: Apache Software License", - "Programming Language :: Python", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", -] -dependencies = [ - "ddtrace>=0.34.0,<0.47.0", - "opentelemetry-api ~= 1.12", - "opentelemetry-sdk ~= 1.12", - "opentelemetry-semantic-conventions == 0.30b0", -] - -[project.optional-dependencies] -test = [] - -[project.urls] -Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/exporter/opentelemetry-exporter-datadog" - -[tool.hatch.version] -path = "src/opentelemetry/exporter/datadog/version.py" - -[tool.hatch.build.targets.sdist] -include = [ - "/src", - "/tests", -] - -[tool.hatch.build.targets.wheel] -packages = ["src/opentelemetry"] diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/__init__.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/__init__.py deleted file mode 100644 index 57953d57e..000000000 --- a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/__init__.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -The **OpenTelemetry Datadog Exporter** provides a span exporter from -`OpenTelemetry`_ traces to `Datadog`_ by using the Datadog Agent. - -Installation ------------- - -:: - - pip install opentelemetry-exporter-datadog - - -Usage ------ - -The Datadog exporter provides a span processor that must be added along with the -exporter. In addition, a formatter is provided to handle propagation of trace -context between OpenTelemetry-instrumented and Datadog-instrumented services in -a distributed trace. - -.. code:: python - - from opentelemetry.propagate import set_global_textmap - from opentelemetry import trace - from opentelemetry.exporter.datadog import DatadogExportSpanProcessor, DatadogSpanExporter - from opentelemetry.exporter.datadog.propagator import DatadogFormat - from opentelemetry.sdk.trace import TracerProvider - - trace.set_tracer_provider(TracerProvider()) - tracer = trace.get_tracer(__name__) - - exporter = DatadogSpanExporter( - agent_url="http://agent:8126", service="my-helloworld-service" - ) - - span_processor = DatadogExportSpanProcessor(exporter) - trace.get_tracer_provider().add_span_processor(span_processor) - - # Optional: use Datadog format for propagation in distributed traces - set_global_textmap(DatadogFormat()) - - with tracer.start_as_current_span("foo"): - print("Hello world!") - - -Examples --------- - -The `docs/examples/datadog_exporter`_ includes examples for using the Datadog -exporter with OpenTelemetry instrumented applications. - -API ---- -.. _Datadog: https://www.datadoghq.com/ -.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/ -.. _docs/examples/datadog_exporter: https://github.com/open-telemetry/opentelemetry-python/tree/main/docs/examples/datadog_exporter -""" -# pylint: disable=import-error - -from .exporter import DatadogSpanExporter -from .spanprocessor import DatadogExportSpanProcessor - -__all__ = ["DatadogExportSpanProcessor", "DatadogSpanExporter"] diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/constants.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/constants.py deleted file mode 100644 index 6f86c12cc..000000000 --- a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/constants.py +++ /dev/null @@ -1,16 +0,0 @@ -DD_ORIGIN = "dd_origin" -AUTO_REJECT = 0 -AUTO_KEEP = 1 -USER_KEEP = 2 -SAMPLE_RATE_METRIC_KEY = "_sample_rate" -SAMPLING_PRIORITY_KEY = "_sampling_priority_v1" -ENV_KEY = "env" -VERSION_KEY = "version" -SERVICE_NAME_TAG = "service.name" -EVENT_NAME_EXCEPTION = "exception" -EXCEPTION_TYPE_ATTR_KEY = "exception.type" -EXCEPTION_MSG_ATTR_KEY = "exception.message" -EXCEPTION_STACK_ATTR_KEY = "exception.stacktrace" -DD_ERROR_TYPE_TAG_KEY = "error.type" -DD_ERROR_MSG_TAG_KEY = "error.msg" -DD_ERROR_STACK_TAG_KEY = "error.stack" diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/exporter.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/exporter.py deleted file mode 100644 index 2913885cb..000000000 --- a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/exporter.py +++ /dev/null @@ -1,346 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os -from urllib.parse import urlparse - -from ddtrace.ext import SpanTypes as DatadogSpanTypes -from ddtrace.internal.writer import AgentWriter -from ddtrace.span import Span as DatadogSpan - -import opentelemetry.trace as trace_api -from opentelemetry.exporter.datadog.constants import ( - DD_ERROR_MSG_TAG_KEY, - DD_ERROR_STACK_TAG_KEY, - DD_ERROR_TYPE_TAG_KEY, - DD_ORIGIN, - ENV_KEY, - EVENT_NAME_EXCEPTION, - EXCEPTION_MSG_ATTR_KEY, - EXCEPTION_STACK_ATTR_KEY, - EXCEPTION_TYPE_ATTR_KEY, - SAMPLE_RATE_METRIC_KEY, - SERVICE_NAME_TAG, - VERSION_KEY, -) -from opentelemetry.sdk.trace import sampling -from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult -from opentelemetry.semconv.trace import SpanAttributes - -logger = logging.getLogger(__name__) - - -DEFAULT_AGENT_URL = "http://localhost:8126" -_INSTRUMENTATION_SPAN_TYPES = { - "opentelemetry.instrumentation.aiohttp-client": DatadogSpanTypes.HTTP, - "opentelemetry.instrumentation.asgi": DatadogSpanTypes.WEB, - "opentelemetry.instrumentation.dbapi": DatadogSpanTypes.SQL, - "opentelemetry.instrumentation.django": DatadogSpanTypes.WEB, - "opentelemetry.instrumentation.flask": DatadogSpanTypes.WEB, - "opentelemetry.instrumentation.grpc": DatadogSpanTypes.GRPC, - "opentelemetry.instrumentation.jinja2": DatadogSpanTypes.TEMPLATE, - "opentelemetry.instrumentation.mysql": DatadogSpanTypes.SQL, - "opentelemetry.instrumentation.psycopg2": DatadogSpanTypes.SQL, - "opentelemetry.instrumentation.pymemcache": DatadogSpanTypes.CACHE, - "opentelemetry.instrumentation.pymongo": DatadogSpanTypes.MONGODB, - "opentelemetry.instrumentation.pymysql": DatadogSpanTypes.SQL, - "opentelemetry.instrumentation.redis": DatadogSpanTypes.REDIS, - "opentelemetry.instrumentation.requests": DatadogSpanTypes.HTTP, - "opentelemetry.instrumentation.sqlalchemy": DatadogSpanTypes.SQL, - "opentelemetry.instrumentation.wsgi": DatadogSpanTypes.WEB, -} - - -class DatadogSpanExporter(SpanExporter): - """Datadog span exporter for OpenTelemetry. - - Args: - agent_url: The url of the Datadog Agent or use ``DD_TRACE_AGENT_URL`` environment variable - service: The service name to be used for the application or use ``DD_SERVICE`` environment variable - env: Set the application’s environment or use ``DD_ENV`` environment variable - version: Set the application’s version or use ``DD_VERSION`` environment variable - tags: A list (formatted as a comma-separated string) of default tags to be added to every span or use ``DD_TAGS`` environment variable - """ - - def __init__( - self, agent_url=None, service=None, env=None, version=None, tags=None - ): - self.agent_url = ( - agent_url - if agent_url - else os.environ.get("DD_TRACE_AGENT_URL", DEFAULT_AGENT_URL) - ) - self.service = service or os.environ.get("DD_SERVICE") - self.env = env or os.environ.get("DD_ENV") - self.version = version or os.environ.get("DD_VERSION") - self.tags = _parse_tags_str(tags or os.environ.get("DD_TAGS")) - self._agent_writer = None - - @property - def agent_writer(self): - if self._agent_writer is None: - url_parsed = urlparse(self.agent_url) - if url_parsed.scheme in ("http", "https"): - self._agent_writer = AgentWriter( - hostname=url_parsed.hostname, - port=url_parsed.port, - https=url_parsed.scheme == "https", - ) - elif url_parsed.scheme == "unix": - self._agent_writer = AgentWriter(uds_path=url_parsed.path) - else: - raise ValueError( - f"Unknown scheme `{url_parsed.scheme}` for agent URL" - ) - return self._agent_writer - - def export(self, spans): - datadog_spans = self._translate_to_datadog(spans) - - self.agent_writer.write(spans=datadog_spans) - - return SpanExportResult.SUCCESS - - def shutdown(self): - if self.agent_writer.started: - self.agent_writer.stop() - self.agent_writer.join(self.agent_writer.exit_timeout) - - # pylint: disable=too-many-locals - def _translate_to_datadog(self, spans): - datadog_spans = [] - - for span in spans: - trace_id, parent_id, span_id = _get_trace_ids(span) - - # datadog Span is initialized with a reference to the tracer which is - # used to record the span when it is finished. We can skip ignore this - # because we are not calling the finish method and explicitly set the - # duration. - tracer = None - - # extract resource attributes to be used as tags as well as potential service name - [ - resource_tags, - resource_service_name, - ] = _extract_tags_from_resource(span.resource, self.service) - - datadog_span = DatadogSpan( - tracer, - _get_span_name(span), - service=resource_service_name, - resource=_get_resource(span), - span_type=_get_span_type(span), - trace_id=trace_id, - span_id=span_id, - parent_id=parent_id, - ) - datadog_span.start_ns = span.start_time - datadog_span.duration_ns = span.end_time - span.start_time - - if not span.status.is_ok: - datadog_span.error = 1 - # loop over events and look for exception events, extract info. - # https://github.com/open-telemetry/opentelemetry-python/blob/71e3a7a192c0fc8a7503fac967ada36a74b79e58/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py#L810-L819 - if span.events: - _extract_tags_from_exception_events( - span.events, datadog_span - ) - - # combine resource attributes and span attributes, don't modify existing span attributes - combined_span_tags = {} - combined_span_tags.update(resource_tags) - combined_span_tags.update(span.attributes) - - datadog_span.set_tags(combined_span_tags) - - # add configured env tag - if self.env is not None: - datadog_span.set_tag(ENV_KEY, self.env) - - # add configured application version tag to only root span - if self.version is not None and parent_id == 0: - datadog_span.set_tag(VERSION_KEY, self.version) - - # add configured global tags - datadog_span.set_tags(self.tags) - - # add origin to root span - origin = _get_origin(span) - if origin and parent_id == 0: - datadog_span.set_tag(DD_ORIGIN, origin) - - sampling_rate = _get_sampling_rate(span) - if sampling_rate is not None: - datadog_span.set_metric(SAMPLE_RATE_METRIC_KEY, sampling_rate) - - # span events and span links are not supported except for extracting exception event context - - datadog_spans.append(datadog_span) - - return datadog_spans - - -def _get_trace_ids(span): - """Extract tracer ids from span""" - ctx = span.get_span_context() - trace_id = ctx.trace_id - span_id = ctx.span_id - - if isinstance(span.parent, trace_api.Span): - parent_id = span.parent.get_span_context().span_id - elif isinstance(span.parent, trace_api.SpanContext): - parent_id = span.parent.span_id - else: - parent_id = 0 - - trace_id = _convert_trace_id_uint64(trace_id) - - return trace_id, parent_id, span_id - - -def _convert_trace_id_uint64(otel_id): - """Convert 128-bit int used for trace_id to 64-bit unsigned int""" - return otel_id & 0xFFFFFFFFFFFFFFFF - - -def _get_span_name(span): - """Get span name by using instrumentation and kind while backing off to - span.name - """ - instrumentation_name = ( - span.instrumentation_info.name if span.instrumentation_info else None - ) - span_kind_name = span.kind.name if span.kind else None - name = ( - f"{instrumentation_name}.{span_kind_name}" - if instrumentation_name and span_kind_name - else span.name - ) - return name - - -def _get_resource(span): - """Get resource name for span""" - if SpanAttributes.HTTP_METHOD in span.attributes: - route = span.attributes.get(SpanAttributes.HTTP_ROUTE) - return ( - span.attributes[SpanAttributes.HTTP_METHOD] + " " + route - if route - else span.attributes[SpanAttributes.HTTP_METHOD] - ) - - return span.name - - -def _get_span_type(span): - """Get Datadog span type""" - instrumentation_name = ( - span.instrumentation_info.name if span.instrumentation_info else None - ) - span_type = _INSTRUMENTATION_SPAN_TYPES.get(instrumentation_name) - return span_type - - -def _get_exc_info(span): - """Parse span status description for exception type and value""" - exc_type, exc_val = span.status.description.split(":", 1) - return exc_type, exc_val.strip() - - -def _get_origin(span): - ctx = span.get_span_context() - origin = ctx.trace_state.get(DD_ORIGIN) - return origin - - -def _get_sampling_rate(span): - ctx = span.get_span_context() - tracer_provider = trace_api.get_tracer_provider() - if not hasattr(tracer_provider, "sampler"): - return None - sampler = tracer_provider.sampler - return ( - sampler.rate - if ctx.trace_flags.sampled - and isinstance(sampler, sampling.TraceIdRatioBased) - else None - ) - - -def _parse_tags_str(tags_str): - """Parse a string of tags typically provided via environment variables. - - The expected string is of the form:: - "key1:value1,key2:value2" - - :param tags_str: A string of the above form to parse tags from. - :return: A dict containing the tags that were parsed. - """ - parsed_tags = {} - if not tags_str: - return parsed_tags - - for tag in tags_str.split(","): - try: - key, value = tag.split(":", 1) - - # Validate the tag - if key == "" or value == "" or value.endswith(":"): - raise ValueError - except ValueError: - logger.error( - "Malformed tag in tag pair '%s' from tag string '%s'.", - tag, - tags_str, - ) - else: - parsed_tags[key] = value - - return parsed_tags - - -def _extract_tags_from_resource(resource, fallback_service_name): - """Parse tags from resource.attributes, except service.name which - has special significance within datadog""" - tags = {} - if not (resource and getattr(resource, "attributes", None)): - return [tags, fallback_service_name] - - service_name = None - for attribute_key, attribute_value in resource.attributes.items(): - if attribute_key == SERVICE_NAME_TAG: - service_name = attribute_value - else: - tags[attribute_key] = attribute_value - - if service_name is None or service_name == "unknown_service": - service_name = fallback_service_name - - return [tags, service_name] - - -def _extract_tags_from_exception_events(events, datadog_span): - """Parse error tags from exception events, error.msg error.type - and error.stack have special significance within datadog""" - for event in events: - if event.name is not None and event.name == EVENT_NAME_EXCEPTION: - for key, value in event.attributes.items(): - if key == EXCEPTION_TYPE_ATTR_KEY: - datadog_span.set_tag(DD_ERROR_TYPE_TAG_KEY, value) - elif key == EXCEPTION_MSG_ATTR_KEY: - datadog_span.set_tag(DD_ERROR_MSG_TAG_KEY, value) - elif key == EXCEPTION_STACK_ATTR_KEY: - datadog_span.set_tag(DD_ERROR_STACK_TAG_KEY, value) diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/propagator.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/propagator.py deleted file mode 100644 index a6b19d15f..000000000 --- a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/propagator.py +++ /dev/null @@ -1,148 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import typing - -from opentelemetry import trace -from opentelemetry.context import Context -from opentelemetry.exporter.datadog import constants -from opentelemetry.propagators.textmap import ( - CarrierT, - Getter, - Setter, - TextMapPropagator, - default_getter, - default_setter, -) -from opentelemetry.trace import get_current_span, set_span_in_context - - -class DatadogFormat(TextMapPropagator): - """Propagator for the Datadog HTTP header format.""" - - TRACE_ID_KEY = "x-datadog-trace-id" - PARENT_ID_KEY = "x-datadog-parent-id" - SAMPLING_PRIORITY_KEY = "x-datadog-sampling-priority" - ORIGIN_KEY = "x-datadog-origin" - - def extract( - self, - carrier: CarrierT, - context: typing.Optional[Context] = None, - getter: Getter[CarrierT] = default_getter, - ) -> Context: - if context is None: - context = Context() - - trace_id = extract_first_element( - getter.get(carrier, self.TRACE_ID_KEY) - ) - - span_id = extract_first_element( - getter.get(carrier, self.PARENT_ID_KEY) - ) - - sampled = extract_first_element( - getter.get(carrier, self.SAMPLING_PRIORITY_KEY) - ) - - origin = extract_first_element(getter.get(carrier, self.ORIGIN_KEY)) - - trace_flags = trace.TraceFlags() - if sampled and int(sampled) in ( - constants.AUTO_KEEP, - constants.USER_KEEP, - ): - trace_flags = trace.TraceFlags(trace.TraceFlags.SAMPLED) - - if trace_id is None or span_id is None: - return context - - trace_state = [] - if origin is not None: - trace_state.append((constants.DD_ORIGIN, origin)) - span_context = trace.SpanContext( - trace_id=int(trace_id), - span_id=int(span_id), - is_remote=True, - trace_flags=trace_flags, - trace_state=trace.TraceState(trace_state), - ) - - return set_span_in_context( - trace.NonRecordingSpan(span_context), context - ) - - def inject( - self, - carrier: CarrierT, - context: typing.Optional[Context] = None, - setter: Setter[CarrierT] = default_setter, - ) -> None: - span = get_current_span(context) - span_context = span.get_span_context() - if span_context == trace.INVALID_SPAN_CONTEXT: - return - sampled = (trace.TraceFlags.SAMPLED & span.context.trace_flags) != 0 - setter.set( - carrier, - self.TRACE_ID_KEY, - format_trace_id(span.context.trace_id), - ) - setter.set( - carrier, self.PARENT_ID_KEY, format_span_id(span.context.span_id) - ) - setter.set( - carrier, - self.SAMPLING_PRIORITY_KEY, - str(constants.AUTO_KEEP if sampled else constants.AUTO_REJECT), - ) - if constants.DD_ORIGIN in span.context.trace_state: - setter.set( - carrier, - self.ORIGIN_KEY, - span.context.trace_state[constants.DD_ORIGIN], - ) - - @property - def fields(self): - """Returns a set with the fields set in `inject`. - - See - `opentelemetry.propagators.textmap.TextMapPropagator.fields` - """ - return { - self.TRACE_ID_KEY, - self.PARENT_ID_KEY, - self.SAMPLING_PRIORITY_KEY, - self.ORIGIN_KEY, - } - - -def format_trace_id(trace_id: int) -> str: - """Format the trace id for Datadog.""" - return str(trace_id & 0xFFFFFFFFFFFFFFFF) - - -def format_span_id(span_id: int) -> str: - """Format the span id for Datadog.""" - return str(span_id) - - -def extract_first_element( - items: typing.Iterable[CarrierT], -) -> typing.Optional[CarrierT]: - if items is None: - return None - return next(iter(items), None) diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/spanprocessor.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/spanprocessor.py deleted file mode 100644 index f8f9b5007..000000000 --- a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/spanprocessor.py +++ /dev/null @@ -1,226 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import collections -import logging -import threading -import typing -from time import time_ns - -from opentelemetry.context import Context, attach, detach, set_value -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY -from opentelemetry.sdk.trace import Span, SpanProcessor -from opentelemetry.sdk.trace.export import SpanExporter -from opentelemetry.trace import INVALID_TRACE_ID - -logger = logging.getLogger(__name__) - - -class DatadogExportSpanProcessor(SpanProcessor): - """Datadog exporter span processor - - DatadogExportSpanProcessor is an implementation of `SpanProcessor` that - batches all opened spans into a list per trace. When all spans for a trace - are ended, the trace is queues up for export. This is required for exporting - to the Datadog Agent which expects to received list of spans for each trace. - """ - - _FLUSH_TOKEN = INVALID_TRACE_ID - - def __init__( - self, - span_exporter: SpanExporter, - schedule_delay_millis: float = 5000, - max_trace_size: int = 4096, - ): - if max_trace_size <= 0: - raise ValueError("max_queue_size must be a positive integer.") - - if schedule_delay_millis <= 0: - raise ValueError("schedule_delay_millis must be positive.") - - self.span_exporter = span_exporter - - # queue trace_ids for traces with recently ended spans for worker thread to check - # for exporting - self.check_traces_queue = ( - collections.deque() - ) # type: typing.Deque[int] - - self.traces_lock = threading.Lock() - # dictionary of trace_ids to a list of spans where the first span is the - # first opened span for the trace - self.traces = collections.defaultdict(list) - # counter to keep track of the number of spans and ended spans for a - # trace_id - self.traces_spans_count = collections.Counter() - self.traces_spans_ended_count = collections.Counter() - - self.worker_thread = threading.Thread(target=self.worker, daemon=True) - - # threading conditions used for flushing and shutdown - self.condition = threading.Condition(threading.Lock()) - self.flush_condition = threading.Condition(threading.Lock()) - - # flag to indicate that there is a flush operation on progress - self._flushing = False - - self.max_trace_size = max_trace_size - self._spans_dropped = False - self.schedule_delay_millis = schedule_delay_millis - self.done = False - self.worker_thread.start() - - def on_start( - self, span: Span, parent_context: typing.Optional[Context] = None - ) -> None: - ctx = span.get_span_context() - trace_id = ctx.trace_id - - with self.traces_lock: - # check upper bound on number of spans for trace before adding new - # span - if self.traces_spans_count[trace_id] == self.max_trace_size: - logger.warning("Max spans for trace, spans will be dropped.") - self._spans_dropped = True - return - - # add span to end of list for a trace and update the counter - self.traces[trace_id].append(span) - self.traces_spans_count[trace_id] += 1 - - def on_end(self, span: Span) -> None: - if self.done: - logger.warning("Already shutdown, dropping span.") - return - - ctx = span.get_span_context() - trace_id = ctx.trace_id - - with self.traces_lock: - self.traces_spans_ended_count[trace_id] += 1 - if self.is_trace_exportable(trace_id): - self.check_traces_queue.appendleft(trace_id) - - def worker(self): - timeout = self.schedule_delay_millis / 1e3 - while not self.done: - if not self._flushing: - with self.condition: - self.condition.wait(timeout) - if not self.check_traces_queue: - # spurious notification, let's wait again, reset timeout - timeout = self.schedule_delay_millis / 1e3 - continue - if self.done: - # missing spans will be sent when calling flush - break - - # subtract the duration of this export call to the next timeout - start = time_ns() - self.export() - end = time_ns() - duration = (end - start) / 1e9 - timeout = self.schedule_delay_millis / 1e3 - duration - - # be sure that all spans are sent - self._drain_queue() - - def is_trace_exportable(self, trace_id): - return ( - self.traces_spans_count[trace_id] - - self.traces_spans_ended_count[trace_id] - <= 0 - ) - - def export(self) -> None: - """Exports traces with finished spans.""" - notify_flush = False - export_trace_ids = [] - - while self.check_traces_queue: - trace_id = self.check_traces_queue.pop() - if trace_id is self._FLUSH_TOKEN: - notify_flush = True - else: - with self.traces_lock: - # check whether trace is exportable again in case that new - # spans were started since we last concluded trace was - # exportable - if self.is_trace_exportable(trace_id): - export_trace_ids.append(trace_id) - del self.traces_spans_count[trace_id] - del self.traces_spans_ended_count[trace_id] - - if len(export_trace_ids) > 0: - token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) - - for trace_id in export_trace_ids: - with self.traces_lock: - try: - # Ignore type b/c the Optional[None]+slicing is too "clever" - # for mypy - self.span_exporter.export(self.traces[trace_id]) # type: ignore - # pylint: disable=broad-except - except Exception: - logger.exception( - "Exception while exporting Span batch." - ) - finally: - del self.traces[trace_id] - - detach(token) - - if notify_flush: - with self.flush_condition: - self.flush_condition.notify() - - def _drain_queue(self): - """Export all elements until queue is empty. - - Can only be called from the worker thread context because it invokes - `export` that is not thread safe. - """ - while self.check_traces_queue: - self.export() - - def force_flush(self, timeout_millis: int = 30000) -> bool: - if self.done: - logger.warning("Already shutdown, ignoring call to force_flush().") - return True - - self._flushing = True - self.check_traces_queue.appendleft(self._FLUSH_TOKEN) - - # wake up worker thread - with self.condition: - self.condition.notify_all() - - # wait for token to be processed - with self.flush_condition: - ret = self.flush_condition.wait(timeout_millis / 1e3) - - self._flushing = False - - if not ret: - logger.warning("Timeout was exceeded in force_flush().") - return ret - - def shutdown(self) -> None: - # signal the worker thread to finish and then wait for it - self.done = True - with self.condition: - self.condition.notify_all() - self.worker_thread.join() - self.span_exporter.shutdown() diff --git a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/version.py b/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/version.py deleted file mode 100644 index 75e49aa58..000000000 --- a/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/version.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -__version__ = "0.30b0" diff --git a/exporter/opentelemetry-exporter-datadog/tests/__init__.py b/exporter/opentelemetry-exporter-datadog/tests/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/exporter/opentelemetry-exporter-datadog/tests/test_datadog_exporter.py b/exporter/opentelemetry-exporter-datadog/tests/test_datadog_exporter.py deleted file mode 100644 index f86dd31af..000000000 --- a/exporter/opentelemetry-exporter-datadog/tests/test_datadog_exporter.py +++ /dev/null @@ -1,669 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import itertools -import logging -import sys -import time -import unittest -from unittest import mock - -from ddtrace.internal.writer import AgentWriter -from flaky import flaky -from pytest import mark - -from opentelemetry import trace as trace_api -from opentelemetry.context import Context -from opentelemetry.exporter import datadog -from opentelemetry.sdk import trace -from opentelemetry.sdk.trace import Resource, sampling -from opentelemetry.sdk.util.instrumentation import InstrumentationInfo -from opentelemetry.semconv.trace import SpanAttributes - - -class MockDatadogSpanExporter(datadog.DatadogSpanExporter): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - agent_writer_mock = mock.Mock(spec=AgentWriter) - agent_writer_mock.started = True - agent_writer_mock.exit_timeout = 1 - self._agent_writer = agent_writer_mock - - -def get_spans(tracer, exporter, shutdown=True): - if shutdown: - tracer.span_processor.shutdown() - - spans = [ - call_args[-1]["spans"] - for call_args in exporter.agent_writer.write.call_args_list - ] - - return [span.to_dict() for span in itertools.chain.from_iterable(spans)] - - -class TestDatadogSpanExporter(unittest.TestCase): - def setUp(self): - self.exporter = MockDatadogSpanExporter() - self.span_processor = datadog.DatadogExportSpanProcessor(self.exporter) - tracer_provider = trace.TracerProvider() - tracer_provider.add_span_processor(self.span_processor) - self.tracer_provider = tracer_provider - self.tracer = tracer_provider.get_tracer(__name__) - - def tearDown(self): - self.tracer_provider.shutdown() - - def test_constructor_default(self): - """Test the default values assigned by constructor.""" - exporter = datadog.DatadogSpanExporter() - - self.assertEqual(exporter.agent_url, "http://localhost:8126") - self.assertIsNone(exporter.service) - self.assertIsNotNone(exporter.agent_writer) - - def test_constructor_explicit(self): - """Test the constructor passing all the options.""" - agent_url = "http://localhost:8126" - exporter = datadog.DatadogSpanExporter( - agent_url=agent_url, - service="explicit", - ) - - self.assertEqual(exporter.agent_url, agent_url) - self.assertEqual(exporter.service, "explicit") - self.assertIsNone(exporter.env) - self.assertIsNone(exporter.version) - self.assertEqual(exporter.tags, {}) - - exporter = datadog.DatadogSpanExporter( - agent_url=agent_url, - service="explicit", - env="test", - version="0.0.1", - tags="", - ) - - self.assertEqual(exporter.agent_url, agent_url) - self.assertEqual(exporter.service, "explicit") - self.assertEqual(exporter.env, "test") - self.assertEqual(exporter.version, "0.0.1") - self.assertEqual(exporter.tags, {}) - - exporter = datadog.DatadogSpanExporter( - agent_url=agent_url, - service="explicit", - env="test", - version="0.0.1", - tags="team:testers,layer:app", - ) - - self.assertEqual(exporter.agent_url, agent_url) - self.assertEqual(exporter.service, "explicit") - self.assertEqual(exporter.env, "test") - self.assertEqual(exporter.version, "0.0.1") - self.assertEqual(exporter.tags, {"team": "testers", "layer": "app"}) - - @mock.patch.dict( - "os.environ", - { - "DD_TRACE_AGENT_URL": "http://agent:8126", - "DD_SERVICE": "test-service", - "DD_ENV": "test", - "DD_VERSION": "0.0.1", - "DD_TAGS": "team:testers", - }, - ) - def test_constructor_environ(self): - exporter = datadog.DatadogSpanExporter() - - self.assertEqual(exporter.agent_url, "http://agent:8126") - self.assertEqual(exporter.service, "test-service") - self.assertEqual(exporter.env, "test") - self.assertEqual(exporter.version, "0.0.1") - self.assertEqual(exporter.tags, {"team": "testers"}) - self.assertIsNotNone(exporter.agent_writer) - - # pylint: disable=too-many-locals - @mock.patch.dict( - "os.environ", - { - "DD_SERVICE": "test-service", - "DD_ENV": "test", - "DD_VERSION": "0.0.1", - "DD_TAGS": "team:testers", - }, - ) - def test_translate_to_datadog(self): - # pylint: disable=invalid-name - self.maxDiff = None - - resource = Resource( - attributes={ - "key_resource": "some_resource", - "service.name": "resource_service_name", - } - ) - - resource_without_service = Resource( - attributes={"conflicting_key": "conflicting_value"} - ) - - span_names = ("test1", "test2", "test3") - trace_id = 0x6E0C63257DE34C926F9EFCD03927272E - trace_id_low = 0x6F9EFCD03927272E - span_id = 0x34BF92DEEFC58C92 - parent_id = 0x1111111111111111 - other_id = 0x2222222222222222 - - base_time = 683647322 * 10**9 # in ns - start_times = ( - base_time, - base_time + 150 * 10**6, - base_time + 300 * 10**6, - ) - durations = (50 * 10**6, 100 * 10**6, 200 * 10**6) - end_times = ( - start_times[0] + durations[0], - start_times[1] + durations[1], - start_times[2] + durations[2], - ) - - span_context = trace_api.SpanContext( - trace_id, span_id, is_remote=False - ) - parent_span_context = trace_api.SpanContext( - trace_id, parent_id, is_remote=False - ) - other_context = trace_api.SpanContext( - trace_id, other_id, is_remote=False - ) - - instrumentation_info = InstrumentationInfo(__name__, "0") - - otel_spans = [ - trace._Span( - name=span_names[0], - context=span_context, - parent=parent_span_context, - kind=trace_api.SpanKind.CLIENT, - instrumentation_info=instrumentation_info, - resource=Resource({}), - ), - trace._Span( - name=span_names[1], - context=parent_span_context, - parent=None, - instrumentation_info=instrumentation_info, - resource=resource_without_service, - ), - trace._Span( - name=span_names[2], - context=other_context, - parent=None, - resource=resource, - ), - ] - - otel_spans[1].set_attribute("conflicting_key", "original_value") - - otel_spans[0].start(start_time=start_times[0]) - otel_spans[0].end(end_time=end_times[0]) - - otel_spans[1].start(start_time=start_times[1]) - otel_spans[1].end(end_time=end_times[1]) - - otel_spans[2].start(start_time=start_times[2]) - otel_spans[2].end(end_time=end_times[2]) - - # pylint: disable=protected-access - exporter = datadog.DatadogSpanExporter() - datadog_spans = [ - span.to_dict() - for span in exporter._translate_to_datadog(otel_spans) - ] - - expected_spans = [ - dict( - trace_id=trace_id_low, - parent_id=parent_id, - span_id=span_id, - name="tests.test_datadog_exporter.CLIENT", - resource=span_names[0], - start=start_times[0], - duration=durations[0], - error=0, - service="test-service", - meta={"env": "test", "team": "testers"}, - ), - dict( - trace_id=trace_id_low, - parent_id=0, - span_id=parent_id, - name="tests.test_datadog_exporter.INTERNAL", - resource=span_names[1], - start=start_times[1], - duration=durations[1], - error=0, - service="test-service", - meta={ - "env": "test", - "team": "testers", - "version": "0.0.1", - "conflicting_key": "original_value", - }, - ), - dict( - trace_id=trace_id_low, - parent_id=0, - span_id=other_id, - name=span_names[2], - resource=span_names[2], - start=start_times[2], - duration=durations[2], - error=0, - service="resource_service_name", - meta={ - "env": "test", - "team": "testers", - "version": "0.0.1", - "key_resource": "some_resource", - }, - ), - ] - - self.assertEqual(datadog_spans, expected_spans) - - def test_export(self): - """Test that agent and/or collector are invoked""" - # create and save span to be used in tests - context = trace_api.SpanContext( - trace_id=0x000000000000000000000000DEADBEEF, - span_id=0x00000000DEADBEF0, - is_remote=False, - ) - - test_span = trace._Span("test_span", context=context) - test_span.start() - test_span.end() - - self.exporter.export((test_span,)) - - self.assertEqual(self.exporter.agent_writer.write.call_count, 1) - - def test_resources(self): - test_attributes = [ - {}, - { - SpanAttributes.HTTP_METHOD: "GET", - SpanAttributes.HTTP_ROUTE: "/foo/", - }, - { - SpanAttributes.HTTP_METHOD: "GET", - SpanAttributes.HTTP_TARGET: "/foo/200", - }, - ] - - for index, test in enumerate(test_attributes): - with self.tracer.start_span(str(index), attributes=test): - pass - - datadog_spans = get_spans(self.tracer, self.exporter) - - self.assertEqual(len(datadog_spans), 3) - - actual = [span["resource"] for span in datadog_spans] - expected = ["0", "GET /foo/", "GET"] - - self.assertEqual(actual, expected) - - def test_span_types(self): - test_instrumentations = [ - "opentelemetry.instrumentation.aiohttp-client", - "opentelemetry.instrumentation.dbapi", - "opentelemetry.instrumentation.django", - "opentelemetry.instrumentation.flask", - "opentelemetry.instrumentation.grpc", - "opentelemetry.instrumentation.jinja2", - "opentelemetry.instrumentation.mysql", - "opentelemetry.instrumentation.psycopg2", - "opentelemetry.instrumentation.pymongo", - "opentelemetry.instrumentation.pymysql", - "opentelemetry.instrumentation.redis", - "opentelemetry.instrumentation.requests", - "opentelemetry.instrumentation.sqlalchemy", - "opentelemetry.instrumentation.wsgi", - ] - - for index, instrumentation in enumerate(test_instrumentations): - # change tracer's instrumentation info before starting span - self.tracer.instrumentation_info = InstrumentationInfo( - instrumentation, "0" - ) - with self.tracer.start_span(str(index)): - pass - - datadog_spans = get_spans(self.tracer, self.exporter) - - self.assertEqual(len(datadog_spans), 14) - - actual = [span.get("type") for span in datadog_spans] - expected = [ - "http", - "sql", - "web", - "web", - "grpc", - "template", - "sql", - "sql", - "mongodb", - "sql", - "redis", - "http", - "sql", - "web", - ] - self.assertEqual(actual, expected) - - def test_errors(self): - with self.assertRaises(ValueError): - with self.tracer.start_span("foo"): - raise ValueError("bar") - - datadog_spans = get_spans(self.tracer, self.exporter) - - self.assertEqual(len(datadog_spans), 1) - - span = datadog_spans[0] - self.assertEqual(span["error"], 1) - self.assertEqual(span["meta"]["error.msg"], "bar") - self.assertEqual(span["meta"]["error.type"], "ValueError") - self.assertTrue(span["meta"]["error.stack"] is not None) - - def test_shutdown(self): - span_names = ["xxx", "bar", "foo"] - - for name in span_names: - with self.tracer.start_span(name): - pass - - self.span_processor.shutdown() - - # check that spans are exported without an explicitly call to - # force_flush() - datadog_spans = get_spans(self.tracer, self.exporter) - actual = [span.get("resource") for span in datadog_spans] - self.assertListEqual(span_names, actual) - - def test_flush(self): - span_names0 = ["xxx", "bar", "foo"] - span_names1 = ["yyy", "baz", "fox"] - - for name in span_names0: - with self.tracer.start_span(name): - pass - - self.assertTrue(self.span_processor.force_flush()) - datadog_spans = get_spans(self.tracer, self.exporter, shutdown=False) - actual0 = [span.get("resource") for span in datadog_spans] - self.assertListEqual(span_names0, actual0) - - # create some more spans to check that span processor still works - for name in span_names1: - with self.tracer.start_span(name): - pass - - self.assertTrue(self.span_processor.force_flush()) - datadog_spans = get_spans(self.tracer, self.exporter) - actual1 = [span.get("resource") for span in datadog_spans] - self.assertListEqual(span_names0 + span_names1, actual1) - - def test_span_processor_lossless(self): - """Test that no spans are lost when sending max_trace_size spans""" - span_processor = datadog.DatadogExportSpanProcessor( - self.exporter, max_trace_size=128 - ) - tracer_provider = trace.TracerProvider() - tracer_provider.add_span_processor(span_processor) - tracer = tracer_provider.get_tracer(__name__) - - with tracer.start_as_current_span("root"): - for _ in range(127): - with tracer.start_span("foo"): - pass - - self.assertTrue(span_processor.force_flush()) - datadog_spans = get_spans(tracer, self.exporter) - self.assertEqual(len(datadog_spans), 128) - tracer_provider.shutdown() - - def test_span_processor_dropped_spans(self): - """Test that spans are lost when exceeding max_trace_size spans""" - span_processor = datadog.DatadogExportSpanProcessor( - self.exporter, max_trace_size=128 - ) - tracer_provider = trace.TracerProvider() - tracer_provider.add_span_processor(span_processor) - tracer = tracer_provider.get_tracer(__name__) - - with tracer.start_as_current_span("root"): - for _ in range(127): - with tracer.start_span("foo"): - pass - with self.assertLogs(level=logging.WARNING): - with tracer.start_span("one-too-many"): - pass - - self.assertTrue(span_processor.force_flush()) - datadog_spans = get_spans(tracer, self.exporter) - self.assertEqual(len(datadog_spans), 128) - tracer_provider.shutdown() - - @mark.skipif( - sys.platform == "win32", - reason="unreliable test on windows", - ) - def test_span_processor_scheduled_delay(self): - """Test that spans are exported each schedule_delay_millis""" - delay = 300 - span_processor = datadog.DatadogExportSpanProcessor( - self.exporter, schedule_delay_millis=delay - ) - tracer_provider = trace.TracerProvider() - tracer_provider.add_span_processor(span_processor) - tracer = tracer_provider.get_tracer(__name__) - - with tracer.start_span("foo"): - pass - - time.sleep(delay / (1e3 * 2)) - datadog_spans = get_spans(tracer, self.exporter, shutdown=False) - self.assertEqual(len(datadog_spans), 0) - - time.sleep(delay / (1e3 * 2) + 0.01) - datadog_spans = get_spans(tracer, self.exporter, shutdown=False) - self.assertEqual(len(datadog_spans), 1) - - tracer_provider.shutdown() - - @flaky(max_runs=3, min_passes=1) - def test_batch_span_processor_reset_timeout(self): - """Test that the scheduled timeout is reset on cycles without spans""" - delay = 50 - # pylint: disable=protected-access - exporter = MockDatadogSpanExporter() - exporter._agent_writer.write.side_effect = lambda spans: time.sleep( - 0.05 - ) - span_processor = datadog.DatadogExportSpanProcessor( - exporter, schedule_delay_millis=delay - ) - tracer_provider = trace.TracerProvider() - tracer_provider.add_span_processor(span_processor) - tracer = tracer_provider.get_tracer(__name__) - with mock.patch.object(span_processor.condition, "wait") as mock_wait: - with tracer.start_span("foo"): - pass - - # give some time for exporter to loop - # since wait is mocked it should return immediately - time.sleep(0.1) - mock_wait_calls = list(mock_wait.mock_calls) - - # find the index of the call that processed the singular span - for idx, wait_call in enumerate(mock_wait_calls): - _, args, __ = wait_call - if args[0] <= 0: - after_calls = mock_wait_calls[idx + 1 :] - break - - self.assertTrue( - all(args[0] >= 0.05 for _, args, __ in after_calls) - ) - - span_processor.shutdown() - - def test_span_processor_accepts_parent_context(self): - span_processor = mock.Mock( - wraps=datadog.DatadogExportSpanProcessor(self.exporter) - ) - tracer_provider = trace.TracerProvider() - tracer_provider.add_span_processor(span_processor) - tracer = tracer_provider.get_tracer(__name__) - - context = Context() - span = tracer.start_span("foo", context=context) - - span_processor.on_start.assert_called_once_with( - span, parent_context=context - ) - - def test_origin(self): - trace_id = 0x000000000000000000000000DEADBEEF - trace_state = trace_api.TraceState( - [(datadog.constants.DD_ORIGIN, "origin-service")] - ) - parent_span_ctx = trace_api.SpanContext( - trace_id=trace_id, - span_id=0x1, - is_remote=False, - trace_state=trace_state, - ) - child_span_ctx = trace_api.SpanContext( - trace_id=trace_id, - span_id=0x2, - is_remote=False, - trace_state=trace_state, - ) - - root_span = trace._Span( - name="root", context=parent_span_ctx, parent=None - ) - child_span = trace._Span( - name="child", context=child_span_ctx, parent=parent_span_ctx - ) - root_span.start() - child_span.start() - child_span.end() - root_span.end() - - # pylint: disable=protected-access - exporter = datadog.DatadogSpanExporter() - datadog_spans = [ - span.to_dict() - for span in exporter._translate_to_datadog([root_span, child_span]) - ] - - self.assertEqual(len(datadog_spans), 2) - - actual = [ - span["meta"].get(datadog.constants.DD_ORIGIN) - if "meta" in span - else None - for span in datadog_spans - ] - expected = ["origin-service", None] - self.assertListEqual(actual, expected) - - def test_sampling_rate(self): - context = trace_api.SpanContext( - trace_id=0x000000000000000000000000DEADBEEF, - span_id=0x34BF92DEEFC58C92, - is_remote=False, - trace_flags=trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED), - ) - trace_api.get_tracer_provider().sampler = sampling.TraceIdRatioBased( - 0.5 - ) - - span = trace._Span(name="sampled", context=context, parent=None) - span.start() - span.end() - - # pylint: disable=protected-access - exporter = datadog.DatadogSpanExporter() - datadog_spans = [ - span.to_dict() for span in exporter._translate_to_datadog([span]) - ] - - self.assertEqual(len(datadog_spans), 1) - - actual = [ - span["metrics"].get(datadog.constants.SAMPLE_RATE_METRIC_KEY) - if "metrics" in span - else None - for span in datadog_spans - ] - expected = [0.5] - self.assertListEqual(actual, expected) - - def test_service_name_fallback(self): - context = trace_api.SpanContext( - trace_id=0x000000000000000000000000DEADBEEF, - span_id=0x34BF92DEEFC58C92, - is_remote=False, - trace_flags=trace_api.TraceFlags(trace_api.TraceFlags.SAMPLED), - ) - trace_api.get_tracer_provider().sampler = sampling.TraceIdRatioBased( - 0.5 - ) - - resource_with_default_name = Resource( - attributes={ - "key_resource": "some_resource", - "service.name": "unknown_service", - } - ) - - span = trace._Span( - name="sampled", - context=context, - parent=None, - resource=resource_with_default_name, - ) - span.start() - span.end() - - # pylint: disable=protected-access - exporter = datadog.DatadogSpanExporter(service="fallback_service_name") - datadog_spans = [ - span.to_dict() for span in exporter._translate_to_datadog([span]) - ] - - self.assertEqual(len(datadog_spans), 1) - - span = datadog_spans[0] - self.assertEqual(span["service"], "fallback_service_name") diff --git a/exporter/opentelemetry-exporter-datadog/tests/test_datadog_format.py b/exporter/opentelemetry-exporter-datadog/tests/test_datadog_format.py deleted file mode 100644 index e7205d650..000000000 --- a/exporter/opentelemetry-exporter-datadog/tests/test_datadog_format.py +++ /dev/null @@ -1,216 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from unittest.mock import Mock, patch - -from opentelemetry import trace as trace_api -from opentelemetry.context import Context -from opentelemetry.exporter.datadog import constants, propagator -from opentelemetry.sdk import trace -from opentelemetry.sdk.trace.id_generator import RandomIdGenerator -from opentelemetry.trace import get_current_span, set_span_in_context - -FORMAT = propagator.DatadogFormat() - - -class TestDatadogFormat(unittest.TestCase): - @classmethod - def setUpClass(cls): - id_generator = RandomIdGenerator() - cls.serialized_trace_id = propagator.format_trace_id( - id_generator.generate_trace_id() - ) - cls.serialized_parent_id = propagator.format_span_id( - id_generator.generate_span_id() - ) - cls.serialized_origin = "origin-service" - - def test_extract_malformed_headers_to_explicit_ctx(self): - """Test with no Datadog headers""" - orig_ctx = Context({"k1": "v1"}) - malformed_trace_id_key = FORMAT.TRACE_ID_KEY + "-x" - malformed_parent_id_key = FORMAT.PARENT_ID_KEY + "-x" - context = FORMAT.extract( - { - malformed_trace_id_key: self.serialized_trace_id, - malformed_parent_id_key: self.serialized_parent_id, - }, - orig_ctx, - ) - self.assertDictEqual(orig_ctx, context) - - def test_extract_malformed_headers_to_implicit_ctx(self): - malformed_trace_id_key = FORMAT.TRACE_ID_KEY + "-x" - malformed_parent_id_key = FORMAT.PARENT_ID_KEY + "-x" - context = FORMAT.extract( - { - malformed_trace_id_key: self.serialized_trace_id, - malformed_parent_id_key: self.serialized_parent_id, - } - ) - self.assertDictEqual(Context(), context) - - def test_extract_missing_trace_id_to_explicit_ctx(self): - """If a trace id is missing, populate an invalid trace id.""" - orig_ctx = Context({"k1": "v1"}) - carrier = {FORMAT.PARENT_ID_KEY: self.serialized_parent_id} - - ctx = FORMAT.extract(carrier, orig_ctx) - self.assertDictEqual(orig_ctx, ctx) - - def test_extract_missing_trace_id_to_implicit_ctx(self): - carrier = {FORMAT.PARENT_ID_KEY: self.serialized_parent_id} - - ctx = FORMAT.extract(carrier) - self.assertDictEqual(Context(), ctx) - - def test_extract_missing_parent_id_to_explicit_ctx(self): - """If a parent id is missing, populate an invalid trace id.""" - orig_ctx = Context({"k1": "v1"}) - carrier = {FORMAT.TRACE_ID_KEY: self.serialized_trace_id} - - ctx = FORMAT.extract(carrier, orig_ctx) - self.assertDictEqual(orig_ctx, ctx) - - def test_extract_missing_parent_id_to_implicit_ctx(self): - carrier = {FORMAT.TRACE_ID_KEY: self.serialized_trace_id} - - ctx = FORMAT.extract(carrier) - self.assertDictEqual(Context(), ctx) - - def test_context_propagation(self): - """Test the propagation of Datadog headers.""" - parent_span_context = get_current_span( - FORMAT.extract( - { - FORMAT.TRACE_ID_KEY: self.serialized_trace_id, - FORMAT.PARENT_ID_KEY: self.serialized_parent_id, - FORMAT.SAMPLING_PRIORITY_KEY: str(constants.AUTO_KEEP), - FORMAT.ORIGIN_KEY: self.serialized_origin, - }, - ) - ).get_span_context() - - self.assertEqual( - parent_span_context.trace_id, int(self.serialized_trace_id) - ) - self.assertEqual( - parent_span_context.span_id, int(self.serialized_parent_id) - ) - self.assertEqual(parent_span_context.trace_flags, constants.AUTO_KEEP) - self.assertEqual( - parent_span_context.trace_state.get(constants.DD_ORIGIN), - self.serialized_origin, - ) - self.assertTrue(parent_span_context.is_remote) - - child = trace._Span( - "child", - trace_api.SpanContext( - parent_span_context.trace_id, - RandomIdGenerator().generate_span_id(), - is_remote=False, - trace_flags=parent_span_context.trace_flags, - trace_state=parent_span_context.trace_state, - ), - parent=parent_span_context, - ) - - child_carrier = {} - child_context = set_span_in_context(child) - FORMAT.inject(child_carrier, context=child_context) - - self.assertEqual( - child_carrier[FORMAT.TRACE_ID_KEY], self.serialized_trace_id - ) - self.assertEqual( - child_carrier[FORMAT.PARENT_ID_KEY], str(child.context.span_id) - ) - self.assertEqual( - child_carrier[FORMAT.SAMPLING_PRIORITY_KEY], - str(constants.AUTO_KEEP), - ) - self.assertEqual( - child_carrier.get(FORMAT.ORIGIN_KEY), self.serialized_origin - ) - - def test_sampling_priority_auto_reject(self): - """Test sampling priority rejected.""" - parent_span_context = get_current_span( - FORMAT.extract( - { - FORMAT.TRACE_ID_KEY: self.serialized_trace_id, - FORMAT.PARENT_ID_KEY: self.serialized_parent_id, - FORMAT.SAMPLING_PRIORITY_KEY: str(constants.AUTO_REJECT), - }, - ) - ).get_span_context() - - self.assertEqual( - parent_span_context.trace_flags, constants.AUTO_REJECT - ) - - child = trace._Span( - "child", - trace_api.SpanContext( - parent_span_context.trace_id, - RandomIdGenerator().generate_span_id(), - is_remote=False, - trace_flags=parent_span_context.trace_flags, - trace_state=parent_span_context.trace_state, - ), - parent=parent_span_context, - ) - - child_carrier = {} - child_context = set_span_in_context(child) - FORMAT.inject(child_carrier, context=child_context) - - self.assertEqual( - child_carrier[FORMAT.SAMPLING_PRIORITY_KEY], - str(constants.AUTO_REJECT), - ) - - @patch("opentelemetry.exporter.datadog.propagator.get_current_span") - def test_fields(self, mock_get_current_span): - """Make sure the fields attribute returns the fields used in inject""" - - tracer = trace.TracerProvider().get_tracer("sdk_tracer_provider") - - mock_setter = Mock() - - mock_get_current_span.configure_mock( - **{ - "return_value": Mock( - **{ - "get_span_context.return_value": None, - "context.trace_flags": 0, - "context.trace_id": 1, - "context.trace_state": {constants.DD_ORIGIN: 0}, - } - ) - } - ) - - with tracer.start_as_current_span("parent"): - with tracer.start_as_current_span("child"): - FORMAT.inject({}, setter=mock_setter) - - inject_fields = set() - - for call in mock_setter.mock_calls: - inject_fields.add(call[1][1]) - - self.assertEqual(FORMAT.fields, inject_fields) diff --git a/scripts/build.sh b/scripts/build.sh index b351c9022..56b350257 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -41,6 +41,3 @@ DISTDIR=dist done ) ) - -# FIXME: This is a temporary workaround, see #1357. -rm -rf $DISTDIR/opentelemetry_exporter_datadog-0.30b0.tar.gz diff --git a/scripts/coverage.sh b/scripts/coverage.sh index b0c465e69..4015c6884 100755 --- a/scripts/coverage.sh +++ b/scripts/coverage.sh @@ -16,7 +16,6 @@ PYTHON_VERSION_INFO=(${PYTHON_VERSION//./ }) coverage erase -cov exporter/opentelemetry-exporter-datadog cov instrumentation/opentelemetry-instrumentation-flask cov instrumentation/opentelemetry-instrumentation-requests cov instrumentation/opentelemetry-instrumentation-wsgi