mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-30 21:56:07 +08:00
285 lines
10 KiB
Python
285 lines
10 KiB
Python
# stdlib
|
|
import itertools
|
|
import random
|
|
import time
|
|
|
|
from .. import api
|
|
from .. import _worker
|
|
from ..internal.logger import get_logger
|
|
from ..sampler import BasePrioritySampler
|
|
from ..settings import config
|
|
from ..vendor import monotonic
|
|
from ddtrace.vendor.six.moves.queue import Queue, Full, Empty
|
|
|
|
log = get_logger(__name__)
|
|
|
|
|
|
MAX_TRACES = 1000
|
|
|
|
DEFAULT_TIMEOUT = 5
|
|
LOG_ERR_INTERVAL = 60
|
|
|
|
|
|
class AgentWriter(_worker.PeriodicWorkerThread):
|
|
|
|
QUEUE_PROCESSING_INTERVAL = 1
|
|
|
|
def __init__(
|
|
self,
|
|
hostname="localhost",
|
|
port=8126,
|
|
uds_path=None,
|
|
https=False,
|
|
shutdown_timeout=DEFAULT_TIMEOUT,
|
|
filters=None,
|
|
sampler=None,
|
|
priority_sampler=None,
|
|
dogstatsd=None,
|
|
):
|
|
super(AgentWriter, self).__init__(
|
|
interval=self.QUEUE_PROCESSING_INTERVAL, exit_timeout=shutdown_timeout, name=self.__class__.__name__
|
|
)
|
|
self._trace_queue = Q(maxsize=MAX_TRACES)
|
|
self._filters = filters
|
|
self._sampler = sampler
|
|
self._priority_sampler = priority_sampler
|
|
self._last_error_ts = 0
|
|
self.dogstatsd = dogstatsd
|
|
self.api = api.API(
|
|
hostname, port, uds_path=uds_path, https=https, priority_sampling=priority_sampler is not None
|
|
)
|
|
if hasattr(time, "thread_time"):
|
|
self._last_thread_time = time.thread_time()
|
|
self.start()
|
|
|
|
def recreate(self):
|
|
""" Create a new instance of :class:`AgentWriter` using the same settings from this instance
|
|
|
|
:rtype: :class:`AgentWriter`
|
|
:returns: A new :class:`AgentWriter` instance
|
|
"""
|
|
writer = self.__class__(
|
|
hostname=self.api.hostname,
|
|
port=self.api.port,
|
|
uds_path=self.api.uds_path,
|
|
https=self.api.https,
|
|
shutdown_timeout=self.exit_timeout,
|
|
filters=self._filters,
|
|
priority_sampler=self._priority_sampler,
|
|
dogstatsd=self.dogstatsd,
|
|
)
|
|
return writer
|
|
|
|
@property
|
|
def _send_stats(self):
|
|
"""Determine if we're sending stats or not."""
|
|
return bool(config.health_metrics_enabled and self.dogstatsd)
|
|
|
|
def write(self, spans=None, services=None):
|
|
if spans:
|
|
self._trace_queue.put(spans)
|
|
|
|
def flush_queue(self):
|
|
try:
|
|
traces = self._trace_queue.get(block=False)
|
|
except Empty:
|
|
return
|
|
|
|
if self._send_stats:
|
|
traces_queue_length = len(traces)
|
|
traces_queue_spans = sum(map(len, traces))
|
|
|
|
# Before sending the traces, make them go through the
|
|
# filters
|
|
try:
|
|
traces = self._apply_filters(traces)
|
|
except Exception:
|
|
log.error("error while filtering traces", exc_info=True)
|
|
return
|
|
|
|
if self._send_stats:
|
|
traces_filtered = len(traces) - traces_queue_length
|
|
|
|
# If we have data, let's try to send it.
|
|
traces_responses = self.api.send_traces(traces)
|
|
for response in traces_responses:
|
|
if isinstance(response, Exception) or response.status >= 400:
|
|
self._log_error_status(response)
|
|
elif self._priority_sampler or isinstance(self._sampler, BasePrioritySampler):
|
|
result_traces_json = response.get_json()
|
|
if result_traces_json and "rate_by_service" in result_traces_json:
|
|
if self._priority_sampler:
|
|
self._priority_sampler.update_rate_by_service_sample_rates(
|
|
result_traces_json["rate_by_service"],
|
|
)
|
|
if isinstance(self._sampler, BasePrioritySampler):
|
|
self._sampler.update_rate_by_service_sample_rates(result_traces_json["rate_by_service"],)
|
|
|
|
# Dump statistics
|
|
# NOTE: Do not use the buffering of dogstatsd as it's not thread-safe
|
|
# https://github.com/DataDog/datadogpy/issues/439
|
|
if self._send_stats:
|
|
# Statistics about the queue length, size and number of spans
|
|
self.dogstatsd.increment("datadog.tracer.flushes")
|
|
self._histogram_with_total("datadog.tracer.flush.traces", traces_queue_length)
|
|
self._histogram_with_total("datadog.tracer.flush.spans", traces_queue_spans)
|
|
|
|
# Statistics about the filtering
|
|
self._histogram_with_total("datadog.tracer.flush.traces_filtered", traces_filtered)
|
|
|
|
# Statistics about API
|
|
self._histogram_with_total("datadog.tracer.api.requests", len(traces_responses))
|
|
|
|
self._histogram_with_total(
|
|
"datadog.tracer.api.errors", len(list(t for t in traces_responses if isinstance(t, Exception)))
|
|
)
|
|
for status, grouped_responses in itertools.groupby(
|
|
sorted((t for t in traces_responses if not isinstance(t, Exception)), key=lambda r: r.status),
|
|
key=lambda r: r.status,
|
|
):
|
|
self._histogram_with_total(
|
|
"datadog.tracer.api.responses", len(list(grouped_responses)), tags=["status:%d" % status]
|
|
)
|
|
|
|
# Statistics about the writer thread
|
|
if hasattr(time, "thread_time"):
|
|
new_thread_time = time.thread_time()
|
|
diff = new_thread_time - self._last_thread_time
|
|
self._last_thread_time = new_thread_time
|
|
self.dogstatsd.histogram("datadog.tracer.writer.cpu_time", diff)
|
|
|
|
def _histogram_with_total(self, name, value, tags=None):
|
|
"""Helper to add metric as a histogram and with a `.total` counter"""
|
|
self.dogstatsd.histogram(name, value, tags=tags)
|
|
self.dogstatsd.increment("%s.total" % (name,), value, tags=tags)
|
|
|
|
def run_periodic(self):
|
|
if self._send_stats:
|
|
self.dogstatsd.gauge("datadog.tracer.heartbeat", 1)
|
|
|
|
try:
|
|
self.flush_queue()
|
|
finally:
|
|
if not self._send_stats:
|
|
return
|
|
|
|
# Statistics about the rate at which spans are inserted in the queue
|
|
dropped, enqueued, enqueued_lengths = self._trace_queue.reset_stats()
|
|
self.dogstatsd.gauge("datadog.tracer.queue.max_length", self._trace_queue.maxsize)
|
|
self.dogstatsd.increment("datadog.tracer.queue.dropped.traces", dropped)
|
|
self.dogstatsd.increment("datadog.tracer.queue.enqueued.traces", enqueued)
|
|
self.dogstatsd.increment("datadog.tracer.queue.enqueued.spans", enqueued_lengths)
|
|
|
|
def on_shutdown(self):
|
|
try:
|
|
self.run_periodic()
|
|
finally:
|
|
if not self._send_stats:
|
|
return
|
|
|
|
self.dogstatsd.increment("datadog.tracer.shutdown")
|
|
|
|
def _log_error_status(self, response):
|
|
log_level = log.debug
|
|
now = monotonic.monotonic()
|
|
if now > self._last_error_ts + LOG_ERR_INTERVAL:
|
|
log_level = log.error
|
|
self._last_error_ts = now
|
|
prefix = "Failed to send traces to Datadog Agent at %s: "
|
|
if isinstance(response, api.Response):
|
|
log_level(
|
|
prefix + "HTTP error status %s, reason %s, message %s",
|
|
self.api,
|
|
response.status,
|
|
response.reason,
|
|
response.msg,
|
|
)
|
|
else:
|
|
log_level(
|
|
prefix + "%s", self.api, response,
|
|
)
|
|
|
|
def _apply_filters(self, traces):
|
|
"""
|
|
Here we make each trace go through the filters configured in the
|
|
tracer. There is no need for a lock since the traces are owned by the
|
|
AgentWriter at that point.
|
|
"""
|
|
if self._filters is not None:
|
|
filtered_traces = []
|
|
for trace in traces:
|
|
for filtr in self._filters:
|
|
trace = filtr.process_trace(trace)
|
|
if trace is None:
|
|
break
|
|
if trace is not None:
|
|
filtered_traces.append(trace)
|
|
return filtered_traces
|
|
return traces
|
|
|
|
|
|
class Q(Queue):
|
|
"""
|
|
Q is a threadsafe queue that let's you pop everything at once and
|
|
will randomly overwrite elements when it's over the max size.
|
|
|
|
This queue also exposes some statistics about its length, the number of items dropped, etc.
|
|
"""
|
|
|
|
def __init__(self, maxsize=0):
|
|
# Cannot use super() here because Queue in Python2 is old style class
|
|
Queue.__init__(self, maxsize)
|
|
# Number of item dropped (queue full)
|
|
self.dropped = 0
|
|
# Number of items accepted
|
|
self.accepted = 0
|
|
# Cumulative length of accepted items
|
|
self.accepted_lengths = 0
|
|
|
|
def put(self, item):
|
|
try:
|
|
# Cannot use super() here because Queue in Python2 is old style class
|
|
Queue.put(self, item, block=False)
|
|
except Full:
|
|
# If the queue is full, replace a random item. We need to make sure
|
|
# the queue is not emptied was emptied in the meantime, so we lock
|
|
# check qsize value.
|
|
with self.mutex:
|
|
qsize = self._qsize()
|
|
if qsize != 0:
|
|
idx = random.randrange(0, qsize)
|
|
self.queue[idx] = item
|
|
log.warning("Writer queue is full has more than %d traces, some traces will be lost", self.maxsize)
|
|
self.dropped += 1
|
|
self._update_stats(item)
|
|
return
|
|
# The queue has been emptied, simply retry putting item
|
|
return self.put(item)
|
|
else:
|
|
with self.mutex:
|
|
self._update_stats(item)
|
|
|
|
def _update_stats(self, item):
|
|
# self.mutex needs to be locked to make sure we don't lose data when resetting
|
|
self.accepted += 1
|
|
if hasattr(item, "__len__"):
|
|
item_length = len(item)
|
|
else:
|
|
item_length = 1
|
|
self.accepted_lengths += item_length
|
|
|
|
def reset_stats(self):
|
|
"""Reset the stats to 0.
|
|
|
|
:return: The current value of dropped, accepted and accepted_lengths.
|
|
"""
|
|
with self.mutex:
|
|
dropped, accepted, accepted_lengths = (self.dropped, self.accepted, self.accepted_lengths)
|
|
self.dropped, self.accepted, self.accepted_lengths = 0, 0, 0
|
|
return dropped, accepted, accepted_lengths
|
|
|
|
def _get(self):
|
|
things = self.queue
|
|
self._init(self.maxsize)
|
|
return things
|