Add metrics instrumentation sqlalchemy (#1645)

This commit is contained in:
Shalev Roda
2023-02-26 16:51:42 +02:00
committed by GitHub
parent 0417141a70
commit 7ffbfc302e
6 changed files with 250 additions and 37 deletions

View File

@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased ## Unreleased
- Add metrics instrumentation for sqlalchemy
([#1645](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1645))
- Fix exception in Urllib3 when dealing with filelike body. - Fix exception in Urllib3 when dealing with filelike body.
([#1399](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1399)) ([#1399](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1399))

View File

@ -34,7 +34,7 @@
| [opentelemetry-instrumentation-remoulade](./opentelemetry-instrumentation-remoulade) | remoulade >= 0.50 | No | [opentelemetry-instrumentation-remoulade](./opentelemetry-instrumentation-remoulade) | remoulade >= 0.50 | No
| [opentelemetry-instrumentation-requests](./opentelemetry-instrumentation-requests) | requests ~= 2.0 | Yes | [opentelemetry-instrumentation-requests](./opentelemetry-instrumentation-requests) | requests ~= 2.0 | Yes
| [opentelemetry-instrumentation-sklearn](./opentelemetry-instrumentation-sklearn) | scikit-learn ~= 0.24.0 | No | [opentelemetry-instrumentation-sklearn](./opentelemetry-instrumentation-sklearn) | scikit-learn ~= 0.24.0 | No
| [opentelemetry-instrumentation-sqlalchemy](./opentelemetry-instrumentation-sqlalchemy) | sqlalchemy | No | [opentelemetry-instrumentation-sqlalchemy](./opentelemetry-instrumentation-sqlalchemy) | sqlalchemy | Yes
| [opentelemetry-instrumentation-sqlite3](./opentelemetry-instrumentation-sqlite3) | sqlite3 | No | [opentelemetry-instrumentation-sqlite3](./opentelemetry-instrumentation-sqlite3) | sqlite3 | No
| [opentelemetry-instrumentation-starlette](./opentelemetry-instrumentation-starlette) | starlette ~= 0.13.0 | Yes | [opentelemetry-instrumentation-starlette](./opentelemetry-instrumentation-starlette) | starlette ~= 0.13.0 | Yes
| [opentelemetry-instrumentation-system-metrics](./opentelemetry-instrumentation-system-metrics) | psutil >= 5 | No | [opentelemetry-instrumentation-system-metrics](./opentelemetry-instrumentation-system-metrics) | psutil >= 5 | No

View File

@ -105,13 +105,16 @@ from wrapt import wrap_function_wrapper as _w
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.sqlalchemy.engine import ( from opentelemetry.instrumentation.sqlalchemy.engine import (
EngineTracer, EngineTracer,
_get_tracer,
_wrap_connect, _wrap_connect,
_wrap_create_async_engine, _wrap_create_async_engine,
_wrap_create_engine, _wrap_create_engine,
) )
from opentelemetry.instrumentation.sqlalchemy.package import _instruments from opentelemetry.instrumentation.sqlalchemy.package import _instruments
from opentelemetry.instrumentation.sqlalchemy.version import __version__
from opentelemetry.instrumentation.utils import unwrap from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import get_meter
from opentelemetry.semconv.metrics import MetricInstruments
from opentelemetry.trace import get_tracer
class SQLAlchemyInstrumentor(BaseInstrumentor): class SQLAlchemyInstrumentor(BaseInstrumentor):
@ -136,32 +139,47 @@ class SQLAlchemyInstrumentor(BaseInstrumentor):
An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise. An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise.
""" """
tracer_provider = kwargs.get("tracer_provider") tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, __version__, tracer_provider)
meter_provider = kwargs.get("meter_provider")
meter = get_meter(__name__, __version__, meter_provider)
connections_usage = meter.create_up_down_counter(
name=MetricInstruments.DB_CLIENT_CONNECTIONS_USAGE,
unit="connections",
description="The number of connections that are currently in state described by the state attribute.",
)
enable_commenter = kwargs.get("enable_commenter", False) enable_commenter = kwargs.get("enable_commenter", False)
_w( _w(
"sqlalchemy", "sqlalchemy",
"create_engine", "create_engine",
_wrap_create_engine(tracer_provider, enable_commenter), _wrap_create_engine(tracer, connections_usage, enable_commenter),
) )
_w( _w(
"sqlalchemy.engine", "sqlalchemy.engine",
"create_engine", "create_engine",
_wrap_create_engine(tracer_provider, enable_commenter), _wrap_create_engine(tracer, connections_usage, enable_commenter),
) )
_w( _w(
"sqlalchemy.engine.base", "sqlalchemy.engine.base",
"Engine.connect", "Engine.connect",
_wrap_connect(tracer_provider), _wrap_connect(tracer),
) )
if parse_version(sqlalchemy.__version__).release >= (1, 4): if parse_version(sqlalchemy.__version__).release >= (1, 4):
_w( _w(
"sqlalchemy.ext.asyncio", "sqlalchemy.ext.asyncio",
"create_async_engine", "create_async_engine",
_wrap_create_async_engine(tracer_provider, enable_commenter), _wrap_create_async_engine(
tracer, connections_usage, enable_commenter
),
) )
if kwargs.get("engine") is not None: if kwargs.get("engine") is not None:
return EngineTracer( return EngineTracer(
_get_tracer(tracer_provider), tracer,
kwargs.get("engine"), kwargs.get("engine"),
connections_usage,
kwargs.get("enable_commenter", False), kwargs.get("enable_commenter", False),
kwargs.get("commenter_options", {}), kwargs.get("commenter_options", {}),
) )
@ -170,8 +188,9 @@ class SQLAlchemyInstrumentor(BaseInstrumentor):
): ):
return [ return [
EngineTracer( EngineTracer(
_get_tracer(tracer_provider), tracer,
engine, engine,
connections_usage,
kwargs.get("enable_commenter", False), kwargs.get("enable_commenter", False),
kwargs.get("commenter_options", {}), kwargs.get("commenter_options", {}),
) )

View File

@ -20,9 +20,6 @@ from sqlalchemy.event import ( # pylint: disable=no-name-in-module
) )
from opentelemetry import trace from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy.package import (
_instrumenting_module_name,
)
from opentelemetry.instrumentation.sqlalchemy.version import __version__ from opentelemetry.instrumentation.sqlalchemy.version import __version__
from opentelemetry.instrumentation.sqlcommenter_utils import _add_sql_comment from opentelemetry.instrumentation.sqlcommenter_utils import _add_sql_comment
from opentelemetry.instrumentation.utils import _get_opentelemetry_values from opentelemetry.instrumentation.utils import _get_opentelemetry_values
@ -44,15 +41,9 @@ def _normalize_vendor(vendor):
return vendor return vendor
def _get_tracer(tracer_provider=None): def _wrap_create_async_engine(
return trace.get_tracer( tracer, connections_usage, enable_commenter=False
_instrumenting_module_name, ):
__version__,
tracer_provider=tracer_provider,
)
def _wrap_create_async_engine(tracer_provider=None, enable_commenter=False):
# pylint: disable=unused-argument # pylint: disable=unused-argument
def _wrap_create_async_engine_internal(func, module, args, kwargs): def _wrap_create_async_engine_internal(func, module, args, kwargs):
"""Trace the SQLAlchemy engine, creating an `EngineTracer` """Trace the SQLAlchemy engine, creating an `EngineTracer`
@ -60,33 +51,26 @@ def _wrap_create_async_engine(tracer_provider=None, enable_commenter=False):
""" """
engine = func(*args, **kwargs) engine = func(*args, **kwargs)
EngineTracer( EngineTracer(
_get_tracer(tracer_provider), engine.sync_engine, enable_commenter tracer, engine.sync_engine, connections_usage, enable_commenter
) )
return engine return engine
return _wrap_create_async_engine_internal return _wrap_create_async_engine_internal
def _wrap_create_engine(tracer_provider=None, enable_commenter=False): def _wrap_create_engine(tracer, connections_usage, enable_commenter=False):
# pylint: disable=unused-argument def _wrap_create_engine_internal(func, _module, args, kwargs):
def _wrap_create_engine_internal(func, module, args, kwargs):
"""Trace the SQLAlchemy engine, creating an `EngineTracer` """Trace the SQLAlchemy engine, creating an `EngineTracer`
object that will listen to SQLAlchemy events. object that will listen to SQLAlchemy events.
""" """
engine = func(*args, **kwargs) engine = func(*args, **kwargs)
EngineTracer(_get_tracer(tracer_provider), engine, enable_commenter) EngineTracer(tracer, engine, connections_usage, enable_commenter)
return engine return engine
return _wrap_create_engine_internal return _wrap_create_engine_internal
def _wrap_connect(tracer_provider=None): def _wrap_connect(tracer):
tracer = trace.get_tracer(
_instrumenting_module_name,
__version__,
tracer_provider=tracer_provider,
)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def _wrap_connect_internal(func, module, args, kwargs): def _wrap_connect_internal(func, module, args, kwargs):
with tracer.start_as_current_span( with tracer.start_as_current_span(
@ -107,10 +91,16 @@ class EngineTracer:
_remove_event_listener_params = [] _remove_event_listener_params = []
def __init__( def __init__(
self, tracer, engine, enable_commenter=False, commenter_options=None self,
tracer,
engine,
connections_usage,
enable_commenter=False,
commenter_options=None,
): ):
self.tracer = tracer self.tracer = tracer
self.engine = engine self.engine = engine
self.connections_usage = connections_usage
self.vendor = _normalize_vendor(engine.name) self.vendor = _normalize_vendor(engine.name)
self.enable_commenter = enable_commenter self.enable_commenter = enable_commenter
self.commenter_options = commenter_options if commenter_options else {} self.commenter_options = commenter_options if commenter_options else {}
@ -123,6 +113,49 @@ class EngineTracer:
engine, "after_cursor_execute", _after_cur_exec engine, "after_cursor_execute", _after_cur_exec
) )
self._register_event_listener(engine, "handle_error", _handle_error) self._register_event_listener(engine, "handle_error", _handle_error)
self._register_event_listener(engine, "connect", self._pool_connect)
self._register_event_listener(engine, "close", self._pool_close)
self._register_event_listener(engine, "checkin", self._pool_checkin)
self._register_event_listener(engine, "checkout", self._pool_checkout)
def _get_pool_name(self):
return self.engine.pool.logging_name or ""
def _add_idle_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
"state": "idle",
},
)
def _add_used_to_connection_usage(self, value):
self.connections_usage.add(
value,
attributes={
"pool.name": self._get_pool_name(),
"state": "used",
},
)
def _pool_connect(self, _dbapi_connection, _connection_record):
self._add_idle_to_connection_usage(1)
def _pool_close(self, _dbapi_connection, _connection_record):
self._add_idle_to_connection_usage(-1)
# Called when a connection returns to the pool.
def _pool_checkin(self, _dbapi_connection, _connection_record):
self._add_used_to_connection_usage(-1)
self._add_idle_to_connection_usage(1)
# Called when a connection is retrieved from the Pool.
def _pool_checkout(
self, _dbapi_connection, _connection_record, _connection_proxy
):
self._add_idle_to_connection_usage(-1)
self._add_used_to_connection_usage(1)
@classmethod @classmethod
def _register_event_listener(cls, target, identifier, func, *args, **kw): def _register_event_listener(cls, target, identifier, func, *args, **kw):
@ -153,9 +186,8 @@ class EngineTracer:
return self.vendor return self.vendor
return " ".join(parts) return " ".join(parts)
# pylint: disable=unused-argument
def _before_cur_exec( def _before_cur_exec(
self, conn, cursor, statement, params, context, executemany self, conn, cursor, statement, params, context, _executemany
): ):
attrs, found = _get_attributes_from_url(conn.engine.url) attrs, found = _get_attributes_from_url(conn.engine.url)
if not found: if not found:

View File

@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
_instrumenting_module_name = "opentelemetry.instrumentation.sqlalchemy"
_instruments = ("sqlalchemy",) _instruments = ("sqlalchemy",)
_supports_metrics = True

View File

@ -0,0 +1,159 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlalchemy
from sqlalchemy.pool import QueuePool
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.test.test_base import TestBase
class TestSqlalchemyMetricsInstrumentation(TestBase):
def setUp(self):
super().setUp()
SQLAlchemyInstrumentor().instrument(
tracer_provider=self.tracer_provider,
)
def tearDown(self):
super().tearDown()
SQLAlchemyInstrumentor().uninstrument()
def assert_pool_idle_used_expected(self, pool_name, idle, used):
metrics = self.get_sorted_metrics()
self.assertEqual(len(metrics), 1)
self.assert_metric_expected(
metrics[0],
[
self.create_number_data_point(
value=idle,
attributes={"pool.name": pool_name, "state": "idle"},
),
self.create_number_data_point(
value=used,
attributes={"pool.name": pool_name, "state": "used"},
),
],
)
def test_metrics_one_connection(self):
pool_name = "pool_test_name"
engine = sqlalchemy.create_engine(
"sqlite:///:memory:",
pool_size=5,
poolclass=QueuePool,
pool_logging_name=pool_name,
)
metrics = self.get_sorted_metrics()
self.assertEqual(len(metrics), 0)
with engine.connect():
self.assert_pool_idle_used_expected(
pool_name=pool_name, idle=0, used=1
)
# After the connection is closed
self.assert_pool_idle_used_expected(
pool_name=pool_name, idle=1, used=0
)
def test_metrics_without_pool_name(self):
pool_name = ""
engine = sqlalchemy.create_engine(
"sqlite:///:memory:",
pool_size=5,
poolclass=QueuePool,
)
metrics = self.get_sorted_metrics()
self.assertEqual(len(metrics), 0)
with engine.connect():
self.assert_pool_idle_used_expected(
pool_name=pool_name, idle=0, used=1
)
# After the connection is closed
self.assert_pool_idle_used_expected(
pool_name=pool_name, idle=1, used=0
)
def test_metrics_two_connections(self):
pool_name = "pool_test_name"
engine = sqlalchemy.create_engine(
"sqlite:///:memory:",
pool_size=5,
poolclass=QueuePool,
pool_logging_name=pool_name,
)
metrics = self.get_sorted_metrics()
self.assertEqual(len(metrics), 0)
with engine.connect():
with engine.connect():
self.assert_pool_idle_used_expected(pool_name, idle=0, used=2)
# After the first connection is closed
self.assert_pool_idle_used_expected(pool_name, idle=1, used=1)
# After the two connections are closed
self.assert_pool_idle_used_expected(pool_name, idle=2, used=0)
def test_metrics_connections(self):
pool_name = "pool_test_name"
engine = sqlalchemy.create_engine(
"sqlite:///:memory:",
pool_size=5,
poolclass=QueuePool,
pool_logging_name=pool_name,
)
metrics = self.get_sorted_metrics()
self.assertEqual(len(metrics), 0)
with engine.connect():
with engine.connect():
self.assert_pool_idle_used_expected(
pool_name=pool_name, idle=0, used=2
)
# After the first connection is closed
self.assert_pool_idle_used_expected(
pool_name=pool_name, idle=1, used=1
)
# Resume from idle to used
with engine.connect():
self.assert_pool_idle_used_expected(
pool_name=pool_name, idle=0, used=2
)
# After the two connections are closed
self.assert_pool_idle_used_expected(
pool_name=pool_name, idle=2, used=0
)
def test_metric_uninstrument(self):
SQLAlchemyInstrumentor().uninstrument()
engine = sqlalchemy.create_engine(
"sqlite:///:memory:",
poolclass=QueuePool,
)
engine.connect()
metrics = self.get_sorted_metrics()
self.assertEqual(len(metrics), 0)