Add conversion to TimeSeries methods (#207)

This commit is contained in:
Shovnik Bhattacharya
2020-12-02 09:47:12 -05:00
committed by GitHub
parent 8b323e935d
commit f71bc2b79e
3 changed files with 369 additions and 48 deletions

View File

@ -5,3 +5,5 @@
((#180)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/180])
- Add Exporter constructor validation methods
((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206])
- Add conversion to TimeSeries methods
((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207])

View File

@ -12,8 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import re
from typing import Dict, Sequence
from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
WriteRequest,
)
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
Label,
Sample,
@ -24,6 +29,15 @@ from opentelemetry.sdk.metrics.export import (
MetricsExporter,
MetricsExportResult,
)
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
ValueObserverAggregator,
)
logger = logging.getLogger(__name__)
class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
@ -55,6 +69,14 @@ class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
self.tls_config = tls_config
self.proxies = proxies
self.converter_map = {
MinMaxSumCountAggregator: self._convert_from_min_max_sum_count,
SumAggregator: self._convert_from_sum,
HistogramAggregator: self._convert_from_histogram,
LastValueAggregator: self._convert_from_last_value,
ValueObserverAggregator: self._convert_from_value_observer,
}
@property
def endpoint(self):
return self._endpoint
@ -142,50 +164,145 @@ class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
def shutdown(self) -> None:
raise NotImplementedError()
def convert_to_timeseries(
def _convert_to_timeseries(
self, export_records: Sequence[ExportRecord]
) -> Sequence[TimeSeries]:
timeseries = []
for export_record in export_records:
aggregator_type = type(export_record.aggregator)
converter = self.converter_map.get(aggregator_type)
if converter:
timeseries.extend(converter(export_record))
else:
logger.warning(
"%s aggregator is not supported, record dropped",
aggregator_type,
)
return timeseries
def _convert_from_sum(
self, sum_record: ExportRecord
) -> Sequence[TimeSeries]:
return [
self._create_timeseries(
sum_record,
sum_record.instrument.name + "_sum",
sum_record.aggregator.checkpoint,
)
]
def _convert_from_min_max_sum_count(
self, min_max_sum_count_record: ExportRecord
) -> Sequence[TimeSeries]:
timeseries = []
for agg_type in ["min", "max", "sum", "count"]:
name = min_max_sum_count_record.instrument.name + "_" + agg_type
value = getattr(
min_max_sum_count_record.aggregator.checkpoint, agg_type
)
timeseries.append(
self._create_timeseries(min_max_sum_count_record, name, value)
)
return timeseries
def _convert_from_histogram(
self, histogram_record: ExportRecord
) -> Sequence[TimeSeries]:
timeseries = []
for bound in histogram_record.aggregator.checkpoint.keys():
bound_str = "+Inf" if bound == float("inf") else str(bound)
value = histogram_record.aggregator.checkpoint[bound]
timeseries.append(
self._create_timeseries(
histogram_record,
histogram_record.instrument.name + "_histogram",
value,
extra_label=("le", bound_str),
)
)
return timeseries
def _convert_from_last_value(
self, last_value_record: ExportRecord
) -> Sequence[TimeSeries]:
return [
self._create_timeseries(
last_value_record,
last_value_record.instrument.name + "_last",
last_value_record.aggregator.checkpoint,
)
]
def _convert_from_value_observer(
self, value_observer_record: ExportRecord
) -> Sequence[TimeSeries]:
timeseries = []
for agg_type in ["min", "max", "sum", "count", "last"]:
timeseries.append(
self._create_timeseries(
value_observer_record,
value_observer_record.instrument.name + "_" + agg_type,
getattr(
value_observer_record.aggregator.checkpoint, agg_type
),
)
)
return timeseries
# TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries
def _convert_from_quantile(
self, summary_record: ExportRecord
) -> Sequence[TimeSeries]:
raise NotImplementedError()
def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries:
raise NotImplementedError()
def convert_from_min_max_sum_count(
self, min_max_sum_count_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
def convert_from_histogram(
self, histogram_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
def convert_from_last_value(
self, last_value_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
def convert_from_value_observer(
self, value_observer_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
def convert_from_quantile(
self, summary_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
# pylint: disable=no-member
def create_timeseries(
self, export_record: ExportRecord, name, value: float
def _create_timeseries(
self,
export_record: ExportRecord,
name: str,
value: float,
extra_label: (str, str) = None,
) -> TimeSeries:
raise NotImplementedError()
timeseries = TimeSeries()
seen = set()
def create_sample(self, timestamp: int, value: float) -> Sample:
raise NotImplementedError()
def add_label(label_name: str, label_value: str):
# Label name must contain only alphanumeric characters and underscores
label_name = re.sub("[^\\w_]", "_", label_name)
if label_name not in seen:
label = Label()
label.name = label_name
label.value = label_value
timeseries.labels.append(label)
seen.add(label_name)
else:
logger.warning(
"Duplicate label with name %s and value %s",
label_name,
label_value,
)
def create_label(self, name: str, value: str) -> Label:
raise NotImplementedError()
# The __name__ label is required by PromQL as its value appears as the metric_name
add_label("__name__", name)
if extra_label:
add_label(extra_label[0], extra_label[1])
if export_record.resource.attributes:
for (
label_name,
label_value,
) in export_record.resource.attributes.items():
add_label(label_name, str(label_value))
if export_record.labels:
for [label_name, label_value] in export_record.labels:
add_label(label_name, label_value)
sample = Sample()
sample.timestamp = int(
export_record.aggregator.last_update_timestamp / 1000000
)
sample.value = value
timeseries.samples.append(sample)
return timeseries
def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError()

View File

@ -13,10 +13,28 @@
# limitations under the License.
import unittest
from logging import Logger
from unittest.mock import MagicMock, Mock, patch
from opentelemetry.exporter.prometheus_remote_write import (
PrometheusRemoteWriteMetricsExporter,
)
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
Label,
Sample,
TimeSeries,
)
from opentelemetry.sdk.metrics import Counter
from opentelemetry.sdk.metrics.export import ExportRecord, MetricsExportResult
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
ValueObserverAggregator,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util import get_dict_as_key
class TestValidation(unittest.TestCase):
@ -115,44 +133,228 @@ class TestValidation(unittest.TestCase):
class TestConversion(unittest.TestCase):
# Initializes test data that is reused across tests
def setUp(self):
pass
self.exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="/prom/test_endpoint"
)
# Ensures conversion to timeseries function works with valid aggregation types
def test_valid_convert_to_timeseries(self):
pass
test_records = [
ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
None,
SumAggregator(),
Resource({}),
),
ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
None,
MinMaxSumCountAggregator(),
Resource({}),
),
ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
None,
HistogramAggregator(),
Resource({}),
),
ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
None,
LastValueAggregator(),
Resource({}),
),
ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
None,
ValueObserverAggregator(),
Resource({}),
),
]
for record in test_records:
record.aggregator.update(5)
record.aggregator.take_checkpoint()
data = self.exporter._convert_to_timeseries(test_records)
self.assertIsInstance(data, list)
self.assertEqual(len(data), 13)
for timeseries in data:
self.assertIsInstance(timeseries, TimeSeries)
# Ensures conversion to timeseries fails for unsupported aggregation types
def test_invalid_convert_to_timeseries(self):
pass
data = self.exporter._convert_to_timeseries(
[ExportRecord(None, None, None, Resource({}))]
)
self.assertIsInstance(data, list)
self.assertEqual(len(data), 0)
# Ensures sum aggregator is correctly converted to timeseries
def test_convert_from_sum(self):
pass
sum_record = ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
None,
SumAggregator(),
Resource({}),
)
sum_record.aggregator.update(3)
sum_record.aggregator.update(2)
sum_record.aggregator.take_checkpoint()
expected_timeseries = self.exporter._create_timeseries(
sum_record, "testname_sum", 5.0
)
timeseries = self.exporter._convert_from_sum(sum_record)
self.assertEqual(timeseries[0], expected_timeseries)
# Ensures sum min_max_count aggregator is correctly converted to timeseries
def test_convert_from_min_max_sum_count(self):
pass
min_max_sum_count_record = ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
None,
MinMaxSumCountAggregator(),
Resource({}),
)
min_max_sum_count_record.aggregator.update(5)
min_max_sum_count_record.aggregator.update(1)
min_max_sum_count_record.aggregator.take_checkpoint()
expected_min_timeseries = self.exporter._create_timeseries(
min_max_sum_count_record, "testname_min", 1.0
)
expected_max_timeseries = self.exporter._create_timeseries(
min_max_sum_count_record, "testname_max", 5.0
)
expected_sum_timeseries = self.exporter._create_timeseries(
min_max_sum_count_record, "testname_sum", 6.0
)
expected_count_timeseries = self.exporter._create_timeseries(
min_max_sum_count_record, "testname_count", 2.0
)
timeseries = self.exporter._convert_from_min_max_sum_count(
min_max_sum_count_record
)
self.assertEqual(timeseries[0], expected_min_timeseries)
self.assertEqual(timeseries[1], expected_max_timeseries)
self.assertEqual(timeseries[2], expected_sum_timeseries)
self.assertEqual(timeseries[3], expected_count_timeseries)
# Ensures histogram aggregator is correctly converted to timeseries
def test_convert_from_histogram(self):
pass
histogram_record = ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
None,
HistogramAggregator(),
Resource({}),
)
histogram_record.aggregator.update(5)
histogram_record.aggregator.update(2)
histogram_record.aggregator.update(-1)
histogram_record.aggregator.take_checkpoint()
expected_le_0_timeseries = self.exporter._create_timeseries(
histogram_record, "testname_histogram", 1.0, ("le", "0")
)
expected_le_inf_timeseries = self.exporter._create_timeseries(
histogram_record, "testname_histogram", 2.0, ("le", "+Inf")
)
timeseries = self.exporter._convert_from_histogram(histogram_record)
self.assertEqual(timeseries[0], expected_le_0_timeseries)
self.assertEqual(timeseries[1], expected_le_inf_timeseries)
# Ensures last value aggregator is correctly converted to timeseries
def test_convert_from_last_value(self):
pass
last_value_record = ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
None,
LastValueAggregator(),
Resource({}),
)
last_value_record.aggregator.update(1)
last_value_record.aggregator.update(5)
last_value_record.aggregator.take_checkpoint()
expected_timeseries = self.exporter._create_timeseries(
last_value_record, "testname_last", 5.0
)
timeseries = self.exporter._convert_from_last_value(last_value_record)
self.assertEqual(timeseries[0], expected_timeseries)
# Ensures value observer aggregator is correctly converted to timeseries
def test_convert_from_value_observer(self):
pass
value_observer_record = ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
None,
ValueObserverAggregator(),
Resource({}),
)
value_observer_record.aggregator.update(5)
value_observer_record.aggregator.update(1)
value_observer_record.aggregator.update(2)
value_observer_record.aggregator.take_checkpoint()
expected_min_timeseries = self.exporter._create_timeseries(
value_observer_record, "testname_min", 1.0
)
expected_max_timeseries = self.exporter._create_timeseries(
value_observer_record, "testname_max", 5.0
)
expected_sum_timeseries = self.exporter._create_timeseries(
value_observer_record, "testname_sum", 8.0
)
expected_count_timeseries = self.exporter._create_timeseries(
value_observer_record, "testname_count", 3.0
)
expected_last_timeseries = self.exporter._create_timeseries(
value_observer_record, "testname_last", 2.0
)
timeseries = self.exporter._convert_from_value_observer(
value_observer_record
)
self.assertEqual(timeseries[0], expected_min_timeseries)
self.assertEqual(timeseries[1], expected_max_timeseries)
self.assertEqual(timeseries[2], expected_sum_timeseries)
self.assertEqual(timeseries[3], expected_count_timeseries)
self.assertEqual(timeseries[4], expected_last_timeseries)
# Ensures quantile aggregator is correctly converted to timeseries
# TODO: Add test once method is implemented
def test_convert_from_quantile(self):
pass
# TODO: Add test_convert_from_quantile once method is implemented
# Ensures timeseries produced contains appropriate sample and labels
def test_create_timeseries(self):
pass
def create_label(name, value):
label = Label()
label.name = name
label.value = value
return label
sum_aggregator = SumAggregator()
sum_aggregator.update(5)
sum_aggregator.take_checkpoint()
export_record = ExportRecord(
Counter("testname", "testdesc", "testunit", int, None),
get_dict_as_key({"record_name": "record_value"}),
sum_aggregator,
Resource({"resource_name": "resource_value"}),
)
expected_timeseries = TimeSeries()
expected_timeseries.labels.append(create_label("__name__", "testname"))
expected_timeseries.labels.append(
create_label("resource_name", "resource_value")
)
expected_timeseries.labels.append(
create_label("record_name", "record_value")
)
sample = expected_timeseries.samples.add()
sample.timestamp = int(sum_aggregator.last_update_timestamp / 1000000)
sample.value = 5.0
timeseries = self.exporter._create_timeseries(
export_record, "testname", 5.0
)
self.assertEqual(timeseries, expected_timeseries)
class TestExport(unittest.TestCase):