mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-31 14:11:50 +08:00
Add type hints to HTTPX (#3098)
This commit is contained in:

committed by
GitHub

parent
e54256ddb7
commit
52871b82b6
@ -230,15 +230,13 @@ from opentelemetry.semconv.attributes.network_attributes import (
|
||||
NETWORK_PEER_ADDRESS,
|
||||
NETWORK_PEER_PORT,
|
||||
)
|
||||
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
|
||||
from opentelemetry.trace import SpanKind, Tracer, TracerProvider, get_tracer
|
||||
from opentelemetry.trace.span import Span
|
||||
from opentelemetry.trace.status import StatusCode
|
||||
from opentelemetry.util.http import remove_url_credentials, sanitize_method
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
URL = typing.Tuple[bytes, bytes, typing.Optional[int], bytes]
|
||||
Headers = typing.List[typing.Tuple[bytes, bytes]]
|
||||
RequestHook = typing.Callable[[Span, "RequestInfo"], None]
|
||||
ResponseHook = typing.Callable[[Span, "RequestInfo", "ResponseInfo"], None]
|
||||
AsyncRequestHook = typing.Callable[
|
||||
@ -253,17 +251,15 @@ class RequestInfo(typing.NamedTuple):
|
||||
method: bytes
|
||||
url: httpx.URL
|
||||
headers: httpx.Headers | None
|
||||
stream: typing.Optional[
|
||||
typing.Union[httpx.SyncByteStream, httpx.AsyncByteStream]
|
||||
]
|
||||
extensions: typing.Optional[dict]
|
||||
stream: httpx.SyncByteStream | httpx.AsyncByteStream | None
|
||||
extensions: dict[str, typing.Any] | None
|
||||
|
||||
|
||||
class ResponseInfo(typing.NamedTuple):
|
||||
status_code: int
|
||||
headers: httpx.Headers | None
|
||||
stream: typing.Iterable[bytes]
|
||||
extensions: typing.Optional[dict]
|
||||
stream: httpx.SyncByteStream | httpx.AsyncByteStream
|
||||
extensions: dict[str, typing.Any] | None
|
||||
|
||||
|
||||
def _get_default_span_name(method: str) -> str:
|
||||
@ -274,11 +270,19 @@ def _get_default_span_name(method: str) -> str:
|
||||
return method
|
||||
|
||||
|
||||
def _prepare_headers(headers: typing.Optional[Headers]) -> httpx.Headers:
|
||||
def _prepare_headers(headers: httpx.Headers | None) -> httpx.Headers:
|
||||
return httpx.Headers(headers)
|
||||
|
||||
|
||||
def _extract_parameters(args, kwargs):
|
||||
def _extract_parameters(
|
||||
args: tuple[typing.Any, ...], kwargs: dict[str, typing.Any]
|
||||
) -> tuple[
|
||||
bytes,
|
||||
httpx.URL,
|
||||
httpx.Headers | None,
|
||||
httpx.SyncByteStream | httpx.AsyncByteStream | None,
|
||||
dict[str, typing.Any],
|
||||
]:
|
||||
if isinstance(args[0], httpx.Request):
|
||||
# In httpx >= 0.20.0, handle_request receives a Request object
|
||||
request: httpx.Request = args[0]
|
||||
@ -311,10 +315,15 @@ def _inject_propagation_headers(headers, args, kwargs):
|
||||
|
||||
|
||||
def _extract_response(
|
||||
response: typing.Union[
|
||||
httpx.Response, typing.Tuple[int, Headers, httpx.SyncByteStream, dict]
|
||||
],
|
||||
) -> typing.Tuple[int, Headers, httpx.SyncByteStream, dict, str]:
|
||||
response: httpx.Response
|
||||
| tuple[int, httpx.Headers, httpx.SyncByteStream, dict[str, typing.Any]],
|
||||
) -> tuple[
|
||||
int,
|
||||
httpx.Headers,
|
||||
httpx.SyncByteStream | httpx.AsyncByteStream,
|
||||
dict[str, typing.Any],
|
||||
str,
|
||||
]:
|
||||
if isinstance(response, httpx.Response):
|
||||
status_code = response.status_code
|
||||
headers = response.headers
|
||||
@ -331,8 +340,8 @@ def _extract_response(
|
||||
|
||||
|
||||
def _apply_request_client_attributes_to_span(
|
||||
span_attributes: dict,
|
||||
url: typing.Union[str, URL, httpx.URL],
|
||||
span_attributes: dict[str, typing.Any],
|
||||
url: str | httpx.URL,
|
||||
method_original: str,
|
||||
semconv: _StabilityMode,
|
||||
):
|
||||
@ -407,9 +416,9 @@ class SyncOpenTelemetryTransport(httpx.BaseTransport):
|
||||
def __init__(
|
||||
self,
|
||||
transport: httpx.BaseTransport,
|
||||
tracer_provider: typing.Optional[TracerProvider] = None,
|
||||
request_hook: typing.Optional[RequestHook] = None,
|
||||
response_hook: typing.Optional[ResponseHook] = None,
|
||||
tracer_provider: TracerProvider | None = None,
|
||||
request_hook: RequestHook | None = None,
|
||||
response_hook: ResponseHook | None = None,
|
||||
):
|
||||
_OpenTelemetrySemanticConventionStability._initialize()
|
||||
self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
|
||||
@ -426,27 +435,27 @@ class SyncOpenTelemetryTransport(httpx.BaseTransport):
|
||||
self._request_hook = request_hook
|
||||
self._response_hook = response_hook
|
||||
|
||||
def __enter__(self) -> "SyncOpenTelemetryTransport":
|
||||
def __enter__(self) -> SyncOpenTelemetryTransport:
|
||||
self._transport.__enter__()
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: typing.Optional[typing.Type[BaseException]] = None,
|
||||
exc_value: typing.Optional[BaseException] = None,
|
||||
traceback: typing.Optional[TracebackType] = None,
|
||||
exc_type: type[BaseException] | None = None,
|
||||
exc_value: BaseException | None = None,
|
||||
traceback: TracebackType | None = None,
|
||||
) -> None:
|
||||
self._transport.__exit__(exc_type, exc_value, traceback)
|
||||
|
||||
# pylint: disable=R0914
|
||||
def handle_request(
|
||||
self,
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> typing.Union[
|
||||
typing.Tuple[int, "Headers", httpx.SyncByteStream, dict],
|
||||
httpx.Response,
|
||||
]:
|
||||
*args: typing.Any,
|
||||
**kwargs: typing.Any,
|
||||
) -> (
|
||||
tuple[int, httpx.Headers, httpx.SyncByteStream, dict[str, typing.Any]]
|
||||
| httpx.Response
|
||||
):
|
||||
"""Add request info to span."""
|
||||
if not is_http_instrumentation_enabled():
|
||||
return self._transport.handle_request(*args, **kwargs)
|
||||
@ -532,9 +541,9 @@ class AsyncOpenTelemetryTransport(httpx.AsyncBaseTransport):
|
||||
def __init__(
|
||||
self,
|
||||
transport: httpx.AsyncBaseTransport,
|
||||
tracer_provider: typing.Optional[TracerProvider] = None,
|
||||
request_hook: typing.Optional[AsyncRequestHook] = None,
|
||||
response_hook: typing.Optional[AsyncResponseHook] = None,
|
||||
tracer_provider: TracerProvider | None = None,
|
||||
request_hook: AsyncRequestHook | None = None,
|
||||
response_hook: AsyncResponseHook | None = None,
|
||||
):
|
||||
_OpenTelemetrySemanticConventionStability._initialize()
|
||||
self._sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
|
||||
@ -557,19 +566,19 @@ class AsyncOpenTelemetryTransport(httpx.AsyncBaseTransport):
|
||||
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: typing.Optional[typing.Type[BaseException]] = None,
|
||||
exc_value: typing.Optional[BaseException] = None,
|
||||
traceback: typing.Optional[TracebackType] = None,
|
||||
exc_type: typing.Type[BaseException] | None = None,
|
||||
exc_value: BaseException | None = None,
|
||||
traceback: TracebackType | None = None,
|
||||
) -> None:
|
||||
await self._transport.__aexit__(exc_type, exc_value, traceback)
|
||||
|
||||
# pylint: disable=R0914
|
||||
async def handle_async_request(
|
||||
self, *args, **kwargs
|
||||
) -> typing.Union[
|
||||
typing.Tuple[int, "Headers", httpx.AsyncByteStream, dict],
|
||||
httpx.Response,
|
||||
]:
|
||||
self, *args: typing.Any, **kwargs: typing.Any
|
||||
) -> (
|
||||
tuple[int, httpx.Headers, httpx.AsyncByteStream, dict[str, typing.Any]]
|
||||
| httpx.Response
|
||||
):
|
||||
"""Add request info to span."""
|
||||
if not is_http_instrumentation_enabled():
|
||||
return await self._transport.handle_async_request(*args, **kwargs)
|
||||
@ -653,7 +662,7 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
|
||||
def instrumentation_dependencies(self) -> typing.Collection[str]:
|
||||
return _instruments
|
||||
|
||||
def _instrument(self, **kwargs):
|
||||
def _instrument(self, **kwargs: typing.Any):
|
||||
"""Instruments httpx Client and AsyncClient
|
||||
|
||||
Args:
|
||||
@ -716,20 +725,20 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
|
||||
),
|
||||
)
|
||||
|
||||
def _uninstrument(self, **kwargs):
|
||||
def _uninstrument(self, **kwargs: typing.Any):
|
||||
unwrap(httpx.HTTPTransport, "handle_request")
|
||||
unwrap(httpx.AsyncHTTPTransport, "handle_async_request")
|
||||
|
||||
@staticmethod
|
||||
def _handle_request_wrapper( # pylint: disable=too-many-locals
|
||||
wrapped,
|
||||
instance,
|
||||
args,
|
||||
kwargs,
|
||||
tracer,
|
||||
sem_conv_opt_in_mode,
|
||||
request_hook,
|
||||
response_hook,
|
||||
wrapped: typing.Callable[..., typing.Any],
|
||||
instance: httpx.HTTPTransport,
|
||||
args: tuple[typing.Any, ...],
|
||||
kwargs: dict[str, typing.Any],
|
||||
tracer: Tracer,
|
||||
sem_conv_opt_in_mode: _StabilityMode,
|
||||
request_hook: RequestHook,
|
||||
response_hook: ResponseHook,
|
||||
):
|
||||
if not is_http_instrumentation_enabled():
|
||||
return wrapped(*args, **kwargs)
|
||||
@ -796,14 +805,14 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
|
||||
|
||||
@staticmethod
|
||||
async def _handle_async_request_wrapper( # pylint: disable=too-many-locals
|
||||
wrapped,
|
||||
instance,
|
||||
args,
|
||||
kwargs,
|
||||
tracer,
|
||||
sem_conv_opt_in_mode,
|
||||
async_request_hook,
|
||||
async_response_hook,
|
||||
wrapped: typing.Callable[..., typing.Awaitable[typing.Any]],
|
||||
instance: httpx.AsyncHTTPTransport,
|
||||
args: tuple[typing.Any, ...],
|
||||
kwargs: dict[str, typing.Any],
|
||||
tracer: Tracer,
|
||||
sem_conv_opt_in_mode: _StabilityMode,
|
||||
async_request_hook: AsyncRequestHook,
|
||||
async_response_hook: AsyncResponseHook,
|
||||
):
|
||||
if not is_http_instrumentation_enabled():
|
||||
return await wrapped(*args, **kwargs)
|
||||
@ -872,14 +881,10 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
|
||||
@classmethod
|
||||
def instrument_client(
|
||||
cls,
|
||||
client: typing.Union[httpx.Client, httpx.AsyncClient],
|
||||
tracer_provider: TracerProvider = None,
|
||||
request_hook: typing.Union[
|
||||
typing.Optional[RequestHook], typing.Optional[AsyncRequestHook]
|
||||
] = None,
|
||||
response_hook: typing.Union[
|
||||
typing.Optional[ResponseHook], typing.Optional[AsyncResponseHook]
|
||||
] = None,
|
||||
client: httpx.Client | httpx.AsyncClient,
|
||||
tracer_provider: TracerProvider | None = None,
|
||||
request_hook: RequestHook | AsyncRequestHook | None = None,
|
||||
response_hook: ResponseHook | AsyncResponseHook | None = None,
|
||||
) -> None:
|
||||
"""Instrument httpx Client or AsyncClient
|
||||
|
||||
@ -977,9 +982,7 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
|
||||
client._is_instrumented_by_opentelemetry = True
|
||||
|
||||
@staticmethod
|
||||
def uninstrument_client(
|
||||
client: typing.Union[httpx.Client, httpx.AsyncClient],
|
||||
):
|
||||
def uninstrument_client(client: httpx.Client | httpx.AsyncClient) -> None:
|
||||
"""Disables instrumentation for the given client instance
|
||||
|
||||
Args:
|
||||
|
Reference in New Issue
Block a user