mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2026-03-13 08:10:39 +08:00
Fix psycopg2 (un)instrument_connection to use weakref, not mutate connection object (#4257)
* Fix psycopg2 (un)instrument_connection to use weakref, not mutate object * Changelog * conditional import for docs types * Lint * SImplify test * Simplify * Fix docs --------- Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
This commit is contained in:
@@ -127,6 +127,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
([#4258](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4258))
|
||||
- `opentelemetry-instrumentation-threading`: fix AttributeError when Thread is run without starting
|
||||
([#4246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4246))
|
||||
- `opentelemetry-instrumentation-psycopg2`: Fix AttributeError by using instrumented connections weakref, instead of mutating connection object
|
||||
([#4257](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4257))
|
||||
|
||||
### Breaking changes
|
||||
|
||||
|
||||
@@ -118,6 +118,8 @@ extensions = [
|
||||
|
||||
intersphinx_mapping = {
|
||||
"python": ("https://docs.python.org/3/", None),
|
||||
"psycopg": ("https://www.psycopg.org/psycopg3/docs/", None),
|
||||
"psycopg2": ("https://www.psycopg.org/docs/", None),
|
||||
"opentracing": (
|
||||
"https://opentracing-python.readthedocs.io/en/latest/",
|
||||
None,
|
||||
|
||||
@@ -140,8 +140,12 @@ API
|
||||
---
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import typing
|
||||
import weakref
|
||||
from importlib.metadata import PackageNotFoundError, distribution
|
||||
from typing import Collection
|
||||
|
||||
@@ -151,6 +155,7 @@ from psycopg2.extensions import (
|
||||
)
|
||||
from psycopg2.sql import Composed # pylint: disable=no-name-in-module
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.instrumentation import dbapi
|
||||
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
||||
from opentelemetry.instrumentation.psycopg2.package import (
|
||||
@@ -161,7 +166,11 @@ from opentelemetry.instrumentation.psycopg2.package import (
|
||||
from opentelemetry.instrumentation.psycopg2.version import __version__
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory"
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from psycopg2.extensions import ( # pylint: disable=no-name-in-module
|
||||
connection as PgConnection,
|
||||
)
|
||||
|
||||
|
||||
class Psycopg2Instrumentor(BaseInstrumentor):
|
||||
@@ -173,6 +182,8 @@ class Psycopg2Instrumentor(BaseInstrumentor):
|
||||
}
|
||||
|
||||
_DATABASE_SYSTEM = "postgresql"
|
||||
_INSTRUMENTED_CONNECTIONS = weakref.WeakKeyDictionary()
|
||||
_INSTRUMENTED_CONNECTIONS_LOCK = threading.Lock()
|
||||
|
||||
def instrumentation_dependencies(self) -> Collection[str]:
|
||||
# Determine which package of psycopg2 is installed
|
||||
@@ -222,11 +233,17 @@ class Psycopg2Instrumentor(BaseInstrumentor):
|
||||
|
||||
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
|
||||
@staticmethod
|
||||
def instrument_connection(connection, tracer_provider=None):
|
||||
def instrument_connection(
|
||||
connection: PgConnection,
|
||||
tracer_provider: typing.Optional[trace_api.TracerProvider] = None,
|
||||
) -> PgConnection:
|
||||
"""Enable instrumentation in a psycopg2 connection.
|
||||
|
||||
Uses `_INSTRUMENTED_CONNECTIONS` to store the original `cursor_factory`
|
||||
per connection.
|
||||
|
||||
Args:
|
||||
connection: psycopg2.extensions.connection
|
||||
connection:
|
||||
The psycopg2 connection object to be instrumented.
|
||||
tracer_provider: opentelemetry.trace.TracerProvider, optional
|
||||
The TracerProvider to use for instrumentation. If not specified,
|
||||
@@ -236,29 +253,38 @@ class Psycopg2Instrumentor(BaseInstrumentor):
|
||||
An instrumented psycopg2 connection object.
|
||||
"""
|
||||
|
||||
if not hasattr(connection, "_is_instrumented_by_opentelemetry"):
|
||||
connection._is_instrumented_by_opentelemetry = False
|
||||
with Psycopg2Instrumentor._INSTRUMENTED_CONNECTIONS_LOCK:
|
||||
if connection in Psycopg2Instrumentor._INSTRUMENTED_CONNECTIONS:
|
||||
_logger.warning(
|
||||
"Attempting to instrument Psycopg connection while already instrumented"
|
||||
)
|
||||
return connection
|
||||
|
||||
if not connection._is_instrumented_by_opentelemetry:
|
||||
setattr(
|
||||
connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory
|
||||
)
|
||||
original_cursor_factory = connection.cursor_factory
|
||||
connection.cursor_factory = _new_cursor_factory(
|
||||
tracer_provider=tracer_provider
|
||||
base_factory=original_cursor_factory,
|
||||
tracer_provider=tracer_provider,
|
||||
)
|
||||
connection._is_instrumented_by_opentelemetry = True
|
||||
else:
|
||||
_logger.warning(
|
||||
"Attempting to instrument Psycopg connection while already instrumented"
|
||||
Psycopg2Instrumentor._INSTRUMENTED_CONNECTIONS[connection] = (
|
||||
original_cursor_factory
|
||||
)
|
||||
|
||||
return connection
|
||||
|
||||
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
|
||||
@staticmethod
|
||||
def uninstrument_connection(connection):
|
||||
connection.cursor_factory = getattr(
|
||||
connection, _OTEL_CURSOR_FACTORY_KEY, None
|
||||
)
|
||||
def uninstrument_connection(connection: PgConnection) -> PgConnection:
|
||||
"""Disable instrumentation for a psycopg2 connection.
|
||||
|
||||
Restores the original `cursor_factory` from `_INSTRUMENTED_CONNECTIONS`.
|
||||
"""
|
||||
with Psycopg2Instrumentor._INSTRUMENTED_CONNECTIONS_LOCK:
|
||||
original_cursor_factory = (
|
||||
Psycopg2Instrumentor._INSTRUMENTED_CONNECTIONS.pop(
|
||||
connection, None
|
||||
)
|
||||
)
|
||||
connection.cursor_factory = original_cursor_factory
|
||||
|
||||
return connection
|
||||
|
||||
|
||||
@@ -215,6 +215,96 @@ class TestPostgresqlIntegration(TestBase):
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 1)
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def test_instrument_connection_is_idempotent(self):
|
||||
cnx = psycopg2.connect(database="test")
|
||||
query = "SELECT * FROM test"
|
||||
cursor = cnx.cursor()
|
||||
cursor.execute(query)
|
||||
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 0)
|
||||
|
||||
instrumentor = Psycopg2Instrumentor()
|
||||
cnx = instrumentor.instrument_connection(cnx)
|
||||
cnx = instrumentor.instrument_connection(cnx)
|
||||
cursor = cnx.cursor()
|
||||
cursor.execute(query)
|
||||
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 1)
|
||||
|
||||
def test_instrument_connection_with_custom_cursor_factory_instrument_then_uninstrument(
|
||||
self,
|
||||
):
|
||||
instrumentor = Psycopg2Instrumentor()
|
||||
cnx = psycopg2.connect(database="test", cursor_factory=MockCursor)
|
||||
query = "SELECT * FROM test"
|
||||
|
||||
self.assertIs(cnx.cursor_factory, MockCursor)
|
||||
|
||||
cnx = instrumentor.instrument_connection(cnx)
|
||||
self.assertIsNot(cnx.cursor_factory, MockCursor)
|
||||
|
||||
cursor = cnx.cursor()
|
||||
cursor.execute(query)
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 1)
|
||||
|
||||
cnx = instrumentor.uninstrument_connection(cnx)
|
||||
self.assertIs(cnx.cursor_factory, MockCursor)
|
||||
|
||||
cursor = cnx.cursor()
|
||||
cursor.execute(query)
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 1)
|
||||
|
||||
def test_uninstrument_connection_is_idempotent(self):
|
||||
instrumentor = Psycopg2Instrumentor()
|
||||
cnx = psycopg2.connect(database="test")
|
||||
query = "SELECT * FROM test"
|
||||
|
||||
cnx = instrumentor.instrument_connection(cnx)
|
||||
cursor = cnx.cursor()
|
||||
cursor.execute(query)
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 1)
|
||||
|
||||
cnx = instrumentor.uninstrument_connection(cnx)
|
||||
cursor = cnx.cursor()
|
||||
cursor.execute(query)
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 1)
|
||||
|
||||
cnx = instrumentor.uninstrument_connection(cnx)
|
||||
cursor = cnx.cursor()
|
||||
cursor.execute(query)
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 1)
|
||||
|
||||
def test_instrument_connection_reinstrument_after_uninstrument(self):
|
||||
instrumentor = Psycopg2Instrumentor()
|
||||
cnx = psycopg2.connect(database="test")
|
||||
query = "SELECT * FROM test"
|
||||
|
||||
cnx = instrumentor.instrument_connection(cnx)
|
||||
cursor = cnx.cursor()
|
||||
cursor.execute(query)
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 1)
|
||||
|
||||
cnx = instrumentor.uninstrument_connection(cnx)
|
||||
cursor = cnx.cursor()
|
||||
cursor.execute(query)
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 1)
|
||||
|
||||
cnx = instrumentor.instrument_connection(cnx)
|
||||
cursor = cnx.cursor()
|
||||
cursor.execute(query)
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 2)
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def test_uninstrument_connection_with_instrument(self):
|
||||
Psycopg2Instrumentor().instrument()
|
||||
|
||||
Reference in New Issue
Block a user