mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-28 12:43:39 +08:00
Update prom rw exporter (#1359)
This commit is contained in:
1
.flake8
1
.flake8
@ -16,6 +16,7 @@ exclude =
|
||||
target
|
||||
__pycache__
|
||||
exporter/opentelemetry-exporter-jaeger/src/opentelemetry/exporter/jaeger/gen/
|
||||
exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/gen/
|
||||
exporter/opentelemetry-exporter-jaeger/build/*
|
||||
docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/
|
||||
docs/examples/opentelemetry-example-app/build/*
|
||||
|
@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
([#1413](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1413))
|
||||
- `opentelemetry-instrumentation-pyramid` Add support for regular expression matching and sanitization of HTTP headers.
|
||||
([#1414](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1414))
|
||||
- Add metric exporter for Prometheus Remote Write
|
||||
([#1359](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1359))
|
||||
|
||||
### Fixed
|
||||
|
||||
@ -62,6 +64,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Add metric instrumentation in starlette
|
||||
([#1327](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1327))
|
||||
|
||||
|
||||
### Fixed
|
||||
|
||||
- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies.
|
||||
|
@ -0,0 +1,29 @@
|
||||
OpenTelemetry Prometheus Remote Write Exporter
|
||||
==============================================
|
||||
|
||||
|pypi|
|
||||
|
||||
.. |pypi| image:: https://badge.fury.io/py/opentelemetry-exporter-prometheus-remote-write.svg
|
||||
:target: https://pypi.org/project/opentelemetry-exporter-prometheus-remote-write/
|
||||
|
||||
This package contains an exporter to send metrics from the OpenTelemetry Python SDK directly to a Prometheus Remote Write integrated backend
|
||||
(such as Cortex or Thanos) without having to run an instance of the Prometheus server.
|
||||
|
||||
|
||||
Installation
|
||||
------------
|
||||
|
||||
::
|
||||
|
||||
pip install opentelemetry-exporter-prometheus-remote-write
|
||||
|
||||
|
||||
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
|
||||
.. _Prometheus Remote Write integrated backend: https://prometheus.io/docs/operating/integrations/
|
||||
|
||||
|
||||
References
|
||||
----------
|
||||
|
||||
* `OpenTelemetry Project <https://opentelemetry.io/>`_
|
||||
* `Prometheus Remote Write Integration <https://prometheus.io/docs/operating/integrations/>`_
|
@ -0,0 +1,11 @@
|
||||
FROM python:3.8
|
||||
|
||||
RUN apt-get update -y && apt-get install libsnappy-dev -y
|
||||
|
||||
WORKDIR /code
|
||||
COPY . .
|
||||
|
||||
RUN pip install -e .
|
||||
RUN pip install -r ./examples/requirements.txt
|
||||
|
||||
CMD ["python", "./examples/sampleapp.py"]
|
@ -0,0 +1,42 @@
|
||||
# Prometheus Remote Write Exporter Example
|
||||
This example uses [Docker Compose](https://docs.docker.com/compose/) to set up:
|
||||
|
||||
1. A Python program that creates 5 instruments with 5 unique
|
||||
aggregators and a randomized load generator
|
||||
2. An instance of [Cortex](https://cortexmetrics.io/) to receive the metrics
|
||||
data
|
||||
3. An instance of [Grafana](https://grafana.com/) to visualizse the exported
|
||||
data
|
||||
|
||||
## Requirements
|
||||
* Have Docker Compose [installed](https://docs.docker.com/compose/install/)
|
||||
|
||||
*Users do not need to install Python as the app will be run in the Docker Container*
|
||||
|
||||
## Instructions
|
||||
1. Run `docker-compose up -d` in the the `examples/` directory
|
||||
|
||||
The `-d` flag causes all services to run in detached mode and frees up your
|
||||
terminal session. This also causes no logs to show up. Users can attach themselves to the service's logs manually using `docker logs ${CONTAINER_ID} --follow`
|
||||
|
||||
2. Log into the Grafana instance at [http://localhost:3000](http://localhost:3000)
|
||||
* login credentials are `username: admin` and `password: admin`
|
||||
* There may be an additional screen on setting a new password. This can be skipped and is optional
|
||||
|
||||
3. Navigate to the `Data Sources` page
|
||||
* Look for a gear icon on the left sidebar and select `Data Sources`
|
||||
|
||||
4. Add a new Prometheus Data Source
|
||||
* Use `http://cortex:9009/api/prom` as the URL
|
||||
* (OPTIONAl) set the scrape interval to `2s` to make updates appear quickly
|
||||
* click `Save & Test`
|
||||
|
||||
5. Go to `Metrics Explore` to query metrics
|
||||
* Look for a compass icon on the left sidebar
|
||||
* click `Metrics` for a dropdown list of all the available metrics
|
||||
* (OPTIONAL) Adjust time range by clicking the `Last 6 hours` button on the upper right side of the graph
|
||||
* (OPTIONAL) Set up auto-refresh by selecting an option under the dropdown next to the refresh button on the upper right side of the graph
|
||||
* Click the refresh button and data should show up on the graph
|
||||
|
||||
6. Shutdown the services when finished
|
||||
* Run `docker-compose down` in the examples directory
|
@ -0,0 +1,101 @@
|
||||
# This Cortex Config is copied from the Cortex Project documentation
|
||||
# Source: https://github.com/cortexproject/cortex/blob/master/docs/configuration/single-process-config.yaml
|
||||
|
||||
# Configuration for running Cortex in single-process mode.
|
||||
# This configuration should not be used in production.
|
||||
# It is only for getting started and development.
|
||||
|
||||
# Disable the requirement that every request to Cortex has a
|
||||
# X-Scope-OrgID header. `fake` will be substituted in instead.
|
||||
# pylint: skip-file
|
||||
auth_enabled: false
|
||||
|
||||
server:
|
||||
http_listen_port: 9009
|
||||
|
||||
# Configure the server to allow messages up to 100MB.
|
||||
grpc_server_max_recv_msg_size: 104857600
|
||||
grpc_server_max_send_msg_size: 104857600
|
||||
grpc_server_max_concurrent_streams: 1000
|
||||
|
||||
distributor:
|
||||
shard_by_all_labels: true
|
||||
pool:
|
||||
health_check_ingesters: true
|
||||
|
||||
ingester_client:
|
||||
grpc_client_config:
|
||||
# Configure the client to allow messages up to 100MB.
|
||||
max_recv_msg_size: 104857600
|
||||
max_send_msg_size: 104857600
|
||||
use_gzip_compression: true
|
||||
|
||||
ingester:
|
||||
# We want our ingesters to flush chunks at the same time to optimise
|
||||
# deduplication opportunities.
|
||||
spread_flushes: true
|
||||
chunk_age_jitter: 0
|
||||
|
||||
walconfig:
|
||||
wal_enabled: true
|
||||
recover_from_wal: true
|
||||
wal_dir: /tmp/cortex/wal
|
||||
|
||||
lifecycler:
|
||||
# The address to advertise for this ingester. Will be autodiscovered by
|
||||
# looking up address on eth0 or en0; can be specified if this fails.
|
||||
# address: 127.0.0.1
|
||||
|
||||
# We want to start immediately and flush on shutdown.
|
||||
join_after: 0
|
||||
min_ready_duration: 0s
|
||||
final_sleep: 0s
|
||||
num_tokens: 512
|
||||
tokens_file_path: /tmp/cortex/wal/tokens
|
||||
|
||||
# Use an in memory ring store, so we don't need to launch a Consul.
|
||||
ring:
|
||||
kvstore:
|
||||
store: inmemory
|
||||
replication_factor: 1
|
||||
|
||||
# Use local storage - BoltDB for the index, and the filesystem
|
||||
# for the chunks.
|
||||
schema:
|
||||
configs:
|
||||
- from: 2019-07-29
|
||||
store: boltdb
|
||||
object_store: filesystem
|
||||
schema: v10
|
||||
index:
|
||||
prefix: index_
|
||||
period: 1w
|
||||
|
||||
storage:
|
||||
boltdb:
|
||||
directory: /tmp/cortex/index
|
||||
|
||||
filesystem:
|
||||
directory: /tmp/cortex/chunks
|
||||
|
||||
delete_store:
|
||||
store: boltdb
|
||||
|
||||
purger:
|
||||
object_store_type: filesystem
|
||||
|
||||
frontend_worker:
|
||||
# Configure the frontend worker in the querier to match worker count
|
||||
# to max_concurrent on the queriers.
|
||||
match_max_concurrent: true
|
||||
|
||||
# Configure the ruler to scan the /tmp/cortex/rules directory for prometheus
|
||||
# rules: https://prometheus.io/docs/prometheus/latest/configuration/recording_rules/#recording-rules
|
||||
ruler:
|
||||
enable_api: true
|
||||
enable_sharding: false
|
||||
storage:
|
||||
type: local
|
||||
local:
|
||||
directory: /tmp/cortex/rules
|
||||
|
@ -0,0 +1,33 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
version: "3.8"
|
||||
|
||||
services:
|
||||
cortex:
|
||||
image: quay.io/cortexproject/cortex:v1.5.0
|
||||
command:
|
||||
- -config.file=./config/cortex-config.yml
|
||||
volumes:
|
||||
- ./cortex-config.yml:/config/cortex-config.yml:ro
|
||||
ports:
|
||||
- 9009:9009
|
||||
grafana:
|
||||
image: grafana/grafana:latest
|
||||
ports:
|
||||
- 3000:3000
|
||||
sample_app:
|
||||
build:
|
||||
context: ../
|
||||
dockerfile: ./examples/Dockerfile
|
@ -0,0 +1,7 @@
|
||||
psutil
|
||||
protobuf>=3.13.0
|
||||
requests>=2.25.0
|
||||
python-snappy>=0.5.4
|
||||
opentelemetry-api
|
||||
opentelemetry-sdk
|
||||
opentelemetry-proto
|
@ -0,0 +1,114 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
from logging import INFO
|
||||
|
||||
import psutil
|
||||
|
||||
from opentelemetry import metrics
|
||||
from opentelemetry.exporter.prometheus_remote_write import (
|
||||
PrometheusRemoteWriteMetricsExporter,
|
||||
)
|
||||
from opentelemetry.metrics import Observation
|
||||
from opentelemetry.sdk.metrics import MeterProvider
|
||||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||||
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
testing_labels = {"environment": "testing"}
|
||||
|
||||
exporter = PrometheusRemoteWriteMetricsExporter(
|
||||
endpoint="http://cortex:9009/api/prom/push",
|
||||
headers={"X-Scope-Org-ID": "5"},
|
||||
)
|
||||
reader = PeriodicExportingMetricReader(exporter, 1000)
|
||||
provider = MeterProvider(metric_readers=[reader])
|
||||
metrics.set_meter_provider(provider)
|
||||
meter = metrics.get_meter(__name__)
|
||||
|
||||
|
||||
# Callback to gather cpu usage
|
||||
def get_cpu_usage_callback(observer):
|
||||
for (number, percent) in enumerate(psutil.cpu_percent(percpu=True)):
|
||||
labels = {"cpu_number": str(number)}
|
||||
yield Observation(percent, labels)
|
||||
|
||||
|
||||
# Callback to gather RAM usage
|
||||
def get_ram_usage_callback(observer):
|
||||
ram_percent = psutil.virtual_memory().percent
|
||||
yield Observation(ram_percent, {})
|
||||
|
||||
|
||||
requests_counter = meter.create_counter(
|
||||
name="requests",
|
||||
description="number of requests",
|
||||
unit="1",
|
||||
)
|
||||
|
||||
request_min_max = meter.create_counter(
|
||||
name="requests_min_max",
|
||||
description="min max sum count of requests",
|
||||
unit="1",
|
||||
)
|
||||
|
||||
request_last_value = meter.create_counter(
|
||||
name="requests_last_value",
|
||||
description="last value number of requests",
|
||||
unit="1",
|
||||
)
|
||||
|
||||
requests_active = meter.create_up_down_counter(
|
||||
name="requests_active",
|
||||
description="number of active requests",
|
||||
unit="1",
|
||||
)
|
||||
|
||||
meter.create_observable_counter(
|
||||
callbacks=[get_ram_usage_callback],
|
||||
name="ram_usage",
|
||||
description="ram usage",
|
||||
unit="1",
|
||||
)
|
||||
|
||||
meter.create_observable_up_down_counter(
|
||||
callbacks=[get_cpu_usage_callback],
|
||||
name="cpu_percent",
|
||||
description="per-cpu usage",
|
||||
unit="1",
|
||||
)
|
||||
|
||||
request_latency = meter.create_histogram("request_latency")
|
||||
|
||||
# Load generator
|
||||
num = random.randint(0, 1000)
|
||||
while True:
|
||||
# counters
|
||||
requests_counter.add(num % 131 + 200, testing_labels)
|
||||
request_min_max.add(num % 181 + 200, testing_labels)
|
||||
request_last_value.add(num % 101 + 200, testing_labels)
|
||||
|
||||
# updown counter
|
||||
requests_active.add(num % 7231 + 200, testing_labels)
|
||||
|
||||
request_latency.record(num % 92, testing_labels)
|
||||
logger.log(level=INFO, msg="completed metrics collection cycle")
|
||||
time.sleep(1)
|
||||
num += 9791
|
1
exporter/opentelemetry-exporter-prometheus-remote-write/proto/.gitignore
vendored
Normal file
1
exporter/opentelemetry-exporter-prometheus-remote-write/proto/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
opentelemetry
|
@ -0,0 +1,3 @@
|
||||
## Instructions
|
||||
1. Install protobuf tools. Can use your package manager or download from [GitHub](https://github.com/protocolbuffers/protobuf/releases/tag/v21.7)
|
||||
2. Run `generate-proto-py.sh` from inside the `proto/` directory
|
@ -0,0 +1,57 @@
|
||||
#!/bin/bash
|
||||
|
||||
PROM_VERSION=v2.39.0
|
||||
PROTO_VERSION=v1.3.2
|
||||
|
||||
# SRC_DIR is from protoc perspective. ie its the destination for our checkouts/clones
|
||||
SRC_DIR=opentelemetry/exporter/prometheus_remote_write/gen/
|
||||
DST_DIR=../src/opentelemetry/exporter/prometheus_remote_write/gen/
|
||||
|
||||
#TODO:
|
||||
# Check that black & protoc are installed properly
|
||||
echo "Creating our destination directory"
|
||||
mkdir -p ${SRC_DIR}/gogoproto
|
||||
|
||||
# Clone prometheus
|
||||
echo "Grabbing Prometheus protobuf files"
|
||||
git clone --filter=blob:none --sparse https://github.com/prometheus/prometheus.git
|
||||
cd prometheus
|
||||
git checkout ${PROM_VERSION}
|
||||
git sparse-checkout set prompb
|
||||
cd ..
|
||||
|
||||
|
||||
# We also need gogo.proto which is in the protobuf Repo
|
||||
# Could also try to pull this locally from the install location of protobuf
|
||||
# but that will be harder in a platform agnostic way.
|
||||
echo "Grabbing gogo.proto"
|
||||
git clone --filter=blob:none --sparse https://github.com/gogo/protobuf.git
|
||||
cd protobuf
|
||||
git checkout ${PROTO_VERSION}
|
||||
git sparse-checkout set /gogoproto/gogo.proto
|
||||
cd ..
|
||||
|
||||
# Move the proto files into our structure
|
||||
echo "Moving proto files to ${SRC_DIR}"
|
||||
cp prometheus/prompb/remote.proto prometheus/prompb/types.proto ${SRC_DIR}
|
||||
cp protobuf/gogoproto/gogo.proto ${SRC_DIR}/gogoproto/
|
||||
|
||||
|
||||
# A bit of a hack, but we need to fix the imports to fit the python structure.
|
||||
# using sed to find the 3 files and point them at each other using OUR structure
|
||||
echo "Fixing imports"
|
||||
sed -i 's/import "types.proto";/import "opentelemetry\/exporter\/prometheus_remote_write\/gen\/types.proto";/' ${SRC_DIR}/remote.proto
|
||||
sed -i 's/import "gogoproto\/gogo.proto";/import "opentelemetry\/exporter\/prometheus_remote_write\/gen\/gogoproto\/gogo.proto";/' ${SRC_DIR}/remote.proto
|
||||
sed -i 's/import "gogoproto\/gogo.proto";/import "opentelemetry\/exporter\/prometheus_remote_write\/gen\/gogoproto\/gogo.proto";/' ${SRC_DIR}/types.proto
|
||||
|
||||
|
||||
# Cleanup the repos
|
||||
echo "Removing clones..."
|
||||
rm -rf protobuf prometheus
|
||||
|
||||
# Used libprotoc 3.21.1 & protoc 21.7
|
||||
echo "Compiling proto files to Python"
|
||||
protoc -I . --python_out=../src ${SRC_DIR}/gogoproto/gogo.proto ${SRC_DIR}/remote.proto ${SRC_DIR}/types.proto
|
||||
|
||||
echo "Running formatting on the generated files"
|
||||
../../../scripts/eachdist.py format --path $PWD/..
|
@ -0,0 +1,51 @@
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
|
||||
[project]
|
||||
name = "opentelemetry-exporter-prometheus-remote-write"
|
||||
dynamic = ["version"]
|
||||
description = "Prometheus Remote Write Metrics Exporter for OpenTelemetry"
|
||||
readme = "README.rst"
|
||||
license = "Apache-2.0"
|
||||
requires-python = ">=3.7"
|
||||
authors = [
|
||||
{ name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" },
|
||||
]
|
||||
classifiers = [
|
||||
"Development Status :: 4 - Beta",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: Apache Software License",
|
||||
"Programming Language :: Python",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
]
|
||||
dependencies = [
|
||||
"protobuf ~= 4.21",
|
||||
"requests ~= 2.28",
|
||||
"opentelemetry-api ~= 1.12",
|
||||
"opentelemetry-sdk ~= 1.12",
|
||||
"python-snappy ~= 0.6",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
test = []
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/exporter/opentelemetry-exporter-prometheus-remote-write"
|
||||
|
||||
[tool.hatch.version]
|
||||
path = "src/opentelemetry/exporter/prometheus_remote_write/version.py"
|
||||
|
||||
[tool.hatch.build.targets.sdist]
|
||||
include = [
|
||||
"/src",
|
||||
"/tests",
|
||||
]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/opentelemetry"]
|
@ -0,0 +1,414 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from itertools import chain
|
||||
from typing import Dict, Sequence
|
||||
|
||||
import requests
|
||||
import snappy
|
||||
|
||||
from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( # pylint: disable=no-name-in-module
|
||||
WriteRequest,
|
||||
)
|
||||
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import ( # pylint: disable=no-name-in-module
|
||||
Label,
|
||||
Sample,
|
||||
TimeSeries,
|
||||
)
|
||||
from opentelemetry.sdk.metrics import Counter
|
||||
from opentelemetry.sdk.metrics import Histogram as ClientHistogram
|
||||
from opentelemetry.sdk.metrics import (
|
||||
ObservableCounter,
|
||||
ObservableGauge,
|
||||
ObservableUpDownCounter,
|
||||
UpDownCounter,
|
||||
)
|
||||
from opentelemetry.sdk.metrics.export import (
|
||||
AggregationTemporality,
|
||||
Gauge,
|
||||
Histogram,
|
||||
Metric,
|
||||
MetricExporter,
|
||||
MetricExportResult,
|
||||
MetricsData,
|
||||
Sum,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PROMETHEUS_NAME_REGEX = re.compile(r"^\d|[^\w:]")
|
||||
PROMETHEUS_LABEL_REGEX = re.compile(r"^\d|[^\w]")
|
||||
UNDERSCORE_REGEX = re.compile(r"_+")
|
||||
|
||||
|
||||
class PrometheusRemoteWriteMetricsExporter(MetricExporter):
|
||||
"""
|
||||
Prometheus remote write metric exporter for OpenTelemetry.
|
||||
|
||||
Args:
|
||||
endpoint: url where data will be sent (Required)
|
||||
basic_auth: username and password for authentication (Optional)
|
||||
headers: additional headers for remote write request (Optional)
|
||||
timeout: timeout for remote write requests in seconds, defaults to 30 (Optional)
|
||||
proxies: dict mapping request proxy protocols to proxy urls (Optional)
|
||||
tls_config: configuration for remote write TLS settings (Optional)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
endpoint: str,
|
||||
basic_auth: Dict = None,
|
||||
headers: Dict = None,
|
||||
timeout: int = 30,
|
||||
tls_config: Dict = None,
|
||||
proxies: Dict = None,
|
||||
resources_as_labels: bool = True,
|
||||
preferred_temporality: Dict[type, AggregationTemporality] = None,
|
||||
preferred_aggregation: Dict = None,
|
||||
):
|
||||
self.endpoint = endpoint
|
||||
self.basic_auth = basic_auth
|
||||
self.headers = headers
|
||||
self.timeout = timeout
|
||||
self.tls_config = tls_config
|
||||
self.proxies = proxies
|
||||
self.resources_as_labels = resources_as_labels
|
||||
|
||||
if not preferred_temporality:
|
||||
preferred_temporality = {
|
||||
Counter: AggregationTemporality.CUMULATIVE,
|
||||
UpDownCounter: AggregationTemporality.CUMULATIVE,
|
||||
ClientHistogram: AggregationTemporality.CUMULATIVE,
|
||||
ObservableCounter: AggregationTemporality.CUMULATIVE,
|
||||
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
|
||||
ObservableGauge: AggregationTemporality.CUMULATIVE,
|
||||
}
|
||||
|
||||
super().__init__(preferred_temporality, preferred_aggregation)
|
||||
|
||||
@property
|
||||
def endpoint(self):
|
||||
return self._endpoint
|
||||
|
||||
@endpoint.setter
|
||||
def endpoint(self, endpoint: str):
|
||||
if endpoint == "":
|
||||
raise ValueError("endpoint required")
|
||||
self._endpoint = endpoint
|
||||
|
||||
@property
|
||||
def basic_auth(self):
|
||||
return self._basic_auth
|
||||
|
||||
@basic_auth.setter
|
||||
def basic_auth(self, basic_auth: Dict):
|
||||
if basic_auth:
|
||||
if "username" not in basic_auth:
|
||||
raise ValueError("username required in basic_auth")
|
||||
if "password_file" in basic_auth:
|
||||
if "password" in basic_auth:
|
||||
raise ValueError(
|
||||
"basic_auth cannot contain password and password_file"
|
||||
)
|
||||
with open( # pylint: disable=unspecified-encoding
|
||||
basic_auth["password_file"]
|
||||
) as file:
|
||||
basic_auth["password"] = file.readline().strip()
|
||||
elif "password" not in basic_auth:
|
||||
raise ValueError("password required in basic_auth")
|
||||
self._basic_auth = basic_auth
|
||||
|
||||
@property
|
||||
def timeout(self):
|
||||
return self._timeout
|
||||
|
||||
@timeout.setter
|
||||
def timeout(self, timeout: int):
|
||||
if timeout <= 0:
|
||||
raise ValueError("timeout must be greater than 0")
|
||||
self._timeout = timeout
|
||||
|
||||
@property
|
||||
def tls_config(self):
|
||||
return self._tls_config
|
||||
|
||||
@tls_config.setter
|
||||
def tls_config(self, tls_config: Dict):
|
||||
if tls_config:
|
||||
new_config = {}
|
||||
if "ca_file" in tls_config:
|
||||
new_config["ca_file"] = tls_config["ca_file"]
|
||||
if "cert_file" in tls_config and "key_file" in tls_config:
|
||||
new_config["cert_file"] = tls_config["cert_file"]
|
||||
new_config["key_file"] = tls_config["key_file"]
|
||||
elif "cert_file" in tls_config or "key_file" in tls_config:
|
||||
raise ValueError(
|
||||
"tls_config requires both cert_file and key_file"
|
||||
)
|
||||
if "insecure_skip_verify" in tls_config:
|
||||
new_config["insecure_skip_verify"] = tls_config[
|
||||
"insecure_skip_verify"
|
||||
]
|
||||
self._tls_config = tls_config
|
||||
|
||||
@property
|
||||
def proxies(self):
|
||||
return self._proxies
|
||||
|
||||
@proxies.setter
|
||||
def proxies(self, proxies: Dict):
|
||||
self._proxies = proxies
|
||||
|
||||
@property
|
||||
def headers(self):
|
||||
return self._headers
|
||||
|
||||
@headers.setter
|
||||
def headers(self, headers: Dict):
|
||||
self._headers = headers
|
||||
|
||||
def export(
|
||||
self,
|
||||
metrics_data: MetricsData,
|
||||
timeout_millis: float = 10_000,
|
||||
**kwargs,
|
||||
) -> MetricExportResult:
|
||||
if not metrics_data:
|
||||
return MetricExportResult.SUCCESS
|
||||
timeseries = self._translate_data(metrics_data)
|
||||
if not timeseries:
|
||||
logger.error(
|
||||
"All records contain unsupported aggregators, export aborted"
|
||||
)
|
||||
return MetricExportResult.FAILURE
|
||||
message = self._build_message(timeseries)
|
||||
headers = self._build_headers()
|
||||
return self._send_message(message, headers)
|
||||
|
||||
def _translate_data(self, data: MetricsData) -> Sequence[TimeSeries]:
|
||||
rw_timeseries = []
|
||||
|
||||
for resource_metrics in data.resource_metrics:
|
||||
resource = resource_metrics.resource
|
||||
# OTLP Data model suggests combining some attrs into job/instance
|
||||
# Should we do that here?
|
||||
if self.resources_as_labels:
|
||||
resource_labels = [
|
||||
(n, str(v)) for n, v in resource.attributes.items()
|
||||
]
|
||||
else:
|
||||
resource_labels = []
|
||||
# Scope name/version probably not too useful from a labeling perspective
|
||||
for scope_metrics in resource_metrics.scope_metrics:
|
||||
for metric in scope_metrics.metrics:
|
||||
rw_timeseries.extend(
|
||||
self._parse_metric(metric, resource_labels)
|
||||
)
|
||||
return rw_timeseries
|
||||
|
||||
def _parse_metric(
|
||||
self, metric: Metric, resource_labels: Sequence
|
||||
) -> Sequence[TimeSeries]:
|
||||
"""
|
||||
Parses the Metric & lower objects, then converts the output into
|
||||
OM TimeSeries. Returns a List of TimeSeries objects based on one Metric
|
||||
"""
|
||||
|
||||
# Create the metric name, will be a label later
|
||||
if metric.unit:
|
||||
# Prom. naming guidelines add unit to the name
|
||||
name = f"{metric.name}_{metric.unit}"
|
||||
else:
|
||||
name = metric.name
|
||||
|
||||
# datapoints have attributes associated with them. these would be sent
|
||||
# to RW as different metrics: name & labels is a unique time series
|
||||
sample_sets = defaultdict(list)
|
||||
if isinstance(metric.data, (Gauge, Sum)):
|
||||
for dp in metric.data.data_points:
|
||||
attrs, sample = self._parse_data_point(dp, name)
|
||||
sample_sets[attrs].append(sample)
|
||||
elif isinstance(metric.data, Histogram):
|
||||
for dp in metric.data.data_points:
|
||||
dp_result = self._parse_histogram_data_point(dp, name)
|
||||
for attrs, sample in dp_result:
|
||||
sample_sets[attrs].append(sample)
|
||||
else:
|
||||
logger.warning("Unsupported Metric Type: %s", type(metric.data))
|
||||
return []
|
||||
return self._convert_to_timeseries(sample_sets, resource_labels)
|
||||
|
||||
def _convert_to_timeseries(
|
||||
self, sample_sets: Sequence[tuple], resource_labels: Sequence
|
||||
) -> Sequence[TimeSeries]:
|
||||
timeseries = []
|
||||
for labels, samples in sample_sets.items():
|
||||
ts = TimeSeries()
|
||||
for label_name, label_value in chain(resource_labels, labels):
|
||||
# Previous implementation did not str() the names...
|
||||
ts.labels.append(self._label(label_name, str(label_value)))
|
||||
for value, timestamp in samples:
|
||||
ts.samples.append(self._sample(value, timestamp))
|
||||
timeseries.append(ts)
|
||||
return timeseries
|
||||
|
||||
@staticmethod
|
||||
def _sample(value: int, timestamp: int) -> Sample:
|
||||
sample = Sample()
|
||||
sample.value = value
|
||||
sample.timestamp = timestamp
|
||||
return sample
|
||||
|
||||
def _label(self, name: str, value: str) -> Label:
|
||||
label = Label()
|
||||
label.name = self._sanitize_string(name, "label")
|
||||
label.value = value
|
||||
return label
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_string(string: str, type_: str) -> str:
|
||||
# I Think Prometheus requires names to NOT start with a number this
|
||||
# would not catch that, but do cover the other cases. The naming rules
|
||||
# don't explicit say this, but the supplied regex implies it.
|
||||
# Got a little weird trying to do substitution with it, but can be
|
||||
# fixed if we allow numeric beginnings to metric names
|
||||
if type_ == "name":
|
||||
sanitized = PROMETHEUS_NAME_REGEX.sub("_", string)
|
||||
elif type_ == "label":
|
||||
sanitized = PROMETHEUS_LABEL_REGEX.sub("_", string)
|
||||
else:
|
||||
raise TypeError(f"Unsupported string type: {type_}")
|
||||
|
||||
# Remove consecutive underscores
|
||||
# TODO: Unfortunately this clobbbers __name__
|
||||
# sanitized = UNDERSCORE_REGEX.sub("_",sanitized)
|
||||
|
||||
return sanitized
|
||||
|
||||
def _parse_histogram_data_point(self, data_point, name):
|
||||
|
||||
sample_attr_pairs = []
|
||||
|
||||
base_attrs = list(data_point.attributes.items())
|
||||
timestamp = data_point.time_unix_nano // 1_000_000
|
||||
|
||||
def handle_bucket(value, bound=None, name_override=None):
|
||||
# Metric Level attributes + the bucket boundary attribute + name
|
||||
ts_attrs = base_attrs.copy()
|
||||
ts_attrs.append(
|
||||
(
|
||||
"__name__",
|
||||
self._sanitize_string(name_override or name, "name"),
|
||||
)
|
||||
)
|
||||
if bound:
|
||||
ts_attrs.append(("le", str(bound)))
|
||||
# Value is count of values in each bucket
|
||||
ts_sample = (value, timestamp)
|
||||
return tuple(ts_attrs), ts_sample
|
||||
|
||||
for bound_pos, bound in enumerate(data_point.explicit_bounds):
|
||||
sample_attr_pairs.append(
|
||||
handle_bucket(data_point.bucket_counts[bound_pos], bound)
|
||||
)
|
||||
|
||||
# Add the last label for implicit +inf bucket
|
||||
sample_attr_pairs.append(
|
||||
handle_bucket(data_point.bucket_counts[-1], bound="+Inf")
|
||||
)
|
||||
|
||||
# Lastly, add series for count & sum
|
||||
sample_attr_pairs.append(
|
||||
handle_bucket(data_point.sum, name_override=f"{name}_sum")
|
||||
)
|
||||
sample_attr_pairs.append(
|
||||
handle_bucket(data_point.count, name_override=f"{name}_count")
|
||||
)
|
||||
return sample_attr_pairs
|
||||
|
||||
def _parse_data_point(self, data_point, name=None):
|
||||
|
||||
attrs = tuple(data_point.attributes.items()) + (
|
||||
("__name__", self._sanitize_string(name, "name")),
|
||||
)
|
||||
sample = (data_point.value, (data_point.time_unix_nano // 1_000_000))
|
||||
return attrs, sample
|
||||
|
||||
@staticmethod
|
||||
def _build_message(timeseries: Sequence[TimeSeries]) -> bytes:
|
||||
write_request = WriteRequest()
|
||||
write_request.timeseries.extend(timeseries)
|
||||
serialized_message = write_request.SerializeToString()
|
||||
return snappy.compress(serialized_message)
|
||||
|
||||
def _build_headers(self) -> Dict:
|
||||
headers = {
|
||||
"Content-Encoding": "snappy",
|
||||
"Content-Type": "application/x-protobuf",
|
||||
"X-Prometheus-Remote-Write-Version": "0.1.0",
|
||||
}
|
||||
if self.headers:
|
||||
for header_name, header_value in self.headers.items():
|
||||
headers[header_name] = header_value
|
||||
return headers
|
||||
|
||||
def _send_message(
|
||||
self, message: bytes, headers: Dict
|
||||
) -> MetricExportResult:
|
||||
auth = None
|
||||
if self.basic_auth:
|
||||
auth = (self.basic_auth["username"], self.basic_auth["password"])
|
||||
|
||||
cert = None
|
||||
verify = True
|
||||
if self.tls_config:
|
||||
if "ca_file" in self.tls_config:
|
||||
verify = self.tls_config["ca_file"]
|
||||
elif "insecure_skip_verify" in self.tls_config:
|
||||
verify = self.tls_config["insecure_skip_verify"]
|
||||
|
||||
if (
|
||||
"cert_file" in self.tls_config
|
||||
and "key_file" in self.tls_config
|
||||
):
|
||||
cert = (
|
||||
self.tls_config["cert_file"],
|
||||
self.tls_config["key_file"],
|
||||
)
|
||||
try:
|
||||
response = requests.post(
|
||||
self.endpoint,
|
||||
data=message,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
timeout=self.timeout,
|
||||
proxies=self.proxies,
|
||||
cert=cert,
|
||||
verify=verify,
|
||||
)
|
||||
if not response.ok:
|
||||
response.raise_for_status()
|
||||
except requests.exceptions.RequestException as err:
|
||||
logger.error("Export POST request failed with reason: %s", err)
|
||||
return MetricExportResult.FAILURE
|
||||
return MetricExportResult.SUCCESS
|
||||
|
||||
def force_flush(self, timeout_millis: float = 10_000) -> bool:
|
||||
return True
|
||||
|
||||
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
|
||||
pass
|
File diff suppressed because one or more lines are too long
@ -0,0 +1,59 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# source: opentelemetry/exporter/prometheus_remote_write/gen/remote.proto
|
||||
"""Generated protocol buffer code."""
|
||||
from google.protobuf.internal import builder as _builder
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||
from google.protobuf import symbol_database as _symbol_database
|
||||
|
||||
# @@protoc_insertion_point(imports)
|
||||
|
||||
_sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
from opentelemetry.exporter.prometheus_remote_write.gen import (
|
||||
types_pb2 as opentelemetry_dot_exporter_dot_prometheus__remote__write_dot_gen_dot_types__pb2,
|
||||
)
|
||||
from opentelemetry.exporter.prometheus_remote_write.gen.gogoproto import (
|
||||
gogo_pb2 as opentelemetry_dot_exporter_dot_prometheus__remote__write_dot_gen_dot_gogoproto_dot_gogo__pb2,
|
||||
)
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
|
||||
b'\n?opentelemetry/exporter/prometheus_remote_write/gen/remote.proto\x12\nprometheus\x1a>opentelemetry/exporter/prometheus_remote_write/gen/types.proto\x1aGopentelemetry/exporter/prometheus_remote_write/gen/gogoproto/gogo.proto"z\n\x0cWriteRequest\x12\x30\n\ntimeseries\x18\x01 \x03(\x0b\x32\x16.prometheus.TimeSeriesB\x04\xc8\xde\x1f\x00\x12\x32\n\x08metadata\x18\x03 \x03(\x0b\x32\x1a.prometheus.MetricMetadataB\x04\xc8\xde\x1f\x00J\x04\x08\x02\x10\x03"\xae\x01\n\x0bReadRequest\x12"\n\x07queries\x18\x01 \x03(\x0b\x32\x11.prometheus.Query\x12\x45\n\x17\x61\x63\x63\x65pted_response_types\x18\x02 \x03(\x0e\x32$.prometheus.ReadRequest.ResponseType"4\n\x0cResponseType\x12\x0b\n\x07SAMPLES\x10\x00\x12\x17\n\x13STREAMED_XOR_CHUNKS\x10\x01"8\n\x0cReadResponse\x12(\n\x07results\x18\x01 \x03(\x0b\x32\x17.prometheus.QueryResult"\x8f\x01\n\x05Query\x12\x1a\n\x12start_timestamp_ms\x18\x01 \x01(\x03\x12\x18\n\x10\x65nd_timestamp_ms\x18\x02 \x01(\x03\x12*\n\x08matchers\x18\x03 \x03(\x0b\x32\x18.prometheus.LabelMatcher\x12$\n\x05hints\x18\x04 \x01(\x0b\x32\x15.prometheus.ReadHints"9\n\x0bQueryResult\x12*\n\ntimeseries\x18\x01 \x03(\x0b\x32\x16.prometheus.TimeSeries"]\n\x13\x43hunkedReadResponse\x12\x31\n\x0e\x63hunked_series\x18\x01 \x03(\x0b\x32\x19.prometheus.ChunkedSeries\x12\x13\n\x0bquery_index\x18\x02 \x01(\x03\x42\x08Z\x06prompbb\x06proto3'
|
||||
)
|
||||
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
|
||||
_builder.BuildTopDescriptorsAndMessages(
|
||||
DESCRIPTOR,
|
||||
"opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2",
|
||||
globals(),
|
||||
)
|
||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||
|
||||
DESCRIPTOR._options = None
|
||||
DESCRIPTOR._serialized_options = b"Z\006prompb"
|
||||
_WRITEREQUEST.fields_by_name["timeseries"]._options = None
|
||||
_WRITEREQUEST.fields_by_name[
|
||||
"timeseries"
|
||||
]._serialized_options = b"\310\336\037\000"
|
||||
_WRITEREQUEST.fields_by_name["metadata"]._options = None
|
||||
_WRITEREQUEST.fields_by_name[
|
||||
"metadata"
|
||||
]._serialized_options = b"\310\336\037\000"
|
||||
_WRITEREQUEST._serialized_start = 216
|
||||
_WRITEREQUEST._serialized_end = 338
|
||||
_READREQUEST._serialized_start = 341
|
||||
_READREQUEST._serialized_end = 515
|
||||
_READREQUEST_RESPONSETYPE._serialized_start = 463
|
||||
_READREQUEST_RESPONSETYPE._serialized_end = 515
|
||||
_READRESPONSE._serialized_start = 517
|
||||
_READRESPONSE._serialized_end = 573
|
||||
_QUERY._serialized_start = 576
|
||||
_QUERY._serialized_end = 719
|
||||
_QUERYRESULT._serialized_start = 721
|
||||
_QUERYRESULT._serialized_end = 778
|
||||
_CHUNKEDREADRESPONSE._serialized_start = 780
|
||||
_CHUNKEDREADRESPONSE._serialized_end = 873
|
||||
# @@protoc_insertion_point(module_scope)
|
@ -0,0 +1,86 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# source: opentelemetry/exporter/prometheus_remote_write/gen/types.proto
|
||||
"""Generated protocol buffer code."""
|
||||
from google.protobuf.internal import builder as _builder
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||
from google.protobuf import symbol_database as _symbol_database
|
||||
|
||||
# @@protoc_insertion_point(imports)
|
||||
|
||||
_sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
from opentelemetry.exporter.prometheus_remote_write.gen.gogoproto import (
|
||||
gogo_pb2 as opentelemetry_dot_exporter_dot_prometheus__remote__write_dot_gen_dot_gogoproto_dot_gogo__pb2,
|
||||
)
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
|
||||
b'\n>opentelemetry/exporter/prometheus_remote_write/gen/types.proto\x12\nprometheus\x1aGopentelemetry/exporter/prometheus_remote_write/gen/gogoproto/gogo.proto"\xf8\x01\n\x0eMetricMetadata\x12\x33\n\x04type\x18\x01 \x01(\x0e\x32%.prometheus.MetricMetadata.MetricType\x12\x1a\n\x12metric_family_name\x18\x02 \x01(\t\x12\x0c\n\x04help\x18\x04 \x01(\t\x12\x0c\n\x04unit\x18\x05 \x01(\t"y\n\nMetricType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07\x43OUNTER\x10\x01\x12\t\n\x05GAUGE\x10\x02\x12\r\n\tHISTOGRAM\x10\x03\x12\x12\n\x0eGAUGEHISTOGRAM\x10\x04\x12\x0b\n\x07SUMMARY\x10\x05\x12\x08\n\x04INFO\x10\x06\x12\x0c\n\x08STATESET\x10\x07"*\n\x06Sample\x12\r\n\x05value\x18\x01 \x01(\x01\x12\x11\n\ttimestamp\x18\x02 \x01(\x03"U\n\x08\x45xemplar\x12\'\n\x06labels\x18\x01 \x03(\x0b\x32\x11.prometheus.LabelB\x04\xc8\xde\x1f\x00\x12\r\n\x05value\x18\x02 \x01(\x01\x12\x11\n\ttimestamp\x18\x03 \x01(\x03"\x8f\x01\n\nTimeSeries\x12\'\n\x06labels\x18\x01 \x03(\x0b\x32\x11.prometheus.LabelB\x04\xc8\xde\x1f\x00\x12)\n\x07samples\x18\x02 \x03(\x0b\x32\x12.prometheus.SampleB\x04\xc8\xde\x1f\x00\x12-\n\texemplars\x18\x03 \x03(\x0b\x32\x14.prometheus.ExemplarB\x04\xc8\xde\x1f\x00"$\n\x05Label\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t"1\n\x06Labels\x12\'\n\x06labels\x18\x01 \x03(\x0b\x32\x11.prometheus.LabelB\x04\xc8\xde\x1f\x00"\x82\x01\n\x0cLabelMatcher\x12+\n\x04type\x18\x01 \x01(\x0e\x32\x1d.prometheus.LabelMatcher.Type\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\t"(\n\x04Type\x12\x06\n\x02\x45Q\x10\x00\x12\x07\n\x03NEQ\x10\x01\x12\x06\n\x02RE\x10\x02\x12\x07\n\x03NRE\x10\x03"|\n\tReadHints\x12\x0f\n\x07step_ms\x18\x01 \x01(\x03\x12\x0c\n\x04\x66unc\x18\x02 \x01(\t\x12\x10\n\x08start_ms\x18\x03 \x01(\x03\x12\x0e\n\x06\x65nd_ms\x18\x04 \x01(\x03\x12\x10\n\x08grouping\x18\x05 \x03(\t\x12\n\n\x02\x62y\x18\x06 \x01(\x08\x12\x10\n\x08range_ms\x18\x07 \x01(\x03"\x8b\x01\n\x05\x43hunk\x12\x13\n\x0bmin_time_ms\x18\x01 \x01(\x03\x12\x13\n\x0bmax_time_ms\x18\x02 \x01(\x03\x12(\n\x04type\x18\x03 \x01(\x0e\x32\x1a.prometheus.Chunk.Encoding\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\x0c" \n\x08\x45ncoding\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x07\n\x03XOR\x10\x01"a\n\rChunkedSeries\x12\'\n\x06labels\x18\x01 \x03(\x0b\x32\x11.prometheus.LabelB\x04\xc8\xde\x1f\x00\x12\'\n\x06\x63hunks\x18\x02 \x03(\x0b\x32\x11.prometheus.ChunkB\x04\xc8\xde\x1f\x00\x42\x08Z\x06prompbb\x06proto3'
|
||||
)
|
||||
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
|
||||
_builder.BuildTopDescriptorsAndMessages(
|
||||
DESCRIPTOR,
|
||||
"opentelemetry.exporter.prometheus_remote_write.gen.types_pb2",
|
||||
globals(),
|
||||
)
|
||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||
|
||||
DESCRIPTOR._options = None
|
||||
DESCRIPTOR._serialized_options = b"Z\006prompb"
|
||||
_EXEMPLAR.fields_by_name["labels"]._options = None
|
||||
_EXEMPLAR.fields_by_name[
|
||||
"labels"
|
||||
]._serialized_options = b"\310\336\037\000"
|
||||
_TIMESERIES.fields_by_name["labels"]._options = None
|
||||
_TIMESERIES.fields_by_name[
|
||||
"labels"
|
||||
]._serialized_options = b"\310\336\037\000"
|
||||
_TIMESERIES.fields_by_name["samples"]._options = None
|
||||
_TIMESERIES.fields_by_name[
|
||||
"samples"
|
||||
]._serialized_options = b"\310\336\037\000"
|
||||
_TIMESERIES.fields_by_name["exemplars"]._options = None
|
||||
_TIMESERIES.fields_by_name[
|
||||
"exemplars"
|
||||
]._serialized_options = b"\310\336\037\000"
|
||||
_LABELS.fields_by_name["labels"]._options = None
|
||||
_LABELS.fields_by_name["labels"]._serialized_options = b"\310\336\037\000"
|
||||
_CHUNKEDSERIES.fields_by_name["labels"]._options = None
|
||||
_CHUNKEDSERIES.fields_by_name[
|
||||
"labels"
|
||||
]._serialized_options = b"\310\336\037\000"
|
||||
_CHUNKEDSERIES.fields_by_name["chunks"]._options = None
|
||||
_CHUNKEDSERIES.fields_by_name[
|
||||
"chunks"
|
||||
]._serialized_options = b"\310\336\037\000"
|
||||
_METRICMETADATA._serialized_start = 152
|
||||
_METRICMETADATA._serialized_end = 400
|
||||
_METRICMETADATA_METRICTYPE._serialized_start = 279
|
||||
_METRICMETADATA_METRICTYPE._serialized_end = 400
|
||||
_SAMPLE._serialized_start = 402
|
||||
_SAMPLE._serialized_end = 444
|
||||
_EXEMPLAR._serialized_start = 446
|
||||
_EXEMPLAR._serialized_end = 531
|
||||
_TIMESERIES._serialized_start = 534
|
||||
_TIMESERIES._serialized_end = 677
|
||||
_LABEL._serialized_start = 679
|
||||
_LABEL._serialized_end = 715
|
||||
_LABELS._serialized_start = 717
|
||||
_LABELS._serialized_end = 766
|
||||
_LABELMATCHER._serialized_start = 769
|
||||
_LABELMATCHER._serialized_end = 899
|
||||
_LABELMATCHER_TYPE._serialized_start = 859
|
||||
_LABELMATCHER_TYPE._serialized_end = 899
|
||||
_READHINTS._serialized_start = 901
|
||||
_READHINTS._serialized_end = 1025
|
||||
_CHUNK._serialized_start = 1028
|
||||
_CHUNK._serialized_end = 1167
|
||||
_CHUNK_ENCODING._serialized_start = 1135
|
||||
_CHUNK_ENCODING._serialized_end = 1167
|
||||
_CHUNKEDSERIES._serialized_start = 1169
|
||||
_CHUNKEDSERIES._serialized_end = 1266
|
||||
# @@protoc_insertion_point(module_scope)
|
@ -0,0 +1,15 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
__version__ = "0.34b0"
|
@ -0,0 +1,13 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
@ -0,0 +1,66 @@
|
||||
import random
|
||||
|
||||
import pytest
|
||||
|
||||
import opentelemetry.test.metrictestutil as metric_util
|
||||
from opentelemetry.exporter.prometheus_remote_write import (
|
||||
PrometheusRemoteWriteMetricsExporter,
|
||||
)
|
||||
from opentelemetry.sdk.metrics.export import (
|
||||
AggregationTemporality,
|
||||
Histogram,
|
||||
HistogramDataPoint,
|
||||
Metric,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def prom_rw():
|
||||
return PrometheusRemoteWriteMetricsExporter(
|
||||
"http://victoria:8428/api/v1/write"
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metric(request):
|
||||
if hasattr(request, "param"):
|
||||
type_ = request.param
|
||||
else:
|
||||
type_ = random.choice(["gauge", "sum"])
|
||||
|
||||
if type_ == "gauge":
|
||||
return metric_util._generate_gauge(
|
||||
"test.gauge", random.randint(0, 100)
|
||||
)
|
||||
if type_ == "sum":
|
||||
return metric_util._generate_sum(
|
||||
"test.sum", random.randint(0, 9_999_999_999)
|
||||
)
|
||||
if type_ == "histogram":
|
||||
return _generate_histogram("test_histogram")
|
||||
|
||||
raise ValueError(f"Unsupported metric type '{type_}'.")
|
||||
|
||||
|
||||
def _generate_histogram(name):
|
||||
dp = HistogramDataPoint(
|
||||
attributes={"foo": "bar", "baz": 42},
|
||||
start_time_unix_nano=1641946016139533244,
|
||||
time_unix_nano=1641946016139533244,
|
||||
count=5,
|
||||
sum=420,
|
||||
bucket_counts=[1, 4],
|
||||
explicit_bounds=[10.0],
|
||||
min=8,
|
||||
max=80,
|
||||
)
|
||||
data = Histogram(
|
||||
[dp],
|
||||
AggregationTemporality.CUMULATIVE,
|
||||
)
|
||||
return Metric(
|
||||
name,
|
||||
"foo",
|
||||
"tu",
|
||||
data=data,
|
||||
)
|
@ -0,0 +1,309 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from opentelemetry.exporter.prometheus_remote_write import (
|
||||
PrometheusRemoteWriteMetricsExporter,
|
||||
)
|
||||
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import ( # pylint: disable=E0611
|
||||
TimeSeries,
|
||||
)
|
||||
from opentelemetry.sdk.metrics.export import (
|
||||
Histogram,
|
||||
HistogramDataPoint,
|
||||
MetricExportResult,
|
||||
MetricsData,
|
||||
NumberDataPoint,
|
||||
ResourceMetrics,
|
||||
ScopeMetrics,
|
||||
)
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"name,result",
|
||||
[
|
||||
("abc.124", "abc_124"),
|
||||
(":abc", ":abc"),
|
||||
("abc.name.hi", "abc_name_hi"),
|
||||
("service.name...", "service_name___"),
|
||||
("4hellowor:ld5∂©∑", "_hellowor:ld5___"),
|
||||
],
|
||||
)
|
||||
def test_regex(name, result, prom_rw):
|
||||
assert prom_rw._sanitize_string(name, "name") == result
|
||||
|
||||
|
||||
def test_regex_invalid(prom_rw):
|
||||
with pytest.raises(TypeError):
|
||||
prom_rw("foo_bar", "A random type")
|
||||
|
||||
|
||||
def test_parse_data_point(prom_rw):
|
||||
|
||||
attrs = {"Foo": "Bar", "Baz": 42}
|
||||
timestamp = 1641946016139533244
|
||||
value = 242.42
|
||||
dp = NumberDataPoint(attrs, 0, timestamp, value)
|
||||
name = "abc.123_42"
|
||||
labels, sample = prom_rw._parse_data_point(dp, name)
|
||||
|
||||
name = "abc_123_42"
|
||||
assert labels == (("Foo", "Bar"), ("Baz", 42), ("__name__", name))
|
||||
assert sample == (value, timestamp // 1_000_000)
|
||||
|
||||
|
||||
def test_parse_histogram_dp(prom_rw):
|
||||
attrs = {"foo": "bar", "baz": 42}
|
||||
timestamp = 1641946016139533244
|
||||
bounds = [10.0, 20.0]
|
||||
dp = HistogramDataPoint(
|
||||
attributes=attrs,
|
||||
start_time_unix_nano=1641946016139533244,
|
||||
time_unix_nano=timestamp,
|
||||
count=9,
|
||||
sum=180,
|
||||
bucket_counts=[1, 4, 4],
|
||||
explicit_bounds=bounds,
|
||||
min=8,
|
||||
max=80,
|
||||
)
|
||||
name = "foo_histogram"
|
||||
label_sample_pairs = prom_rw._parse_histogram_data_point(dp, name)
|
||||
timestamp = timestamp // 1_000_000
|
||||
bounds.append("+Inf")
|
||||
for pos, bound in enumerate(bounds):
|
||||
# We have to attributes, we kinda assume the bucket label is last...
|
||||
assert ("le", str(bound)) == label_sample_pairs[pos][0][-1]
|
||||
# Check and make sure we are putting the bucket counts in there
|
||||
assert (dp.bucket_counts[pos], timestamp) == label_sample_pairs[pos][1]
|
||||
|
||||
# Last two are the sum & total count
|
||||
assert ("__name__", f"{name}_sum") in label_sample_pairs[-2][0]
|
||||
assert (dp.sum, timestamp) == label_sample_pairs[-2][1]
|
||||
|
||||
assert ("__name__", f"{name}_count") in label_sample_pairs[-1][0]
|
||||
assert (dp.count, timestamp) == label_sample_pairs[-1][1]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"metric",
|
||||
[
|
||||
"gauge",
|
||||
"sum",
|
||||
"histogram",
|
||||
],
|
||||
indirect=["metric"],
|
||||
)
|
||||
def test_parse_metric(metric, prom_rw):
|
||||
"""
|
||||
Ensures output from parse_metrics are TimeSeries with expected data/size
|
||||
"""
|
||||
attributes = {
|
||||
"service_name": "foo",
|
||||
"bool_value": True,
|
||||
}
|
||||
|
||||
assert (
|
||||
len(metric.data.data_points) == 1
|
||||
), "We can only support a single datapoint in tests"
|
||||
series = prom_rw._parse_metric(metric, tuple(attributes.items()))
|
||||
timestamp = metric.data.data_points[0].time_unix_nano // 1_000_000
|
||||
for single_series in series:
|
||||
labels = str(single_series.labels)
|
||||
# Its a bit easier to validate these stringified where we dont have to
|
||||
# worry about ordering and protobuf TimeSeries object structure
|
||||
# This doesn't guarantee the labels aren't mixed up, but our other
|
||||
# test cases already do.
|
||||
assert "__name__" in labels
|
||||
assert prom_rw._sanitize_string(metric.name, "name") in labels
|
||||
combined_attrs = list(attributes.items()) + list(
|
||||
metric.data.data_points[0].attributes.items()
|
||||
)
|
||||
for name, value in combined_attrs:
|
||||
assert prom_rw._sanitize_string(name, "label") in labels
|
||||
assert str(value) in labels
|
||||
if isinstance(metric.data, Histogram):
|
||||
values = [
|
||||
metric.data.data_points[0].count,
|
||||
metric.data.data_points[0].sum,
|
||||
metric.data.data_points[0].bucket_counts[0],
|
||||
metric.data.data_points[0].bucket_counts[1],
|
||||
]
|
||||
else:
|
||||
values = [
|
||||
metric.data.data_points[0].value,
|
||||
]
|
||||
for sample in single_series.samples:
|
||||
assert sample.timestamp == timestamp
|
||||
assert sample.value in values
|
||||
|
||||
|
||||
class TestValidation(unittest.TestCase):
|
||||
# Test cases to ensure exporter parameter validation works as intended
|
||||
def test_valid_standard_param(self):
|
||||
exporter = PrometheusRemoteWriteMetricsExporter(
|
||||
endpoint="/prom/test_endpoint",
|
||||
)
|
||||
self.assertEqual(exporter.endpoint, "/prom/test_endpoint")
|
||||
|
||||
def test_valid_basic_auth_param(self):
|
||||
exporter = PrometheusRemoteWriteMetricsExporter(
|
||||
endpoint="/prom/test_endpoint",
|
||||
basic_auth={
|
||||
"username": "test_username",
|
||||
"password": "test_password",
|
||||
},
|
||||
)
|
||||
self.assertEqual(exporter.basic_auth["username"], "test_username")
|
||||
self.assertEqual(exporter.basic_auth["password"], "test_password")
|
||||
|
||||
def test_invalid_no_endpoint_param(self):
|
||||
with self.assertRaises(ValueError):
|
||||
PrometheusRemoteWriteMetricsExporter("")
|
||||
|
||||
def test_invalid_no_username_param(self):
|
||||
with self.assertRaises(ValueError):
|
||||
PrometheusRemoteWriteMetricsExporter(
|
||||
endpoint="/prom/test_endpoint",
|
||||
basic_auth={"password": "test_password"},
|
||||
)
|
||||
|
||||
def test_invalid_no_password_param(self):
|
||||
with self.assertRaises(ValueError):
|
||||
PrometheusRemoteWriteMetricsExporter(
|
||||
endpoint="/prom/test_endpoint",
|
||||
basic_auth={"username": "test_username"},
|
||||
)
|
||||
|
||||
def test_invalid_conflicting_passwords_param(self):
|
||||
with self.assertRaises(ValueError):
|
||||
PrometheusRemoteWriteMetricsExporter(
|
||||
endpoint="/prom/test_endpoint",
|
||||
basic_auth={
|
||||
"username": "test_username",
|
||||
"password": "test_password",
|
||||
"password_file": "test_file",
|
||||
},
|
||||
)
|
||||
|
||||
def test_invalid_timeout_param(self):
|
||||
with self.assertRaises(ValueError):
|
||||
PrometheusRemoteWriteMetricsExporter(
|
||||
endpoint="/prom/test_endpoint", timeout=0
|
||||
)
|
||||
|
||||
def test_valid_tls_config_param(self):
|
||||
tls_config = {
|
||||
"ca_file": "test_ca_file",
|
||||
"cert_file": "test_cert_file",
|
||||
"key_file": "test_key_file",
|
||||
"insecure_skip_verify": True,
|
||||
}
|
||||
exporter = PrometheusRemoteWriteMetricsExporter(
|
||||
endpoint="/prom/test_endpoint", tls_config=tls_config
|
||||
)
|
||||
self.assertEqual(exporter.tls_config["ca_file"], tls_config["ca_file"])
|
||||
self.assertEqual(
|
||||
exporter.tls_config["cert_file"], tls_config["cert_file"]
|
||||
)
|
||||
self.assertEqual(
|
||||
exporter.tls_config["key_file"], tls_config["key_file"]
|
||||
)
|
||||
self.assertEqual(
|
||||
exporter.tls_config["insecure_skip_verify"],
|
||||
tls_config["insecure_skip_verify"],
|
||||
)
|
||||
|
||||
# if cert_file is provided, then key_file must also be provided
|
||||
def test_invalid_tls_config_cert_only_param(self):
|
||||
tls_config = {"cert_file": "value"}
|
||||
with self.assertRaises(ValueError):
|
||||
PrometheusRemoteWriteMetricsExporter(
|
||||
endpoint="/prom/test_endpoint", tls_config=tls_config
|
||||
)
|
||||
|
||||
# if cert_file is provided, then key_file must also be provided
|
||||
def test_invalid_tls_config_key_only_param(self):
|
||||
tls_config = {"cert_file": "value"}
|
||||
with self.assertRaises(ValueError):
|
||||
PrometheusRemoteWriteMetricsExporter(
|
||||
endpoint="/prom/test_endpoint", tls_config=tls_config
|
||||
)
|
||||
|
||||
|
||||
# Ensures export is successful with valid export_records and config
|
||||
@patch("requests.post")
|
||||
def test_valid_export(mock_post, prom_rw, metric):
|
||||
mock_post.return_value.configure_mock(**{"status_code": 200})
|
||||
|
||||
# Assumed a "None" for Scope or Resource aren't valid, so build them here
|
||||
scope = ScopeMetrics(
|
||||
InstrumentationScope(name="prom-rw-test"), [metric], None
|
||||
)
|
||||
resource = ResourceMetrics(
|
||||
Resource({"service.name": "foo"}), [scope], None
|
||||
)
|
||||
record = MetricsData([resource])
|
||||
|
||||
result = prom_rw.export(record)
|
||||
assert result == MetricExportResult.SUCCESS
|
||||
assert mock_post.call_count == 1
|
||||
|
||||
result = prom_rw.export([])
|
||||
assert result == MetricExportResult.SUCCESS
|
||||
|
||||
|
||||
def test_invalid_export(prom_rw):
|
||||
record = MetricsData([])
|
||||
|
||||
result = prom_rw.export(record)
|
||||
assert result == MetricExportResult.FAILURE
|
||||
|
||||
|
||||
@patch("requests.post")
|
||||
def test_valid_send_message(mock_post, prom_rw):
|
||||
mock_post.return_value.configure_mock(**{"ok": True})
|
||||
result = prom_rw._send_message(bytes(), {})
|
||||
assert mock_post.call_count == 1
|
||||
assert result == MetricExportResult.SUCCESS
|
||||
|
||||
|
||||
def test_invalid_send_message(prom_rw):
|
||||
result = prom_rw._send_message(bytes(), {})
|
||||
assert result == MetricExportResult.FAILURE
|
||||
|
||||
|
||||
# Verifies that build_message calls snappy.compress and returns SerializedString
|
||||
@patch("snappy.compress", return_value=bytes())
|
||||
def test_build_message(mock_compress, prom_rw):
|
||||
message = prom_rw._build_message([TimeSeries()])
|
||||
assert mock_compress.call_count == 1
|
||||
assert isinstance(message, bytes)
|
||||
|
||||
|
||||
# Ensure correct headers are added when valid config is provided
|
||||
def test_build_headers(prom_rw):
|
||||
prom_rw.headers = {"Custom Header": "test_header"}
|
||||
|
||||
headers = prom_rw._build_headers()
|
||||
assert headers["Content-Encoding"] == "snappy"
|
||||
assert headers["Content-Type"] == "application/x-protobuf"
|
||||
assert headers["X-Prometheus-Remote-Write-Version"] == "0.1.0"
|
||||
assert headers["Custom Header"] == "test_header"
|
7
tox.ini
7
tox.ini
@ -104,6 +104,9 @@ envlist =
|
||||
; opentelemetry-exporter-richconsole
|
||||
py3{7,8,9,10}-test-exporter-richconsole
|
||||
|
||||
; opentelemetry-exporter-prometheus-remote-write
|
||||
py3{6,7,8,9,10}-test-exporter-prometheus-remote-write
|
||||
|
||||
; opentelemetry-instrumentation-mysql
|
||||
py3{7,8,9,10}-test-instrumentation-mysql
|
||||
pypy3-test-instrumentation-mysql
|
||||
@ -300,6 +303,7 @@ changedir =
|
||||
test-propagator-aws: propagator/opentelemetry-propagator-aws-xray/tests
|
||||
test-propagator-ot-trace: propagator/opentelemetry-propagator-ot-trace/tests
|
||||
test-exporter-richconsole: exporter/opentelemetry-exporter-richconsole/tests
|
||||
test-exporter-prometheus-remote-write: exporter/opentelemetry-exporter-prometheus-remote-write/tests
|
||||
|
||||
commands_pre =
|
||||
; Install without -e to test the actual installation
|
||||
@ -387,6 +391,8 @@ commands_pre =
|
||||
|
||||
richconsole: pip install flaky {toxinidir}/exporter/opentelemetry-exporter-richconsole[test]
|
||||
|
||||
prometheus: pip install {toxinidir}/exporter/opentelemetry-exporter-prometheus-remote-write[test]
|
||||
|
||||
sklearn: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-sklearn[test]
|
||||
|
||||
sqlalchemy{11,14}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-sqlalchemy[test]
|
||||
@ -498,6 +504,7 @@ commands_pre =
|
||||
python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-aws-lambda[test]
|
||||
python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-system-metrics[test]
|
||||
python -m pip install -e {toxinidir}/exporter/opentelemetry-exporter-richconsole[test]
|
||||
python -m pip install -e {toxinidir}/exporter/opentelemetry-exporter-prometheus-remote-write[test]
|
||||
python -m pip install -e {toxinidir}/sdk-extension/opentelemetry-sdk-extension-aws[test]
|
||||
python -m pip install -e {toxinidir}/propagator/opentelemetry-propagator-aws-xray[test]
|
||||
python -m pip install -e {toxinidir}/propagator/opentelemetry-propagator-ot-trace[test]
|
||||
|
Reference in New Issue
Block a user