mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-28 20:52:57 +08:00
Handle redis.exceptions.WatchError as a non-error event in instrumentation (#2668)
This commit is contained in:
@ -40,7 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
([#2682](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2682))
|
||||
|
||||
### Fixed
|
||||
|
||||
- Handle `redis.exceptions.WatchError` as a non-error event in redis instrumentation
|
||||
([#2668](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2668))
|
||||
- `opentelemetry-instrumentation-httpx` Ensure httpx.get or httpx.request like methods are instrumented
|
||||
([#2538](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2538))
|
||||
- Add Python 3.12 support
|
||||
|
@ -106,7 +106,7 @@ from opentelemetry.instrumentation.redis.util import (
|
||||
from opentelemetry.instrumentation.redis.version import __version__
|
||||
from opentelemetry.instrumentation.utils import unwrap
|
||||
from opentelemetry.semconv.trace import SpanAttributes
|
||||
from opentelemetry.trace import Span
|
||||
from opentelemetry.trace import Span, StatusCode
|
||||
|
||||
_DEFAULT_SERVICE = "redis"
|
||||
|
||||
@ -212,9 +212,16 @@ def _instrument(
|
||||
span.set_attribute(
|
||||
"db.redis.pipeline_length", len(command_stack)
|
||||
)
|
||||
response = func(*args, **kwargs)
|
||||
|
||||
response = None
|
||||
try:
|
||||
response = func(*args, **kwargs)
|
||||
except redis.WatchError:
|
||||
span.set_status(StatusCode.UNSET)
|
||||
|
||||
if callable(response_hook):
|
||||
response_hook(span, instance, response)
|
||||
|
||||
return response
|
||||
|
||||
pipeline_class = (
|
||||
@ -281,7 +288,13 @@ def _instrument(
|
||||
span.set_attribute(
|
||||
"db.redis.pipeline_length", len(command_stack)
|
||||
)
|
||||
response = await func(*args, **kwargs)
|
||||
|
||||
response = None
|
||||
try:
|
||||
response = await func(*args, **kwargs)
|
||||
except redis.WatchError:
|
||||
span.set_status(StatusCode.UNSET)
|
||||
|
||||
if callable(response_hook):
|
||||
response_hook(span, instance, response)
|
||||
return response
|
||||
|
@ -1,6 +1,7 @@
|
||||
asgiref==3.7.2
|
||||
async-timeout==4.0.3
|
||||
Deprecated==1.2.14
|
||||
fakeredis==2.23.3
|
||||
importlib-metadata==6.11.0
|
||||
iniconfig==2.0.0
|
||||
packaging==24.0
|
||||
|
@ -12,11 +12,16 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import asyncio
|
||||
from unittest import mock
|
||||
from unittest import IsolatedAsyncioTestCase, mock
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import fakeredis
|
||||
import pytest
|
||||
import redis
|
||||
import redis.asyncio
|
||||
from fakeredis.aioredis import FakeRedis
|
||||
from redis.exceptions import ConnectionError as redis_ConnectionError
|
||||
from redis.exceptions import WatchError
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.instrumentation.redis import RedisInstrumentor
|
||||
@ -311,3 +316,113 @@ class TestRedis(TestBase):
|
||||
span.attributes[SpanAttributes.NET_TRANSPORT],
|
||||
NetTransportValues.OTHER.value,
|
||||
)
|
||||
|
||||
def test_connection_error(self):
|
||||
server = fakeredis.FakeServer()
|
||||
server.connected = False
|
||||
redis_client = fakeredis.FakeStrictRedis(server=server)
|
||||
try:
|
||||
redis_client.set("foo", "bar")
|
||||
except redis_ConnectionError:
|
||||
pass
|
||||
|
||||
spans = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans), 1)
|
||||
span = spans[0]
|
||||
|
||||
self.assertEqual(span.name, "SET")
|
||||
self.assertEqual(span.kind, SpanKind.CLIENT)
|
||||
self.assertEqual(span.status.status_code, trace.StatusCode.ERROR)
|
||||
|
||||
def test_response_error(self):
|
||||
redis_client = fakeredis.FakeStrictRedis()
|
||||
redis_client.lpush("mylist", "value")
|
||||
try:
|
||||
redis_client.incr(
|
||||
"mylist"
|
||||
) # Trying to increment a list, which is invalid
|
||||
except redis.ResponseError:
|
||||
pass
|
||||
|
||||
spans = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans), 2)
|
||||
|
||||
span = spans[0]
|
||||
self.assertEqual(span.name, "LPUSH")
|
||||
self.assertEqual(span.kind, SpanKind.CLIENT)
|
||||
self.assertEqual(span.status.status_code, trace.StatusCode.UNSET)
|
||||
|
||||
span = spans[1]
|
||||
self.assertEqual(span.name, "INCRBY")
|
||||
self.assertEqual(span.kind, SpanKind.CLIENT)
|
||||
self.assertEqual(span.status.status_code, trace.StatusCode.ERROR)
|
||||
|
||||
def test_watch_error_sync(self):
|
||||
def redis_operations():
|
||||
try:
|
||||
redis_client = fakeredis.FakeStrictRedis()
|
||||
pipe = redis_client.pipeline(transaction=True)
|
||||
pipe.watch("a")
|
||||
redis_client.set("a", "bad") # This will cause the WatchError
|
||||
pipe.multi()
|
||||
pipe.set("a", "1")
|
||||
pipe.execute()
|
||||
except WatchError:
|
||||
pass
|
||||
|
||||
redis_operations()
|
||||
|
||||
spans = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans), 3)
|
||||
|
||||
# there should be 3 tests, we start watch operation and have 2 set operation on same key
|
||||
self.assertEqual(len(spans), 3)
|
||||
|
||||
self.assertEqual(spans[0].attributes.get("db.statement"), "WATCH ?")
|
||||
self.assertEqual(spans[0].kind, SpanKind.CLIENT)
|
||||
self.assertEqual(spans[0].status.status_code, trace.StatusCode.UNSET)
|
||||
|
||||
for span in spans[1:]:
|
||||
self.assertEqual(span.attributes.get("db.statement"), "SET ? ?")
|
||||
self.assertEqual(span.kind, SpanKind.CLIENT)
|
||||
self.assertEqual(span.status.status_code, trace.StatusCode.UNSET)
|
||||
|
||||
|
||||
class TestRedisAsync(TestBase, IsolatedAsyncioTestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)
|
||||
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
RedisInstrumentor().uninstrument()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_watch_error_async(self):
|
||||
async def redis_operations():
|
||||
try:
|
||||
redis_client = FakeRedis()
|
||||
async with redis_client.pipeline(transaction=False) as pipe:
|
||||
await pipe.watch("a")
|
||||
await redis_client.set("a", "bad")
|
||||
pipe.multi()
|
||||
await pipe.set("a", "1")
|
||||
await pipe.execute()
|
||||
except WatchError:
|
||||
pass
|
||||
|
||||
await redis_operations()
|
||||
|
||||
spans = self.memory_exporter.get_finished_spans()
|
||||
|
||||
# there should be 3 tests, we start watch operation and have 2 set operation on same key
|
||||
self.assertEqual(len(spans), 3)
|
||||
|
||||
self.assertEqual(spans[0].attributes.get("db.statement"), "WATCH ?")
|
||||
self.assertEqual(spans[0].kind, SpanKind.CLIENT)
|
||||
self.assertEqual(spans[0].status.status_code, trace.StatusCode.UNSET)
|
||||
|
||||
for span in spans[1:]:
|
||||
self.assertEqual(span.attributes.get("db.statement"), "SET ? ?")
|
||||
self.assertEqual(span.kind, SpanKind.CLIENT)
|
||||
self.assertEqual(span.status.status_code, trace.StatusCode.UNSET)
|
||||
|
Reference in New Issue
Block a user