mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-08-06 14:59:11 +08:00
gRPC streaming bugfix (#260)
This commit is contained in:
@ -16,6 +16,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
([#312](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/312))
|
([#312](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/312))
|
||||||
- `opentelemetry-instrumentation-boto` updated to set span attributes instead of overriding the resource.
|
- `opentelemetry-instrumentation-boto` updated to set span attributes instead of overriding the resource.
|
||||||
([#310](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/310))
|
([#310](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/310))
|
||||||
|
- `opentelemetry-instrumentation-grpc` Fix issue tracking child spans in streaming responses
|
||||||
|
([#260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/260))
|
||||||
|
- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs
|
||||||
|
([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269))
|
||||||
|
|
||||||
## [0.17b0](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.17b0) - 2021-01-20
|
## [0.17b0](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.17b0) - 2021-01-20
|
||||||
|
|
||||||
@ -80,8 +84,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
|
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
|
||||||
- Update TraceState to adhere to specs
|
- Update TraceState to adhere to specs
|
||||||
([#276](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/276))
|
([#276](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/276))
|
||||||
- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs
|
|
||||||
([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269))
|
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
- Remove Configuration
|
- Remove Configuration
|
||||||
|
@ -239,6 +239,15 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
|
|||||||
def telemetry_wrapper(behavior, request_streaming, response_streaming):
|
def telemetry_wrapper(behavior, request_streaming, response_streaming):
|
||||||
def telemetry_interceptor(request_or_iterator, context):
|
def telemetry_interceptor(request_or_iterator, context):
|
||||||
|
|
||||||
|
# handle streaming responses specially
|
||||||
|
if response_streaming:
|
||||||
|
return self._intercept_server_stream(
|
||||||
|
behavior,
|
||||||
|
handler_call_details,
|
||||||
|
request_or_iterator,
|
||||||
|
context,
|
||||||
|
)
|
||||||
|
|
||||||
with self._set_remote_context(context):
|
with self._set_remote_context(context):
|
||||||
with self._start_span(
|
with self._start_span(
|
||||||
handler_call_details, context
|
handler_call_details, context
|
||||||
@ -249,6 +258,7 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
|
|||||||
# And now we run the actual RPC.
|
# And now we run the actual RPC.
|
||||||
try:
|
try:
|
||||||
return behavior(request_or_iterator, context)
|
return behavior(request_or_iterator, context)
|
||||||
|
|
||||||
except Exception as error:
|
except Exception as error:
|
||||||
# Bare exceptions are likely to be gRPC aborts, which
|
# Bare exceptions are likely to be gRPC aborts, which
|
||||||
# we handle in our context wrapper.
|
# we handle in our context wrapper.
|
||||||
@ -263,3 +273,23 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
|
|||||||
return _wrap_rpc_behavior(
|
return _wrap_rpc_behavior(
|
||||||
continuation(handler_call_details), telemetry_wrapper
|
continuation(handler_call_details), telemetry_wrapper
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Handle streaming responses separately - we have to do this
|
||||||
|
# to return a *new* generator or various upstream things
|
||||||
|
# get confused, or we'll lose the consistent trace
|
||||||
|
def _intercept_server_stream(
|
||||||
|
self, behavior, handler_call_details, request_or_iterator, context
|
||||||
|
):
|
||||||
|
|
||||||
|
with self._set_remote_context(context):
|
||||||
|
with self._start_span(handler_call_details, context) as span:
|
||||||
|
context = _OpenTelemetryServicerContext(context, span)
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield from behavior(request_or_iterator, context)
|
||||||
|
|
||||||
|
except Exception as error:
|
||||||
|
# pylint:disable=unidiomatic-typecheck
|
||||||
|
if type(error) != Exception:
|
||||||
|
span.record_exception(error)
|
||||||
|
raise error
|
||||||
|
@ -30,6 +30,12 @@ from opentelemetry.sdk import trace as trace_sdk
|
|||||||
from opentelemetry.test.test_base import TestBase
|
from opentelemetry.test.test_base import TestBase
|
||||||
from opentelemetry.trace.status import StatusCode
|
from opentelemetry.trace.status import StatusCode
|
||||||
|
|
||||||
|
from .protobuf.test_server_pb2 import Request, Response
|
||||||
|
from .protobuf.test_server_pb2_grpc import (
|
||||||
|
GRPCTestServerServicer,
|
||||||
|
add_GRPCTestServerServicer_to_server,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class UnaryUnaryMethodHandler(grpc.RpcMethodHandler):
|
class UnaryUnaryMethodHandler(grpc.RpcMethodHandler):
|
||||||
def __init__(self, handler):
|
def __init__(self, handler):
|
||||||
@ -51,6 +57,23 @@ class UnaryUnaryRpcHandler(grpc.GenericRpcHandler):
|
|||||||
return UnaryUnaryMethodHandler(self._unary_unary_handler)
|
return UnaryUnaryMethodHandler(self._unary_unary_handler)
|
||||||
|
|
||||||
|
|
||||||
|
class Servicer(GRPCTestServerServicer):
|
||||||
|
"""Our test servicer"""
|
||||||
|
|
||||||
|
# pylint:disable=C0103
|
||||||
|
def SimpleMethod(self, request, context):
|
||||||
|
return Response(
|
||||||
|
server_id=request.client_id, response_data=request.request_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
# pylint:disable=C0103
|
||||||
|
def ServerStreamingMethod(self, request, context):
|
||||||
|
for data in ("one", "two", "three"):
|
||||||
|
yield Response(
|
||||||
|
server_id=request.client_id, response_data=data,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestOpenTelemetryServerInterceptor(TestBase):
|
class TestOpenTelemetryServerInterceptor(TestBase):
|
||||||
def test_instrumentor(self):
|
def test_instrumentor(self):
|
||||||
def handler(request, context):
|
def handler(request, context):
|
||||||
@ -134,25 +157,21 @@ class TestOpenTelemetryServerInterceptor(TestBase):
|
|||||||
# Intercept gRPC calls...
|
# Intercept gRPC calls...
|
||||||
interceptor = server_interceptor()
|
interceptor = server_interceptor()
|
||||||
|
|
||||||
# No-op RPC handler
|
|
||||||
def handler(request, context):
|
|
||||||
return b""
|
|
||||||
|
|
||||||
server = grpc.server(
|
server = grpc.server(
|
||||||
futures.ThreadPoolExecutor(max_workers=1),
|
futures.ThreadPoolExecutor(max_workers=1),
|
||||||
options=(("grpc.so_reuseport", 0),),
|
options=(("grpc.so_reuseport", 0),),
|
||||||
interceptors=[interceptor],
|
interceptors=[interceptor],
|
||||||
)
|
)
|
||||||
|
add_GRPCTestServerServicer_to_server(Servicer(), server)
|
||||||
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
|
|
||||||
|
|
||||||
port = server.add_insecure_port("[::]:0")
|
port = server.add_insecure_port("[::]:0")
|
||||||
channel = grpc.insecure_channel("localhost:{:d}".format(port))
|
channel = grpc.insecure_channel("localhost:{:d}".format(port))
|
||||||
|
|
||||||
rpc_call = "TestServicer/handler"
|
rpc_call = "/GRPCTestServer/SimpleMethod"
|
||||||
|
request = Request(client_id=1, request_data="test")
|
||||||
|
msg = request.SerializeToString()
|
||||||
try:
|
try:
|
||||||
server.start()
|
server.start()
|
||||||
channel.unary_unary(rpc_call)(b"")
|
channel.unary_unary(rpc_call)(msg)
|
||||||
finally:
|
finally:
|
||||||
server.stop(None)
|
server.stop(None)
|
||||||
|
|
||||||
@ -174,13 +193,211 @@ class TestOpenTelemetryServerInterceptor(TestBase):
|
|||||||
{
|
{
|
||||||
"net.peer.ip": "[::1]",
|
"net.peer.ip": "[::1]",
|
||||||
"net.peer.name": "localhost",
|
"net.peer.name": "localhost",
|
||||||
"rpc.method": "handler",
|
"rpc.method": "SimpleMethod",
|
||||||
"rpc.service": "TestServicer",
|
"rpc.service": "GRPCTestServer",
|
||||||
"rpc.system": "grpc",
|
"rpc.system": "grpc",
|
||||||
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
|
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_create_two_spans(self):
|
||||||
|
"""Verify that the interceptor captures sub spans within the given
|
||||||
|
trace"""
|
||||||
|
|
||||||
|
class TwoSpanServicer(GRPCTestServerServicer):
|
||||||
|
# pylint:disable=C0103
|
||||||
|
def SimpleMethod(self, request, context):
|
||||||
|
|
||||||
|
# create another span
|
||||||
|
tracer = trace.get_tracer(__name__)
|
||||||
|
with tracer.start_as_current_span("child") as child:
|
||||||
|
child.add_event("child event")
|
||||||
|
|
||||||
|
return Response(
|
||||||
|
server_id=request.client_id,
|
||||||
|
response_data=request.request_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Intercept gRPC calls...
|
||||||
|
interceptor = server_interceptor()
|
||||||
|
|
||||||
|
# setup the server
|
||||||
|
server = grpc.server(
|
||||||
|
futures.ThreadPoolExecutor(max_workers=1),
|
||||||
|
options=(("grpc.so_reuseport", 0),),
|
||||||
|
interceptors=[interceptor],
|
||||||
|
)
|
||||||
|
add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server)
|
||||||
|
port = server.add_insecure_port("[::]:0")
|
||||||
|
channel = grpc.insecure_channel("localhost:{:d}".format(port))
|
||||||
|
|
||||||
|
# setup the RPC
|
||||||
|
rpc_call = "/GRPCTestServer/SimpleMethod"
|
||||||
|
request = Request(client_id=1, request_data="test")
|
||||||
|
msg = request.SerializeToString()
|
||||||
|
try:
|
||||||
|
server.start()
|
||||||
|
channel.unary_unary(rpc_call)(msg)
|
||||||
|
finally:
|
||||||
|
server.stop(None)
|
||||||
|
|
||||||
|
spans_list = self.memory_exporter.get_finished_spans()
|
||||||
|
self.assertEqual(len(spans_list), 2)
|
||||||
|
child_span = spans_list[0]
|
||||||
|
parent_span = spans_list[1]
|
||||||
|
|
||||||
|
self.assertEqual(parent_span.name, rpc_call)
|
||||||
|
self.assertIs(parent_span.kind, trace.SpanKind.SERVER)
|
||||||
|
|
||||||
|
# Check version and name in span's instrumentation info
|
||||||
|
self.check_span_instrumentation_info(
|
||||||
|
parent_span, opentelemetry.instrumentation.grpc
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check attributes
|
||||||
|
self.assert_span_has_attributes(
|
||||||
|
parent_span,
|
||||||
|
{
|
||||||
|
"net.peer.ip": "[::1]",
|
||||||
|
"net.peer.name": "localhost",
|
||||||
|
"rpc.method": "SimpleMethod",
|
||||||
|
"rpc.service": "GRPCTestServer",
|
||||||
|
"rpc.system": "grpc",
|
||||||
|
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check the child span
|
||||||
|
self.assertEqual(child_span.name, "child")
|
||||||
|
self.assertEqual(
|
||||||
|
parent_span.context.trace_id, child_span.context.trace_id
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_create_span_streaming(self):
|
||||||
|
"""Check that the interceptor wraps calls with spans server-side, on a
|
||||||
|
streaming call."""
|
||||||
|
|
||||||
|
# Intercept gRPC calls...
|
||||||
|
interceptor = server_interceptor()
|
||||||
|
|
||||||
|
# setup the server
|
||||||
|
server = grpc.server(
|
||||||
|
futures.ThreadPoolExecutor(max_workers=1),
|
||||||
|
options=(("grpc.so_reuseport", 0),),
|
||||||
|
interceptors=[interceptor],
|
||||||
|
)
|
||||||
|
add_GRPCTestServerServicer_to_server(Servicer(), server)
|
||||||
|
port = server.add_insecure_port("[::]:0")
|
||||||
|
channel = grpc.insecure_channel("localhost:{:d}".format(port))
|
||||||
|
|
||||||
|
# setup the RPC
|
||||||
|
rpc_call = "/GRPCTestServer/ServerStreamingMethod"
|
||||||
|
request = Request(client_id=1, request_data="test")
|
||||||
|
msg = request.SerializeToString()
|
||||||
|
try:
|
||||||
|
server.start()
|
||||||
|
list(channel.unary_stream(rpc_call)(msg))
|
||||||
|
finally:
|
||||||
|
server.stop(None)
|
||||||
|
|
||||||
|
spans_list = self.memory_exporter.get_finished_spans()
|
||||||
|
self.assertEqual(len(spans_list), 1)
|
||||||
|
span = spans_list[0]
|
||||||
|
|
||||||
|
self.assertEqual(span.name, rpc_call)
|
||||||
|
self.assertIs(span.kind, trace.SpanKind.SERVER)
|
||||||
|
|
||||||
|
# Check version and name in span's instrumentation info
|
||||||
|
self.check_span_instrumentation_info(
|
||||||
|
span, opentelemetry.instrumentation.grpc
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check attributes
|
||||||
|
self.assert_span_has_attributes(
|
||||||
|
span,
|
||||||
|
{
|
||||||
|
"net.peer.ip": "[::1]",
|
||||||
|
"net.peer.name": "localhost",
|
||||||
|
"rpc.method": "ServerStreamingMethod",
|
||||||
|
"rpc.service": "GRPCTestServer",
|
||||||
|
"rpc.system": "grpc",
|
||||||
|
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_create_two_spans_streaming(self):
|
||||||
|
"""Verify that the interceptor captures sub spans in a
|
||||||
|
streaming call, within the given trace"""
|
||||||
|
|
||||||
|
class TwoSpanServicer(GRPCTestServerServicer):
|
||||||
|
# pylint:disable=C0103
|
||||||
|
def ServerStreamingMethod(self, request, context):
|
||||||
|
|
||||||
|
# create another span
|
||||||
|
tracer = trace.get_tracer(__name__)
|
||||||
|
with tracer.start_as_current_span("child") as child:
|
||||||
|
child.add_event("child event")
|
||||||
|
|
||||||
|
for data in ("one", "two", "three"):
|
||||||
|
yield Response(
|
||||||
|
server_id=request.client_id, response_data=data,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Intercept gRPC calls...
|
||||||
|
interceptor = server_interceptor()
|
||||||
|
|
||||||
|
# setup the server
|
||||||
|
server = grpc.server(
|
||||||
|
futures.ThreadPoolExecutor(max_workers=1),
|
||||||
|
options=(("grpc.so_reuseport", 0),),
|
||||||
|
interceptors=[interceptor],
|
||||||
|
)
|
||||||
|
add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server)
|
||||||
|
port = server.add_insecure_port("[::]:0")
|
||||||
|
channel = grpc.insecure_channel("localhost:{:d}".format(port))
|
||||||
|
|
||||||
|
# setup the RPC
|
||||||
|
rpc_call = "/GRPCTestServer/ServerStreamingMethod"
|
||||||
|
request = Request(client_id=1, request_data="test")
|
||||||
|
msg = request.SerializeToString()
|
||||||
|
try:
|
||||||
|
server.start()
|
||||||
|
list(channel.unary_stream(rpc_call)(msg))
|
||||||
|
finally:
|
||||||
|
server.stop(None)
|
||||||
|
|
||||||
|
spans_list = self.memory_exporter.get_finished_spans()
|
||||||
|
self.assertEqual(len(spans_list), 2)
|
||||||
|
child_span = spans_list[0]
|
||||||
|
parent_span = spans_list[1]
|
||||||
|
|
||||||
|
self.assertEqual(parent_span.name, rpc_call)
|
||||||
|
self.assertIs(parent_span.kind, trace.SpanKind.SERVER)
|
||||||
|
|
||||||
|
# Check version and name in span's instrumentation info
|
||||||
|
self.check_span_instrumentation_info(
|
||||||
|
parent_span, opentelemetry.instrumentation.grpc
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check attributes
|
||||||
|
self.assert_span_has_attributes(
|
||||||
|
parent_span,
|
||||||
|
{
|
||||||
|
"net.peer.ip": "[::1]",
|
||||||
|
"net.peer.name": "localhost",
|
||||||
|
"rpc.method": "ServerStreamingMethod",
|
||||||
|
"rpc.service": "GRPCTestServer",
|
||||||
|
"rpc.system": "grpc",
|
||||||
|
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check the child span
|
||||||
|
self.assertEqual(child_span.name, "child")
|
||||||
|
self.assertEqual(
|
||||||
|
parent_span.context.trace_id, child_span.context.trace_id
|
||||||
|
)
|
||||||
|
|
||||||
def test_span_lifetime(self):
|
def test_span_lifetime(self):
|
||||||
"""Check that the span is active for the duration of the call."""
|
"""Check that the span is active for the duration of the call."""
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user