Add gRPC filter (#1241)

This commit is contained in:
Yoshi Yamaguchi
2022-09-21 01:47:27 +09:00
committed by GitHub
parent d5ada286e1
commit 2ce0b69a8e
8 changed files with 1615 additions and 22 deletions

View File

@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- `opentelemetry-instrumentation-grpc` add supports to filter requests to instrument. ([#1241](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1241))
- Flask sqlalchemy psycopg2 integration
([#1224](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1224))
- Add metric instrumentation in fastapi
@ -125,7 +127,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `opentelemetry-instrumentation-starlette` Capture custom request/response headers in span attributes
([#1046])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1046)
([#1046](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1046))
### Fixed
- Prune autoinstrumentation sitecustomize module directory from PYTHONPATH immediately
@ -148,17 +150,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `opentelemetry-instrumentation-fastapi` Capture custom request/response headers in span attributes
([#1032])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1032)
([#1032](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1032))
- `opentelemetry-instrumentation-django` Capture custom request/response headers in span attributes
([#1024])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1024)
([#1024](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1024))
- `opentelemetry-instrumentation-asgi` Capture custom request/response headers in span attributes
([#1004])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1004)
([#1004](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1004))
- `opentelemetry-instrumentation-psycopg2` extended the sql commenter support of dbapi into psycopg2
([#940](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/940))
- `opentelemetry-instrumentation-falcon` Add support for falcon==1.4.1
([#1000])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1000)
([#1000](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1000))
- `opentelemetry-instrumentation-falcon` Falcon: Capture custom request/response headers in span attributes
([#1003])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1003)
([#1003](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1003))
- `opentelemetry-instrumentation-elasticsearch` no longer creates unique span names by including search target, replaces them with `<target>` and puts the value in attribute `elasticsearch.target`
([#1018](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1018))
- `opentelemetry-instrumentation-pyramid` Handle non-HTTPException exceptions
@ -166,17 +168,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-system-metrics` restore `SystemMetrics` instrumentation as `SystemMetricsInstrumentor`
([#1012](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1012))
- `opentelemetry-instrumentation-pyramid` Pyramid: Capture custom request/response headers in span attributes
([#1022])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1022)
([#1022](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1022))
## [1.10.0-0.29b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.10.0-0.29b0) - 2022-03-10
- `opentelemetry-instrumentation-wsgi` Capture custom request/response headers in span attributes
([#925])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/925)
([#925](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/925))
- `opentelemetry-instrumentation-flask` Flask: Capture custom request/response headers in span attributes
([#952])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/952)
([#952](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/952))
- `opentelemetry-instrumentation-tornado` Tornado: Capture custom request/response headers in span attributes
([#950])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/950)
([#950](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/950))
### Added
@ -971,7 +973,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#572](https://github.com/open-telemetry/opentelemetry-python/pull/572))
- `opentelemetry-ext-sqlite3` Initial release
- `opentelemetry-ext-psycopg2` Implement instrumentor interface, enabling auto-instrumentation
([#694]https://github.com/open-telemetry/opentelemetry-python/pull/694)
([#694](https://github.com/open-telemetry/opentelemetry-python/pull/694))
- `opentelemetry-ext-asgi` Add ASGI middleware
([#716](https://github.com/open-telemetry/opentelemetry-python/pull/716))
- `opentelemetry-ext-django` Add exclude list for paths and hosts to prevent from tracing

View File

@ -118,13 +118,62 @@ You can also add the instrumentor manually, rather than using
server = grpc.server(futures.ThreadPoolExecutor(),
interceptors = [server_interceptor()])
Filters
-------
If you prefer to filter specific requests to be instrumented, you can specify
the condition by assigning filters to instrumentors.
You can write a global server instrumentor as follows:
.. code-block::
from opentelemetry.instrumentation.grpc import filters, GrpcInstrumentorServer
grpc_server_instrumentor = GrpcInstrumentorServer(
filter_ = filters.any_of(
filters.method_name("SimpleMethod"),
filters.method_name("ComplexMethod"),
)
)
grpc_server_instrumentor.instrument()
You can also use the filters directly on the provided interceptors:
.. code-block::
my_interceptor = server_interceptor(
filter_ = filters.negate(filters.method_name("TestMethod"))
)
server = grpc.server(futures.ThreadPoolExecutor(),
interceptors = [my_interceptor])
``filter_`` option also applies to both global and manual client intrumentors.
Environment variable
--------------------
If you'd like to exclude specific services for the instrumentations, you can use
``OTEL_PYTHON_GRPC_EXCLUDED_SERVICES`` environment variables.
For example, if you assign ``"GRPCTestServer,GRPCHealthServer"`` to the variable,
then the global interceptor automatically adds the filters to exclude requests to
services ``GRPCTestServer`` and ``GRPCHealthServer``.
"""
from typing import Collection
import os
from typing import Callable, Collection, List, Union
import grpc # pylint:disable=import-self
from wrapt import wrap_function_wrapper as _wrap
from opentelemetry import trace
from opentelemetry.instrumentation.grpc.filters import (
any_of,
negate,
service_name,
)
from opentelemetry.instrumentation.grpc.grpcext import intercept_channel
from opentelemetry.instrumentation.grpc.package import _instruments
from opentelemetry.instrumentation.grpc.version import __version__
@ -145,10 +194,26 @@ class GrpcInstrumentorServer(BaseInstrumentor):
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
If you want to add a filter that only intercept requests
to match the condition, pass ``filter_`` to GrpcInstrumentorServer.
grpc_server_instrumentor = GrpcInstrumentorServer(
filter_=filters.method_prefix("SimpleMethod"))
grpc_server_instrumentor.instrument()
"""
# pylint:disable=attribute-defined-outside-init, redefined-outer-name
def __init__(self, filter_=None):
excluded_service_filter = _excluded_service_filter()
if excluded_service_filter is not None:
if filter_ is None:
filter_ = excluded_service_filter
else:
filter_ = any_of(filter_, excluded_service_filter)
self._filter = filter_
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
@ -160,11 +225,16 @@ class GrpcInstrumentorServer(BaseInstrumentor):
if "interceptors" in kwargs:
# add our interceptor as the first
kwargs["interceptors"].insert(
0, server_interceptor(tracer_provider=tracer_provider)
0,
server_interceptor(
tracer_provider=tracer_provider, filter_=self._filter
),
)
else:
kwargs["interceptors"] = [
server_interceptor(tracer_provider=tracer_provider)
server_interceptor(
tracer_provider=tracer_provider, filter_=self._filter
)
]
return self._original_func(*args, **kwargs)
@ -183,8 +253,25 @@ class GrpcInstrumentorClient(BaseInstrumentor):
grpc_client_instrumentor = GrpcInstrumentorClient()
grpc_client_instrumentor.instrument()
If you want to add a filter that only intercept requests
to match the condition, pass ``filter_`` option to GrpcInstrumentorClient.
grpc_client_instrumentor = GrpcInstrumentorClient(
filter_=filters.negate(filters.health_check())
)
grpc_client_instrumentor.instrument()
"""
def __init__(self, filter_=None):
excluded_service_filter = _excluded_service_filter()
if excluded_service_filter is not None:
if filter_ is None:
filter_ = excluded_service_filter
else:
filter_ = any_of(filter_, excluded_service_filter)
self._filter = filter_
# Figures out which channel type we need to wrap
def _which_channel(self, kwargs):
# handle legacy argument
@ -221,16 +308,23 @@ class GrpcInstrumentorClient(BaseInstrumentor):
tracer_provider = kwargs.get("tracer_provider")
return intercept_channel(
channel,
client_interceptor(tracer_provider=tracer_provider),
client_interceptor(
tracer_provider=tracer_provider,
filter_=self._filter,
),
)
def client_interceptor(tracer_provider=None):
def client_interceptor(tracer_provider=None, filter_=None):
"""Create a gRPC client channel interceptor.
Args:
tracer: The tracer to use to create client-side spans.
filter_: filter function that returns True if gRPC requests
matches the condition. Default is None and intercept
all requests.
Returns:
An invocation-side interceptor object.
"""
@ -238,15 +332,19 @@ def client_interceptor(tracer_provider=None):
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
return _client.OpenTelemetryClientInterceptor(tracer)
return _client.OpenTelemetryClientInterceptor(tracer, filter_=filter_)
def server_interceptor(tracer_provider=None):
def server_interceptor(tracer_provider=None, filter_=None):
"""Create a gRPC server interceptor.
Args:
tracer: The tracer to use to create server-side spans.
filter_: filter function that returns True if gRPC requests
matches the condition. Default is None and intercept
all requests.
Returns:
A service-side interceptor object.
"""
@ -254,4 +352,24 @@ def server_interceptor(tracer_provider=None):
tracer = trace.get_tracer(__name__, __version__, tracer_provider)
return _server.OpenTelemetryServerInterceptor(tracer)
return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_)
def _excluded_service_filter() -> Union[Callable[[object], bool], None]:
services = _parse_services(
os.environ.get("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES", "")
)
if len(services) == 0:
return None
filters = (service_name(srv) for srv in services)
return negate(any_of(*filters))
def _parse_services(excluded_services: str) -> List[str]:
if excluded_services != "":
excluded_service_list = [
s.strip() for s in excluded_services.split(",")
]
else:
excluded_service_list = []
return excluded_service_list

View File

@ -62,8 +62,9 @@ def _make_future_done_callback(span, rpc_info):
class OpenTelemetryClientInterceptor(
grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor
):
def __init__(self, tracer):
def __init__(self, tracer, filter_=None):
self._tracer = tracer
self._filter = filter_
def _start_span(self, method, **kwargs):
service, meth = method.lstrip("/").split("/", 1)
@ -148,6 +149,8 @@ class OpenTelemetryClientInterceptor(
return self._trace_result(span, rpc_info, result)
def intercept_unary(self, request, metadata, client_info, invoker):
if self._filter is not None and not self._filter(client_info):
return invoker(request, metadata)
return self._intercept(request, metadata, client_info, invoker)
# For RPCs that stream responses, the result can be a generator. To record
@ -188,6 +191,9 @@ class OpenTelemetryClientInterceptor(
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return invoker(request_or_iterator, metadata)
if self._filter is not None and not self._filter(client_info):
return invoker(request_or_iterator, metadata)
if client_info.is_server_stream:
return self._intercept_server_stream(
request_or_iterator, metadata, client_info, invoker

View File

@ -173,9 +173,10 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
Usage::
tracer = some OpenTelemetry tracer
filter = filters.negate(filters.method_name("service.Foo"))
interceptors = [
OpenTelemetryServerInterceptor(tracer),
OpenTelemetryServerInterceptor(tracer, filter),
]
server = grpc.server(
@ -184,8 +185,9 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
"""
def __init__(self, tracer):
def __init__(self, tracer, filter_=None):
self._tracer = tracer
self._filter = filter_
@contextmanager
def _set_remote_context(self, servicer_context):
@ -261,6 +263,9 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
)
def intercept_service(self, continuation, handler_call_details):
if self._filter is not None and not self._filter(handler_call_details):
return continuation(handler_call_details)
def telemetry_wrapper(behavior, request_streaming, response_streaming):
def telemetry_interceptor(request_or_iterator, context):

View File

@ -0,0 +1,208 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Callable, TypeVar
import grpc
TCallDetails = TypeVar(
"TCallDetails", grpc.HandlerCallDetails, grpc.ClientCallDetails
)
Condition = Callable[[TCallDetails], bool]
def _full_method(metadata):
name = ""
if isinstance(metadata, grpc.HandlerCallDetails):
name = metadata.method
# NOTE: replace here if there's better way to match cases to handle
# grpcext._interceptor._UnaryClientInfo/_StreamClientInfo
elif hasattr(metadata, "full_method"):
name = metadata.full_method
return name
def _split_full_method(metadata):
name = _full_method(metadata)
service, method = os.path.split(name)
if service != "":
service = os.path.normpath(service)
service = service.lstrip("/")
return (service, method)
def all_of(*args: Condition[TCallDetails]) -> Condition[TCallDetails]:
"""Returns a filter function that returns True if all filter functions
assigned matches conditions.
Args:
args (function): a list of filter function
Returns:
A filter function that returns True if all filter functions
assigned matches conditions.
"""
def filter_fn(metadata):
return all(func(metadata) for func in args)
return filter_fn
def any_of(*args: Condition[TCallDetails]) -> Condition[TCallDetails]:
"""Returns a filter function that returns True if any of filter functions
assigned matches conditions.
Args:
args (function): a list of filter function
Returns:
A filter function that returns True if any of filter functions
assigned matches conditions.
"""
def filter_fn(metadata):
return any(func(metadata) for func in args)
return filter_fn
def negate(func: Condition[TCallDetails]) -> Condition[TCallDetails]:
"""Returns a filter function that negate the result of func
Args:
func (function): filter function to negate the result
Returns:
A filter function that negate the result of func
"""
def filter_fn(metadata):
return not func(metadata)
return filter_fn
def method_name(name: str) -> Condition[TCallDetails]:
"""Returns a filter function that return True if
request's gRPC method name matches name.
Args:
name (str): method name to match
Returns:
A filter function that returns True if request's gRPC method
name matches name
"""
def filter_fn(metadata):
_, method = _split_full_method(metadata)
return method == name
return filter_fn
def method_prefix(prefix: str) -> Condition[TCallDetails]:
"""Returns a filter function that return True if
request's gRPC method name starts with prefix.
Args:
prefix (str): method prefix to match
Returns:
A filter function that returns True if request's gRPC method
name starts with prefix
"""
def filter_fn(metadata):
_, method = _split_full_method(metadata)
return method.startswith(prefix)
return filter_fn
def full_method_name(name: str) -> Condition[TCallDetails]:
"""Returns a filter function that return True if
request's gRPC full method name matches name.
Args:
name (str): full method name to match
Returns:
A filter function that returns True if request's gRPC full
method name matches name
"""
def filter_fn(metadata):
fm = _full_method(metadata)
return fm == name
return filter_fn
def service_name(name: str) -> Condition[TCallDetails]:
"""Returns a filter function that return True if
request's gRPC service name matches name.
Args:
name (str): service name to match
Returns:
A filter function that returns True if request's gRPC service
name matches name
"""
def filter_fn(metadata):
service, _ = _split_full_method(metadata)
return service == name
return filter_fn
def service_prefix(prefix: str) -> Condition[TCallDetails]:
"""Returns a filter function that return True if
request's gRPC service name starts with prefix.
Args:
prefix (str): service prefix to match
Returns:
A filter function that returns True if request's gRPC service
name starts with prefix
"""
def filter_fn(metadata):
service, _ = _split_full_method(metadata)
return service.startswith(prefix)
return filter_fn
def health_check() -> Condition[TCallDetails]:
"""Returns a Filter that returns true if the request's
service name is health check defined by gRPC Health Checking Protocol.
https://github.com/grpc/grpc/blob/master/doc/health-checking.md
"""
return service_prefix("grpc.health.v1.Health")
__all__ = [
"method_name",
"method_prefix",
"full_method_name",
"service_name",
"service_prefix",
"health_check",
]

View File

@ -0,0 +1,682 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from unittest import mock
import grpc
from tests.protobuf import ( # pylint: disable=no-name-in-module
test_server_pb2_grpc,
)
import opentelemetry.instrumentation.grpc
from opentelemetry import context, trace
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient, filters
from opentelemetry.instrumentation.grpc._client import (
OpenTelemetryClientInterceptor,
)
from opentelemetry.instrumentation.grpc.grpcext._interceptor import (
_UnaryClientInfo,
)
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.propagate import get_global_textmap, set_global_textmap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.mock_textmap import MockTextMapPropagator
from opentelemetry.test.test_base import TestBase
from ._client import (
bidirectional_streaming_method,
client_streaming_method,
server_streaming_method,
simple_method,
simple_method_future,
)
from ._server import create_test_server
from .protobuf.test_server_pb2 import Request
# User defined interceptor. Is used in the tests along with the opentelemetry client interceptor.
class Interceptor(
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor,
):
def __init__(self):
pass
def intercept_unary_unary(
self, continuation, client_call_details, request
):
return self._intercept_call(continuation, client_call_details, request)
def intercept_unary_stream(
self, continuation, client_call_details, request
):
return self._intercept_call(continuation, client_call_details, request)
def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
return self._intercept_call(
continuation, client_call_details, request_iterator
)
def intercept_stream_stream(
self, continuation, client_call_details, request_iterator
):
return self._intercept_call(
continuation, client_call_details, request_iterator
)
@staticmethod
def _intercept_call(
continuation, client_call_details, request_or_iterator
):
return continuation(client_call_details, request_or_iterator)
class TestClientProtoFilterMethodName(TestBase):
def setUp(self):
super().setUp()
GrpcInstrumentorClient(
filter_=filters.method_name("SimpleMethod")
).instrument()
self.server = create_test_server(25565)
self.server.start()
# use a user defined interceptor along with the opentelemetry client interceptor
interceptors = [Interceptor()]
self.channel = grpc.insecure_channel("localhost:25565")
self.channel = grpc.intercept_channel(self.channel, *interceptors)
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
def tearDown(self):
super().tearDown()
GrpcInstrumentorClient().uninstrument()
self.server.stop(None)
self.channel.close()
def test_unary_unary_future(self):
simple_method_future(self._stub).result()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
def test_unary_unary(self):
simple_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
self.assertSpanHasAttributes(
span,
{
SpanAttributes.RPC_METHOD: "SimpleMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[
0
],
},
)
def test_unary_stream(self):
server_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_stream_unary(self):
client_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_stream_stream(self):
bidirectional_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_error_simple(self):
with self.assertRaises(grpc.RpcError):
simple_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertIs(
span.status.status_code,
trace.StatusCode.ERROR,
)
def test_error_stream_unary(self):
with self.assertRaises(grpc.RpcError):
client_streaming_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_error_unary_stream(self):
with self.assertRaises(grpc.RpcError):
server_streaming_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_error_stream_stream(self):
with self.assertRaises(grpc.RpcError):
bidirectional_streaming_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_client_interceptor_trace_context_propagation(
self,
): # pylint: disable=no-self-use
"""ensure that client interceptor correctly inject trace context into all outgoing requests."""
previous_propagator = get_global_textmap()
try:
set_global_textmap(MockTextMapPropagator())
interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer())
carrier = tuple()
def invoker(request, metadata):
nonlocal carrier
carrier = metadata
return {}
request = Request(client_id=1, request_data="data")
interceptor.intercept_unary(
request,
{},
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod", timeout=None
),
invoker=invoker,
)
assert len(carrier) == 2
assert carrier[0][0] == "mock-traceid"
assert carrier[0][1] == "0"
assert carrier[1][0] == "mock-spanid"
assert carrier[1][1] == "0"
finally:
set_global_textmap(previous_propagator)
class TestClientProtoFilterMethodPrefix(TestBase):
def setUp(self):
super().setUp()
GrpcInstrumentorClient(
filter_=filters.method_prefix("Simple")
).instrument()
self.server = create_test_server(25565)
self.server.start()
# use a user defined interceptor along with the opentelemetry client interceptor
interceptors = [Interceptor()]
self.channel = grpc.insecure_channel("localhost:25565")
self.channel = grpc.intercept_channel(self.channel, *interceptors)
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
def tearDown(self):
super().tearDown()
GrpcInstrumentorClient().uninstrument()
self.server.stop(None)
self.channel.close()
def test_unary_unary_future(self):
simple_method_future(self._stub).result()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
def test_unary_unary(self):
simple_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
self.assertSpanHasAttributes(
span,
{
SpanAttributes.RPC_METHOD: "SimpleMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[
0
],
},
)
def test_unary_stream(self):
server_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_stream_unary(self):
client_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_stream_stream(self):
bidirectional_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_error_simple(self):
with self.assertRaises(grpc.RpcError):
simple_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertIs(
span.status.status_code,
trace.StatusCode.ERROR,
)
def test_error_stream_unary(self):
with self.assertRaises(grpc.RpcError):
client_streaming_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_error_unary_stream(self):
with self.assertRaises(grpc.RpcError):
server_streaming_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_error_stream_stream(self):
with self.assertRaises(grpc.RpcError):
bidirectional_streaming_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_client_interceptor_trace_context_propagation(
self,
): # pylint: disable=no-self-use
"""ensure that client interceptor correctly inject trace context into all outgoing requests."""
previous_propagator = get_global_textmap()
try:
set_global_textmap(MockTextMapPropagator())
interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer())
carrier = tuple()
def invoker(request, metadata):
nonlocal carrier
carrier = metadata
return {}
request = Request(client_id=1, request_data="data")
interceptor.intercept_unary(
request,
{},
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod", timeout=None
),
invoker=invoker,
)
assert len(carrier) == 2
assert carrier[0][0] == "mock-traceid"
assert carrier[0][1] == "0"
assert carrier[1][0] == "mock-spanid"
assert carrier[1][1] == "0"
finally:
set_global_textmap(previous_propagator)
class TestClientProtoFilterByEnv(TestBase):
def setUp(self):
with mock.patch.dict(
os.environ,
{
"OTEL_PYTHON_GRPC_EXCLUDED_SERVICES": "GRPCMockServer,GRPCTestServer"
},
):
super().setUp()
GrpcInstrumentorClient().instrument()
self.server = create_test_server(25565)
self.server.start()
# use a user defined interceptor along with the opentelemetry client interceptor
interceptors = [Interceptor()]
self.channel = grpc.insecure_channel("localhost:25565")
self.channel = grpc.intercept_channel(self.channel, *interceptors)
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
def tearDown(self):
super().tearDown()
GrpcInstrumentorClient().uninstrument()
self.server.stop(None)
self.channel.close()
def test_unary_unary_future(self):
simple_method_future(self._stub).result()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_unary_unary(self):
simple_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
class TestClientProtoFilterByEnvAndOption(TestBase):
def setUp(self):
with mock.patch.dict(
os.environ,
{"OTEL_PYTHON_GRPC_EXCLUDED_SERVICES": "GRPCMockServer"},
):
super().setUp()
GrpcInstrumentorClient(
filter_=filters.service_prefix("GRPCTestServer")
).instrument()
self.server = create_test_server(25565)
self.server.start()
# use a user defined interceptor along with the opentelemetry client interceptor
interceptors = [Interceptor()]
self.channel = grpc.insecure_channel("localhost:25565")
self.channel = grpc.intercept_channel(self.channel, *interceptors)
self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel)
def tearDown(self):
super().tearDown()
GrpcInstrumentorClient().uninstrument()
self.server.stop(None)
self.channel.close()
def test_unary_unary_future(self):
simple_method_future(self._stub).result()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
def test_unary_unary(self):
simple_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/SimpleMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
self.assertSpanHasAttributes(
span,
{
SpanAttributes.RPC_METHOD: "SimpleMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[
0
],
},
)
def test_unary_stream(self):
server_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/ServerStreamingMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
self.assertSpanHasAttributes(
span,
{
SpanAttributes.RPC_METHOD: "ServerStreamingMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[
0
],
},
)
def test_stream_unary(self):
client_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(span.name, "/GRPCTestServer/ClientStreamingMethod")
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
self.assertSpanHasAttributes(
span,
{
SpanAttributes.RPC_METHOD: "ClientStreamingMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[
0
],
},
)
def test_stream_stream(self):
bidirectional_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertEqual(
span.name, "/GRPCTestServer/BidirectionalStreamingMethod"
)
self.assertIs(span.kind, trace.SpanKind.CLIENT)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
self.assertSpanHasAttributes(
span,
{
SpanAttributes.RPC_METHOD: "BidirectionalStreamingMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[
0
],
},
)
def test_error_simple(self):
with self.assertRaises(grpc.RpcError):
simple_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertIs(
span.status.status_code,
trace.StatusCode.ERROR,
)
def test_error_stream_unary(self):
with self.assertRaises(grpc.RpcError):
client_streaming_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertIs(
span.status.status_code,
trace.StatusCode.ERROR,
)
def test_error_unary_stream(self):
with self.assertRaises(grpc.RpcError):
server_streaming_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertIs(
span.status.status_code,
trace.StatusCode.ERROR,
)
def test_error_stream_stream(self):
with self.assertRaises(grpc.RpcError):
bidirectional_streaming_method(self._stub, error=True)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self.assertIs(
span.status.status_code,
trace.StatusCode.ERROR,
)
def test_client_interceptor_trace_context_propagation(
self,
): # pylint: disable=no-self-use
"""ensure that client interceptor correctly inject trace context into all outgoing requests."""
previous_propagator = get_global_textmap()
try:
set_global_textmap(MockTextMapPropagator())
interceptor = OpenTelemetryClientInterceptor(trace.NoOpTracer())
carrier = tuple()
def invoker(request, metadata):
nonlocal carrier
carrier = metadata
return {}
request = Request(client_id=1, request_data="data")
interceptor.intercept_unary(
request,
{},
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod", timeout=None
),
invoker=invoker,
)
assert len(carrier) == 2
assert carrier[0][0] == "mock-traceid"
assert carrier[0][1] == "0"
assert carrier[1][0] == "mock-spanid"
assert carrier[1][1] == "0"
finally:
set_global_textmap(previous_propagator)
def test_unary_unary_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
simple_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
finally:
context.detach(token)
self.assertEqual(len(spans), 0)
def test_unary_stream_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
server_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
finally:
context.detach(token)
self.assertEqual(len(spans), 0)
def test_stream_unary_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
client_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
finally:
context.detach(token)
self.assertEqual(len(spans), 0)
def test_stream_stream_with_suppress_key(self):
token = context.attach(
context.set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)
)
try:
bidirectional_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
finally:
context.detach(token)
self.assertEqual(len(spans), 0)

View File

@ -0,0 +1,357 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import grpc
import pytest
from opentelemetry.instrumentation.grpc import filters
class _HandlerCallDetails(
collections.namedtuple(
"_HanlderCallDetails",
(
"method",
"invocation_metadata",
),
),
grpc.HandlerCallDetails,
):
pass
class _UnaryClientInfo(
collections.namedtuple("_UnaryClientInfo", ("full_method", "timeout"))
):
pass
class _StreamClientInfo(
collections.namedtuple(
"_StreamClientInfo",
("full_method", "is_client_stream", "is_server_stream", "timeout"),
)
):
pass
@pytest.mark.parametrize(
"test_case",
[
(
True,
"SimpleMethod",
_HandlerCallDetails(
method="SimpleMethod",
invocation_metadata=[("tracer", "foo"), ("caller", "bar")],
),
),
(
False,
"SimpleMethod",
_HandlerCallDetails(
method="NotSimpleMethod",
invocation_metadata=[("tracer", "foo"), ("caller", "bar")],
),
),
(
True,
"SimpleMethod",
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
timeout=3000,
),
),
(
False,
"SimpleMethod",
_UnaryClientInfo(
full_method="/GRPCTestServer/NotSimpleMethod",
timeout=3000,
),
),
(
True,
"SimpleMethod",
_StreamClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
is_client_stream=True,
is_server_stream=False,
timeout=3000,
),
),
(
False,
"SimpleMethod",
_StreamClientInfo(
full_method="/GRPCTestServer/NotSimpleMethod",
is_client_stream=True,
is_server_stream=False,
timeout=3000,
),
),
],
)
def test_method_name(test_case):
fn = filters.method_name(test_case[1])
assert test_case[0] == fn(test_case[2])
@pytest.mark.parametrize(
"test_case",
[
(
True,
"Simple",
_HandlerCallDetails(
method="SimpleMethod",
invocation_metadata=[("tracer", "foo"), ("caller", "bar")],
),
),
(
False,
"Simple",
_HandlerCallDetails(
method="NotSimpleMethod",
invocation_metadata=[("tracer", "foo"), ("caller", "bar")],
),
),
(
True,
"Simple",
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
timeout=3000,
),
),
(
False,
"Simple",
_UnaryClientInfo(
full_method="/GRPCTestServer/NotSimpleMethod",
timeout=3000,
),
),
(
True,
"Simple",
_StreamClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
is_client_stream=True,
is_server_stream=False,
timeout=3000,
),
),
(
False,
"Simple",
_StreamClientInfo(
full_method="/GRPCTestServer/NotSimpleMethod",
is_client_stream=True,
is_server_stream=False,
timeout=3000,
),
),
],
)
def test_method_prefix(test_case):
fn = filters.method_prefix(test_case[1])
assert test_case[0] == fn(test_case[2])
@pytest.mark.parametrize(
"test_case",
[
(
True,
"GRPCTestServer",
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
timeout=3000,
),
),
(
False,
"GRPCTestServer",
_UnaryClientInfo(
full_method="/GRPCRealServer/SimpleMethod",
timeout=3000,
),
),
(
True,
"GRPCTestServer",
_StreamClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
is_client_stream=True,
is_server_stream=False,
timeout=3000,
),
),
(
False,
"GRPCTestServer",
_StreamClientInfo(
full_method="/GRPCRealServer/SimpleMethod",
is_client_stream=True,
is_server_stream=False,
timeout=3000,
),
),
],
)
def test_service_name(test_case):
fn = filters.service_name(test_case[1])
assert test_case[0] == fn(test_case[2])
@pytest.mark.parametrize(
"test_case",
[
(
True,
"GRPCTest",
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
timeout=3000,
),
),
(
False,
"GRPCTest",
_UnaryClientInfo(
full_method="/GRPCRealServer/SimpleMethod",
timeout=3000,
),
),
(
True,
"GRPCTest",
_StreamClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
is_client_stream=True,
is_server_stream=False,
timeout=3000,
),
),
(
False,
"GRPCTest",
_StreamClientInfo(
full_method="/GRPCRealServer/SimpleMethod",
is_client_stream=True,
is_server_stream=False,
timeout=3000,
),
),
],
)
def test_service_prefix(test_case):
fn = filters.service_prefix(test_case[1])
assert test_case[0] == fn(test_case[2])
@pytest.mark.parametrize(
"test_case",
[
(
True,
_UnaryClientInfo(
full_method="/grpc.health.v1.Health/Check",
timeout=3000,
),
),
(
False,
_UnaryClientInfo(
full_method="/GRPCRealServer/SimpleMethod",
timeout=3000,
),
),
(
True,
_StreamClientInfo(
full_method="/grpc.health.v1.Health/Check",
is_client_stream=True,
is_server_stream=False,
timeout=3000,
),
),
(
False,
_StreamClientInfo(
full_method="/GRPCRealServer/SimpleMethod",
is_client_stream=True,
is_server_stream=False,
timeout=3000,
),
),
],
)
def test_health_check(test_case):
fn = filters.health_check()
assert test_case[0] == fn(test_case[1])
@pytest.mark.parametrize(
"test_case",
[
(
True,
filters.all_of(
filters.method_name("SimpleMethod"),
filters.service_name("GRPCTestServer"),
),
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
timeout=3000,
),
),
(
True,
filters.any_of(
filters.method_name("NotSimpleMethod"),
filters.service_name("GRPCTestServer"),
),
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
timeout=3000,
),
),
(
True,
filters.any_of(
filters.service_name("GRPCMockServer"),
filters.service_name("GRPCTestServer"),
),
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
timeout=3000,
),
),
(
False,
filters.negate(filters.method_name("SimpleMethod")),
_UnaryClientInfo(
full_method="/GRPCTestServer/SimpleMethod",
timeout=3000,
),
),
],
)
def test_all_any_negate(test_case):
fn = test_case[1]
assert test_case[0] == fn(test_case[2])

View File

@ -0,0 +1,215 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint:disable=unused-argument
# pylint:disable=no-self-use
from concurrent import futures
import grpc
import opentelemetry.instrumentation.grpc
from opentelemetry import trace
from opentelemetry.instrumentation.grpc import (
GrpcInstrumentorServer,
filters,
server_interceptor,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from .protobuf.test_server_pb2 import Request, Response
from .protobuf.test_server_pb2_grpc import (
GRPCTestServerServicer,
add_GRPCTestServerServicer_to_server,
)
class UnaryUnaryMethodHandler(grpc.RpcMethodHandler):
def __init__(self, handler):
self.request_streaming = False
self.response_streaming = False
self.request_deserializer = None
self.response_serializer = None
self.unary_unary = handler
self.unary_stream = None
self.stream_unary = None
self.stream_stream = None
class UnaryUnaryRpcHandler(grpc.GenericRpcHandler):
def __init__(self, handler):
self._unary_unary_handler = handler
def service(self, handler_call_details):
return UnaryUnaryMethodHandler(self._unary_unary_handler)
class Servicer(GRPCTestServerServicer):
"""Our test servicer"""
# pylint:disable=C0103
def SimpleMethod(self, request, context):
return Response(
server_id=request.client_id,
response_data=request.request_data,
)
# pylint:disable=C0103
def ServerStreamingMethod(self, request, context):
for data in ("one", "two", "three"):
yield Response(
server_id=request.client_id,
response_data=data,
)
class TestOpenTelemetryServerInterceptorFilterMethodName(TestBase):
def test_instrumentor(self):
def handler(request, context):
return b""
grpc_server_instrumentor = GrpcInstrumentorServer(
filter_=filters.method_name("handler")
)
grpc_server_instrumentor.instrument()
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
)
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
rpc_call = "TestServicer/handler"
try:
server.start()
channel.unary_unary(rpc_call)(b"test")
finally:
server.stop(None)
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
# Check attributes
self.assertSpanHasAttributes(
span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
SpanAttributes.RPC_METHOD: "handler",
SpanAttributes.RPC_SERVICE: "TestServicer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[
0
],
},
)
grpc_server_instrumentor.uninstrument()
def test_uninstrument(self):
def handler(request, context):
return b""
grpc_server_instrumentor = GrpcInstrumentorServer(
filter_=filters.method_name("SimpleMethod")
)
grpc_server_instrumentor.instrument()
grpc_server_instrumentor.uninstrument()
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
)
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
rpc_call = "TestServicer/test"
try:
server.start()
channel.unary_unary(rpc_call)(b"test")
finally:
server.stop(None)
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 0)
def test_create_span(self):
"""Check that the interceptor wraps calls with spans server-side."""
# Intercept gRPC calls...
interceptor = server_interceptor(
filter_=filters.method_name("SimpleMethod")
)
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
add_GRPCTestServerServicer_to_server(Servicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
rpc_call = "/GRPCTestServer/SimpleMethod"
request = Request(client_id=1, request_data="test")
msg = request.SerializeToString()
try:
server.start()
channel.unary_unary(rpc_call)(msg)
finally:
server.stop(None)
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, rpc_call)
self.assertIs(span.kind, trace.SpanKind.SERVER)
# Check version and name in span's instrumentation info
self.assertEqualSpanInstrumentationInfo(
span, opentelemetry.instrumentation.grpc
)
# Check attributes
self.assertSpanHasAttributes(
span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
SpanAttributes.RPC_METHOD: "SimpleMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[
0
],
},
)