Add hooks for aiohttp, asgi, starlette, fastAPI, urllib, urllib3 (#576)

This commit is contained in:
Ryo Kather
2021-07-26 14:34:50 -07:00
committed by GitHub
parent 1157eb294d
commit c5c6977584
15 changed files with 557 additions and 223 deletions

View File

@ -6,8 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.4.0-0.23b0...HEAD)
### Added
- `opentelemetry-sdk-extension-aws` Add AWS resource detectors to extension package
([#586](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/586))
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-aiohttp-client`, `openetelemetry-instrumentation-fastapi`,
`opentelemetry-instrumentation-starlette`, `opentelemetry-instrumentation-urllib`, `opentelemetry-instrumentation-urllib3` Added `request_hook` and `response_hook` callbacks
([#576](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/576))
## [1.4.0-0.23b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.4.0-0.23b0) - 2021-07-21

View File

@ -81,13 +81,25 @@ from opentelemetry.instrumentation.utils import (
)
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
from opentelemetry.trace import Span, SpanKind, TracerProvider, get_tracer
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util.http import remove_url_credentials
_UrlFilterT = typing.Optional[typing.Callable[[str], str]]
_SpanNameT = typing.Optional[
typing.Union[typing.Callable[[aiohttp.TraceRequestStartParams], str], str]
_RequestHookT = typing.Optional[
typing.Callable[[Span, aiohttp.TraceRequestStartParams], None]
]
_ResponseHookT = typing.Optional[
typing.Callable[
[
Span,
typing.Union[
aiohttp.TraceRequestEndParams,
aiohttp.TraceRequestExceptionParams,
],
],
None,
]
]
@ -108,7 +120,8 @@ def url_path_span_name(params: aiohttp.TraceRequestStartParams) -> str:
def create_trace_config(
url_filter: _UrlFilterT = None,
span_name: _SpanNameT = None,
request_hook: _RequestHookT = None,
response_hook: _ResponseHookT = None,
tracer_provider: TracerProvider = None,
) -> aiohttp.TraceConfig:
"""Create an aiohttp-compatible trace configuration.
@ -134,15 +147,16 @@ def create_trace_config(
it as a span attribute. This can be useful to remove sensitive data
such as API keys or user personal information.
:param str span_name: Override the default span name.
:param Callable request_hook: Optional callback that can modify span name and request params.
:param Callable response_hook: Optional callback that can modify span name and response params.
:param tracer_provider: optional TracerProvider from which to get a Tracer
:return: An object suitable for use with :py:class:`aiohttp.ClientSession`.
:rtype: :py:class:`aiohttp.TraceConfig`
"""
# `aiohttp.TraceRequestStartParams` resolves to `aiohttp.tracing.TraceRequestStartParams`
# which doesn't exist in the aiottp intersphinx inventory.
# Explicitly specify the type for the `span_name` param and rtype to work
# which doesn't exist in the aiohttp intersphinx inventory.
# Explicitly specify the type for the `request_hook` and `response_hook` param and rtype to work
# around this issue.
tracer = get_tracer(__name__, __version__, tracer_provider)
@ -161,17 +175,15 @@ def create_trace_config(
return
http_method = params.method.upper()
if trace_config_ctx.span_name is None:
request_span_name = "HTTP {}".format(http_method)
elif callable(trace_config_ctx.span_name):
request_span_name = str(trace_config_ctx.span_name(params))
else:
request_span_name = str(trace_config_ctx.span_name)
trace_config_ctx.span = trace_config_ctx.tracer.start_span(
request_span_name, kind=SpanKind.CLIENT,
)
if callable(request_hook):
request_hook(trace_config_ctx.span, params)
if trace_config_ctx.span.is_recording():
attributes = {
SpanAttributes.HTTP_METHOD: http_method,
@ -198,6 +210,9 @@ def create_trace_config(
if trace_config_ctx.span is None:
return
if callable(response_hook):
response_hook(trace_config_ctx.span, params)
if trace_config_ctx.span.is_recording():
trace_config_ctx.span.set_status(
Status(http_status_to_status_code(int(params.response.status)))
@ -215,6 +230,9 @@ def create_trace_config(
if trace_config_ctx.span is None:
return
if callable(response_hook):
response_hook(trace_config_ctx.span, params)
if trace_config_ctx.span.is_recording() and params.exception:
trace_config_ctx.span.set_status(Status(StatusCode.ERROR))
trace_config_ctx.span.record_exception(params.exception)
@ -223,7 +241,7 @@ def create_trace_config(
def _trace_config_ctx_factory(**kwargs):
kwargs.setdefault("trace_request_ctx", {})
return types.SimpleNamespace(
span_name=span_name, tracer=tracer, url_filter=url_filter, **kwargs
tracer=tracer, url_filter=url_filter, **kwargs
)
trace_config = aiohttp.TraceConfig(
@ -240,7 +258,8 @@ def create_trace_config(
def _instrument(
tracer_provider: TracerProvider = None,
url_filter: _UrlFilterT = None,
span_name: _SpanNameT = None,
request_hook: _RequestHookT = None,
response_hook: _ResponseHookT = None,
):
"""Enables tracing of all ClientSessions
@ -256,7 +275,8 @@ def _instrument(
trace_config = create_trace_config(
url_filter=url_filter,
span_name=span_name,
request_hook=request_hook,
response_hook=response_hook,
tracer_provider=tracer_provider,
)
trace_config._is_instrumented_by_opentelemetry = True
@ -304,12 +324,14 @@ class AioHttpClientInstrumentor(BaseInstrumentor):
``url_filter``: A callback to process the requested URL prior to adding
it as a span attribute. This can be useful to remove sensitive data
such as API keys or user personal information.
``span_name``: Override the default span name.
``request_hook``: An optional callback that is invoked right after a span is created.
``response_hook``: An optional callback which is invoked right before the span is finished processing a response.
"""
_instrument(
tracer_provider=kwargs.get("tracer_provider"),
url_filter=kwargs.get("url_filter"),
span_name=kwargs.get("span_name"),
request_hook=kwargs.get("request_hook"),
response_hook=kwargs.get("response_hook"),
)
def _uninstrument(self, **kwargs):

View File

@ -33,7 +33,7 @@ from opentelemetry.instrumentation.aiohttp_client import (
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import StatusCode
from opentelemetry.trace import Span, StatusCode
def run_with_test_server(
@ -161,45 +161,50 @@ class TestAioHttpIntegration(TestBase):
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)
def test_span_name_option(self):
for span_name, method, path, expected in (
("static", "POST", "/static-span-name", "static"),
(
lambda params: "{} - {}".format(
params.method, params.url.path
),
"PATCH",
"/some/path",
"PATCH - /some/path",
),
def test_hooks(self):
method = "PATCH"
path = "/some/path"
expected = "PATCH - /some/path"
def request_hook(span: Span, params: aiohttp.TraceRequestStartParams):
span.update_name("{} - {}".format(params.method, params.url.path))
def response_hook(
span: Span,
params: typing.Union[
aiohttp.TraceRequestEndParams,
aiohttp.TraceRequestExceptionParams,
],
):
with self.subTest(span_name=span_name, method=method, path=path):
span.set_attribute("response_hook_attr", "value")
host, port = self._http_request(
trace_config=aiohttp_client.create_trace_config(
span_name=span_name
request_hook=request_hook, response_hook=response_hook,
),
method=method,
url=path,
status_code=HTTPStatus.OK,
)
self.assert_spans(
[
(
expected,
for span in self.memory_exporter.get_finished_spans():
self.assertEqual(span.name, expected)
self.assertEqual(
(span.status.status_code, span.status.description),
(StatusCode.UNSET, None),
{
SpanAttributes.HTTP_METHOD: method,
SpanAttributes.HTTP_URL: "http://{}:{}{}".format(
host, port, path
),
SpanAttributes.HTTP_STATUS_CODE: int(
HTTPStatus.OK
),
},
)
]
self.assertEqual(
span.attributes[SpanAttributes.HTTP_METHOD], method
)
self.assertEqual(
span.attributes[SpanAttributes.HTTP_URL],
"http://{}:{}{}".format(host, port, path),
)
self.assertEqual(
span.attributes[SpanAttributes.HTTP_STATUS_CODE], HTTPStatus.OK
)
self.assertIn("response_hook_attr", span.attributes)
self.assertEqual(span.attributes["response_hook_attr"], "value")
self.memory_exporter.clear()
def test_url_filter_option(self):
@ -501,12 +506,23 @@ class TestAioHttpClientInstrumentor(TestBase):
span.attributes[SpanAttributes.HTTP_URL],
)
def test_span_name(self):
def span_name_callback(params: aiohttp.TraceRequestStartParams) -> str:
return "{} - {}".format(params.method, params.url.path)
def test_hooks(self):
def request_hook(span: Span, params: aiohttp.TraceRequestStartParams):
span.update_name("{} - {}".format(params.method, params.url.path))
def response_hook(
span: Span,
params: typing.Union[
aiohttp.TraceRequestEndParams,
aiohttp.TraceRequestExceptionParams,
],
):
span.set_attribute("response_hook_attr", "value")
AioHttpClientInstrumentor().uninstrument()
AioHttpClientInstrumentor().instrument(span_name=span_name_callback)
AioHttpClientInstrumentor().instrument(
request_hook=request_hook, response_hook=response_hook
)
url = "/test-path"
run_with_test_server(
@ -514,6 +530,8 @@ class TestAioHttpClientInstrumentor(TestBase):
)
span = self.assert_spans(1)
self.assertEqual("GET - /test-path", span.name)
self.assertIn("response_hook_attr", span.attributes)
self.assertEqual(span.attributes["response_hook_attr"], "value")
class TestLoadingAioHttpInstrumentor(unittest.TestCase):

View File

@ -31,9 +31,14 @@ from opentelemetry.instrumentation.utils import http_status_to_status_code
from opentelemetry.propagate import extract
from opentelemetry.propagators.textmap import Getter
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util.http import remove_url_credentials
_ServerRequestHookT = typing.Optional[typing.Callable[[Span, dict], None]]
_ClientRequestHookT = typing.Optional[typing.Callable[[Span, dict], None]]
_ClientResponseHookT = typing.Optional[typing.Callable[[Span, dict], None]]
class ASGIGetter(Getter):
def get(
@ -141,11 +146,9 @@ def set_status_code(span, status_code):
def get_default_span_details(scope: dict) -> Tuple[str, dict]:
"""Default implementation for span_details_callback
"""Default implementation for get_default_span_details
Args:
scope: the asgi scope dictionary
Returns:
a tuple of the span name, and any attributes to attach to the span.
"""
@ -164,10 +167,15 @@ class OpenTelemetryMiddleware:
Args:
app: The ASGI application callable to forward requests to.
span_details_callback: Callback which should return a string
and a tuple, representing the desired span name and a
default_span_details: Callback which should return a string and a tuple, representing the desired default span name and a
dictionary with any additional span attributes to set.
Optional: Defaults to get_default_span_details.
server_request_hook: Optional callback which is called with the server span and ASGI
scope object for every incoming request.
client_request_hook: Optional callback which is called with the internal span and an ASGI
scope which is sent as a dictionary for when the method recieve is called.
client_response_hook: Optional callback which is called with the internal span and an ASGI
event which is sent as a dictionary for when the method send is called.
tracer_provider: The optional tracer provider to use. If omitted
the current globally configured one is used.
"""
@ -176,15 +184,21 @@ class OpenTelemetryMiddleware:
self,
app,
excluded_urls=None,
span_details_callback=None,
default_span_details=None,
server_request_hook: _ServerRequestHookT = None,
client_request_hook: _ClientRequestHookT = None,
client_response_hook: _ClientResponseHookT = None,
tracer_provider=None,
):
self.app = guarantee_single_callable(app)
self.tracer = trace.get_tracer(__name__, __version__, tracer_provider)
self.span_details_callback = (
span_details_callback or get_default_span_details
)
self.excluded_urls = excluded_urls
self.default_span_details = (
default_span_details or get_default_span_details
)
self.server_request_hook = server_request_hook
self.client_request_hook = client_request_hook
self.client_response_hook = client_response_hook
async def __call__(self, scope, receive, send):
"""The ASGI application
@ -202,7 +216,7 @@ class OpenTelemetryMiddleware:
return await self.app(scope, receive, send)
token = context.attach(extract(scope, getter=asgi_getter))
span_name, additional_attributes = self.span_details_callback(scope)
span_name, additional_attributes = self.default_span_details(scope)
try:
with self.tracer.start_as_current_span(
@ -214,11 +228,16 @@ class OpenTelemetryMiddleware:
for key, value in attributes.items():
span.set_attribute(key, value)
if callable(self.server_request_hook):
self.server_request_hook(span, scope)
@wraps(receive)
async def wrapped_receive():
with self.tracer.start_as_current_span(
" ".join((span_name, scope["type"], "receive"))
) as receive_span:
if callable(self.client_request_hook):
self.client_request_hook(receive_span, scope)
message = await receive()
if receive_span.is_recording():
if message["type"] == "websocket.receive":
@ -231,6 +250,8 @@ class OpenTelemetryMiddleware:
with self.tracer.start_as_current_span(
" ".join((span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, message)
if send_span.is_recording():
if message["type"] == "http.response.start":
status_code = message["status"]

View File

@ -193,7 +193,7 @@ class TestAsgiApplication(AsgiTestBase):
self.validate_outputs(outputs, error=ValueError)
def test_override_span_name(self):
"""Test that span_names can be overwritten by our callback function."""
"""Test that default span_names can be overwritten by our callback function."""
span_name = "Dymaxion"
def get_predefined_span_details(_):
@ -210,7 +210,7 @@ class TestAsgiApplication(AsgiTestBase):
return expected
app = otel_asgi.OpenTelemetryMiddleware(
simple_asgi, span_details_callback=get_predefined_span_details
simple_asgi, default_span_details=get_predefined_span_details
)
self.seed_app(app)
self.send_default_request()
@ -367,6 +367,39 @@ class TestAsgiApplication(AsgiTestBase):
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 0)
def test_hooks(self):
def server_request_hook(span, scope):
span.update_name("name from server hook")
def client_request_hook(recieve_span, request):
recieve_span.update_name("name from client request hook")
def client_response_hook(send_span, response):
send_span.set_attribute("attr-from-hook", "value")
def update_expected_hook_results(expected):
for entry in expected:
if entry["kind"] == trace_api.SpanKind.SERVER:
entry["name"] = "name from server hook"
elif entry["name"] == "/ http receive":
entry["name"] = "name from client request hook"
elif entry["name"] == "/ http send":
entry["attributes"].update({"attr-from-hook": "value"})
return expected
app = otel_asgi.OpenTelemetryMiddleware(
simple_asgi,
server_request_hook=server_request_hook,
client_request_hook=client_request_hook,
client_response_hook=client_response_hook,
)
self.seed_app(app)
self.send_default_request()
outputs = self.get_all_output()
self.validate_outputs(
outputs, modifiers=[update_expected_hook_results]
)
class TestAsgiAttributes(unittest.TestCase):
def setUp(self):

View File

@ -13,6 +13,7 @@
# limitations under the License.
import logging
import typing
from typing import Collection
import fastapi
@ -23,11 +24,16 @@ from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
from opentelemetry.instrumentation.asgi.package import _instruments
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span
from opentelemetry.util.http import get_excluded_urls, parse_excluded_urls
_excluded_urls_from_env = get_excluded_urls("FASTAPI")
_logger = logging.getLogger(__name__)
_ServerRequestHookT = typing.Optional[typing.Callable[[Span, dict], None]]
_ClientRequestHookT = typing.Optional[typing.Callable[[Span, dict], None]]
_ClientResponseHookT = typing.Optional[typing.Callable[[Span, dict], None]]
class FastAPIInstrumentor(BaseInstrumentor):
"""An instrumentor for FastAPI
@ -39,7 +45,12 @@ class FastAPIInstrumentor(BaseInstrumentor):
@staticmethod
def instrument_app(
app: fastapi.FastAPI, tracer_provider=None, excluded_urls=None,
app: fastapi.FastAPI,
server_request_hook: _ServerRequestHookT = None,
client_request_hook: _ClientRequestHookT = None,
client_response_hook: _ClientResponseHookT = None,
tracer_provider=None,
excluded_urls=None,
):
"""Instrument an uninstrumented FastAPI application."""
if not hasattr(app, "_is_instrumented_by_opentelemetry"):
@ -54,7 +65,10 @@ class FastAPIInstrumentor(BaseInstrumentor):
app.add_middleware(
OpenTelemetryMiddleware,
excluded_urls=excluded_urls,
span_details_callback=_get_route_details,
default_span_details=_get_route_details,
server_request_hook=server_request_hook,
client_request_hook=client_request_hook,
client_response_hook=client_response_hook,
tracer_provider=tracer_provider,
)
app._is_instrumented_by_opentelemetry = True
@ -79,6 +93,15 @@ class FastAPIInstrumentor(BaseInstrumentor):
def _instrument(self, **kwargs):
self._original_fastapi = fastapi.FastAPI
_InstrumentedFastAPI._tracer_provider = kwargs.get("tracer_provider")
_InstrumentedFastAPI._server_request_hook = kwargs.get(
"server_request_hook"
)
_InstrumentedFastAPI._client_request_hook = kwargs.get(
"client_request_hook"
)
_InstrumentedFastAPI._client_response_hook = kwargs.get(
"client_response_hook"
)
_excluded_urls = kwargs.get("excluded_urls")
_InstrumentedFastAPI._excluded_urls = (
_excluded_urls_from_env
@ -94,13 +117,19 @@ class FastAPIInstrumentor(BaseInstrumentor):
class _InstrumentedFastAPI(fastapi.FastAPI):
_tracer_provider = None
_excluded_urls = None
_server_request_hook: _ServerRequestHookT = None
_client_request_hook: _ClientRequestHookT = None
_client_response_hook: _ClientResponseHookT = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.add_middleware(
OpenTelemetryMiddleware,
excluded_urls=_InstrumentedFastAPI._excluded_urls,
span_details_callback=_get_route_details,
default_span_details=_get_route_details,
server_request_hook=_InstrumentedFastAPI._server_request_hook,
client_request_hook=_InstrumentedFastAPI._client_request_hook,
client_response_hook=_InstrumentedFastAPI._client_response_hook,
tracer_provider=_InstrumentedFastAPI._tracer_provider,
)

View File

@ -29,13 +29,24 @@ from opentelemetry.util.http import get_excluded_urls
class TestFastAPIManualInstrumentation(TestBase):
def _create_app(self):
app = self._create_fastapi_app()
self._instrumentor.instrument_app(app)
self._instrumentor.instrument_app(
app=app,
server_request_hook=getattr(self, "server_request_hook", None),
client_request_hook=getattr(self, "client_request_hook", None),
client_response_hook=getattr(self, "client_response_hook", None),
)
return app
def _create_app_explicit_excluded_urls(self):
app = self._create_fastapi_app()
to_exclude = "/user/123,/foobar"
self._instrumentor.instrument_app(app, excluded_urls=to_exclude)
self._instrumentor.instrument_app(
app,
excluded_urls=to_exclude,
server_request_hook=getattr(self, "server_request_hook", None),
client_request_hook=getattr(self, "client_request_hook", None),
client_response_hook=getattr(self, "client_response_hook", None),
)
return app
def setUp(self):
@ -166,6 +177,56 @@ class TestFastAPIManualInstrumentation(TestBase):
return app
class TestFastAPIManualInstrumentationHooks(TestFastAPIManualInstrumentation):
_server_request_hook = None
_client_request_hook = None
_client_response_hook = None
def server_request_hook(self, span, scope):
if self._server_request_hook is not None:
self._server_request_hook(span, scope)
def client_request_hook(self, receive_span, request):
if self._client_request_hook is not None:
self._client_request_hook(receive_span, request)
def client_response_hook(self, send_span, response):
if self._client_response_hook is not None:
self._client_response_hook(send_span, response)
def test_hooks(self):
def server_request_hook(span, scope):
span.update_name("name from server hook")
def client_request_hook(receive_span, request):
receive_span.update_name("name from client hook")
receive_span.set_attribute("attr-from-request-hook", "set")
def client_response_hook(send_span, response):
send_span.update_name("name from response hook")
send_span.set_attribute("attr-from-response-hook", "value")
self._server_request_hook = server_request_hook
self._client_request_hook = client_request_hook
self._client_response_hook = client_response_hook
self._client.get("/foobar")
spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(
len(spans), 3
) # 1 server span and 2 response spans (response start and body)
server_span = spans[2]
self.assertEqual(server_span.name, "name from server hook")
response_spans = spans[:2]
for span in response_spans:
self.assertEqual(span.name, "name from response hook")
self.assert_span_has_attributes(
span, {"attr-from-response-hook": "value"}
)
class TestAutoInstrumentation(TestFastAPIManualInstrumentation):
"""Test the auto-instrumented variant
@ -210,6 +271,47 @@ class TestAutoInstrumentation(TestFastAPIManualInstrumentation):
super().tearDown()
class TestAutoInstrumentationHooks(TestFastAPIManualInstrumentationHooks):
"""
Test the auto-instrumented variant for request and response hooks
Extending the manual instrumentation to inherit defined hooks and since most test cases apply
to both.
"""
def _create_app(self):
# instrumentation is handled by the instrument call
self._instrumentor.instrument(
server_request_hook=getattr(self, "server_request_hook", None),
client_request_hook=getattr(self, "client_request_hook", None),
client_response_hook=getattr(self, "client_response_hook", None),
)
return self._create_fastapi_app()
def _create_app_explicit_excluded_urls(self):
resource = Resource.create({"key1": "value1", "key2": "value2"})
tracer_provider, exporter = self.create_tracer_provider(
resource=resource
)
self.memory_exporter = exporter
to_exclude = "/user/123,/foobar"
self._instrumentor.uninstrument() # Disable previous instrumentation (setUp)
self._instrumentor.instrument(
tracer_provider=tracer_provider,
excluded_urls=to_exclude,
server_request_hook=getattr(self, "server_request_hook", None),
client_request_hook=getattr(self, "client_request_hook", None),
client_response_hook=getattr(self, "client_response_hook", None),
)
return self._create_fastapi_app()
def tearDown(self):
self._instrumentor.uninstrument()
super().tearDown()
class TestAutoInstrumentationLogic(unittest.TestCase):
def test_instrumentation(self):
"""Verify that instrumentation methods are instrumenting and

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import typing
from typing import Collection
from starlette import applications
@ -21,10 +22,15 @@ from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
from opentelemetry.instrumentation.asgi.package import _instruments
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span
from opentelemetry.util.http import get_excluded_urls
_excluded_urls = get_excluded_urls("STARLETTE")
_ServerRequestHookT = typing.Optional[typing.Callable[[Span, dict], None]]
_ClientRequestHookT = typing.Optional[typing.Callable[[Span, dict], None]]
_ClientResponseHookT = typing.Optional[typing.Callable[[Span, dict], None]]
class StarletteInstrumentor(BaseInstrumentor):
"""An instrumentor for starlette
@ -35,13 +41,22 @@ class StarletteInstrumentor(BaseInstrumentor):
_original_starlette = None
@staticmethod
def instrument_app(app: applications.Starlette, tracer_provider=None):
def instrument_app(
app: applications.Starlette,
server_request_hook: _ServerRequestHookT = None,
client_request_hook: _ClientRequestHookT = None,
client_response_hook: _ClientResponseHookT = None,
tracer_provider=None,
):
"""Instrument an uninstrumented Starlette application."""
if not getattr(app, "is_instrumented_by_opentelemetry", False):
app.add_middleware(
OpenTelemetryMiddleware,
excluded_urls=_excluded_urls,
span_details_callback=_get_route_details,
default_span_details=_get_route_details,
server_request_hook=server_request_hook,
client_request_hook=client_request_hook,
client_response_hook=client_response_hook,
tracer_provider=tracer_provider,
)
app.is_instrumented_by_opentelemetry = True
@ -52,6 +67,15 @@ class StarletteInstrumentor(BaseInstrumentor):
def _instrument(self, **kwargs):
self._original_starlette = applications.Starlette
_InstrumentedStarlette._tracer_provider = kwargs.get("tracer_provider")
_InstrumentedStarlette._server_request_hook = kwargs.get(
"server_request_hook"
)
_InstrumentedStarlette._client_request_hook = kwargs.get(
"client_request_hook"
)
_InstrumentedStarlette._client_response_hook = kwargs.get(
"client_response_hook"
)
applications.Starlette = _InstrumentedStarlette
def _uninstrument(self, **kwargs):
@ -60,13 +84,19 @@ class StarletteInstrumentor(BaseInstrumentor):
class _InstrumentedStarlette(applications.Starlette):
_tracer_provider = None
_server_request_hook: _ServerRequestHookT = None
_client_request_hook: _ClientRequestHookT = None
_client_response_hook: _ClientResponseHookT = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.add_middleware(
OpenTelemetryMiddleware,
excluded_urls=_excluded_urls,
span_details_callback=_get_route_details,
default_span_details=_get_route_details,
server_request_hook=_InstrumentedStarlette._server_request_hook,
client_request_hook=_InstrumentedStarlette._client_request_hook,
client_response_hook=_InstrumentedStarlette._client_response_hook,
tracer_provider=_InstrumentedStarlette._tracer_provider,
)

View File

@ -30,7 +30,12 @@ from opentelemetry.util.http import get_excluded_urls
class TestStarletteManualInstrumentation(TestBase):
def _create_app(self):
app = self._create_starlette_app()
self._instrumentor.instrument_app(app)
self._instrumentor.instrument_app(
app=app,
server_request_hook=getattr(self, "server_request_hook", None),
client_request_hook=getattr(self, "client_request_hook", None),
client_response_hook=getattr(self, "client_response_hook", None),
)
return app
def setUp(self):
@ -101,6 +106,58 @@ class TestStarletteManualInstrumentation(TestBase):
return app
class TestStarletteManualInstrumentationHooks(
TestStarletteManualInstrumentation
):
_server_request_hook = None
_client_request_hook = None
_client_response_hook = None
def server_request_hook(self, span, scope):
if self._server_request_hook is not None:
self._server_request_hook(span, scope)
def client_request_hook(self, receive_span, request):
if self._client_request_hook is not None:
self._client_request_hook(receive_span, request)
def client_response_hook(self, send_span, response):
if self._client_response_hook is not None:
self._client_response_hook(send_span, response)
def test_hooks(self):
def server_request_hook(span, scope):
span.update_name("name from server hook")
def client_request_hook(receive_span, request):
receive_span.update_name("name from client hook")
receive_span.set_attribute("attr-from-request-hook", "set")
def client_response_hook(send_span, response):
send_span.update_name("name from response hook")
send_span.set_attribute("attr-from-response-hook", "value")
self._server_request_hook = server_request_hook
self._client_request_hook = client_request_hook
self._client_response_hook = client_response_hook
self._client.get("/foobar")
spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(
len(spans), 3
) # 1 server span and 2 response spans (response start and body)
server_span = spans[2]
self.assertEqual(server_span.name, "name from server hook")
response_spans = spans[:2]
for span in response_spans:
self.assertEqual(span.name, "name from response hook")
self.assert_span_has_attributes(
span, {"attr-from-response-hook": "value"}
)
class TestAutoInstrumentation(TestStarletteManualInstrumentation):
"""Test the auto-instrumented variant
@ -132,6 +189,26 @@ class TestAutoInstrumentation(TestStarletteManualInstrumentation):
self.assertEqual(span.resource.attributes["key2"], "value2")
class TestAutoInstrumentationHooks(TestStarletteManualInstrumentationHooks):
"""
Test the auto-instrumented variant for request and response hooks
"""
def _create_app(self):
# instrumentation is handled by the instrument call
self._instrumentor.instrument(
server_request_hook=getattr(self, "server_request_hook", None),
client_request_hook=getattr(self, "client_request_hook", None),
client_response_hook=getattr(self, "client_response_hook", None),
)
return self._create_starlette_app()
def tearDown(self):
self._instrumentor.uninstrument()
super().tearDown()
class TestAutoInstrumentationLogic(unittest.TestCase):
def test_instrumentation(self):
"""Verify that instrumentation methods are instrumenting and

View File

@ -38,7 +38,7 @@ Hooks
*******
Tornado instrumentation supports extending tracing behaviour with the help of hooks.
It's ``instrument()`` method accepts three optional functions that get called back with the
Its ``instrument()`` method accepts three optional functions that get called back with the
created span and some other contextual information. Example:
.. code-block:: python

View File

@ -505,7 +505,7 @@ class TornadoHookTest(TornadoTest):
def client_request_hook(span, request):
span.update_name("name from client hook")
def client_response_hook(span, request):
def client_response_hook(span, response):
span.set_attribute("attr-from-hook", "value")
self._server_request_hook = server_request_hook

View File

@ -32,12 +32,39 @@ Usage
req = request.Request('https://postman-echo.com/post', method="POST")
r = request.urlopen(req)
Hooks
*******
The urllib instrumentation supports extending tracing behavior with the help of
request and response hooks. These are functions that are called back by the instrumentation
right after a Span is created for a request and right before the span is finished processing a response respectively.
The hooks can be configured as follows:
..code:: python
# `request_obj` is an instance of urllib.request.Request
def request_hook(span, request_obj):
pass
# `request_obj` is an instance of urllib.request.Request
# `response` is an instance of http.client.HTTPResponse
def response_hook(span, request_obj, response)
pass
URLLibInstrumentor.instrument(
request_hook=request_hook, response_hook=response_hook)
)
API
---
"""
import functools
import types
import typing
# from urllib import response
from http import client
from typing import Collection
from urllib.request import ( # pylint: disable=no-name-in-module,import-error
OpenerDirector,
@ -54,7 +81,7 @@ from opentelemetry.instrumentation.utils import (
)
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer
from opentelemetry.trace import Span, SpanKind, get_tracer
from opentelemetry.trace.status import Status
from opentelemetry.util.http import remove_url_credentials
@ -64,6 +91,11 @@ _SUPPRESS_HTTP_INSTRUMENTATION_KEY = context.create_key(
"suppress_http_instrumentation"
)
_RequestHookT = typing.Optional[typing.Callable[[Span, Request], None]]
_ResponseHookT = typing.Optional[
typing.Callable[[Span, Request, client.HTTPResponse], None]
]
class URLLibInstrumentor(BaseInstrumentor):
"""An instrumentor for urllib
@ -79,18 +111,15 @@ class URLLibInstrumentor(BaseInstrumentor):
Args:
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global
``span_callback``: An optional callback invoked before returning the http response.
Invoked with Span and http.client.HTTPResponse
``name_callback``: Callback which calculates a generic span name for an
outgoing HTTP request based on the method and url.
Optional: Defaults to get_default_span_name.
``request_hook``: An optional callback invoked that is invoked right after a span is created.
``response_hook``: An optional callback which is invoked right before the span is finished processing a response
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
_instrument(
tracer,
span_callback=kwargs.get("span_callback"),
name_callback=kwargs.get("name_callback"),
request_hook=kwargs.get("request_hook"),
response_hook=kwargs.get("response_hook"),
)
def _uninstrument(self, **kwargs):
@ -103,12 +132,11 @@ class URLLibInstrumentor(BaseInstrumentor):
_uninstrument_from(opener, restore_as_bound_func=True)
def get_default_span_name(method):
"""Default implementation for name_callback, returns HTTP {method_name}."""
return "HTTP {}".format(method).strip()
def _instrument(tracer, span_callback=None, name_callback=None):
def _instrument(
tracer,
request_hook: _RequestHookT = None,
response_hook: _ResponseHookT = None,
):
"""Enables tracing of all requests calls that go through
:code:`urllib.Client._make_request`"""
@ -143,11 +171,7 @@ def _instrument(tracer, span_callback=None, name_callback=None):
method = request.get_method().upper()
url = request.full_url
span_name = ""
if name_callback is not None:
span_name = name_callback(method, url)
if not span_name or not isinstance(span_name, str):
span_name = get_default_span_name(method)
span_name = "HTTP {}".format(method).strip()
url = remove_url_credentials(url)
@ -160,6 +184,8 @@ def _instrument(tracer, span_callback=None, name_callback=None):
span_name, kind=SpanKind.CLIENT
) as span:
exception = None
if callable(request_hook):
request_hook(span, request)
if span.is_recording():
span.set_attribute(SpanAttributes.HTTP_METHOD, method)
span.set_attribute(SpanAttributes.HTTP_URL, url)
@ -193,8 +219,8 @@ def _instrument(tracer, span_callback=None, name_callback=None):
ver_[:1], ver_[:-1]
)
if span_callback is not None:
span_callback(span, result)
if callable(response_hook):
response_hook(span, request, result)
if exception is not None:
raise exception.with_traceback(exception.__traceback__)

View File

@ -15,7 +15,6 @@
import abc
import socket
import urllib
from http.client import HTTPResponse
from unittest import mock
from urllib import request
from urllib.error import HTTPError
@ -120,31 +119,6 @@ class RequestsIntegrationTestBase(abc.ABC):
span, opentelemetry.instrumentation.urllib
)
def test_name_callback(self):
def name_callback(method, url):
return "GET" + url
URLLibInstrumentor().uninstrument()
URLLibInstrumentor().instrument(name_callback=name_callback)
result = self.perform_request(self.URL)
self.assertEqual(result.read(), b"Hello!")
span = self.assert_span()
self.assertEqual(span.name, "GET" + self.URL)
def test_name_callback_default(self):
def name_callback(method, url):
return 123
URLLibInstrumentor().uninstrument()
URLLibInstrumentor().instrument(name_callback=name_callback)
result = self.perform_request(self.URL)
self.assertEqual(result.read(), b"Hello!")
span = self.assert_span()
self.assertEqual(span.name, "HTTP GET")
def test_not_foundbasic(self):
url_404 = "http://httpbin.org/status/404/"
httpretty.register_uri(
@ -252,31 +226,6 @@ class RequestsIntegrationTestBase(abc.ABC):
finally:
set_global_textmap(previous_propagator)
def test_span_callback(self):
URLLibInstrumentor().uninstrument()
def span_callback(span, result: HTTPResponse):
span.set_attribute("http.response.body", result.read())
URLLibInstrumentor().instrument(
tracer_provider=self.tracer_provider, span_callback=span_callback,
)
result = self.perform_request(self.URL)
self.assertEqual(result.read(), b"")
span = self.assert_span()
self.assertEqual(
span.attributes,
{
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: self.URL,
SpanAttributes.HTTP_STATUS_CODE: 200,
"http.response.body": "Hello!",
},
)
def test_custom_tracer_provider(self):
resource = resources.Resource.create({})
result = self.create_tracer_provider(resource=resource)
@ -330,6 +279,26 @@ class RequestsIntegrationTestBase(abc.ABC):
span = self.assert_span()
self.assertEqual(span.attributes[SpanAttributes.HTTP_URL], self.URL)
def test_hooks(self):
def request_hook(span, request_obj):
span.update_name("name set from hook")
def response_hook(span, request_obj, response):
span.set_attribute("response_hook_attr", "value")
URLLibInstrumentor().uninstrument()
URLLibInstrumentor().instrument(
request_hook=request_hook, response_hook=response_hook
)
result = self.perform_request(self.URL)
self.assertEqual(result.read(), b"Hello!")
span = self.assert_span()
self.assertEqual(span.name, "name set from hook")
self.assertIn("response_hook_attr", span.attributes)
self.assertEqual(span.attributes["response_hook_attr"], "value")
class TestRequestsIntegration(RequestsIntegrationTestBase, TestBase):
@staticmethod

View File

@ -21,25 +21,42 @@ Usage
.. code-block:: python
import urllib3
import urllib3.util
from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor
def strip_query_params(url: str) -> str:
return url.split("?")[0]
def span_name_callback(method: str, url: str, headers):
return urllib3.util.Url(url).path
URLLib3Instrumentor().instrument(
# Remove all query params from the URL attribute on the span.
url_filter=strip_query_params,
# Use the URL's path as the span name.
span_name_or_callback=span_name_callback
)
http = urllib3.PoolManager()
response = http.request("GET", "https://www.example.org/")
Hooks
*******
The urllib3 instrumentation supports extending tracing behavior with the help of
request and response hooks. These are functions that are called back by the instrumentation
right after a Span is created for a request and right before the span is finished processing a response respectively.
The hooks can be configured as follows:
.. code:: python
# `request` is an instance of urllib3.connectionpool.HTTPConnectionPool
def request_hook(span, request):
pass
# `request` is an instance of urllib3.connectionpool.HTTPConnectionPool
# `response` is an instance of urllib3.response.HTTPResponse
def response_hook(span, request, response):
pass
URLLib3Instrumentor.instrument(
request_hook=request_hook, response_hook=response_hook)
)
API
---
"""
@ -72,8 +89,18 @@ _SUPPRESS_HTTP_INSTRUMENTATION_KEY = context.create_key(
)
_UrlFilterT = typing.Optional[typing.Callable[[str], str]]
_SpanNameT = typing.Optional[
typing.Union[typing.Callable[[str, str, typing.Mapping], str], str]
_RequestHookT = typing.Optional[
typing.Callable[[Span, urllib3.connectionpool.HTTPConnectionPool], None]
]
_ResponseHookT = typing.Optional[
typing.Callable[
[
Span,
urllib3.connectionpool.HTTPConnectionPool,
urllib3.response.HTTPResponse,
],
None,
]
]
_URL_OPEN_ARG_TO_INDEX_MAPPING = {
@ -92,7 +119,8 @@ class URLLib3Instrumentor(BaseInstrumentor):
Args:
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global.
``span_name_or_callback``: Override the default span name.
``request_hook``: An optional callback that is invoked right after a span is created.
``response_hook``: An optional callback which is invoked right before the span is finished processing a response.
``url_filter``: A callback to process the requested URL prior
to adding it as a span attribute.
"""
@ -100,7 +128,8 @@ class URLLib3Instrumentor(BaseInstrumentor):
tracer = get_tracer(__name__, __version__, tracer_provider)
_instrument(
tracer,
span_name_or_callback=kwargs.get("span_name"),
request_hook=kwargs.get("request_hook"),
response_hook=kwargs.get("response_hook"),
url_filter=kwargs.get("url_filter"),
)
@ -110,7 +139,8 @@ class URLLib3Instrumentor(BaseInstrumentor):
def _instrument(
tracer,
span_name_or_callback: _SpanNameT = None,
request_hook: _RequestHookT = None,
response_hook: _ResponseHookT = None,
url_filter: _UrlFilterT = None,
):
def instrumented_urlopen(wrapped, instance, args, kwargs):
@ -121,7 +151,7 @@ def _instrument(
url = _get_url(instance, args, kwargs, url_filter)
headers = _prepare_headers(kwargs)
span_name = _get_span_name(span_name_or_callback, method, url, headers)
span_name = "HTTP {}".format(method.strip())
span_attributes = {
SpanAttributes.HTTP_METHOD: method,
SpanAttributes.HTTP_URL: url,
@ -130,12 +160,16 @@ def _instrument(
with tracer.start_as_current_span(
span_name, kind=SpanKind.CLIENT, attributes=span_attributes
) as span:
if callable(request_hook):
request_hook(span, instance)
inject(headers)
with _suppress_further_instrumentation():
response = wrapped(*args, **kwargs)
_apply_response(span, response)
if callable(response_hook):
response_hook(span, instance, response)
return response
wrapt.wrap_function_wrapper(
@ -195,20 +229,6 @@ def _prepare_headers(urlopen_kwargs: typing.Dict) -> typing.Dict:
return headers
def _get_span_name(
span_name_or_callback, method: str, url: str, headers: typing.Mapping
):
span_name = None
if callable(span_name_or_callback):
span_name = span_name_or_callback(method, url, headers)
elif isinstance(span_name_or_callback, str):
span_name = span_name_or_callback
if not span_name or not isinstance(span_name, str):
span_name = "HTTP {}".format(method.strip())
return span_name
def _apply_response(span: Span, response: urllib3.response.HTTPResponse):
if not span.is_recording():
return

View File

@ -244,44 +244,6 @@ class TestURLLib3Instrumentor(TestBase):
# expect only a single span (retries are ignored)
self.assert_exception_span(self.HTTP_URL)
def test_span_name_callback(self):
def span_name_callback(method, url, headers):
self.assertEqual("GET", method)
self.assertEqual(self.HTTP_URL, url)
self.assertEqual({"key": "value"}, headers)
return "test_span_name"
URLLib3Instrumentor().uninstrument()
URLLib3Instrumentor().instrument(span_name=span_name_callback)
response = self.perform_request(
self.HTTP_URL, headers={"key": "value"}
)
self.assertEqual(b"Hello!", response.data)
span = self.assert_span()
self.assertEqual("test_span_name", span.name)
def test_span_name_callback_invalid(self):
invalid_span_names = (None, 123, "")
for span_name in invalid_span_names:
self.memory_exporter.clear()
# pylint: disable=unused-argument
def span_name_callback(method, url, headers):
return span_name # pylint: disable=cell-var-from-loop
URLLib3Instrumentor().uninstrument()
URLLib3Instrumentor().instrument(span_name=span_name_callback)
with self.subTest(span_name=span_name):
response = self.perform_request(self.HTTP_URL)
self.assertEqual(b"Hello!", response.data)
span = self.assert_span()
self.assertEqual("HTTP GET", span.name)
def test_url_filter(self):
def url_filter(url):
return url.split("?")[0]
@ -297,3 +259,23 @@ class TestURLLib3Instrumentor(TestBase):
response = self.perform_request(url)
self.assert_success_span(response, self.HTTP_URL)
def test_hooks(self):
def request_hook(span, request):
span.update_name("name set from hook")
def response_hook(span, request, response):
span.set_attribute("response_hook_attr", "value")
URLLib3Instrumentor().uninstrument()
URLLib3Instrumentor().instrument(
request_hook=request_hook, response_hook=response_hook
)
response = self.perform_request(self.HTTP_URL)
self.assertEqual(b"Hello!", response.data)
span = self.assert_span()
self.assertEqual(span.name, "name set from hook")
self.assertIn("response_hook_attr", span.attributes)
self.assertEqual(span.attributes["response_hook_attr"], "value")