mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-29 21:23:55 +08:00
Keep client interceptors in sync with grpc client interceptors (#442)
This commit is contained in:
@ -64,6 +64,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))
|
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))
|
||||||
- `opentelemetry-instrumenation-flask` now supports trace response headers.
|
- `opentelemetry-instrumenation-flask` now supports trace response headers.
|
||||||
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))
|
([#436](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/436))
|
||||||
|
- `opentelemetry-instrumentation-grpc` Keep client interceptor in sync with grpc client interceptors.
|
||||||
|
([#442](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/442))
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
- Remove `http.status_text` from span attributes
|
- Remove `http.status_text` from span attributes
|
||||||
|
@ -13,9 +13,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
# pylint:disable=relative-beyond-top-level
|
# pylint:disable=relative-beyond-top-level
|
||||||
# pylint:disable=arguments-differ
|
|
||||||
# pylint:disable=no-member
|
# pylint:disable=no-member
|
||||||
# pylint:disable=signature-differs
|
|
||||||
|
|
||||||
"""Implementation of gRPC Python interceptors."""
|
"""Implementation of gRPC Python interceptors."""
|
||||||
|
|
||||||
@ -48,9 +46,24 @@ class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
|
|||||||
self._base_callable = base_callable
|
self._base_callable = base_callable
|
||||||
self._interceptor = interceptor
|
self._interceptor = interceptor
|
||||||
|
|
||||||
def __call__(self, request, timeout=None, metadata=None, credentials=None):
|
def __call__(
|
||||||
|
self,
|
||||||
|
request,
|
||||||
|
timeout=None,
|
||||||
|
metadata=None,
|
||||||
|
credentials=None,
|
||||||
|
wait_for_ready=None,
|
||||||
|
compression=None,
|
||||||
|
):
|
||||||
def invoker(request, metadata):
|
def invoker(request, metadata):
|
||||||
return self._base_callable(request, timeout, metadata, credentials)
|
return self._base_callable(
|
||||||
|
request,
|
||||||
|
timeout,
|
||||||
|
metadata,
|
||||||
|
credentials,
|
||||||
|
wait_for_ready,
|
||||||
|
compression,
|
||||||
|
)
|
||||||
|
|
||||||
client_info = _UnaryClientInfo(self._method, timeout)
|
client_info = _UnaryClientInfo(self._method, timeout)
|
||||||
return self._interceptor.intercept_unary(
|
return self._interceptor.intercept_unary(
|
||||||
@ -58,11 +71,22 @@ class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def with_call(
|
def with_call(
|
||||||
self, request, timeout=None, metadata=None, credentials=None
|
self,
|
||||||
|
request,
|
||||||
|
timeout=None,
|
||||||
|
metadata=None,
|
||||||
|
credentials=None,
|
||||||
|
wait_for_ready=None,
|
||||||
|
compression=None,
|
||||||
):
|
):
|
||||||
def invoker(request, metadata):
|
def invoker(request, metadata):
|
||||||
return self._base_callable.with_call(
|
return self._base_callable.with_call(
|
||||||
request, timeout, metadata, credentials
|
request,
|
||||||
|
timeout,
|
||||||
|
metadata,
|
||||||
|
credentials,
|
||||||
|
wait_for_ready,
|
||||||
|
compression,
|
||||||
)
|
)
|
||||||
|
|
||||||
client_info = _UnaryClientInfo(self._method, timeout)
|
client_info = _UnaryClientInfo(self._method, timeout)
|
||||||
@ -70,10 +94,23 @@ class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
|
|||||||
request, metadata, client_info, invoker
|
request, metadata, client_info, invoker
|
||||||
)
|
)
|
||||||
|
|
||||||
def future(self, request, timeout=None, metadata=None, credentials=None):
|
def future(
|
||||||
|
self,
|
||||||
|
request,
|
||||||
|
timeout=None,
|
||||||
|
metadata=None,
|
||||||
|
credentials=None,
|
||||||
|
wait_for_ready=None,
|
||||||
|
compression=None,
|
||||||
|
):
|
||||||
def invoker(request, metadata):
|
def invoker(request, metadata):
|
||||||
return self._base_callable.future(
|
return self._base_callable.future(
|
||||||
request, timeout, metadata, credentials
|
request,
|
||||||
|
timeout,
|
||||||
|
metadata,
|
||||||
|
credentials,
|
||||||
|
wait_for_ready,
|
||||||
|
compression,
|
||||||
)
|
)
|
||||||
|
|
||||||
client_info = _UnaryClientInfo(self._method, timeout)
|
client_info = _UnaryClientInfo(self._method, timeout)
|
||||||
@ -88,9 +125,24 @@ class _InterceptorUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
|
|||||||
self._base_callable = base_callable
|
self._base_callable = base_callable
|
||||||
self._interceptor = interceptor
|
self._interceptor = interceptor
|
||||||
|
|
||||||
def __call__(self, request, timeout=None, metadata=None, credentials=None):
|
def __call__(
|
||||||
|
self,
|
||||||
|
request,
|
||||||
|
timeout=None,
|
||||||
|
metadata=None,
|
||||||
|
credentials=None,
|
||||||
|
wait_for_ready=None,
|
||||||
|
compression=None,
|
||||||
|
):
|
||||||
def invoker(request, metadata):
|
def invoker(request, metadata):
|
||||||
return self._base_callable(request, timeout, metadata, credentials)
|
return self._base_callable(
|
||||||
|
request,
|
||||||
|
timeout,
|
||||||
|
metadata,
|
||||||
|
credentials,
|
||||||
|
wait_for_ready,
|
||||||
|
compression,
|
||||||
|
)
|
||||||
|
|
||||||
client_info = _StreamClientInfo(self._method, False, True, timeout)
|
client_info = _StreamClientInfo(self._method, False, True, timeout)
|
||||||
return self._interceptor.intercept_stream(
|
return self._interceptor.intercept_stream(
|
||||||
@ -105,11 +157,22 @@ class _InterceptorStreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
|
|||||||
self._interceptor = interceptor
|
self._interceptor = interceptor
|
||||||
|
|
||||||
def __call__(
|
def __call__(
|
||||||
self, request_iterator, timeout=None, metadata=None, credentials=None
|
self,
|
||||||
|
request_iterator,
|
||||||
|
timeout=None,
|
||||||
|
metadata=None,
|
||||||
|
credentials=None,
|
||||||
|
wait_for_ready=None,
|
||||||
|
compression=None,
|
||||||
):
|
):
|
||||||
def invoker(request_iterator, metadata):
|
def invoker(request_iterator, metadata):
|
||||||
return self._base_callable(
|
return self._base_callable(
|
||||||
request_iterator, timeout, metadata, credentials
|
request_iterator,
|
||||||
|
timeout,
|
||||||
|
metadata,
|
||||||
|
credentials,
|
||||||
|
wait_for_ready,
|
||||||
|
compression,
|
||||||
)
|
)
|
||||||
|
|
||||||
client_info = _StreamClientInfo(self._method, True, False, timeout)
|
client_info = _StreamClientInfo(self._method, True, False, timeout)
|
||||||
@ -118,11 +181,22 @@ class _InterceptorStreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def with_call(
|
def with_call(
|
||||||
self, request_iterator, timeout=None, metadata=None, credentials=None
|
self,
|
||||||
|
request_iterator,
|
||||||
|
timeout=None,
|
||||||
|
metadata=None,
|
||||||
|
credentials=None,
|
||||||
|
wait_for_ready=None,
|
||||||
|
compression=None,
|
||||||
):
|
):
|
||||||
def invoker(request_iterator, metadata):
|
def invoker(request_iterator, metadata):
|
||||||
return self._base_callable.with_call(
|
return self._base_callable.with_call(
|
||||||
request_iterator, timeout, metadata, credentials
|
request_iterator,
|
||||||
|
timeout,
|
||||||
|
metadata,
|
||||||
|
credentials,
|
||||||
|
wait_for_ready,
|
||||||
|
compression,
|
||||||
)
|
)
|
||||||
|
|
||||||
client_info = _StreamClientInfo(self._method, True, False, timeout)
|
client_info = _StreamClientInfo(self._method, True, False, timeout)
|
||||||
@ -131,11 +205,22 @@ class _InterceptorStreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def future(
|
def future(
|
||||||
self, request_iterator, timeout=None, metadata=None, credentials=None
|
self,
|
||||||
|
request_iterator,
|
||||||
|
timeout=None,
|
||||||
|
metadata=None,
|
||||||
|
credentials=None,
|
||||||
|
wait_for_ready=None,
|
||||||
|
compression=None,
|
||||||
):
|
):
|
||||||
def invoker(request_iterator, metadata):
|
def invoker(request_iterator, metadata):
|
||||||
return self._base_callable.future(
|
return self._base_callable.future(
|
||||||
request_iterator, timeout, metadata, credentials
|
request_iterator,
|
||||||
|
timeout,
|
||||||
|
metadata,
|
||||||
|
credentials,
|
||||||
|
wait_for_ready,
|
||||||
|
compression,
|
||||||
)
|
)
|
||||||
|
|
||||||
client_info = _StreamClientInfo(self._method, True, False, timeout)
|
client_info = _StreamClientInfo(self._method, True, False, timeout)
|
||||||
@ -151,11 +236,22 @@ class _InterceptorStreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
|
|||||||
self._interceptor = interceptor
|
self._interceptor = interceptor
|
||||||
|
|
||||||
def __call__(
|
def __call__(
|
||||||
self, request_iterator, timeout=None, metadata=None, credentials=None
|
self,
|
||||||
|
request_iterator,
|
||||||
|
timeout=None,
|
||||||
|
metadata=None,
|
||||||
|
credentials=None,
|
||||||
|
wait_for_ready=None,
|
||||||
|
compression=None,
|
||||||
):
|
):
|
||||||
def invoker(request_iterator, metadata):
|
def invoker(request_iterator, metadata):
|
||||||
return self._base_callable(
|
return self._base_callable(
|
||||||
request_iterator, timeout, metadata, credentials
|
request_iterator,
|
||||||
|
timeout,
|
||||||
|
metadata,
|
||||||
|
credentials,
|
||||||
|
wait_for_ready,
|
||||||
|
compression,
|
||||||
)
|
)
|
||||||
|
|
||||||
client_info = _StreamClientInfo(self._method, True, True, timeout)
|
client_info = _StreamClientInfo(self._method, True, True, timeout)
|
||||||
|
@ -41,13 +41,57 @@ from ._server import create_test_server
|
|||||||
from .protobuf.test_server_pb2 import Request
|
from .protobuf.test_server_pb2 import Request
|
||||||
|
|
||||||
|
|
||||||
|
# User defined interceptor. Is used in the tests along with the opentelemetry client interceptor.
|
||||||
|
class Interceptor(
|
||||||
|
grpc.UnaryUnaryClientInterceptor,
|
||||||
|
grpc.UnaryStreamClientInterceptor,
|
||||||
|
grpc.StreamUnaryClientInterceptor,
|
||||||
|
grpc.StreamStreamClientInterceptor,
|
||||||
|
):
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def intercept_unary_unary(
|
||||||
|
self, continuation, client_call_details, request
|
||||||
|
):
|
||||||
|
return self._intercept_call(continuation, client_call_details, request)
|
||||||
|
|
||||||
|
def intercept_unary_stream(
|
||||||
|
self, continuation, client_call_details, request
|
||||||
|
):
|
||||||
|
return self._intercept_call(continuation, client_call_details, request)
|
||||||
|
|
||||||
|
def intercept_stream_unary(
|
||||||
|
self, continuation, client_call_details, request_iterator
|
||||||
|
):
|
||||||
|
return self._intercept_call(
|
||||||
|
continuation, client_call_details, request_iterator
|
||||||
|
)
|
||||||
|
|
||||||
|
def intercept_stream_stream(
|
||||||
|
self, continuation, client_call_details, request_iterator
|
||||||
|
):
|
||||||
|
return self._intercept_call(
|
||||||
|
continuation, client_call_details, request_iterator
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _intercept_call(
|
||||||
|
continuation, client_call_details, request_or_iterator
|
||||||
|
):
|
||||||
|
return continuation(client_call_details, request_or_iterator)
|
||||||
|
|
||||||
|
|
||||||
class TestClientProto(TestBase):
|
class TestClientProto(TestBase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super().setUp()
|
super().setUp()
|
||||||
GrpcInstrumentorClient().instrument()
|
GrpcInstrumentorClient().instrument()
|
||||||
self.server = create_test_server(25565)
|
self.server = create_test_server(25565)
|
||||||
self.server.start()
|
self.server.start()
|
||||||
|
# use a user defined interceptor along with the opentelemetry client interceptor
|
||||||
|
interceptors = [Interceptor()]
|
||||||
self.channel = grpc.insecure_channel("localhost:25565")
|
self.channel = grpc.insecure_channel("localhost:25565")
|
||||||
|
self.channel = grpc.intercept_channel(self.channel, *interceptors)
|
||||||
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
|
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
Reference in New Issue
Block a user