Files

269 lines
10 KiB
Python

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest.mock import patch
from pyramid.config import Configurator
from opentelemetry import trace
from opentelemetry.instrumentation.pyramid import PyramidInstrumentor
from opentelemetry.test.globals_test import reset_trace_globals
from opentelemetry.test.test_base import TestBase
from opentelemetry.test.wsgitestutil import WsgiTestBase
from opentelemetry.trace import SpanKind
from opentelemetry.util.http import (
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST,
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE,
)
# pylint: disable=import-error
from .pyramid_base_test import InstrumentationTest
class TestAutomatic(InstrumentationTest, TestBase, WsgiTestBase):
def setUp(self):
super().setUp()
PyramidInstrumentor().instrument()
self.config = Configurator()
self._common_initialization(self.config)
def tearDown(self):
super().tearDown()
with self.disable_logging():
PyramidInstrumentor().uninstrument()
def test_uninstrument(self):
# pylint: disable=access-member-before-definition
resp = self.client.get("/hello/123")
self.assertEqual(200, resp.status_code)
self.assertEqual([b"Hello: 123"], list(resp.response))
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
PyramidInstrumentor().uninstrument()
self.config = Configurator()
self._common_initialization(self.config)
resp = self.client.get("/hello/123")
self.assertEqual(200, resp.status_code)
self.assertEqual([b"Hello: 123"], list(resp.response))
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
def test_tween_list(self):
tween_list = "pyramid.tweens.excview_tween_factory"
config = Configurator(settings={"pyramid.tweens": tween_list})
self._common_initialization(config)
resp = self.client.get("/hello/123")
self.assertEqual(200, resp.status_code)
self.assertEqual([b"Hello: 123"], list(resp.response))
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
PyramidInstrumentor().uninstrument()
self.config = Configurator()
self._common_initialization(self.config)
resp = self.client.get("/hello/123")
self.assertEqual(200, resp.status_code)
self.assertEqual([b"Hello: 123"], list(resp.response))
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
def test_registry_name_is_this_module(self):
config = Configurator()
self.assertEqual(
config.registry.__name__, __name__.rsplit(".", maxsplit=1)[0]
)
class TestWrappedWithOtherFramework(
InstrumentationTest, TestBase, WsgiTestBase
):
def setUp(self):
super().setUp()
PyramidInstrumentor().instrument()
self.config = Configurator()
self._common_initialization(self.config)
def tearDown(self) -> None:
super().tearDown()
with self.disable_logging():
PyramidInstrumentor().uninstrument()
def test_with_existing_span(self):
tracer_provider, _ = self.create_tracer_provider()
tracer = tracer_provider.get_tracer(__name__)
with tracer.start_as_current_span(
"test", kind=SpanKind.SERVER
) as parent_span:
resp = self.client.get("/hello/123")
self.assertEqual(200, resp.status_code)
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(SpanKind.INTERNAL, span_list[0].kind)
self.assertEqual(
parent_span.get_span_context().span_id,
span_list[0].parent.span_id,
)
class TestCustomRequestResponseHeaders(
InstrumentationTest, TestBase, WsgiTestBase
):
def setUp(self):
super().setUp()
PyramidInstrumentor().instrument()
self.config = Configurator()
self._common_initialization(self.config)
self.env_patch = patch.dict(
"os.environ",
{
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST: "Custom-Test-Header-1,Custom-Test-Header-2,invalid-header",
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE: "content-type,content-length,my-custom-header,invalid-header",
},
)
self.env_patch.start()
def tearDown(self) -> None:
super().tearDown()
self.env_patch.stop()
with self.disable_logging():
PyramidInstrumentor().uninstrument()
def test_custom_request_header_added_in_server_span(self):
headers = {
"Custom-Test-Header-1": "Test Value 1",
"Custom-Test-Header-2": "TestValue2,TestValue3",
"Custom-Test-Header-3": "TestValue4",
}
resp = self.client.get("/hello/123", headers=headers)
self.assertEqual(200, resp.status_code)
span = self.memory_exporter.get_finished_spans()[0]
expected = {
"http.request.header.custom_test_header_1": ("Test Value 1",),
"http.request.header.custom_test_header_2": (
"TestValue2,TestValue3",
),
}
not_expected = {
"http.request.header.custom_test_header_3": ("TestValue4",),
}
self.assertEqual(span.kind, SpanKind.SERVER)
self.assertSpanHasAttributes(span, expected)
for key, _ in not_expected.items():
self.assertNotIn(key, span.attributes)
def test_custom_request_header_not_added_in_internal_span(self):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("test", kind=SpanKind.SERVER):
headers = {
"Custom-Test-Header-1": "Test Value 1",
"Custom-Test-Header-2": "TestValue2,TestValue3",
}
resp = self.client.get("/hello/123", headers=headers)
self.assertEqual(200, resp.status_code)
span = self.memory_exporter.get_finished_spans()[0]
not_expected = {
"http.request.header.custom_test_header_1": ("Test Value 1",),
"http.request.header.custom_test_header_2": (
"TestValue2,TestValue3",
),
}
self.assertEqual(span.kind, SpanKind.INTERNAL)
for key, _ in not_expected.items():
self.assertNotIn(key, span.attributes)
def test_custom_response_header_added_in_server_span(self):
resp = self.client.get("/test_custom_response_headers")
self.assertEqual(200, resp.status_code)
span = self.memory_exporter.get_finished_spans()[0]
expected = {
"http.response.header.content_type": (
"text/plain; charset=utf-8",
),
"http.response.header.content_length": ("7",),
"http.response.header.my_custom_header": (
"my-custom-value-1,my-custom-header-2",
),
}
not_expected = {
"http.response.header.dont_capture_me": ("test-value",)
}
self.assertEqual(span.kind, SpanKind.SERVER)
self.assertSpanHasAttributes(span, expected)
for key, _ in not_expected.items():
self.assertNotIn(key, span.attributes)
def test_custom_response_header_not_added_in_internal_span(self):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("test", kind=SpanKind.SERVER):
resp = self.client.get("/test_custom_response_headers")
self.assertEqual(200, resp.status_code)
span = self.memory_exporter.get_finished_spans()[0]
not_expected = {
"http.response.header.content_type": (
"text/plain; charset=utf-8",
),
"http.response.header.content_length": ("7",),
"http.response.header.my_custom_header": (
"my-custom-value-1,my-custom-header-2",
),
}
self.assertEqual(span.kind, SpanKind.INTERNAL)
for key, _ in not_expected.items():
self.assertNotIn(key, span.attributes)
class TestCustomHeadersNonRecordingSpan(
InstrumentationTest, TestBase, WsgiTestBase
):
def setUp(self):
super().setUp()
# This is done because set_tracer_provider cannot override the
# current tracer provider.
reset_trace_globals()
tracer_provider = trace.NoOpTracerProvider()
trace.set_tracer_provider(tracer_provider)
PyramidInstrumentor().instrument()
self.config = Configurator()
self._common_initialization(self.config)
self.env_patch = patch.dict(
"os.environ",
{
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST: "Custom-Test-Header-1,Custom-Test-Header-2,invalid-header",
OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE: "content-type,content-length,my-custom-header,invalid-header",
},
)
self.env_patch.start()
def tearDown(self) -> None:
super().tearDown()
self.env_patch.stop()
with self.disable_logging():
PyramidInstrumentor().uninstrument()
def test_custom_header_non_recording_span(self):
try:
resp = self.client.get("/hello/123")
self.assertEqual(200, resp.status_code)
except Exception as exc: # pylint: disable=W0703
self.fail(f"Exception raised with NonRecordingSpan {exc}")