Updated dbapi and psycopg2 instrumentations. (#246)

Changes:

- Update dbapi instrumentation to use the SQL statement name as the span
instead of the entire SQL query.
- Renamed TracedCursor with CursorTracing. The class was not a valid
Cursor so the name was confusing.
- Updated CursorTracing's (previously TracedCursor) traced_execution
method to accept the cursor instance as the first argument. This is
required as for some dbapi implementations, we need a reference to the
cursor in order to correctly format the SQL query.
- Updated psycopg2 instrumentation to leverage dbapi's `cursor_factory`
mechanism instead of wrapping the cursor with wrapt. This results in a
simpler instrumentation without monkey patching objects at runtime and
allows psycopg2's type registration system to work. This should make it
possible to use psycopg2 instrumentation when using the JSONB feature or
with frameworks like Django.
This commit is contained in:
Owais Lone
2021-01-21 00:15:28 +05:30
committed by GitHub
parent 8c8f2785bd
commit 8b9202be6f
13 changed files with 292 additions and 91 deletions

View File

@ -78,7 +78,7 @@ disable=missing-docstring,
protected-access, # temp-pylint-upgrade
super-init-not-called, # temp-pylint-upgrade
invalid-overridden-method, # temp-pylint-upgrade
missing-module-docstring, # temp-pylint-upgrad, # temp-pylint-upgradee
missing-module-docstring, # temp-pylint-upgrade
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option

View File

@ -56,9 +56,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-aiopg` Fix AttributeError `__aexit__` when `aiopg.connect` and `aio[g].create_pool` used with async context manager
([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235))
- `opentelemetry-exporter-datadog` `opentelemetry-sdk-extension-aws` Fix reference to ids_generator in sdk
([#235](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/235))
([#283](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/283))
- `opentelemetry-instrumentation-sqlalchemy` Use SQL operation and DB name as span name.
([#254](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/254))
- `opentelemetry-instrumentation-dbapi`, `TracedCursor` replaced by `CursorTracer`
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
- `opentelemetry-instrumentation-psycopg2`, Added support for psycopg2 registered types.
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
- `opentelemetry-instrumentation-dbapi`, `opentelemetry-instrumentation-psycopg2`, `opentelemetry-instrumentation-mysql`, `opentelemetry-instrumentation-pymysql`, `opentelemetry-instrumentation-aiopg` Use SQL command name as the span operation name instead of the entire query.
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
## [0.16b1](https://github.com/open-telemetry/opentelemetry-python-contrib/releases/tag/v0.16b1) - 2020-11-26

View File

@ -4,8 +4,8 @@ import wrapt
from aiopg.utils import _ContextManager, _PoolAcquireContextManager
from opentelemetry.instrumentation.dbapi import (
CursorTracer,
DatabaseApiIntegration,
TracedCursor,
)
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCode
@ -94,25 +94,29 @@ def get_traced_pool_proxy(pool, db_api_integration, *args, **kwargs):
return TracedPoolProxy(pool, *args, **kwargs)
class AsyncTracedCursor(TracedCursor):
class AsyncCursorTracer(CursorTracer):
async def traced_execution(
self,
cursor,
query_method: typing.Callable[..., typing.Any],
*args: typing.Tuple[typing.Any, typing.Any],
**kwargs: typing.Dict[typing.Any, typing.Any]
):
name = ""
if len(args) > 0 and args[0]:
name = args[0]
elif self._db_api_integration.database:
name = self._db_api_integration.database
else:
name = self._db_api_integration.name
if args:
name = self.get_operation_name(cursor, args)
if not name:
name = (
self._db_api_integration.database
if self._db_api_integration.database
else self._db_api_integration.name
)
with self._db_api_integration.get_tracer().start_as_current_span(
name, kind=SpanKind.CLIENT
) as span:
self._populate_span(span, *args)
self._populate_span(span, cursor, *args)
try:
result = await query_method(*args, **kwargs)
return result
@ -123,10 +127,10 @@ class AsyncTracedCursor(TracedCursor):
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
_traced_cursor = AsyncTracedCursor(db_api_integration)
_traced_cursor = AsyncCursorTracer(db_api_integration)
# pylint: disable=abstract-method
class AsyncTracedCursorProxy(AsyncProxyObject):
class AsyncCursorTracerProxy(AsyncProxyObject):
# pylint: disable=unused-argument
def __init__(self, cursor, *args, **kwargs):
@ -134,20 +138,20 @@ def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
async def execute(self, *args, **kwargs):
result = await _traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
self, self.__wrapped__.execute, *args, **kwargs
)
return result
async def executemany(self, *args, **kwargs):
result = await _traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
self, self.__wrapped__.executemany, *args, **kwargs
)
return result
async def callproc(self, *args, **kwargs):
result = await _traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
self, self.__wrapped__.callproc, *args, **kwargs
)
return result
return AsyncTracedCursorProxy(cursor, *args, **kwargs)
return AsyncCursorTracerProxy(cursor, *args, **kwargs)

View File

@ -256,7 +256,7 @@ class TestAiopgIntegration(TestBase):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "Test query")
self.assertEqual(span.name, "Test")
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(span.attributes["component"], "testcomponent")

View File

@ -60,6 +60,7 @@ def trace_integration(
connection_attributes: typing.Dict = None,
tracer_provider: typing.Optional[TracerProvider] = None,
capture_parameters: bool = False,
db_api_integration_factory=None,
):
"""Integrate with DB API library.
https://www.python.org/dev/peps/pep-0249/
@ -86,6 +87,7 @@ def trace_integration(
version=__version__,
tracer_provider=tracer_provider,
capture_parameters=capture_parameters,
db_api_integration_factory=db_api_integration_factory,
)
@ -99,6 +101,7 @@ def wrap_connect(
version: str = "",
tracer_provider: typing.Optional[TracerProvider] = None,
capture_parameters: bool = False,
db_api_integration_factory=None,
):
"""Integrate with DB API library.
https://www.python.org/dev/peps/pep-0249/
@ -115,6 +118,9 @@ def wrap_connect(
capture_parameters: Configure if db.statement.parameters should be captured.
"""
db_api_integration_factory = (
db_api_integration_factory or DatabaseApiIntegration
)
# pylint: disable=unused-argument
def wrap_connect_(
@ -123,7 +129,7 @@ def wrap_connect(
args: typing.Tuple[typing.Any, typing.Any],
kwargs: typing.Dict[typing.Any, typing.Any],
):
db_integration = DatabaseApiIntegration(
db_integration = db_api_integration_factory(
name,
database_component,
database_type=database_type,
@ -314,16 +320,19 @@ def get_traced_connection_proxy(
return TracedConnectionProxy(connection, *args, **kwargs)
class TracedCursor:
class CursorTracer:
def __init__(self, db_api_integration: DatabaseApiIntegration):
self._db_api_integration = db_api_integration
def _populate_span(
self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any]
self,
span: trace_api.Span,
cursor,
*args: typing.Tuple[typing.Any, typing.Any]
):
if not span.is_recording():
return
statement = args[0] if args else ""
statement = self.get_statement(cursor, args)
span.set_attribute(
"component", self._db_api_integration.database_component
)
@ -342,24 +351,38 @@ class TracedCursor:
if self._db_api_integration.capture_parameters and len(args) > 1:
span.set_attribute("db.statement.parameters", str(args[1]))
def get_operation_name(self, cursor, args): # pylint: disable=no-self-use
if args and isinstance(args[0], str):
return args[0].split()[0]
return ""
def get_statement(self, cursor, args): # pylint: disable=no-self-use
if not args:
return ""
statement = args[0]
if isinstance(statement, bytes):
return statement.decode("utf8", "replace")
return statement
def traced_execution(
self,
cursor,
query_method: typing.Callable[..., typing.Any],
*args: typing.Tuple[typing.Any, typing.Any],
**kwargs: typing.Dict[typing.Any, typing.Any]
):
name = ""
if args:
name = args[0]
elif self._db_api_integration.database:
name = self._db_api_integration.database
else:
name = self._db_api_integration.name
name = self.get_operation_name(cursor, args)
if not name:
name = (
self._db_api_integration.database
if self._db_api_integration.database
else self._db_api_integration.name
)
with self._db_api_integration.get_tracer().start_as_current_span(
name, kind=SpanKind.CLIENT
) as span:
self._populate_span(span, *args)
self._populate_span(span, cursor, *args)
try:
result = query_method(*args, **kwargs)
return result
@ -370,7 +393,7 @@ class TracedCursor:
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
_traced_cursor = TracedCursor(db_api_integration)
_cursor_tracer = CursorTracer(db_api_integration)
# pylint: disable=abstract-method
class TracedCursorProxy(wrapt.ObjectProxy):
@ -380,18 +403,18 @@ def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
wrapt.ObjectProxy.__init__(self, cursor)
def execute(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.execute, *args, **kwargs
)
def executemany(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs
)
def callproc(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs
)
def __enter__(self):

View File

@ -50,7 +50,7 @@ class TestDBApiIntegration(TestBase):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "Test query")
self.assertEqual(span.name, "Test")
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(span.attributes["component"], "testcomponent")
@ -65,6 +65,27 @@ class TestDBApiIntegration(TestBase):
span.status.status_code, trace_api.status.StatusCode.UNSET
)
def test_span_name(self):
db_integration = dbapi.DatabaseApiIntegration(
self.tracer, "testcomponent", "testtype", {}
)
mock_connection = db_integration.wrapped_connection(
mock_connect, {}, {}
)
cursor = mock_connection.cursor()
cursor.execute("Test query", ("param1Value", False))
cursor.execute(
"""multi
line
query"""
)
cursor.execute("tab\tseparated query")
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 3)
self.assertEqual(spans_list[0].name, "Test")
self.assertEqual(spans_list[1].name, "multi")
self.assertEqual(spans_list[2].name, "tab")
def test_span_succeeded_with_capture_of_statement_parameters(self):
connection_props = {
"database": "testdatabase",
@ -93,7 +114,7 @@ class TestDBApiIntegration(TestBase):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
span = spans_list[0]
self.assertEqual(span.name, "Test query")
self.assertEqual(span.name, "Test")
self.assertIs(span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(span.attributes["component"], "testcomponent")

View File

@ -39,12 +39,19 @@ API
---
"""
import typing
import psycopg2
from psycopg2.extensions import (
cursor as pg_cursor, # pylint: disable=no-name-in-module
)
from psycopg2.sql import Composed # pylint: disable=no-name-in-module
from opentelemetry.instrumentation import dbapi
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.psycopg2.version import __version__
from opentelemetry.trace import get_tracer
_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory"
class Psycopg2Instrumentor(BaseInstrumentor):
@ -62,7 +69,6 @@ class Psycopg2Instrumentor(BaseInstrumentor):
"""Integrate with PostgreSQL Psycopg library.
Psycopg: http://initd.org/psycopg/
"""
tracer_provider = kwargs.get("tracer_provider")
dbapi.wrap_connect(
@ -74,39 +80,101 @@ class Psycopg2Instrumentor(BaseInstrumentor):
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
db_api_integration_factory=DatabaseApiIntegration,
)
def _uninstrument(self, **kwargs):
""""Disable Psycopg2 instrumentation"""
dbapi.unwrap_connect(psycopg2, "connect")
# pylint:disable=no-self-use
def instrument_connection(self, connection):
"""Enable instrumentation in a Psycopg2 connection.
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
def instrument_connection(self, connection): # pylint: disable=no-self-use
setattr(
connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory
)
connection.cursor_factory = _new_cursor_factory()
return connection
Args:
connection: The connection to instrument.
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
def uninstrument_connection(
self, connection
): # pylint: disable=no-self-use
connection.cursor_factory = getattr(
connection, _OTEL_CURSOR_FACTORY_KEY, None
)
return connection
Returns:
An instrumented connection.
"""
tracer = get_tracer(__name__, __version__)
return dbapi.instrument_connection(
tracer,
connection,
self._DATABASE_COMPONENT,
self._DATABASE_TYPE,
self._CONNECTION_ATTRIBUTES,
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
class DatabaseApiIntegration(dbapi.DatabaseApiIntegration):
def wrapped_connection(
self,
connect_method: typing.Callable[..., typing.Any],
args: typing.Tuple[typing.Any, typing.Any],
kwargs: typing.Dict[typing.Any, typing.Any],
):
"""Add object proxy to connection object."""
base_cursor_factory = kwargs.pop("cursor_factory", None)
new_factory_kwargs = {"db_api": self}
if base_cursor_factory:
new_factory_kwargs["base_factory"] = base_cursor_factory
kwargs["cursor_factory"] = _new_cursor_factory(**new_factory_kwargs)
connection = connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
return connection
class CursorTracer(dbapi.CursorTracer):
def get_operation_name(self, cursor, args):
if not args:
return ""
statement = args[0]
if isinstance(statement, Composed):
statement = statement.as_string(cursor)
if isinstance(statement, str):
return statement.split()[0]
return ""
def get_statement(self, cursor, args):
if not args:
return ""
statement = args[0]
if isinstance(statement, Composed):
statement = statement.as_string(cursor)
return statement
def _new_cursor_factory(db_api=None, base_factory=None):
if not db_api:
db_api = DatabaseApiIntegration(
__name__,
Psycopg2Instrumentor._DATABASE_COMPONENT,
database_type=Psycopg2Instrumentor._DATABASE_TYPE,
connection_attributes=Psycopg2Instrumentor._CONNECTION_ATTRIBUTES,
version=__version__,
)
def uninstrument_connection(self, connection):
"""Disable instrumentation in a Psycopg2 connection.
base_factory = base_factory or pg_cursor
_cursor_tracer = CursorTracer(db_api)
Args:
connection: The connection to uninstrument.
class TracedCursorFactory(base_factory):
def execute(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self, super().execute, *args, **kwargs
)
Returns:
An uninstrumented connection.
"""
return dbapi.uninstrument_connection(connection)
def executemany(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self, super().executemany, *args, **kwargs
)
def callproc(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self, super().callproc, *args, **kwargs
)
return TracedCursorFactory

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import types
from unittest import mock
import psycopg2
@ -22,15 +23,69 @@ from opentelemetry.sdk import resources
from opentelemetry.test.test_base import TestBase
class MockCursor:
execute = mock.MagicMock(spec=types.MethodType)
execute.__name__ = "execute"
executemany = mock.MagicMock(spec=types.MethodType)
executemany.__name__ = "executemany"
callproc = mock.MagicMock(spec=types.MethodType)
callproc.__name__ = "callproc"
rowcount = "SomeRowCount"
def __init__(self, *args, **kwargs):
pass
def __enter__(self):
return self
def __exit__(self, *args):
return self
class MockConnection:
commit = mock.MagicMock(spec=types.MethodType)
commit.__name__ = "commit"
rollback = mock.MagicMock(spec=types.MethodType)
rollback.__name__ = "rollback"
def __init__(self, *args, **kwargs):
self.cursor_factory = kwargs.pop("cursor_factory", None)
def cursor(self):
if self.cursor_factory:
return self.cursor_factory(self)
return MockCursor()
def get_dsn_parameters(self): # pylint: disable=no-self-use
return dict(dbname="test")
class TestPostgresqlIntegration(TestBase):
def setUp(self):
self.cursor_mock = mock.patch(
"opentelemetry.instrumentation.psycopg2.pg_cursor", MockCursor
)
self.connection_mock = mock.patch("psycopg2.connect", MockConnection)
self.cursor_mock.start()
self.connection_mock.start()
def tearDown(self):
super().tearDown()
self.memory_exporter.clear()
self.cursor_mock.stop()
self.connection_mock.stop()
with self.disable_logging():
Psycopg2Instrumentor().uninstrument()
@mock.patch("psycopg2.connect")
# pylint: disable=unused-argument
def test_instrumentor(self, mock_connect):
def test_instrumentor(self):
Psycopg2Instrumentor().instrument()
cnx = psycopg2.connect(database="test")
@ -60,9 +115,8 @@ class TestPostgresqlIntegration(TestBase):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
@mock.patch("psycopg2.connect")
# pylint: disable=unused-argument
def test_not_recording(self, mock_connect):
def test_not_recording(self):
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
@ -83,9 +137,8 @@ class TestPostgresqlIntegration(TestBase):
Psycopg2Instrumentor().uninstrument()
@mock.patch("psycopg2.connect")
# pylint: disable=unused-argument
def test_custom_tracer_provider(self, mock_connect):
def test_custom_tracer_provider(self):
resource = resources.Resource.create({})
result = self.create_tracer_provider(resource=resource)
tracer_provider, exporter = result
@ -103,9 +156,8 @@ class TestPostgresqlIntegration(TestBase):
self.assertIs(span.resource, resource)
@mock.patch("psycopg2.connect")
# pylint: disable=unused-argument
def test_instrument_connection(self, mock_connect):
def test_instrument_connection(self):
cnx = psycopg2.connect(database="test")
query = "SELECT * FROM test"
cursor = cnx.cursor()
@ -121,9 +173,8 @@ class TestPostgresqlIntegration(TestBase):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
@mock.patch("psycopg2.connect")
# pylint: disable=unused-argument
def test_uninstrument_connection(self, mock_connect):
def test_uninstrument_connection(self):
Psycopg2Instrumentor().instrument()
cnx = psycopg2.connect(database="test")
query = "SELECT * FROM test"

View File

@ -60,7 +60,7 @@ class TestSQLite3(TestBase):
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute(stmt)
self.validate_spans(stmt)
self.validate_spans("CREATE")
def test_executemany(self):
"""Should create a child span for executemany"""
@ -68,7 +68,7 @@ class TestSQLite3(TestBase):
with self._tracer.start_as_current_span("rootSpan"):
data = [("1",), ("2",), ("3",)]
self._cursor.executemany(stmt, data)
self.validate_spans(stmt)
self.validate_spans("INSERT")
def test_callproc(self):
"""Should create a child span for callproc"""

View File

@ -81,7 +81,7 @@ class TestFunctionalMysql(TestBase):
stmt = "CREATE TABLE IF NOT EXISTS test (id INT)"
with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute(stmt)
self.validate_spans(stmt)
self.validate_spans("CREATE")
def test_execute_with_connection_context_manager(self):
"""Should create a child span for execute with connection context"""
@ -90,7 +90,7 @@ class TestFunctionalMysql(TestBase):
with self._connection as conn:
cursor = conn.cursor()
cursor.execute(stmt)
self.validate_spans(stmt)
self.validate_spans("CREATE")
def test_execute_with_cursor_context_manager(self):
"""Should create a child span for execute with cursor context"""
@ -98,7 +98,7 @@ class TestFunctionalMysql(TestBase):
with self._tracer.start_as_current_span("rootSpan"):
with self._connection.cursor() as cursor:
cursor.execute(stmt)
self.validate_spans(stmt)
self.validate_spans("CREATE")
def test_executemany(self):
"""Should create a child span for executemany"""
@ -106,7 +106,7 @@ class TestFunctionalMysql(TestBase):
with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",))
self._cursor.executemany(stmt, data)
self.validate_spans(stmt)
self.validate_spans("INSERT")
def test_callproc(self):
"""Should create a child span for callproc"""

View File

@ -89,7 +89,7 @@ class TestFunctionalAiopgConnect(TestBase):
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
with self._tracer.start_as_current_span("rootSpan"):
async_call(self._cursor.execute(stmt))
self.validate_spans(stmt)
self.validate_spans("CREATE")
def test_executemany(self):
"""Should create a child span for executemany"""
@ -98,7 +98,7 @@ class TestFunctionalAiopgConnect(TestBase):
with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",))
async_call(self._cursor.executemany(stmt, data))
self.validate_spans(stmt)
self.validate_spans("INSERT")
def test_callproc(self):
"""Should create a child span for callproc"""
@ -167,7 +167,7 @@ class TestFunctionalAiopgCreatePool(TestBase):
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
with self._tracer.start_as_current_span("rootSpan"):
async_call(self._cursor.execute(stmt))
self.validate_spans(stmt)
self.validate_spans("CREATE")
def test_executemany(self):
"""Should create a child span for executemany"""
@ -176,7 +176,7 @@ class TestFunctionalAiopgCreatePool(TestBase):
with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",))
async_call(self._cursor.executemany(stmt, data))
self.validate_spans(stmt)
self.validate_spans("INSERT")
def test_callproc(self):
"""Should create a child span for callproc"""

View File

@ -15,6 +15,7 @@
import os
import psycopg2
from psycopg2 import sql
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
@ -81,7 +82,7 @@ class TestFunctionalPsycopg(TestBase):
stmt = "CREATE TABLE IF NOT EXISTS test (id integer)"
with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute(stmt)
self.validate_spans(stmt)
self.validate_spans("CREATE")
def test_execute_with_connection_context_manager(self):
"""Should create a child span for execute with connection context"""
@ -90,7 +91,7 @@ class TestFunctionalPsycopg(TestBase):
with self._connection as conn:
cursor = conn.cursor()
cursor.execute(stmt)
self.validate_spans(stmt)
self.validate_spans("CREATE")
def test_execute_with_cursor_context_manager(self):
"""Should create a child span for execute with cursor context"""
@ -98,7 +99,7 @@ class TestFunctionalPsycopg(TestBase):
with self._tracer.start_as_current_span("rootSpan"):
with self._connection.cursor() as cursor:
cursor.execute(stmt)
self.validate_spans(stmt)
self.validate_spans("CREATE")
self.assertTrue(cursor.closed)
def test_executemany(self):
@ -107,7 +108,7 @@ class TestFunctionalPsycopg(TestBase):
with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",))
self._cursor.executemany(stmt, data)
self.validate_spans(stmt)
self.validate_spans("INSERT")
def test_callproc(self):
"""Should create a child span for callproc"""
@ -116,3 +117,30 @@ class TestFunctionalPsycopg(TestBase):
):
self._cursor.callproc("test", ())
self.validate_spans("test")
def test_register_types(self):
psycopg2.extras.register_default_jsonb(
conn_or_curs=self._cursor, loads=lambda x: x
)
def test_composed_queries(self):
stmt = "CREATE TABLE IF NOT EXISTS users (id integer, name varchar)"
with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute(stmt)
self.validate_spans("CREATE")
self._cursor.execute(
sql.SQL("SELECT FROM {table} where {field}='{value}'").format(
table=sql.Identifier("users"),
field=sql.Identifier("name"),
value=sql.Identifier("abc"),
)
)
spans = self.memory_exporter.get_finished_spans()
span = spans[2]
self.assertEqual(span.name, "SELECT")
self.assertEqual(
span.attributes["db.statement"],
'SELECT FROM "users" where "name"=\'"abc"\'',
)

View File

@ -78,7 +78,7 @@ class TestFunctionalPyMysql(TestBase):
stmt = "CREATE TABLE IF NOT EXISTS test (id INT)"
with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute(stmt)
self.validate_spans(stmt)
self.validate_spans("CREATE")
def test_execute_with_cursor_context_manager(self):
"""Should create a child span for execute with cursor context"""
@ -86,7 +86,7 @@ class TestFunctionalPyMysql(TestBase):
with self._tracer.start_as_current_span("rootSpan"):
with self._connection.cursor() as cursor:
cursor.execute(stmt)
self.validate_spans(stmt)
self.validate_spans("CREATE")
def test_executemany(self):
"""Should create a child span for executemany"""
@ -94,7 +94,7 @@ class TestFunctionalPyMysql(TestBase):
with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",))
self._cursor.executemany(stmt, data)
self.validate_spans(stmt)
self.validate_spans("INSERT")
def test_callproc(self):
"""Should create a child span for callproc"""