mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-08-01 06:33:52 +08:00
Refactor httpx instrumentation (#577)
This commit is contained in:
14
CHANGELOG.md
14
CHANGELOG.md
@ -6,12 +6,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.3.0-0.22b0...HEAD)
|
||||
- `opentelemetry-sdk-extension-aws` Update AWS entry points to match spec
|
||||
([#566](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/566))
|
||||
- Include Flask 2.0 as compatible with existing flask instrumentation
|
||||
([#545](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/545))
|
||||
- `openelemetry-sdk-extension-aws` Take a dependency on `opentelemetry-sdk`
|
||||
([#558](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/558))
|
||||
|
||||
### Changed
|
||||
- `opentelemetry-instrumentation-tornado` properly instrument work done in tornado on_finish method.
|
||||
@ -36,6 +30,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
([#567](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/567))
|
||||
- `opentelemetry-instrumentation-grpc` Fixed asynchonous unary call traces
|
||||
([#536](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/536))
|
||||
- `opentelemetry-sdk-extension-aws` Update AWS entry points to match spec
|
||||
([#566](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/566))
|
||||
- Include Flask 2.0 as compatible with existing flask instrumentation
|
||||
([#545](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/545))
|
||||
- `openelemetry-sdk-extension-aws` Take a dependency on `opentelemetry-sdk`
|
||||
([#558](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/558))
|
||||
- Change `opentelemetry-instrumentation-httpx` to replace `client` classes with instrumented versions.
|
||||
([#577](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/577))
|
||||
|
||||
### Added
|
||||
- `opentelemetry-instrumentation-httpx` Add `httpx` instrumentation
|
||||
|
@ -12,6 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import typing
|
||||
|
||||
import httpx
|
||||
@ -31,6 +32,8 @@ from opentelemetry.trace import SpanKind, Tracer, TracerProvider, get_tracer
|
||||
from opentelemetry.trace.span import Span
|
||||
from opentelemetry.trace.status import Status
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
URL = typing.Tuple[bytes, bytes, typing.Optional[int], bytes]
|
||||
Headers = typing.List[typing.Tuple[bytes, bytes]]
|
||||
RequestHook = typing.Callable[[Span, "RequestInfo"], None]
|
||||
@ -258,98 +261,48 @@ class AsyncOpenTelemetryTransport(httpx.AsyncBaseTransport):
|
||||
return status_code, headers, stream, extensions
|
||||
|
||||
|
||||
def _instrument(
|
||||
tracer_provider: TracerProvider = None,
|
||||
request_hook: typing.Optional[RequestHook] = None,
|
||||
response_hook: typing.Optional[ResponseHook] = None,
|
||||
) -> None:
|
||||
"""Enables tracing of all Client and AsyncClient instances
|
||||
class _InstrumentedClient(httpx.Client):
|
||||
|
||||
When a Client or AsyncClient gets created, a telemetry transport is passed
|
||||
in to the instance.
|
||||
"""
|
||||
# pylint:disable=unused-argument
|
||||
def instrumented_sync_send(wrapped, instance, args, kwargs):
|
||||
if context.get_value("suppress_instrumentation"):
|
||||
return wrapped(*args, **kwargs)
|
||||
_tracer_provider = None
|
||||
_request_hook = None
|
||||
_response_hook = None
|
||||
|
||||
transport = instance._transport or httpx.HTTPTransport()
|
||||
telemetry_transport = SyncOpenTelemetryTransport(
|
||||
transport,
|
||||
tracer_provider=tracer_provider,
|
||||
request_hook=request_hook,
|
||||
response_hook=response_hook,
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self._original_transport = self._transport
|
||||
self._is_instrumented_by_opentelemetry = True
|
||||
|
||||
self._transport = SyncOpenTelemetryTransport(
|
||||
self._transport,
|
||||
tracer_provider=_InstrumentedClient._tracer_provider,
|
||||
request_hook=_InstrumentedClient._request_hook,
|
||||
response_hook=_InstrumentedClient._response_hook,
|
||||
)
|
||||
|
||||
instance._transport = telemetry_transport
|
||||
return wrapped(*args, **kwargs)
|
||||
|
||||
async def instrumented_async_send(wrapped, instance, args, kwargs):
|
||||
if context.get_value("suppress_instrumentation"):
|
||||
return await wrapped(*args, **kwargs)
|
||||
class _InstrumentedAsyncClient(httpx.AsyncClient):
|
||||
|
||||
transport = instance._transport or httpx.AsyncHTTPTransport()
|
||||
telemetry_transport = AsyncOpenTelemetryTransport(
|
||||
transport,
|
||||
tracer_provider=tracer_provider,
|
||||
request_hook=request_hook,
|
||||
response_hook=response_hook,
|
||||
_tracer_provider = None
|
||||
_request_hook = None
|
||||
_response_hook = None
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self._original_transport = self._transport
|
||||
self._is_instrumented_by_opentelemetry = True
|
||||
|
||||
self._transport = AsyncOpenTelemetryTransport(
|
||||
self._transport,
|
||||
tracer_provider=_InstrumentedAsyncClient._tracer_provider,
|
||||
request_hook=_InstrumentedAsyncClient._request_hook,
|
||||
response_hook=_InstrumentedAsyncClient._response_hook,
|
||||
)
|
||||
|
||||
instance._transport = telemetry_transport
|
||||
return await wrapped(*args, **kwargs)
|
||||
|
||||
wrapt.wrap_function_wrapper(httpx.Client, "send", instrumented_sync_send)
|
||||
|
||||
wrapt.wrap_function_wrapper(
|
||||
httpx.AsyncClient, "send", instrumented_async_send
|
||||
)
|
||||
|
||||
|
||||
def _instrument_client(
|
||||
client: typing.Union[httpx.Client, httpx.AsyncClient],
|
||||
tracer_provider: TracerProvider = None,
|
||||
request_hook: typing.Optional[RequestHook] = None,
|
||||
response_hook: typing.Optional[ResponseHook] = None,
|
||||
) -> None:
|
||||
"""Enables instrumentation for the given Client or AsyncClient"""
|
||||
# pylint: disable=protected-access
|
||||
if isinstance(client, httpx.Client):
|
||||
transport = client._transport or httpx.HTTPTransport()
|
||||
telemetry_transport = SyncOpenTelemetryTransport(
|
||||
transport,
|
||||
tracer_provider=tracer_provider,
|
||||
request_hook=request_hook,
|
||||
response_hook=response_hook,
|
||||
)
|
||||
elif isinstance(client, httpx.AsyncClient):
|
||||
transport = client._transport or httpx.AsyncHTTPTransport()
|
||||
telemetry_transport = AsyncOpenTelemetryTransport(
|
||||
transport,
|
||||
tracer_provider=tracer_provider,
|
||||
request_hook=request_hook,
|
||||
response_hook=response_hook,
|
||||
)
|
||||
else:
|
||||
raise TypeError("Invalid client provided")
|
||||
client._transport = telemetry_transport
|
||||
|
||||
|
||||
def _uninstrument() -> None:
|
||||
"""Disables instrumenting for all newly created Client and AsyncClient instances"""
|
||||
unwrap(httpx.Client, "send")
|
||||
unwrap(httpx.AsyncClient, "send")
|
||||
|
||||
|
||||
def _uninstrument_client(
|
||||
client: typing.Union[httpx.Client, httpx.AsyncClient]
|
||||
) -> None:
|
||||
"""Disables instrumentation for the given Client or AsyncClient"""
|
||||
# pylint: disable=protected-access
|
||||
unwrap(client, "send")
|
||||
|
||||
|
||||
class HTTPXClientInstrumentor(BaseInstrumentor):
|
||||
# pylint: disable=protected-access,attribute-defined-outside-init
|
||||
"""An instrumentor for httpx Client and AsyncClient
|
||||
|
||||
See `BaseInstrumentor`
|
||||
@ -369,14 +322,31 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
|
||||
``response_hook``: A hook that receives the span, request, and response
|
||||
that is called right before the span ends
|
||||
"""
|
||||
_instrument(
|
||||
tracer_provider=kwargs.get("tracer_provider"),
|
||||
request_hook=kwargs.get("request_hook"),
|
||||
response_hook=kwargs.get("response_hook"),
|
||||
)
|
||||
self._original_client = httpx.Client
|
||||
self._original_async_client = httpx.AsyncClient
|
||||
request_hook = kwargs.get("request_hook")
|
||||
response_hook = kwargs.get("response_hook")
|
||||
if callable(request_hook):
|
||||
_InstrumentedClient._request_hook = request_hook
|
||||
_InstrumentedAsyncClient._request_hook = request_hook
|
||||
if callable(response_hook):
|
||||
_InstrumentedClient._response_hook = response_hook
|
||||
_InstrumentedAsyncClient._response_hook = response_hook
|
||||
tracer_provider = kwargs.get("tracer_provider")
|
||||
_InstrumentedClient._tracer_provider = tracer_provider
|
||||
_InstrumentedAsyncClient._tracer_provider = tracer_provider
|
||||
httpx.Client = _InstrumentedClient
|
||||
httpx.AsyncClient = _InstrumentedAsyncClient
|
||||
|
||||
def _uninstrument(self, **kwargs):
|
||||
_uninstrument()
|
||||
httpx.Client = self._original_client
|
||||
httpx.AsyncClient = self._original_async_client
|
||||
_InstrumentedClient._tracer_provider = None
|
||||
_InstrumentedClient._request_hook = None
|
||||
_InstrumentedClient._response_hook = None
|
||||
_InstrumentedAsyncClient._tracer_provider = None
|
||||
_InstrumentedAsyncClient._request_hook = None
|
||||
_InstrumentedAsyncClient._response_hook = None
|
||||
|
||||
@staticmethod
|
||||
def instrument_client(
|
||||
@ -395,12 +365,34 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
|
||||
response_hook: A hook that receives the span, request, and response
|
||||
that is called right before the span ends
|
||||
"""
|
||||
_instrument_client(
|
||||
client,
|
||||
tracer_provider=tracer_provider,
|
||||
request_hook=request_hook,
|
||||
response_hook=response_hook,
|
||||
)
|
||||
# pylint: disable=protected-access
|
||||
if not hasattr(client, "_is_instrumented_by_opentelemetry"):
|
||||
client._is_instrumented_by_opentelemetry = False
|
||||
|
||||
if not client._is_instrumented_by_opentelemetry:
|
||||
if isinstance(client, httpx.Client):
|
||||
client._original_transport = client._transport
|
||||
transport = client._transport or httpx.HTTPTransport()
|
||||
client._transport = SyncOpenTelemetryTransport(
|
||||
transport,
|
||||
tracer_provider=tracer_provider,
|
||||
request_hook=request_hook,
|
||||
response_hook=response_hook,
|
||||
)
|
||||
client._is_instrumented_by_opentelemetry = True
|
||||
if isinstance(client, httpx.AsyncClient):
|
||||
transport = client._transport or httpx.AsyncHTTPTransport()
|
||||
client._transport = AsyncOpenTelemetryTransport(
|
||||
transport,
|
||||
tracer_provider=tracer_provider,
|
||||
request_hook=request_hook,
|
||||
response_hook=response_hook,
|
||||
)
|
||||
client._is_instrumented_by_opentelemetry = True
|
||||
else:
|
||||
_logger.warning(
|
||||
"Attempting to instrument Httpx client while already instrumented"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def uninstrument_client(
|
||||
@ -411,4 +403,12 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
|
||||
Args:
|
||||
client: The httpx Client or AsyncClient instance
|
||||
"""
|
||||
_uninstrument_client(client)
|
||||
if hasattr(client, "_original_transport"):
|
||||
client._transport = client._original_transport
|
||||
del client._original_transport
|
||||
client._is_instrumented_by_opentelemetry = False
|
||||
else:
|
||||
_logger.warning(
|
||||
"Attempting to uninstrument Httpx "
|
||||
"client while already uninstrumented"
|
||||
)
|
||||
|
@ -157,6 +157,11 @@ class BaseTestCases:
|
||||
span, opentelemetry.instrumentation.httpx
|
||||
)
|
||||
|
||||
def test_basic_multiple(self):
|
||||
self.perform_request(self.URL)
|
||||
self.perform_request(self.URL)
|
||||
self.assert_span(num_spans=2)
|
||||
|
||||
def test_not_foundbasic(self):
|
||||
url_404 = "http://httpbin.org/status/404"
|
||||
|
||||
@ -375,12 +380,9 @@ class BaseTestCases:
|
||||
pass
|
||||
|
||||
def setUp(self):
|
||||
self.client = self.create_client()
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
super().setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
self.client = self.create_client()
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
|
||||
def test_custom_tracer_provider(self):
|
||||
@ -388,7 +390,6 @@ class BaseTestCases:
|
||||
result = self.create_tracer_provider(resource=resource)
|
||||
tracer_provider, exporter = result
|
||||
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
HTTPXClientInstrumentor().instrument(
|
||||
tracer_provider=tracer_provider
|
||||
)
|
||||
@ -398,9 +399,9 @@ class BaseTestCases:
|
||||
self.assertEqual(result.text, "Hello!")
|
||||
span = self.assert_span(exporter=exporter)
|
||||
self.assertIs(span.resource, resource)
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
|
||||
def test_response_hook(self):
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
HTTPXClientInstrumentor().instrument(
|
||||
tracer_provider=self.tracer_provider,
|
||||
response_hook=self.response_hook,
|
||||
@ -419,9 +420,9 @@ class BaseTestCases:
|
||||
HTTP_RESPONSE_BODY: "Hello!",
|
||||
},
|
||||
)
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
|
||||
def test_request_hook(self):
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
HTTPXClientInstrumentor().instrument(
|
||||
tracer_provider=self.tracer_provider,
|
||||
request_hook=self.request_hook,
|
||||
@ -432,9 +433,9 @@ class BaseTestCases:
|
||||
self.assertEqual(result.text, "Hello!")
|
||||
span = self.assert_span()
|
||||
self.assertEqual(span.name, "GET" + self.URL)
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
|
||||
def test_request_hook_no_span_update(self):
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
HTTPXClientInstrumentor().instrument(
|
||||
tracer_provider=self.tracer_provider,
|
||||
request_hook=self.no_update_request_hook,
|
||||
@ -445,10 +446,10 @@ class BaseTestCases:
|
||||
self.assertEqual(result.text, "Hello!")
|
||||
span = self.assert_span()
|
||||
self.assertEqual(span.name, "HTTP GET")
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
|
||||
def test_not_recording(self):
|
||||
with mock.patch("opentelemetry.trace.INVALID_SPAN") as mock_span:
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
HTTPXClientInstrumentor().instrument(
|
||||
tracer_provider=trace._DefaultTracerProvider()
|
||||
)
|
||||
@ -463,8 +464,10 @@ class BaseTestCases:
|
||||
self.assertTrue(mock_span.is_recording.called)
|
||||
self.assertFalse(mock_span.set_attribute.called)
|
||||
self.assertFalse(mock_span.set_status.called)
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
|
||||
def test_suppress_instrumentation_new_client(self):
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
token = context.attach(
|
||||
context.set_value("suppress_instrumentation", True)
|
||||
)
|
||||
@ -476,32 +479,22 @@ class BaseTestCases:
|
||||
context.detach(token)
|
||||
|
||||
self.assert_span(num_spans=0)
|
||||
|
||||
def test_existing_client(self):
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
client = self.create_client()
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
result = self.perform_request(self.URL, client=client)
|
||||
self.assertEqual(result.text, "Hello!")
|
||||
self.assert_span(num_spans=1)
|
||||
|
||||
def test_instrument_client(self):
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
client = self.create_client()
|
||||
HTTPXClientInstrumentor().instrument_client(client)
|
||||
result = self.perform_request(self.URL, client=client)
|
||||
self.assertEqual(result.text, "Hello!")
|
||||
self.assert_span(num_spans=1)
|
||||
# instrument again to avoid annoying warning message
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
|
||||
def test_uninstrument(self):
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
HTTPXClientInstrumentor().uninstrument()
|
||||
result = self.perform_request(self.URL)
|
||||
client = self.create_client()
|
||||
result = self.perform_request(self.URL, client=client)
|
||||
self.assertEqual(result.text, "Hello!")
|
||||
self.assert_span(num_spans=0)
|
||||
# instrument again to avoid annoying warning message
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
|
||||
def test_uninstrument_client(self):
|
||||
HTTPXClientInstrumentor().uninstrument_client(self.client)
|
||||
@ -512,6 +505,7 @@ class BaseTestCases:
|
||||
self.assert_span(num_spans=0)
|
||||
|
||||
def test_uninstrument_new_client(self):
|
||||
HTTPXClientInstrumentor().instrument()
|
||||
client1 = self.create_client()
|
||||
HTTPXClientInstrumentor().uninstrument_client(client1)
|
||||
|
||||
|
Reference in New Issue
Block a user