mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-31 22:23:12 +08:00
Instrument RedisCluster clients (#1177)
* Instrument RedisCluster clients * reformat files * update changelogs * refactor _traced_execute_pipeline * handle AttributeError * handle IndexError * refactor _traced_execute_pipeline * move hasattr check to _set_connection_attributes function Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc2-0.32b0...HEAD)
|
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc2-0.32b0...HEAD)
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- `opentelemetry-instrumentation-redis` add support to instrument RedisCluster clients
|
||||||
|
([#1177](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1177))
|
||||||
|
|
||||||
## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2-0.32b0) - 2022-07-01
|
## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2-0.32b0) - 2022-07-01
|
||||||
|
|
||||||
|
|
||||||
|
@ -122,9 +122,12 @@ _REDIS_ASYNCIO_VERSION = (4, 2, 0)
|
|||||||
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
|
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
|
||||||
import redis.asyncio
|
import redis.asyncio
|
||||||
|
|
||||||
|
_REDIS_CLUSTER_VERSION = (4, 1, 0)
|
||||||
|
_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 0)
|
||||||
|
|
||||||
|
|
||||||
def _set_connection_attributes(span, conn):
|
def _set_connection_attributes(span, conn):
|
||||||
if not span.is_recording():
|
if not span.is_recording() or not hasattr(conn, "connection_pool"):
|
||||||
return
|
return
|
||||||
for key, value in _extract_conn_attributes(
|
for key, value in _extract_conn_attributes(
|
||||||
conn.connection_pool.connection_kwargs
|
conn.connection_pool.connection_kwargs
|
||||||
@ -159,10 +162,29 @@ def _instrument(
|
|||||||
return response
|
return response
|
||||||
|
|
||||||
def _traced_execute_pipeline(func, instance, args, kwargs):
|
def _traced_execute_pipeline(func, instance, args, kwargs):
|
||||||
cmds = [_format_command_args(c) for c, _ in instance.command_stack]
|
try:
|
||||||
|
command_stack = (
|
||||||
|
instance.command_stack
|
||||||
|
if hasattr(instance, "command_stack")
|
||||||
|
else instance._command_stack
|
||||||
|
)
|
||||||
|
|
||||||
|
cmds = [
|
||||||
|
_format_command_args(c.args if hasattr(c, "args") else c[0])
|
||||||
|
for c in command_stack
|
||||||
|
]
|
||||||
resource = "\n".join(cmds)
|
resource = "\n".join(cmds)
|
||||||
|
|
||||||
span_name = " ".join([args[0] for args, _ in instance.command_stack])
|
span_name = " ".join(
|
||||||
|
[
|
||||||
|
(c.args[0] if hasattr(c, "args") else c[0][0])
|
||||||
|
for c in command_stack
|
||||||
|
]
|
||||||
|
)
|
||||||
|
except (AttributeError, IndexError):
|
||||||
|
command_stack = []
|
||||||
|
resource = ""
|
||||||
|
span_name = ""
|
||||||
|
|
||||||
with tracer.start_as_current_span(
|
with tracer.start_as_current_span(
|
||||||
span_name, kind=trace.SpanKind.CLIENT
|
span_name, kind=trace.SpanKind.CLIENT
|
||||||
@ -171,7 +193,7 @@ def _instrument(
|
|||||||
span.set_attribute(SpanAttributes.DB_STATEMENT, resource)
|
span.set_attribute(SpanAttributes.DB_STATEMENT, resource)
|
||||||
_set_connection_attributes(span, instance)
|
_set_connection_attributes(span, instance)
|
||||||
span.set_attribute(
|
span.set_attribute(
|
||||||
"db.redis.pipeline_length", len(instance.command_stack)
|
"db.redis.pipeline_length", len(command_stack)
|
||||||
)
|
)
|
||||||
response = func(*args, **kwargs)
|
response = func(*args, **kwargs)
|
||||||
if callable(response_hook):
|
if callable(response_hook):
|
||||||
@ -196,6 +218,17 @@ def _instrument(
|
|||||||
f"{pipeline_class}.immediate_execute_command",
|
f"{pipeline_class}.immediate_execute_command",
|
||||||
_traced_execute_command,
|
_traced_execute_command,
|
||||||
)
|
)
|
||||||
|
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
|
||||||
|
wrap_function_wrapper(
|
||||||
|
"redis.cluster",
|
||||||
|
"RedisCluster.execute_command",
|
||||||
|
_traced_execute_command,
|
||||||
|
)
|
||||||
|
wrap_function_wrapper(
|
||||||
|
"redis.cluster",
|
||||||
|
"ClusterPipeline.execute",
|
||||||
|
_traced_execute_pipeline,
|
||||||
|
)
|
||||||
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
|
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
|
||||||
wrap_function_wrapper(
|
wrap_function_wrapper(
|
||||||
"redis.asyncio",
|
"redis.asyncio",
|
||||||
@ -212,6 +245,17 @@ def _instrument(
|
|||||||
f"{pipeline_class}.immediate_execute_command",
|
f"{pipeline_class}.immediate_execute_command",
|
||||||
_traced_execute_command,
|
_traced_execute_command,
|
||||||
)
|
)
|
||||||
|
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION:
|
||||||
|
wrap_function_wrapper(
|
||||||
|
"redis.asyncio.cluster",
|
||||||
|
"RedisCluster.execute_command",
|
||||||
|
_traced_execute_command,
|
||||||
|
)
|
||||||
|
wrap_function_wrapper(
|
||||||
|
"redis.asyncio.cluster",
|
||||||
|
"ClusterPipeline.execute",
|
||||||
|
_traced_execute_pipeline,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class RedisInstrumentor(BaseInstrumentor):
|
class RedisInstrumentor(BaseInstrumentor):
|
||||||
@ -258,8 +302,14 @@ class RedisInstrumentor(BaseInstrumentor):
|
|||||||
unwrap(redis.Redis, "pipeline")
|
unwrap(redis.Redis, "pipeline")
|
||||||
unwrap(redis.client.Pipeline, "execute")
|
unwrap(redis.client.Pipeline, "execute")
|
||||||
unwrap(redis.client.Pipeline, "immediate_execute_command")
|
unwrap(redis.client.Pipeline, "immediate_execute_command")
|
||||||
|
if redis.VERSION >= _REDIS_CLUSTER_VERSION:
|
||||||
|
unwrap(redis.cluster.RedisCluster, "execute_command")
|
||||||
|
unwrap(redis.cluster.ClusterPipeline, "execute")
|
||||||
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
|
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
|
||||||
unwrap(redis.asyncio.Redis, "execute_command")
|
unwrap(redis.asyncio.Redis, "execute_command")
|
||||||
unwrap(redis.asyncio.Redis, "pipeline")
|
unwrap(redis.asyncio.Redis, "pipeline")
|
||||||
unwrap(redis.asyncio.client.Pipeline, "execute")
|
unwrap(redis.asyncio.client.Pipeline, "execute")
|
||||||
unwrap(redis.asyncio.client.Pipeline, "immediate_execute_command")
|
unwrap(redis.asyncio.client.Pipeline, "immediate_execute_command")
|
||||||
|
if redis.VERSION >= _REDIS_ASYNCIO_CLUSTER_VERSION:
|
||||||
|
unwrap(redis.asyncio.cluster.RedisCluster, "execute_command")
|
||||||
|
unwrap(redis.asyncio.cluster.ClusterPipeline, "execute")
|
||||||
|
@ -27,6 +27,17 @@ services:
|
|||||||
image: redis:4.0-alpine
|
image: redis:4.0-alpine
|
||||||
ports:
|
ports:
|
||||||
- "127.0.0.1:6379:6379"
|
- "127.0.0.1:6379:6379"
|
||||||
|
otrediscluster:
|
||||||
|
image: grokzen/redis-cluster:6.2.0
|
||||||
|
environment:
|
||||||
|
- IP=0.0.0.0
|
||||||
|
ports:
|
||||||
|
- "127.0.0.1:7000:7000"
|
||||||
|
- "127.0.0.1:7001:7001"
|
||||||
|
- "127.0.0.1:7002:7002"
|
||||||
|
- "127.0.0.1:7003:7003"
|
||||||
|
- "127.0.0.1:7004:7004"
|
||||||
|
- "127.0.0.1:7005:7005"
|
||||||
otjaeger:
|
otjaeger:
|
||||||
image: jaegertracing/all-in-one:1.8
|
image: jaegertracing/all-in-one:1.8
|
||||||
environment:
|
environment:
|
||||||
|
@ -124,6 +124,72 @@ class TestRedisInstrument(TestBase):
|
|||||||
self.assertEqual(child_span.name, "GET")
|
self.assertEqual(child_span.name, "GET")
|
||||||
|
|
||||||
|
|
||||||
|
class TestRedisClusterInstrument(TestBase):
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
self.redis_client = redis.cluster.RedisCluster(
|
||||||
|
host="localhost", port=7000
|
||||||
|
)
|
||||||
|
self.redis_client.flushall()
|
||||||
|
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super().tearDown()
|
||||||
|
RedisInstrumentor().uninstrument()
|
||||||
|
|
||||||
|
def _check_span(self, span, name):
|
||||||
|
self.assertEqual(span.name, name)
|
||||||
|
self.assertIs(span.status.status_code, trace.StatusCode.UNSET)
|
||||||
|
|
||||||
|
def test_basics(self):
|
||||||
|
self.assertIsNone(self.redis_client.get("cheese"))
|
||||||
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
self.assertEqual(len(spans), 1)
|
||||||
|
span = spans[0]
|
||||||
|
self._check_span(span, "GET")
|
||||||
|
self.assertEqual(
|
||||||
|
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese"
|
||||||
|
)
|
||||||
|
self.assertEqual(span.attributes.get("db.redis.args_length"), 2)
|
||||||
|
|
||||||
|
def test_pipeline_traced(self):
|
||||||
|
with self.redis_client.pipeline(transaction=False) as pipeline:
|
||||||
|
pipeline.set("blah", 32)
|
||||||
|
pipeline.rpush("foo", "éé")
|
||||||
|
pipeline.hgetall("xxx")
|
||||||
|
pipeline.execute()
|
||||||
|
|
||||||
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
self.assertEqual(len(spans), 1)
|
||||||
|
span = spans[0]
|
||||||
|
self._check_span(span, "SET RPUSH HGETALL")
|
||||||
|
self.assertEqual(
|
||||||
|
span.attributes.get(SpanAttributes.DB_STATEMENT),
|
||||||
|
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
|
||||||
|
)
|
||||||
|
self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3)
|
||||||
|
|
||||||
|
def test_parent(self):
|
||||||
|
"""Ensure OpenTelemetry works with redis."""
|
||||||
|
ot_tracer = trace.get_tracer("redis_svc")
|
||||||
|
|
||||||
|
with ot_tracer.start_as_current_span("redis_get"):
|
||||||
|
self.assertIsNone(self.redis_client.get("cheese"))
|
||||||
|
|
||||||
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
self.assertEqual(len(spans), 2)
|
||||||
|
child_span, parent_span = spans[0], spans[1]
|
||||||
|
|
||||||
|
# confirm the parenting
|
||||||
|
self.assertIsNone(parent_span.parent)
|
||||||
|
self.assertIs(child_span.parent, parent_span.get_span_context())
|
||||||
|
|
||||||
|
self.assertEqual(parent_span.name, "redis_get")
|
||||||
|
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")
|
||||||
|
|
||||||
|
self.assertEqual(child_span.name, "GET")
|
||||||
|
|
||||||
|
|
||||||
def async_call(coro):
|
def async_call(coro):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
return loop.run_until_complete(coro)
|
return loop.run_until_complete(coro)
|
||||||
@ -238,6 +304,77 @@ class TestAsyncRedisInstrument(TestBase):
|
|||||||
self.assertEqual(child_span.name, "GET")
|
self.assertEqual(child_span.name, "GET")
|
||||||
|
|
||||||
|
|
||||||
|
class TestAsyncRedisClusterInstrument(TestBase):
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
self.redis_client = redis.asyncio.cluster.RedisCluster(
|
||||||
|
host="localhost", port=7000
|
||||||
|
)
|
||||||
|
async_call(self.redis_client.flushall())
|
||||||
|
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
super().tearDown()
|
||||||
|
RedisInstrumentor().uninstrument()
|
||||||
|
|
||||||
|
def _check_span(self, span, name):
|
||||||
|
self.assertEqual(span.name, name)
|
||||||
|
self.assertIs(span.status.status_code, trace.StatusCode.UNSET)
|
||||||
|
|
||||||
|
def test_basics(self):
|
||||||
|
self.assertIsNone(async_call(self.redis_client.get("cheese")))
|
||||||
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
self.assertEqual(len(spans), 1)
|
||||||
|
span = spans[0]
|
||||||
|
self._check_span(span, "GET")
|
||||||
|
self.assertEqual(
|
||||||
|
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese"
|
||||||
|
)
|
||||||
|
self.assertEqual(span.attributes.get("db.redis.args_length"), 2)
|
||||||
|
|
||||||
|
def test_pipeline_traced(self):
|
||||||
|
async def pipeline_simple():
|
||||||
|
async with self.redis_client.pipeline(
|
||||||
|
transaction=False
|
||||||
|
) as pipeline:
|
||||||
|
pipeline.set("blah", 32)
|
||||||
|
pipeline.rpush("foo", "éé")
|
||||||
|
pipeline.hgetall("xxx")
|
||||||
|
await pipeline.execute()
|
||||||
|
|
||||||
|
async_call(pipeline_simple())
|
||||||
|
|
||||||
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
self.assertEqual(len(spans), 1)
|
||||||
|
span = spans[0]
|
||||||
|
self._check_span(span, "SET RPUSH HGETALL")
|
||||||
|
self.assertEqual(
|
||||||
|
span.attributes.get(SpanAttributes.DB_STATEMENT),
|
||||||
|
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
|
||||||
|
)
|
||||||
|
self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3)
|
||||||
|
|
||||||
|
def test_parent(self):
|
||||||
|
"""Ensure OpenTelemetry works with redis."""
|
||||||
|
ot_tracer = trace.get_tracer("redis_svc")
|
||||||
|
|
||||||
|
with ot_tracer.start_as_current_span("redis_get"):
|
||||||
|
self.assertIsNone(async_call(self.redis_client.get("cheese")))
|
||||||
|
|
||||||
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
self.assertEqual(len(spans), 2)
|
||||||
|
child_span, parent_span = spans[0], spans[1]
|
||||||
|
|
||||||
|
# confirm the parenting
|
||||||
|
self.assertIsNone(parent_span.parent)
|
||||||
|
self.assertIs(child_span.parent, parent_span.get_span_context())
|
||||||
|
|
||||||
|
self.assertEqual(parent_span.name, "redis_get")
|
||||||
|
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")
|
||||||
|
|
||||||
|
self.assertEqual(child_span.name, "GET")
|
||||||
|
|
||||||
|
|
||||||
class TestRedisDBIndexInstrument(TestBase):
|
class TestRedisDBIndexInstrument(TestBase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super().setUp()
|
super().setUp()
|
||||||
|
2
tox.ini
2
tox.ini
@ -501,7 +501,7 @@ deps =
|
|||||||
psycopg2 ~= 2.8.4
|
psycopg2 ~= 2.8.4
|
||||||
aiopg >= 0.13.0, < 1.3.0
|
aiopg >= 0.13.0, < 1.3.0
|
||||||
sqlalchemy ~= 1.4
|
sqlalchemy ~= 1.4
|
||||||
redis ~= 4.2
|
redis ~= 4.3
|
||||||
celery[pytest] >= 4.0, < 6.0
|
celery[pytest] >= 4.0, < 6.0
|
||||||
protobuf~=3.13
|
protobuf~=3.13
|
||||||
requests==2.25.0
|
requests==2.25.0
|
||||||
|
Reference in New Issue
Block a user