fix(grpc): Allow gRPC connections via Unix socket (#1833)

* fix(grpc): Allow gRPC connections via Unix socket

This commit addresses issue #1832.

The way `NET_PEER_IP` and `NET_PEER_PORT` are retrieved raises a `ValueError`
when gRPC connections are handled via Unix sockets.

```py
ip, port = (
    context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1)
)
```

When using an address like `unix:///tmp/grpc.sock` the value of `context.peer()` is `"unix:"`.
Substituting that in the function above...

```py
ip, port = "unix:".split(",")[0].split(":", 1)[1].rsplit(":", 1)
ip, port = ["unix:"][0].split(":", 1)[1].rsplit(":", 1)
ip, port = "unix:".split(":", 1)[1].rsplit(":", 1)
ip, port = ["unix", ""][1].rsplit(":", 1)
ip, port = "".rsplit(":", 1)
ip, port = [""]  # ValueError
```

I "addressed" the issue by guarding the retrieval of `net.peer.*` values under
an `if` statement that checks if we are using a Unix socket.

I extended the `server_interceptor` tests to run against TCP and Unix socket configurations.

---

**Open Questions**

- [ ] The socket tests will fail on Windows. Is there a way to annotate that?
- [ ] Are there other span values we should be setting for the unix socket?

* Update CHANGELOG

* Add placeholder attributes for linter

* fix lint

---------

Co-authored-by: Matt Oberle <mattoberle@users.noreply.github.com>
Co-authored-by: Shalev Roda <65566801+shalevr@users.noreply.github.com>
This commit is contained in:
Matt Oberle
2023-06-21 08:30:35 -04:00
committed by GitHub
parent ffc9334dd7
commit 32ae65ed55
3 changed files with 108 additions and 122 deletions

View File

@ -25,6 +25,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1679))
- `opentelemetry-instrumentation-asgi` Add `http.server.response.size` metric
([#1789](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1789))
- `opentelemetry-instrumentation-grpc` Allow gRPC connections via Unix socket
([#1833](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1833))
## Version 1.18.0/0.39b0 (2023-05-10)

View File

@ -250,24 +250,30 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
# * ipv4:127.0.0.1:57284
# * ipv4:10.2.1.1:57284,127.0.0.1:57284
#
try:
ip, port = (
context.peer().split(",")[0].split(":", 1)[1].rsplit(":", 1)
)
ip = unquote(ip)
attributes.update(
{
SpanAttributes.NET_PEER_IP: ip,
SpanAttributes.NET_PEER_PORT: port,
}
)
if context.peer() != "unix:":
try:
ip, port = (
context.peer()
.split(",")[0]
.split(":", 1)[1]
.rsplit(":", 1)
)
ip = unquote(ip)
attributes.update(
{
SpanAttributes.NET_PEER_IP: ip,
SpanAttributes.NET_PEER_PORT: port,
}
)
# other telemetry sources add this, so we will too
if ip in ("[::1]", "127.0.0.1"):
attributes[SpanAttributes.NET_PEER_NAME] = "localhost"
# other telemetry sources add this, so we will too
if ip in ("[::1]", "127.0.0.1"):
attributes[SpanAttributes.NET_PEER_NAME] = "localhost"
except IndexError:
logger.warning("Failed to parse peer address '%s'", context.peer())
except IndexError:
logger.warning(
"Failed to parse peer address '%s'", context.peer()
)
return self._tracer.start_as_current_span(
name=handler_call_details.method,

View File

@ -15,6 +15,8 @@
# pylint:disable=unused-argument
# pylint:disable=no-self-use
import contextlib
import tempfile
import threading
from concurrent import futures
@ -78,23 +80,32 @@ class Servicer(GRPCTestServerServicer):
class TestOpenTelemetryServerInterceptor(TestBase):
net_peer_span_attributes = {
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
}
@contextlib.contextmanager
def server(self, max_workers=1, interceptors=None):
with futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=interceptors or [],
)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
yield server, channel
def test_instrumentor(self):
def handler(request, context):
return b""
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
)
with self.server(max_workers=1) as (server, channel):
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
rpc_call = "TestServicer/handler"
try:
server.start()
@ -117,8 +128,7 @@ class TestOpenTelemetryServerInterceptor(TestBase):
self.assertSpanHasAttributes(
span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
**self.net_peer_span_attributes,
SpanAttributes.RPC_METHOD: "handler",
SpanAttributes.RPC_SERVICE: "TestServicer",
SpanAttributes.RPC_SYSTEM: "grpc",
@ -137,17 +147,8 @@ class TestOpenTelemetryServerInterceptor(TestBase):
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
grpc_server_instrumentor.uninstrument()
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
)
with self.server(max_workers=1) as (server, channel):
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
rpc_call = "TestServicer/test"
try:
server.start()
@ -164,15 +165,11 @@ class TestOpenTelemetryServerInterceptor(TestBase):
# Intercept gRPC calls...
interceptor = server_interceptor()
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
with self.server(
max_workers=1,
interceptors=[interceptor],
) as (server, channel):
add_GRPCTestServerServicer_to_server(Servicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
rpc_call = "/GRPCTestServer/SimpleMethod"
request = Request(client_id=1, request_data="test")
@ -199,8 +196,7 @@ class TestOpenTelemetryServerInterceptor(TestBase):
self.assertSpanHasAttributes(
span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
**self.net_peer_span_attributes,
SpanAttributes.RPC_METHOD: "SimpleMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
@ -231,15 +227,11 @@ class TestOpenTelemetryServerInterceptor(TestBase):
interceptor = server_interceptor()
# setup the server
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
with self.server(
max_workers=1,
interceptors=[interceptor],
) as (server, channel):
add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
# setup the RPC
rpc_call = "/GRPCTestServer/SimpleMethod"
@ -268,8 +260,7 @@ class TestOpenTelemetryServerInterceptor(TestBase):
self.assertSpanHasAttributes(
parent_span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
**self.net_peer_span_attributes,
SpanAttributes.RPC_METHOD: "SimpleMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
@ -292,15 +283,11 @@ class TestOpenTelemetryServerInterceptor(TestBase):
# Intercept gRPC calls...
interceptor = server_interceptor()
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
with self.server(
max_workers=1,
interceptors=[interceptor],
) as (server, channel):
add_GRPCTestServerServicer_to_server(Servicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
# setup the RPC
rpc_call = "/GRPCTestServer/ServerStreamingMethod"
@ -328,8 +315,7 @@ class TestOpenTelemetryServerInterceptor(TestBase):
self.assertSpanHasAttributes(
span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
**self.net_peer_span_attributes,
SpanAttributes.RPC_METHOD: "ServerStreamingMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
@ -360,15 +346,11 @@ class TestOpenTelemetryServerInterceptor(TestBase):
# Intercept gRPC calls...
interceptor = server_interceptor()
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
with self.server(
max_workers=1,
interceptors=[interceptor],
) as (server, channel):
add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server)
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
# setup the RPC
rpc_call = "/GRPCTestServer/ServerStreamingMethod"
@ -397,8 +379,7 @@ class TestOpenTelemetryServerInterceptor(TestBase):
self.assertSpanHasAttributes(
parent_span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
**self.net_peer_span_attributes,
SpanAttributes.RPC_METHOD: "ServerStreamingMethod",
SpanAttributes.RPC_SERVICE: "GRPCTestServer",
SpanAttributes.RPC_SYSTEM: "grpc",
@ -427,17 +408,12 @@ class TestOpenTelemetryServerInterceptor(TestBase):
active_span_in_handler = trace.get_current_span()
return b""
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
with self.server(
max_workers=1,
interceptors=[interceptor],
) as (server, channel):
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
active_span_before_call = trace.get_current_span()
try:
server.start()
@ -463,17 +439,12 @@ class TestOpenTelemetryServerInterceptor(TestBase):
active_spans_in_handler.append(trace.get_current_span())
return b""
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
with self.server(
max_workers=1,
interceptors=[interceptor],
) as (server, channel):
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
try:
server.start()
channel.unary_unary("TestServicer/handler")(b"")
@ -496,8 +467,7 @@ class TestOpenTelemetryServerInterceptor(TestBase):
self.assertSpanHasAttributes(
span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
**self.net_peer_span_attributes,
SpanAttributes.RPC_METHOD: "handler",
SpanAttributes.RPC_SERVICE: "TestServicer",
SpanAttributes.RPC_SYSTEM: "grpc",
@ -527,17 +497,12 @@ class TestOpenTelemetryServerInterceptor(TestBase):
active_spans_in_handler.append(trace.get_current_span())
return b""
with futures.ThreadPoolExecutor(max_workers=2) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
with self.server(
max_workers=2,
interceptors=[interceptor],
) as (server, channel):
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
try:
server.start()
# Interleave calls so spans are active on each thread at the same
@ -568,8 +533,7 @@ class TestOpenTelemetryServerInterceptor(TestBase):
self.assertSpanHasAttributes(
span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
**self.net_peer_span_attributes,
SpanAttributes.RPC_METHOD: "handler",
SpanAttributes.RPC_SERVICE: "TestServicer",
SpanAttributes.RPC_SYSTEM: "grpc",
@ -592,18 +556,11 @@ class TestOpenTelemetryServerInterceptor(TestBase):
def handler(request, context):
context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message)
with futures.ThreadPoolExecutor(max_workers=1) as executor:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=[interceptor],
)
with self.server(
max_workers=1,
interceptors=[interceptor],
) as (server, channel):
server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),))
port = server.add_insecure_port("[::]:0")
channel = grpc.insecure_channel(f"localhost:{port:d}")
rpc_call = "TestServicer/handler"
server.start()
@ -635,8 +592,7 @@ class TestOpenTelemetryServerInterceptor(TestBase):
self.assertSpanHasAttributes(
span,
{
SpanAttributes.NET_PEER_IP: "[::1]",
SpanAttributes.NET_PEER_NAME: "localhost",
**self.net_peer_span_attributes,
SpanAttributes.RPC_METHOD: "handler",
SpanAttributes.RPC_SERVICE: "TestServicer",
SpanAttributes.RPC_SYSTEM: "grpc",
@ -647,6 +603,28 @@ class TestOpenTelemetryServerInterceptor(TestBase):
)
class TestOpenTelemetryServerInterceptorUnix(
TestOpenTelemetryServerInterceptor,
):
net_peer_span_attributes = {}
@contextlib.contextmanager
def server(self, max_workers=1, interceptors=None):
with futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor, tempfile.TemporaryDirectory() as tmp:
server = grpc.server(
executor,
options=(("grpc.so_reuseport", 0),),
interceptors=interceptors or [],
)
sock = f"unix://{tmp}/grpc.sock"
server.add_insecure_port(sock)
channel = grpc.insecure_channel(sock)
yield server, channel
def get_latch(num):
"""Get a countdown latch function for use in n threads."""
cv = threading.Condition()