mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-28 12:43:39 +08:00
Add additional attributes for redis.search methods create_index, search (#2635)
This commit is contained in:
@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
([#2860](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2860))
|
([#2860](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2860))
|
||||||
- `opentelemetry-instrumentation-aiokafka` Add instrumentor and auto instrumentation support for aiokafka
|
- `opentelemetry-instrumentation-aiokafka` Add instrumentor and auto instrumentation support for aiokafka
|
||||||
([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082))
|
([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082))
|
||||||
|
- `opentelemetry-instrumentation-redis` Add additional attributes for methods create_index and search, rename those spans
|
||||||
|
([#2635](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2635))
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
|
@ -102,6 +102,8 @@ from opentelemetry.instrumentation.redis.package import _instruments
|
|||||||
from opentelemetry.instrumentation.redis.util import (
|
from opentelemetry.instrumentation.redis.util import (
|
||||||
_extract_conn_attributes,
|
_extract_conn_attributes,
|
||||||
_format_command_args,
|
_format_command_args,
|
||||||
|
_set_span_attribute_if_value,
|
||||||
|
_value_or_none,
|
||||||
)
|
)
|
||||||
from opentelemetry.instrumentation.redis.version import __version__
|
from opentelemetry.instrumentation.redis.version import __version__
|
||||||
from opentelemetry.instrumentation.utils import unwrap
|
from opentelemetry.instrumentation.utils import unwrap
|
||||||
@ -126,6 +128,8 @@ if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
|
|||||||
_REDIS_CLUSTER_VERSION = (4, 1, 0)
|
_REDIS_CLUSTER_VERSION = (4, 1, 0)
|
||||||
_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 2)
|
_REDIS_ASYNCIO_CLUSTER_VERSION = (4, 3, 2)
|
||||||
|
|
||||||
|
_FIELD_TYPES = ["NUMERIC", "TEXT", "GEO", "TAG", "VECTOR"]
|
||||||
|
|
||||||
|
|
||||||
def _set_connection_attributes(span, conn):
|
def _set_connection_attributes(span, conn):
|
||||||
if not span.is_recording() or not hasattr(conn, "connection_pool"):
|
if not span.is_recording() or not hasattr(conn, "connection_pool"):
|
||||||
@ -138,7 +142,12 @@ def _set_connection_attributes(span, conn):
|
|||||||
|
|
||||||
def _build_span_name(instance, cmd_args):
|
def _build_span_name(instance, cmd_args):
|
||||||
if len(cmd_args) > 0 and cmd_args[0]:
|
if len(cmd_args) > 0 and cmd_args[0]:
|
||||||
name = cmd_args[0]
|
if cmd_args[0] == "FT.SEARCH":
|
||||||
|
name = "redis.search"
|
||||||
|
elif cmd_args[0] == "FT.CREATE":
|
||||||
|
name = "redis.create_index"
|
||||||
|
else:
|
||||||
|
name = cmd_args[0]
|
||||||
else:
|
else:
|
||||||
name = instance.connection_pool.connection_kwargs.get("db", 0)
|
name = instance.connection_pool.connection_kwargs.get("db", 0)
|
||||||
return name
|
return name
|
||||||
@ -181,7 +190,6 @@ def _instrument(
|
|||||||
def _traced_execute_command(func, instance, args, kwargs):
|
def _traced_execute_command(func, instance, args, kwargs):
|
||||||
query = _format_command_args(args)
|
query = _format_command_args(args)
|
||||||
name = _build_span_name(instance, args)
|
name = _build_span_name(instance, args)
|
||||||
|
|
||||||
with tracer.start_as_current_span(
|
with tracer.start_as_current_span(
|
||||||
name, kind=trace.SpanKind.CLIENT
|
name, kind=trace.SpanKind.CLIENT
|
||||||
) as span:
|
) as span:
|
||||||
@ -189,9 +197,14 @@ def _instrument(
|
|||||||
span.set_attribute(SpanAttributes.DB_STATEMENT, query)
|
span.set_attribute(SpanAttributes.DB_STATEMENT, query)
|
||||||
_set_connection_attributes(span, instance)
|
_set_connection_attributes(span, instance)
|
||||||
span.set_attribute("db.redis.args_length", len(args))
|
span.set_attribute("db.redis.args_length", len(args))
|
||||||
|
if span.name == "redis.create_index":
|
||||||
|
_add_create_attributes(span, args)
|
||||||
if callable(request_hook):
|
if callable(request_hook):
|
||||||
request_hook(span, instance, args, kwargs)
|
request_hook(span, instance, args, kwargs)
|
||||||
response = func(*args, **kwargs)
|
response = func(*args, **kwargs)
|
||||||
|
if span.is_recording():
|
||||||
|
if span.name == "redis.search":
|
||||||
|
_add_search_attributes(span, response, args)
|
||||||
if callable(response_hook):
|
if callable(response_hook):
|
||||||
response_hook(span, instance, response)
|
response_hook(span, instance, response)
|
||||||
return response
|
return response
|
||||||
@ -202,9 +215,7 @@ def _instrument(
|
|||||||
resource,
|
resource,
|
||||||
span_name,
|
span_name,
|
||||||
) = _build_span_meta_data_for_pipeline(instance)
|
) = _build_span_meta_data_for_pipeline(instance)
|
||||||
|
|
||||||
exception = None
|
exception = None
|
||||||
|
|
||||||
with tracer.start_as_current_span(
|
with tracer.start_as_current_span(
|
||||||
span_name, kind=trace.SpanKind.CLIENT
|
span_name, kind=trace.SpanKind.CLIENT
|
||||||
) as span:
|
) as span:
|
||||||
@ -230,6 +241,60 @@ def _instrument(
|
|||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
def _add_create_attributes(span, args):
|
||||||
|
_set_span_attribute_if_value(
|
||||||
|
span, "redis.create_index.index", _value_or_none(args, 1)
|
||||||
|
)
|
||||||
|
# According to: https://github.com/redis/redis-py/blob/master/redis/commands/search/commands.py#L155 schema is last argument for execute command
|
||||||
|
try:
|
||||||
|
schema_index = args.index("SCHEMA")
|
||||||
|
except ValueError:
|
||||||
|
return
|
||||||
|
schema = args[schema_index:]
|
||||||
|
field_attribute = ""
|
||||||
|
# Schema in format:
|
||||||
|
# [first_field_name, first_field_type, first_field_some_attribute1, first_field_some_attribute2, second_field_name, ...]
|
||||||
|
field_attribute = "".join(
|
||||||
|
f"Field(name: {schema[index - 1]}, type: {schema[index]});"
|
||||||
|
for index in range(1, len(schema))
|
||||||
|
if schema[index] in _FIELD_TYPES
|
||||||
|
)
|
||||||
|
_set_span_attribute_if_value(
|
||||||
|
span,
|
||||||
|
"redis.create_index.fields",
|
||||||
|
field_attribute,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _add_search_attributes(span, response, args):
|
||||||
|
_set_span_attribute_if_value(
|
||||||
|
span, "redis.search.index", _value_or_none(args, 1)
|
||||||
|
)
|
||||||
|
_set_span_attribute_if_value(
|
||||||
|
span, "redis.search.query", _value_or_none(args, 2)
|
||||||
|
)
|
||||||
|
# Parse response from search
|
||||||
|
# https://redis.io/docs/latest/commands/ft.search/
|
||||||
|
# Response in format:
|
||||||
|
# [number_of_returned_documents, index_of_first_returned_doc, first_doc(as a list), index_of_second_returned_doc, second_doc(as a list) ...]
|
||||||
|
# Returned documents in array format:
|
||||||
|
# [first_field_name, first_field_value, second_field_name, second_field_value ...]
|
||||||
|
number_of_returned_documents = _value_or_none(response, 0)
|
||||||
|
_set_span_attribute_if_value(
|
||||||
|
span, "redis.search.total", number_of_returned_documents
|
||||||
|
)
|
||||||
|
if "NOCONTENT" in args or not number_of_returned_documents:
|
||||||
|
return
|
||||||
|
for document_number in range(number_of_returned_documents):
|
||||||
|
document_index = _value_or_none(response, 1 + 2 * document_number)
|
||||||
|
if document_index:
|
||||||
|
document = response[2 + 2 * document_number]
|
||||||
|
for attribute_name_index in range(0, len(document), 2):
|
||||||
|
_set_span_attribute_if_value(
|
||||||
|
span,
|
||||||
|
f"redis.search.xdoc_{document_index}.{document[attribute_name_index]}",
|
||||||
|
document[attribute_name_index + 1],
|
||||||
|
)
|
||||||
|
|
||||||
pipeline_class = (
|
pipeline_class = (
|
||||||
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
|
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
|
||||||
)
|
)
|
||||||
|
@ -68,3 +68,15 @@ def _format_command_args(args):
|
|||||||
out_str = ""
|
out_str = ""
|
||||||
|
|
||||||
return out_str
|
return out_str
|
||||||
|
|
||||||
|
|
||||||
|
def _set_span_attribute_if_value(span, name, value):
|
||||||
|
if value is not None and value != "":
|
||||||
|
span.set_attribute(name, value)
|
||||||
|
|
||||||
|
|
||||||
|
def _value_or_none(values, n):
|
||||||
|
try:
|
||||||
|
return values[n]
|
||||||
|
except IndexError:
|
||||||
|
return None
|
||||||
|
@ -24,7 +24,7 @@ services:
|
|||||||
POSTGRES_PASSWORD: testpassword
|
POSTGRES_PASSWORD: testpassword
|
||||||
POSTGRES_DB: opentelemetry-tests
|
POSTGRES_DB: opentelemetry-tests
|
||||||
otredis:
|
otredis:
|
||||||
image: redis:4.0-alpine
|
image: redis/redis-stack:7.2.0-v12
|
||||||
ports:
|
ports:
|
||||||
- "127.0.0.1:6379:6379"
|
- "127.0.0.1:6379:6379"
|
||||||
otrediscluster:
|
otrediscluster:
|
||||||
|
@ -18,6 +18,15 @@ from time import time_ns
|
|||||||
import redis
|
import redis
|
||||||
import redis.asyncio
|
import redis.asyncio
|
||||||
|
|
||||||
|
from redis.exceptions import ResponseError
|
||||||
|
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
|
||||||
|
from redis.commands.search.aggregation import AggregateRequest
|
||||||
|
from redis.commands.search.query import Query
|
||||||
|
from redis.commands.search.field import (
|
||||||
|
TextField,
|
||||||
|
VectorField,
|
||||||
|
)
|
||||||
|
|
||||||
from opentelemetry import trace
|
from opentelemetry import trace
|
||||||
from opentelemetry.instrumentation.redis import RedisInstrumentor
|
from opentelemetry.instrumentation.redis import RedisInstrumentor
|
||||||
from opentelemetry.semconv.trace import SpanAttributes
|
from opentelemetry.semconv.trace import SpanAttributes
|
||||||
@ -614,3 +623,63 @@ class TestRedisDBIndexInstrument(TestBase):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET ?"
|
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET ?"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRedisearchInstrument(TestBase):
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
self.redis_client = redis.Redis(port=6379)
|
||||||
|
self.redis_client.flushall()
|
||||||
|
self.embedding_dim = 256
|
||||||
|
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)
|
||||||
|
self.prepare_data()
|
||||||
|
self.create_index()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
RedisInstrumentor().uninstrument()
|
||||||
|
super().tearDown()
|
||||||
|
|
||||||
|
def prepare_data(self):
|
||||||
|
try:
|
||||||
|
self.redis_client.ft("idx:test_vss").dropindex(True)
|
||||||
|
except ResponseError:
|
||||||
|
print("No such index")
|
||||||
|
item = {"name": "test",
|
||||||
|
"value": "test_value",
|
||||||
|
"embeddings": [0.1] * 256}
|
||||||
|
pipeline = self.redis_client.pipeline()
|
||||||
|
pipeline.json().set(f"test:001", "$", item)
|
||||||
|
res = pipeline.execute()
|
||||||
|
assert False not in res
|
||||||
|
|
||||||
|
def create_index(self):
|
||||||
|
schema = (
|
||||||
|
TextField("$.name", no_stem=True, as_name="name"),
|
||||||
|
TextField("$.value", no_stem=True, as_name="value"),
|
||||||
|
VectorField("$.embeddings",
|
||||||
|
"FLAT",
|
||||||
|
{
|
||||||
|
"TYPE": "FLOAT32",
|
||||||
|
"DIM": self.embedding_dim,
|
||||||
|
"DISTANCE_METRIC": "COSINE",
|
||||||
|
},
|
||||||
|
as_name="vector",),
|
||||||
|
)
|
||||||
|
definition = IndexDefinition(prefix=["test:"], index_type=IndexType.JSON)
|
||||||
|
res = self.redis_client.ft("idx:test_vss").create_index(fields=schema, definition=definition)
|
||||||
|
assert "OK" in str(res)
|
||||||
|
|
||||||
|
def test_redis_create_index(self):
|
||||||
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
span = next(span for span in spans if span.name == "redis.create_index")
|
||||||
|
assert "redis.create_index.fields" in span.attributes
|
||||||
|
|
||||||
|
def test_redis_query(self):
|
||||||
|
query = "@name:test"
|
||||||
|
res = self.redis_client.ft("idx:test_vss").search(Query(query))
|
||||||
|
|
||||||
|
spans = self.memory_exporter.get_finished_spans()
|
||||||
|
span = next(span for span in spans if span.name == "redis.search")
|
||||||
|
|
||||||
|
assert span.attributes.get("redis.search.query") == query
|
||||||
|
assert span.attributes.get("redis.search.total") == 1
|
||||||
|
Reference in New Issue
Block a user