diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bd167956..58b2fe944 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246)) - Update TraceState to adhere to specs ([#276](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/276)) +- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs + ([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269)) ### Removed - Remove Configuration diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index f963a2102..eeffea1b8 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -185,29 +185,33 @@ class GrpcInstrumentorClient(BaseInstrumentor): """ + # Figures out which channel type we need to wrap + def _which_channel(self, kwargs): + # handle legacy argument + if "channel_type" in kwargs: + if kwargs.get("channel_type") == "secure": + return ("secure_channel",) + return ("insecure_channel",) + + # handle modern arguments + types = [] + for ctype in ("secure_channel", "insecure_channel"): + if kwargs.get(ctype, True): + types.append(ctype) + + return tuple(types) + def _instrument(self, **kwargs): exporter = kwargs.get("exporter", None) interval = kwargs.get("interval", 30) - if kwargs.get("channel_type") == "secure": + for ctype in self._which_channel(kwargs): _wrap( - "grpc", - "secure_channel", - partial(self.wrapper_fn, exporter, interval), - ) - - else: - _wrap( - "grpc", - "insecure_channel", - partial(self.wrapper_fn, exporter, interval), + "grpc", ctype, partial(self.wrapper_fn, exporter, interval), ) def _uninstrument(self, **kwargs): - if kwargs.get("channel_type") == "secure": - unwrap(grpc, "secure_channel") - - else: - unwrap(grpc, "insecure_channel") + for ctype in self._which_channel(kwargs): + unwrap(grpc, ctype) def wrapper_fn( self, exporter, interval, original_func, instance, args, kwargs diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index b38fd4d51..3cf4c74df 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -102,8 +102,16 @@ class OpenTelemetryClientInterceptor( ) def _start_span(self, method): + service, meth = method.lstrip("/").split("/", 1) + attributes = { + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + "rpc.method": meth, + "rpc.service": service, + } + return self._tracer.start_as_current_span( - name=method, kind=trace.SpanKind.CLIENT + name=method, kind=trace.SpanKind.CLIENT, attributes=attributes ) # pylint:disable=no-self-use @@ -133,6 +141,7 @@ class OpenTelemetryClientInterceptor( self._metrics_recorder.record_bytes_in( response.ByteSize(), client_info.full_method ) + return result def _start_guarded_span(self, *args, **kwargs): @@ -175,11 +184,14 @@ class OpenTelemetryClientInterceptor( try: result = invoker(request, metadata) - except grpc.RpcError: + except grpc.RpcError as err: guarded_span.generated_span.set_status( Status(StatusCode.ERROR) ) - raise + guarded_span.generated_span.set_attribute( + "rpc.grpc.status_code", err.code().value[0] + ) + raise err return self._trace_result( guarded_span, rpc_info, result, client_info @@ -230,9 +242,12 @@ class OpenTelemetryClientInterceptor( response.ByteSize(), client_info.full_method ) yield response - except grpc.RpcError: + except grpc.RpcError as err: span.set_status(Status(StatusCode.ERROR)) - raise + span.set_attribute( + "rpc.grpc.status_code", err.code().value[0] + ) + raise err def intercept_stream( self, request_or_iterator, metadata, client_info, invoker @@ -268,11 +283,14 @@ class OpenTelemetryClientInterceptor( try: result = invoker(request_or_iterator, metadata) - except grpc.RpcError: + except grpc.RpcError as err: guarded_span.generated_span.set_status( Status(StatusCode.ERROR) ) - raise + guarded_span.generated_span.set_attribute( + "rpc.grpc.status_code", err.code().value[0], + ) + raise err return self._trace_result( guarded_span, rpc_info, result, client_info diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py index 9de95c972..078a98b9e 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py @@ -72,33 +72,32 @@ class TimedMetricRecorder: def record_bytes_in(self, bytes_in, method): if self._meter: - labels = {"method": method} + labels = {"rpc.method": method} self._bytes_in.add(bytes_in, labels) def record_bytes_out(self, bytes_out, method): if self._meter: - labels = {"method": method} + labels = {"rpc.method": method} self._bytes_out.add(bytes_out, labels) @contextmanager def record_latency(self, method): start_time = time() labels = { - "method": method, - "status_code": grpc.StatusCode.OK, # pylint:disable=no-member + "rpc.method": method, + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.name, } try: yield labels except grpc.RpcError as exc: # pylint:disable=no-member if self._meter: # pylint: disable=no-member - labels["status_code"] = exc.code() + labels["rpc.grpc.status_code"] = exc.code().name self._error_count.add(1, labels) - labels["error"] = True + labels["error"] = "true" raise finally: if self._meter: - if "error" not in labels: - labels["error"] = False elapsed_time = (time() - start_time) * 1000 self._duration.record(elapsed_time, labels) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py index f272a9024..a13a9a99b 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor.py @@ -73,21 +73,21 @@ class TestClientProto(TestBase): self.assertIsNotNone(bytes_out) self.assertEqual(bytes_out.instrument.name, "grpcio/client/bytes_out") - self.assertEqual(bytes_out.labels, (("method", method),)) + self.assertEqual(bytes_out.labels, (("rpc.method", method),)) self.assertIsNotNone(bytes_in) self.assertEqual(bytes_in.instrument.name, "grpcio/client/bytes_in") - self.assertEqual(bytes_in.labels, (("method", method),)) + self.assertEqual(bytes_in.labels, (("rpc.method", method),)) self.assertIsNotNone(duration) self.assertEqual(duration.instrument.name, "grpcio/client/duration") - self.assertEqual( - duration.labels, - ( - ("error", False), - ("method", method), - ("status_code", grpc.StatusCode.OK), - ), + self.assertSequenceEqual( + sorted(duration.labels), + [ + ("rpc.grpc.status_code", grpc.StatusCode.OK.name), + ("rpc.method", method), + ("rpc.system", "grpc"), + ], ) self.assertEqual(type(bytes_out.aggregator), SumAggregator) @@ -116,6 +116,16 @@ class TestClientProto(TestBase): self._verify_success_records(8, 8, "/GRPCTestServer/SimpleMethod") + self.assert_span_has_attributes( + span, + { + "rpc.method": "SimpleMethod", + "rpc.service": "GRPCTestServer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + def test_unary_stream(self): server_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -134,6 +144,16 @@ class TestClientProto(TestBase): 8, 40, "/GRPCTestServer/ServerStreamingMethod" ) + self.assert_span_has_attributes( + span, + { + "rpc.method": "ServerStreamingMethod", + "rpc.service": "GRPCTestServer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + def test_stream_unary(self): client_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -152,6 +172,16 @@ class TestClientProto(TestBase): 40, 8, "/GRPCTestServer/ClientStreamingMethod" ) + self.assert_span_has_attributes( + span, + { + "rpc.method": "ClientStreamingMethod", + "rpc.service": "GRPCTestServer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + def test_stream_stream(self): bidirectional_streaming_method(self._stub) spans = self.memory_exporter.get_finished_spans() @@ -172,6 +202,16 @@ class TestClientProto(TestBase): 40, 40, "/GRPCTestServer/BidirectionalStreamingMethod" ) + self.assert_span_has_attributes( + span, + { + "rpc.method": "BidirectionalStreamingMethod", + "rpc.service": "GRPCTestServer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + def _verify_error_records(self, method): # pylint: disable=protected-access,no-member self.channel._interceptor.controller.tick() @@ -195,21 +235,33 @@ class TestClientProto(TestBase): self.assertIsNotNone(duration) self.assertEqual(errors.instrument.name, "grpcio/client/errors") - self.assertEqual( - errors.labels, - ( - ("method", method), - ("status_code", grpc.StatusCode.INVALID_ARGUMENT), + self.assertSequenceEqual( + sorted(errors.labels), + sorted( + ( + ( + "rpc.grpc.status_code", + grpc.StatusCode.INVALID_ARGUMENT.name, + ), + ("rpc.method", method), + ("rpc.system", "grpc"), + ) ), ) self.assertEqual(errors.aggregator.checkpoint, 1) - self.assertEqual( - duration.labels, - ( - ("error", True), - ("method", method), - ("status_code", grpc.StatusCode.INVALID_ARGUMENT), + self.assertSequenceEqual( + sorted(duration.labels), + sorted( + ( + ("error", "true"), + ("rpc.method", method), + ("rpc.system", "grpc"), + ( + "rpc.grpc.status_code", + grpc.StatusCode.INVALID_ARGUMENT.name, + ), + ) ), )