Merge pull request #123 from NathanielRN/move-exporter-datadog

Move exporter datadog
This commit is contained in:
alrex
2020-11-02 13:31:45 -08:00
committed by GitHub
8 changed files with 59 additions and 23 deletions

View File

@ -2,6 +2,10 @@
## Unreleased ## Unreleased
## Version 0.15b0
Released 2020-11-02
- Make `SpanProcessor.on_start` accept parent Context - Make `SpanProcessor.on_start` accept parent Context
([#1251](https://github.com/open-telemetry/opentelemetry-python/pull/1251)) ([#1251](https://github.com/open-telemetry/opentelemetry-python/pull/1251))

View File

@ -40,8 +40,8 @@ package_dir=
packages=find_namespace: packages=find_namespace:
install_requires = install_requires =
ddtrace>=0.34.0 ddtrace>=0.34.0
opentelemetry-api == 0.15.dev0 opentelemetry-api == 0.15b0
opentelemetry-sdk == 0.15.dev0 opentelemetry-sdk == 0.15b0
[options.packages.find] [options.packages.find]
where = src where = src

View File

@ -23,7 +23,6 @@ from ddtrace.span import Span as DatadogSpan
import opentelemetry.trace as trace_api import opentelemetry.trace as trace_api
from opentelemetry.sdk.trace import sampling from opentelemetry.sdk.trace import sampling
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace.status import StatusCanonicalCode
# pylint:disable=relative-beyond-top-level # pylint:disable=relative-beyond-top-level
from .constants import ( from .constants import (
@ -145,7 +144,7 @@ class DatadogSpanExporter(SpanExporter):
datadog_span.start_ns = span.start_time datadog_span.start_ns = span.start_time
datadog_span.duration_ns = span.end_time - span.start_time datadog_span.duration_ns = span.end_time - span.start_time
if span.status.canonical_code is not StatusCanonicalCode.OK: if not span.status.is_ok:
datadog_span.error = 1 datadog_span.error = 1
if span.status.description: if span.status.description:
exc_type, exc_val = _get_exc_info(span) exc_type, exc_val = _get_exc_info(span)

View File

@ -39,25 +39,23 @@ class DatadogFormat(TextMapPropagator):
def extract( def extract(
self, self,
get_from_carrier: Getter[TextMapPropagatorT], getter: Getter[TextMapPropagatorT],
carrier: TextMapPropagatorT, carrier: TextMapPropagatorT,
context: typing.Optional[Context] = None, context: typing.Optional[Context] = None,
) -> Context: ) -> Context:
trace_id = extract_first_element( trace_id = extract_first_element(
get_from_carrier(carrier, self.TRACE_ID_KEY) getter.get(carrier, self.TRACE_ID_KEY)
) )
span_id = extract_first_element( span_id = extract_first_element(
get_from_carrier(carrier, self.PARENT_ID_KEY) getter.get(carrier, self.PARENT_ID_KEY)
) )
sampled = extract_first_element( sampled = extract_first_element(
get_from_carrier(carrier, self.SAMPLING_PRIORITY_KEY) getter.get(carrier, self.SAMPLING_PRIORITY_KEY)
) )
origin = extract_first_element( origin = extract_first_element(getter.get(carrier, self.ORIGIN_KEY))
get_from_carrier(carrier, self.ORIGIN_KEY)
)
trace_flags = trace.TraceFlags() trace_flags = trace.TraceFlags()
if sampled and int(sampled) in ( if sampled and int(sampled) in (

View File

@ -119,7 +119,8 @@ class DatadogExportSpanProcessor(SpanProcessor):
with self.condition: with self.condition:
self.condition.wait(timeout) self.condition.wait(timeout)
if not self.check_traces_queue: if not self.check_traces_queue:
# spurious notification, let's wait again # spurious notification, let's wait again, reset timeout
timeout = self.schedule_delay_millis / 1e3
continue continue
if self.done: if self.done:
# missing spans will be sent when calling flush # missing spans will be sent when calling flush

View File

@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
__version__ = "0.15.dev0" __version__ = "0.15b0"

View File

@ -40,7 +40,7 @@ class MockDatadogSpanExporter(datadog.DatadogSpanExporter):
def get_spans(tracer, exporter, shutdown=True): def get_spans(tracer, exporter, shutdown=True):
if shutdown: if shutdown:
tracer.source.shutdown() tracer.span_processor.shutdown()
spans = [ spans = [
call_args[-1]["spans"] call_args[-1]["spans"]
@ -483,6 +483,42 @@ class TestDatadogSpanExporter(unittest.TestCase):
tracer_provider.shutdown() tracer_provider.shutdown()
def test_batch_span_processor_reset_timeout(self):
"""Test that the scheduled timeout is reset on cycles without spans"""
delay = 50
# pylint: disable=protected-access
exporter = MockDatadogSpanExporter()
exporter._agent_writer.write.side_effect = lambda spans: time.sleep(
0.05
)
span_processor = datadog.DatadogExportSpanProcessor(
exporter, schedule_delay_millis=delay
)
tracer_provider = trace.TracerProvider()
tracer_provider.add_span_processor(span_processor)
tracer = tracer_provider.get_tracer(__name__)
with mock.patch.object(span_processor.condition, "wait") as mock_wait:
with tracer.start_span("foo"):
pass
# give some time for exporter to loop
# since wait is mocked it should return immediately
time.sleep(0.1)
mock_wait_calls = list(mock_wait.mock_calls)
# find the index of the call that processed the singular span
for idx, wait_call in enumerate(mock_wait_calls):
_, args, __ = wait_call
if args[0] <= 0:
after_calls = mock_wait_calls[idx + 1 :]
break
self.assertTrue(
all(args[0] >= 0.05 for _, args, __ in after_calls)
)
span_processor.shutdown()
def test_span_processor_accepts_parent_context(self): def test_span_processor_accepts_parent_context(self):
span_processor = mock.Mock( span_processor = mock.Mock(
wraps=datadog.DatadogExportSpanProcessor(self.exporter) wraps=datadog.DatadogExportSpanProcessor(self.exporter)

View File

@ -18,13 +18,11 @@ from opentelemetry import trace as trace_api
from opentelemetry.exporter.datadog import constants, propagator from opentelemetry.exporter.datadog import constants, propagator
from opentelemetry.sdk import trace from opentelemetry.sdk import trace
from opentelemetry.trace import get_current_span, set_span_in_context from opentelemetry.trace import get_current_span, set_span_in_context
from opentelemetry.trace.propagation.textmap import DictGetter
FORMAT = propagator.DatadogFormat() FORMAT = propagator.DatadogFormat()
carrier_getter = DictGetter()
def get_as_list(dict_object, key):
value = dict_object.get(key)
return [value] if value is not None else []
class TestDatadogFormat(unittest.TestCase): class TestDatadogFormat(unittest.TestCase):
@ -45,7 +43,7 @@ class TestDatadogFormat(unittest.TestCase):
malformed_parent_id_key = FORMAT.PARENT_ID_KEY + "-x" malformed_parent_id_key = FORMAT.PARENT_ID_KEY + "-x"
context = get_current_span( context = get_current_span(
FORMAT.extract( FORMAT.extract(
get_as_list, carrier_getter,
{ {
malformed_trace_id_key: self.serialized_trace_id, malformed_trace_id_key: self.serialized_trace_id,
malformed_parent_id_key: self.serialized_parent_id, malformed_parent_id_key: self.serialized_parent_id,
@ -63,7 +61,7 @@ class TestDatadogFormat(unittest.TestCase):
FORMAT.PARENT_ID_KEY: self.serialized_parent_id, FORMAT.PARENT_ID_KEY: self.serialized_parent_id,
} }
ctx = FORMAT.extract(get_as_list, carrier) ctx = FORMAT.extract(carrier_getter, carrier)
span_context = get_current_span(ctx).get_span_context() span_context = get_current_span(ctx).get_span_context()
self.assertEqual(span_context.trace_id, trace_api.INVALID_TRACE_ID) self.assertEqual(span_context.trace_id, trace_api.INVALID_TRACE_ID)
@ -73,7 +71,7 @@ class TestDatadogFormat(unittest.TestCase):
FORMAT.TRACE_ID_KEY: self.serialized_trace_id, FORMAT.TRACE_ID_KEY: self.serialized_trace_id,
} }
ctx = FORMAT.extract(get_as_list, carrier) ctx = FORMAT.extract(carrier_getter, carrier)
span_context = get_current_span(ctx).get_span_context() span_context = get_current_span(ctx).get_span_context()
self.assertEqual(span_context.span_id, trace_api.INVALID_SPAN_ID) self.assertEqual(span_context.span_id, trace_api.INVALID_SPAN_ID)
@ -81,7 +79,7 @@ class TestDatadogFormat(unittest.TestCase):
"""Test the propagation of Datadog headers.""" """Test the propagation of Datadog headers."""
parent_span_context = get_current_span( parent_span_context = get_current_span(
FORMAT.extract( FORMAT.extract(
get_as_list, carrier_getter,
{ {
FORMAT.TRACE_ID_KEY: self.serialized_trace_id, FORMAT.TRACE_ID_KEY: self.serialized_trace_id,
FORMAT.PARENT_ID_KEY: self.serialized_parent_id, FORMAT.PARENT_ID_KEY: self.serialized_parent_id,
@ -138,7 +136,7 @@ class TestDatadogFormat(unittest.TestCase):
"""Test sampling priority rejected.""" """Test sampling priority rejected."""
parent_span_context = get_current_span( parent_span_context = get_current_span(
FORMAT.extract( FORMAT.extract(
get_as_list, carrier_getter,
{ {
FORMAT.TRACE_ID_KEY: self.serialized_trace_id, FORMAT.TRACE_ID_KEY: self.serialized_trace_id,
FORMAT.PARENT_ID_KEY: self.serialized_parent_id, FORMAT.PARENT_ID_KEY: self.serialized_parent_id,