mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-29 21:23:55 +08:00
feat(instrumentation-dbapi): add experimental sql commenter capability (#908)
* feat(instrumentation-dbapi): add experimental sql commenter capability * Update instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com> * Fix lint * Add CHANGELOG entry * Fix lint * Fix lint again Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com>
This commit is contained in:
@ -7,14 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.9.1-0.28b1...HEAD)
|
||||
|
||||
- `opentelemetry-instrumentation-wsgi` WSGI: Conditionally create SERVER spans
|
||||
([#903](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/903))
|
||||
### Added
|
||||
|
||||
- `opentelemetry-instrumentation-dbapi` add experimental sql commenter capability
|
||||
([#908](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/908))
|
||||
|
||||
### Fixed
|
||||
|
||||
- `opentelemetry-instrumentation-logging` retrieves service name defensively.
|
||||
([#890](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/890))
|
||||
|
||||
- `opentelemetry-instrumentation-wsgi` WSGI: Conditionally create SERVER spans
|
||||
([#903](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/903))
|
||||
|
||||
## [1.9.1-0.28b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.9.1-0.28b1) - 2022-01-29
|
||||
|
||||
|
||||
|
@ -45,11 +45,11 @@ import wrapt
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.instrumentation.dbapi.version import __version__
|
||||
from opentelemetry.instrumentation.utils import unwrap
|
||||
from opentelemetry.instrumentation.utils import _generate_sql_comment, unwrap
|
||||
from opentelemetry.semconv.trace import SpanAttributes
|
||||
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
|
||||
from opentelemetry.trace import Span, SpanKind, TracerProvider, get_tracer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def trace_integration(
|
||||
@ -59,6 +59,7 @@ def trace_integration(
|
||||
connection_attributes: typing.Dict = None,
|
||||
tracer_provider: typing.Optional[TracerProvider] = None,
|
||||
capture_parameters: bool = False,
|
||||
enable_commenter: bool = False,
|
||||
db_api_integration_factory=None,
|
||||
):
|
||||
"""Integrate with DB API library.
|
||||
@ -84,6 +85,7 @@ def trace_integration(
|
||||
version=__version__,
|
||||
tracer_provider=tracer_provider,
|
||||
capture_parameters=capture_parameters,
|
||||
enable_commenter=enable_commenter,
|
||||
db_api_integration_factory=db_api_integration_factory,
|
||||
)
|
||||
|
||||
@ -97,6 +99,7 @@ def wrap_connect(
|
||||
version: str = "",
|
||||
tracer_provider: typing.Optional[TracerProvider] = None,
|
||||
capture_parameters: bool = False,
|
||||
enable_commenter: bool = False,
|
||||
db_api_integration_factory=None,
|
||||
):
|
||||
"""Integrate with DB API library.
|
||||
@ -132,6 +135,7 @@ def wrap_connect(
|
||||
version=version,
|
||||
tracer_provider=tracer_provider,
|
||||
capture_parameters=capture_parameters,
|
||||
enable_commenter=enable_commenter,
|
||||
)
|
||||
return db_integration.wrapped_connection(wrapped, args, kwargs)
|
||||
|
||||
@ -140,7 +144,7 @@ def wrap_connect(
|
||||
connect_module, connect_method_name, wrap_connect_
|
||||
)
|
||||
except Exception as ex: # pylint: disable=broad-except
|
||||
logger.warning("Failed to integrate with DB API. %s", str(ex))
|
||||
_logger.warning("Failed to integrate with DB API. %s", str(ex))
|
||||
|
||||
|
||||
def unwrap_connect(
|
||||
@ -163,7 +167,8 @@ def instrument_connection(
|
||||
connection_attributes: typing.Dict = None,
|
||||
version: str = "",
|
||||
tracer_provider: typing.Optional[TracerProvider] = None,
|
||||
capture_parameters=False,
|
||||
capture_parameters: bool = False,
|
||||
enable_commenter: bool = False,
|
||||
):
|
||||
"""Enable instrumentation in a database connection.
|
||||
|
||||
@ -180,7 +185,7 @@ def instrument_connection(
|
||||
An instrumented connection.
|
||||
"""
|
||||
if isinstance(connection, wrapt.ObjectProxy):
|
||||
logger.warning("Connection already instrumented")
|
||||
_logger.warning("Connection already instrumented")
|
||||
return connection
|
||||
|
||||
db_integration = DatabaseApiIntegration(
|
||||
@ -190,6 +195,7 @@ def instrument_connection(
|
||||
version=version,
|
||||
tracer_provider=tracer_provider,
|
||||
capture_parameters=capture_parameters,
|
||||
enable_commenter=enable_commenter,
|
||||
)
|
||||
db_integration.get_connection_attributes(connection)
|
||||
return get_traced_connection_proxy(connection, db_integration)
|
||||
@ -207,7 +213,7 @@ def uninstrument_connection(connection):
|
||||
if isinstance(connection, wrapt.ObjectProxy):
|
||||
return connection.__wrapped__
|
||||
|
||||
logger.warning("Connection is not instrumented")
|
||||
_logger.warning("Connection is not instrumented")
|
||||
return connection
|
||||
|
||||
|
||||
@ -220,6 +226,7 @@ class DatabaseApiIntegration:
|
||||
version: str = "",
|
||||
tracer_provider: typing.Optional[TracerProvider] = None,
|
||||
capture_parameters: bool = False,
|
||||
enable_commenter: bool = False,
|
||||
):
|
||||
self.connection_attributes = connection_attributes
|
||||
if self.connection_attributes is None:
|
||||
@ -237,6 +244,7 @@ class DatabaseApiIntegration:
|
||||
tracer_provider=tracer_provider,
|
||||
)
|
||||
self.capture_parameters = capture_parameters
|
||||
self.enable_commenter = enable_commenter
|
||||
self.database_system = database_system
|
||||
self.connection_props = {}
|
||||
self.span_attributes = {}
|
||||
@ -313,8 +321,9 @@ def get_traced_connection_proxy(
|
||||
|
||||
|
||||
class CursorTracer:
|
||||
def __init__(self, db_api_integration: DatabaseApiIntegration):
|
||||
def __init__(self, db_api_integration: DatabaseApiIntegration) -> None:
|
||||
self._db_api_integration = db_api_integration
|
||||
self._commenter_enabled = self._db_api_integration.enable_commenter
|
||||
|
||||
def _populate_span(
|
||||
self,
|
||||
@ -355,6 +364,22 @@ class CursorTracer:
|
||||
return statement.decode("utf8", "replace")
|
||||
return statement
|
||||
|
||||
@staticmethod
|
||||
def _generate_comment(span: Span) -> str:
|
||||
span_context = span.get_span_context()
|
||||
meta = {}
|
||||
if span_context.is_valid:
|
||||
meta.update(
|
||||
{
|
||||
"trace_id": span_context.trace_id,
|
||||
"span_id": span_context.span_id,
|
||||
"trace_flags": span_context.trace_flags,
|
||||
"trace_state": span_context.trace_state.to_header(),
|
||||
}
|
||||
)
|
||||
# TODO(schekuri): revisit to enrich with info such as route, db_driver etc...
|
||||
return _generate_sql_comment(**meta)
|
||||
|
||||
def traced_execution(
|
||||
self,
|
||||
cursor,
|
||||
@ -374,6 +399,18 @@ class CursorTracer:
|
||||
name, kind=SpanKind.CLIENT
|
||||
) as span:
|
||||
self._populate_span(span, cursor, *args)
|
||||
if args and self._commenter_enabled:
|
||||
try:
|
||||
comment = self._generate_comment(span)
|
||||
if isinstance(args[0], bytes):
|
||||
comment = comment.encode("utf8")
|
||||
args_list = list(args)
|
||||
args_list[0] += comment
|
||||
args = tuple(args_list)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
_logger.exception(
|
||||
"Exception while generating sql comment: %s", exc
|
||||
)
|
||||
return query_method(*args, **kwargs)
|
||||
|
||||
|
||||
|
@ -228,6 +228,21 @@ class TestDBApiIntegration(TestBase):
|
||||
span.attributes[SpanAttributes.DB_STATEMENT], "Test query"
|
||||
)
|
||||
|
||||
def test_executemany_comment(self):
|
||||
db_integration = dbapi.DatabaseApiIntegration(
|
||||
"testname", "testcomponent", enable_commenter=True
|
||||
)
|
||||
mock_connection = db_integration.wrapped_connection(
|
||||
mock_connect, {}, {}
|
||||
)
|
||||
cursor = mock_connection.cursor()
|
||||
cursor.executemany("Test query")
|
||||
spans_list = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans_list), 1)
|
||||
span = spans_list[0]
|
||||
comment = dbapi.CursorTracer._generate_comment(span)
|
||||
self.assertIn(comment, cursor.query)
|
||||
|
||||
def test_callproc(self):
|
||||
db_integration = dbapi.DatabaseApiIntegration(
|
||||
"testname", "testcomponent"
|
||||
@ -308,6 +323,10 @@ class MockConnection:
|
||||
|
||||
|
||||
class MockCursor:
|
||||
def __init__(self) -> None:
|
||||
self.query = ""
|
||||
self.params = None
|
||||
|
||||
# pylint: disable=unused-argument, no-self-use
|
||||
def execute(self, query, params=None, throw_exception=False):
|
||||
if throw_exception:
|
||||
@ -317,6 +336,8 @@ class MockCursor:
|
||||
def executemany(self, query, params=None, throw_exception=False):
|
||||
if throw_exception:
|
||||
raise Exception("Test Exception")
|
||||
self.query = query
|
||||
self.params = params
|
||||
|
||||
# pylint: disable=unused-argument, no-self-use
|
||||
def callproc(self, query, params=None, throw_exception=False):
|
||||
|
@ -12,6 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import urllib.parse
|
||||
from typing import Dict, Sequence
|
||||
|
||||
from wrapt import ObjectProxy
|
||||
@ -115,3 +116,38 @@ def _start_internal_or_server_span(
|
||||
attributes=attributes,
|
||||
)
|
||||
return span, token
|
||||
|
||||
|
||||
_KEY_VALUE_DELIMITER = ","
|
||||
|
||||
|
||||
def _generate_sql_comment(**meta):
|
||||
"""
|
||||
Return a SQL comment with comma delimited key=value pairs created from
|
||||
**meta kwargs.
|
||||
"""
|
||||
if not meta: # No entries added.
|
||||
return ""
|
||||
|
||||
# Sort the keywords to ensure that caching works and that testing is
|
||||
# deterministic. It eases visual inspection as well.
|
||||
return (
|
||||
" /*"
|
||||
+ _KEY_VALUE_DELIMITER.join(
|
||||
"{}={!r}".format(_url_quote(key), _url_quote(value))
|
||||
for key, value in sorted(meta.items())
|
||||
if value is not None
|
||||
)
|
||||
+ "*/"
|
||||
)
|
||||
|
||||
|
||||
def _url_quote(s): # pylint: disable=invalid-name
|
||||
if not isinstance(s, (str, bytes)):
|
||||
return s
|
||||
quoted = urllib.parse.quote(s)
|
||||
# Since SQL uses '%' as a keyword, '%' is a by-product of url quoting
|
||||
# e.g. foo,bar --> foo%2Cbar
|
||||
# thus in our quoting, we need to escape it too to finally give
|
||||
# foo,bar --> foo%%2Cbar
|
||||
return quoted.replace("%", "%%")
|
||||
|
Reference in New Issue
Block a user