Added exporter request methods (#212)

This commit is contained in:
Shovnik Bhattacharya
2020-12-08 13:06:46 -05:00
committed by GitHub
parent 3eb27ca466
commit b310ec1728
5 changed files with 125 additions and 28 deletions

View File

@ -7,3 +7,5 @@
((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206]) ((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206])
- Add conversion to TimeSeries methods - Add conversion to TimeSeries methods
((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207]) ((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207])
- Add request methods
((#212)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/212])

View File

@ -39,6 +39,8 @@ package_dir=
=src =src
packages=find_namespace: packages=find_namespace:
install_requires = install_requires =
protobuf >= 3.13.0
requests == 2.25.0
opentelemetry-api == 0.17.dev0 opentelemetry-api == 0.17.dev0
opentelemetry-sdk == 0.17.dev0 opentelemetry-sdk == 0.17.dev0

View File

@ -16,6 +16,9 @@ import logging
import re import re
from typing import Dict, Sequence from typing import Dict, Sequence
import requests
import snappy
from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
WriteRequest, WriteRequest,
) )
@ -48,7 +51,7 @@ class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
endpoint: url where data will be sent (Required) endpoint: url where data will be sent (Required)
basic_auth: username and password for authentication (Optional) basic_auth: username and password for authentication (Optional)
headers: additional headers for remote write request (Optional) headers: additional headers for remote write request (Optional)
timeout: timeout for requests to the remote write endpoint in seconds (Optional) timeout: timeout for remote write requests in seconds, defaults to 30 (Optional)
proxies: dict mapping request proxy protocols to proxy urls (Optional) proxies: dict mapping request proxy protocols to proxy urls (Optional)
tls_config: configuration for remote write TLS settings (Optional) tls_config: configuration for remote write TLS settings (Optional)
""" """
@ -96,15 +99,15 @@ class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
if basic_auth: if basic_auth:
if "username" not in basic_auth: if "username" not in basic_auth:
raise ValueError("username required in basic_auth") raise ValueError("username required in basic_auth")
if ( if "password_file" in basic_auth:
"password" not in basic_auth if "password" in basic_auth:
and "password_file" not in basic_auth raise ValueError(
): "basic_auth cannot contain password and password_file"
)
with open(basic_auth["password_file"]) as file:
basic_auth["password"] = file.readline().strip()
elif "password" not in basic_auth:
raise ValueError("password required in basic_auth") raise ValueError("password required in basic_auth")
if "password" in basic_auth and "password_file" in basic_auth:
raise ValueError(
"basic_auth cannot contain password and password_file"
)
self._basic_auth = basic_auth self._basic_auth = basic_auth
@property @property
@ -159,10 +162,20 @@ class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
def export( def export(
self, export_records: Sequence[ExportRecord] self, export_records: Sequence[ExportRecord]
) -> MetricsExportResult: ) -> MetricsExportResult:
raise NotImplementedError() if not export_records:
return MetricsExportResult.SUCCESS
timeseries = self._convert_to_timeseries(export_records)
if not timeseries:
logger.error(
"All records contain unsupported aggregators, export aborted"
)
return MetricsExportResult.FAILURE
message = self._build_message(timeseries)
headers = self._build_headers()
return self._send_message(message, headers)
def shutdown(self) -> None: def shutdown(self) -> None:
raise NotImplementedError() pass
def _convert_to_timeseries( def _convert_to_timeseries(
self, export_records: Sequence[ExportRecord] self, export_records: Sequence[ExportRecord]
@ -304,13 +317,60 @@ class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
timeseries.samples.append(sample) timeseries.samples.append(sample)
return timeseries return timeseries
def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: def _build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError() write_request = WriteRequest()
write_request.timeseries.extend(timeseries)
serialized_message = write_request.SerializeToString()
return snappy.compress(serialized_message)
def get_headers(self) -> Dict: def _build_headers(self) -> Dict:
raise NotImplementedError() headers = {
"Content-Encoding": "snappy",
"Content-Type": "application/x-protobuf",
"X-Prometheus-Remote-Write-Version": "0.1.0",
}
if self.headers:
for header_name, header_value in self.headers.items():
headers[header_name] = header_value
return headers
def send_message( def _send_message(
self, message: bytes, headers: Dict self, message: bytes, headers: Dict
) -> MetricsExportResult: ) -> MetricsExportResult:
raise NotImplementedError() auth = None
if self.basic_auth:
auth = (self.basic_auth["username"], self.basic_auth["password"])
cert = None
verify = True
if self.tls_config:
if "ca_file" in self.tls_config:
verify = self.tls_config["ca_file"]
elif "insecure_skip_verify" in self.tls_config:
verify = self.tls_config["insecure_skip_verify"]
if (
"cert_file" in self.tls_config
and "key_file" in self.tls_config
):
cert = (
self.tls_config["cert_file"],
self.tls_config["key_file"],
)
try:
response = requests.post(
self.endpoint,
data=message,
headers=headers,
auth=auth,
timeout=self.timeout,
proxies=self.proxies,
cert=cert,
verify=verify,
)
if not response.ok:
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error("Export POST request failed with reason: %s", e)
return MetricsExportResult.FAILURE
return MetricsExportResult.SUCCESS

View File

@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
__version__ = "0.16.dev0" __version__ = "0.17.dev0"

View File

@ -360,22 +360,55 @@ class TestConversion(unittest.TestCase):
class TestExport(unittest.TestCase): class TestExport(unittest.TestCase):
# Initializes test data that is reused across tests # Initializes test data that is reused across tests
def setUp(self): def setUp(self):
pass self.exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="/prom/test_endpoint"
)
# Ensures export is successful with valid export_records and config # Ensures export is successful with valid export_records and config
def test_export(self): @patch("requests.post")
pass def test_valid_export(self, mock_post):
mock_post.return_value.configure_mock(**{"status_code": 200})
test_metric = Counter("testname", "testdesc", "testunit", int, None)
labels = get_dict_as_key({"environment": "testing"})
record = ExportRecord(
test_metric, labels, SumAggregator(), Resource({})
)
result = self.exporter.export([record])
self.assertIs(result, MetricsExportResult.SUCCESS)
self.assertEqual(mock_post.call_count, 1)
def test_valid_send_message(self): result = self.exporter.export([])
pass self.assertIs(result, MetricsExportResult.SUCCESS)
def test_invalid_export(self):
record = ExportRecord(None, None, None, None)
result = self.exporter.export([record])
self.assertIs(result, MetricsExportResult.FAILURE)
@patch("requests.post")
def test_valid_send_message(self, mock_post):
mock_post.return_value.configure_mock(**{"ok": True})
result = self.exporter._send_message(bytes(), {})
self.assertEqual(mock_post.call_count, 1)
self.assertEqual(result, MetricsExportResult.SUCCESS)
def test_invalid_send_message(self): def test_invalid_send_message(self):
pass result = self.exporter._send_message(bytes(), {})
self.assertEqual(result, MetricsExportResult.FAILURE)
# Verifies that build_message calls snappy.compress and returns SerializedString # Verifies that build_message calls snappy.compress and returns SerializedString
def test_build_message(self): @patch("snappy.compress", return_value=bytes())
pass def test_build_message(self, mock_compress):
message = self.exporter._build_message([TimeSeries()])
self.assertEqual(mock_compress.call_count, 1)
self.assertIsInstance(message, bytes)
# Ensure correct headers are added when valid config is provided # Ensure correct headers are added when valid config is provided
def test_get_headers(self): def test_build_headers(self):
pass self.exporter.headers = {"Custom Header": "test_header"}
headers = self.exporter._build_headers()
self.assertEqual(headers["Content-Encoding"], "snappy")
self.assertEqual(headers["Content-Type"], "application/x-protobuf")
self.assertEqual(headers["X-Prometheus-Remote-Write-Version"], "0.1.0")
self.assertEqual(headers["Custom Header"], "test_header")