From 8823655b1c0d4641261c98e0421c97ed8dda359b Mon Sep 17 00:00:00 2001 From: Sungwon Han Date: Thu, 7 Jul 2022 22:00:09 +0900 Subject: [PATCH] Instrument RedisCluster clients (#1177) * Instrument RedisCluster clients * reformat files * update changelogs * refactor _traced_execute_pipeline * handle AttributeError * handle IndexError * refactor _traced_execute_pipeline * move hasattr check to _set_connection_attributes function Co-authored-by: Srikanth Chekuri --- CHANGELOG.md | 4 + .../instrumentation/redis/__init__.py | 60 +++++++- .../tests/docker-compose.yml | 11 ++ .../tests/redis/test_redis_functional.py | 137 ++++++++++++++++++ tox.ini | 2 +- 5 files changed, 208 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9ec0ed3d..c8669d975 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc2-0.32b0...HEAD) +### Added +- `opentelemetry-instrumentation-redis` add support to instrument RedisCluster clients + ([#1177](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1177)) + ## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2-0.32b0) - 2022-07-01 diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index 8aaec8795..4db552326 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -122,9 +122,12 @@ _REDIS_ASYNCIO_VERSION = (4, 2, 0) if redis.VERSION >= _REDIS_ASYNCIO_VERSION: import redis.asyncio +_REDIS_CLUSTER_VERSION = (4, 1, 0) +_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 0) + def _set_connection_attributes(span, conn): - if not span.is_recording(): + if not span.is_recording() or not hasattr(conn, "connection_pool"): return for key, value in _extract_conn_attributes( conn.connection_pool.connection_kwargs @@ -159,10 +162,29 @@ def _instrument( return response def _traced_execute_pipeline(func, instance, args, kwargs): - cmds = [_format_command_args(c) for c, _ in instance.command_stack] - resource = "\n".join(cmds) + try: + command_stack = ( + instance.command_stack + if hasattr(instance, "command_stack") + else instance._command_stack + ) - span_name = " ".join([args[0] for args, _ in instance.command_stack]) + cmds = [ + _format_command_args(c.args if hasattr(c, "args") else c[0]) + for c in command_stack + ] + resource = "\n".join(cmds) + + span_name = " ".join( + [ + (c.args[0] if hasattr(c, "args") else c[0][0]) + for c in command_stack + ] + ) + except (AttributeError, IndexError): + command_stack = [] + resource = "" + span_name = "" with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT @@ -171,7 +193,7 @@ def _instrument( span.set_attribute(SpanAttributes.DB_STATEMENT, resource) _set_connection_attributes(span, instance) span.set_attribute( - "db.redis.pipeline_length", len(instance.command_stack) + "db.redis.pipeline_length", len(command_stack) ) response = func(*args, **kwargs) if callable(response_hook): @@ -196,6 +218,17 @@ def _instrument( f"{pipeline_class}.immediate_execute_command", _traced_execute_command, ) + if redis.VERSION >= _REDIS_CLUSTER_VERSION: + wrap_function_wrapper( + "redis.cluster", + "RedisCluster.execute_command", + _traced_execute_command, + ) + wrap_function_wrapper( + "redis.cluster", + "ClusterPipeline.execute", + _traced_execute_pipeline, + ) if redis.VERSION >= _REDIS_ASYNCIO_VERSION: wrap_function_wrapper( "redis.asyncio", @@ -212,6 +245,17 @@ def _instrument( f"{pipeline_class}.immediate_execute_command", _traced_execute_command, ) + if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION: + wrap_function_wrapper( + "redis.asyncio.cluster", + "RedisCluster.execute_command", + _traced_execute_command, + ) + wrap_function_wrapper( + "redis.asyncio.cluster", + "ClusterPipeline.execute", + _traced_execute_pipeline, + ) class RedisInstrumentor(BaseInstrumentor): @@ -258,8 +302,14 @@ class RedisInstrumentor(BaseInstrumentor): unwrap(redis.Redis, "pipeline") unwrap(redis.client.Pipeline, "execute") unwrap(redis.client.Pipeline, "immediate_execute_command") + if redis.VERSION >= _REDIS_CLUSTER_VERSION: + unwrap(redis.cluster.RedisCluster, "execute_command") + unwrap(redis.cluster.ClusterPipeline, "execute") if redis.VERSION >= _REDIS_ASYNCIO_VERSION: unwrap(redis.asyncio.Redis, "execute_command") unwrap(redis.asyncio.Redis, "pipeline") unwrap(redis.asyncio.client.Pipeline, "execute") unwrap(redis.asyncio.client.Pipeline, "immediate_execute_command") + if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION: + unwrap(redis.asyncio.cluster.RedisCluster, "execute_command") + unwrap(redis.asyncio.cluster.ClusterPipeline, "execute") diff --git a/tests/opentelemetry-docker-tests/tests/docker-compose.yml b/tests/opentelemetry-docker-tests/tests/docker-compose.yml index 8a33adb79..2f89e3388 100644 --- a/tests/opentelemetry-docker-tests/tests/docker-compose.yml +++ b/tests/opentelemetry-docker-tests/tests/docker-compose.yml @@ -27,6 +27,17 @@ services: image: redis:4.0-alpine ports: - "127.0.0.1:6379:6379" + otrediscluster: + image: grokzen/redis-cluster:6.2.0 + environment: + - IP=0.0.0.0 + ports: + - "127.0.0.1:7000:7000" + - "127.0.0.1:7001:7001" + - "127.0.0.1:7002:7002" + - "127.0.0.1:7003:7003" + - "127.0.0.1:7004:7004" + - "127.0.0.1:7005:7005" otjaeger: image: jaegertracing/all-in-one:1.8 environment: diff --git a/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py b/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py index 0b5d49ca1..6a4ef5021 100644 --- a/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py +++ b/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py @@ -124,6 +124,72 @@ class TestRedisInstrument(TestBase): self.assertEqual(child_span.name, "GET") +class TestRedisClusterInstrument(TestBase): + def setUp(self): + super().setUp() + self.redis_client = redis.cluster.RedisCluster( + host="localhost", port=7000 + ) + self.redis_client.flushall() + RedisInstrumentor().instrument(tracer_provider=self.tracer_provider) + + def tearDown(self): + super().tearDown() + RedisInstrumentor().uninstrument() + + def _check_span(self, span, name): + self.assertEqual(span.name, name) + self.assertIs(span.status.status_code, trace.StatusCode.UNSET) + + def test_basics(self): + self.assertIsNone(self.redis_client.get("cheese")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self._check_span(span, "GET") + self.assertEqual( + span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese" + ) + self.assertEqual(span.attributes.get("db.redis.args_length"), 2) + + def test_pipeline_traced(self): + with self.redis_client.pipeline(transaction=False) as pipeline: + pipeline.set("blah", 32) + pipeline.rpush("foo", "éé") + pipeline.hgetall("xxx") + pipeline.execute() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self._check_span(span, "SET RPUSH HGETALL") + self.assertEqual( + span.attributes.get(SpanAttributes.DB_STATEMENT), + "SET blah 32\nRPUSH foo éé\nHGETALL xxx", + ) + self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3) + + def test_parent(self): + """Ensure OpenTelemetry works with redis.""" + ot_tracer = trace.get_tracer("redis_svc") + + with ot_tracer.start_as_current_span("redis_get"): + self.assertIsNone(self.redis_client.get("cheese")) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + child_span, parent_span = spans[0], spans[1] + + # confirm the parenting + self.assertIsNone(parent_span.parent) + self.assertIs(child_span.parent, parent_span.get_span_context()) + + self.assertEqual(parent_span.name, "redis_get") + self.assertEqual(parent_span.instrumentation_info.name, "redis_svc") + + self.assertEqual(child_span.name, "GET") + + def async_call(coro): loop = asyncio.get_event_loop() return loop.run_until_complete(coro) @@ -238,6 +304,77 @@ class TestAsyncRedisInstrument(TestBase): self.assertEqual(child_span.name, "GET") +class TestAsyncRedisClusterInstrument(TestBase): + def setUp(self): + super().setUp() + self.redis_client = redis.asyncio.cluster.RedisCluster( + host="localhost", port=7000 + ) + async_call(self.redis_client.flushall()) + RedisInstrumentor().instrument(tracer_provider=self.tracer_provider) + + def tearDown(self): + super().tearDown() + RedisInstrumentor().uninstrument() + + def _check_span(self, span, name): + self.assertEqual(span.name, name) + self.assertIs(span.status.status_code, trace.StatusCode.UNSET) + + def test_basics(self): + self.assertIsNone(async_call(self.redis_client.get("cheese"))) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self._check_span(span, "GET") + self.assertEqual( + span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese" + ) + self.assertEqual(span.attributes.get("db.redis.args_length"), 2) + + def test_pipeline_traced(self): + async def pipeline_simple(): + async with self.redis_client.pipeline( + transaction=False + ) as pipeline: + pipeline.set("blah", 32) + pipeline.rpush("foo", "éé") + pipeline.hgetall("xxx") + await pipeline.execute() + + async_call(pipeline_simple()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self._check_span(span, "SET RPUSH HGETALL") + self.assertEqual( + span.attributes.get(SpanAttributes.DB_STATEMENT), + "SET blah 32\nRPUSH foo éé\nHGETALL xxx", + ) + self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3) + + def test_parent(self): + """Ensure OpenTelemetry works with redis.""" + ot_tracer = trace.get_tracer("redis_svc") + + with ot_tracer.start_as_current_span("redis_get"): + self.assertIsNone(async_call(self.redis_client.get("cheese"))) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + child_span, parent_span = spans[0], spans[1] + + # confirm the parenting + self.assertIsNone(parent_span.parent) + self.assertIs(child_span.parent, parent_span.get_span_context()) + + self.assertEqual(parent_span.name, "redis_get") + self.assertEqual(parent_span.instrumentation_info.name, "redis_svc") + + self.assertEqual(child_span.name, "GET") + + class TestRedisDBIndexInstrument(TestBase): def setUp(self): super().setUp() diff --git a/tox.ini b/tox.ini index f7a106442..64210a8c0 100644 --- a/tox.ini +++ b/tox.ini @@ -501,7 +501,7 @@ deps = psycopg2 ~= 2.8.4 aiopg >= 0.13.0, < 1.3.0 sqlalchemy ~= 1.4 - redis ~= 4.2 + redis ~= 4.3 celery[pytest] >= 4.0, < 6.0 protobuf~=3.13 requests==2.25.0