mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-28 12:43:39 +08:00
mongo db - fix db statement capturing (#1512)
This commit is contained in:
@ -27,7 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
([#1555](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1555))
|
||||
- `opentelemetry-instrumentation-asgi` Fix keys() in class ASGIGetter to correctly fetch values from carrier headers.
|
||||
([#1435](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1435))
|
||||
|
||||
- mongo db - fix db statement capturing
|
||||
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))
|
||||
|
||||
## Version 1.15.0/0.36b0 (2022-12-10)
|
||||
|
||||
|
@ -46,6 +46,7 @@ this function signature is: def response_hook(span: Span, event: CommandSucceed
|
||||
failed_hook (Callable) -
|
||||
a function with extra user-defined logic to be performed after the query returns with a failed response
|
||||
this function signature is: def failed_hook(span: Span, event: CommandFailedEvent) -> None
|
||||
capture_statement (bool) - an optional value to enable capturing the database statement that is being executed
|
||||
|
||||
for example:
|
||||
|
||||
@ -81,6 +82,9 @@ from pymongo import monitoring
|
||||
from opentelemetry import context
|
||||
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
||||
from opentelemetry.instrumentation.pymongo.package import _instruments
|
||||
from opentelemetry.instrumentation.pymongo.utils import (
|
||||
COMMAND_TO_ATTRIBUTE_MAPPING,
|
||||
)
|
||||
from opentelemetry.instrumentation.pymongo.version import __version__
|
||||
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
|
||||
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
|
||||
@ -106,6 +110,7 @@ class CommandTracer(monitoring.CommandListener):
|
||||
request_hook: RequestHookT = dummy_callback,
|
||||
response_hook: ResponseHookT = dummy_callback,
|
||||
failed_hook: FailedHookT = dummy_callback,
|
||||
capture_statement: bool = False,
|
||||
):
|
||||
self._tracer = tracer
|
||||
self._span_dict = {}
|
||||
@ -113,6 +118,7 @@ class CommandTracer(monitoring.CommandListener):
|
||||
self.start_hook = request_hook
|
||||
self.success_hook = response_hook
|
||||
self.failed_hook = failed_hook
|
||||
self.capture_statement = capture_statement
|
||||
|
||||
def started(self, event: monitoring.CommandStartedEvent):
|
||||
"""Method to handle a pymongo CommandStartedEvent"""
|
||||
@ -120,16 +126,13 @@ class CommandTracer(monitoring.CommandListener):
|
||||
_SUPPRESS_INSTRUMENTATION_KEY
|
||||
):
|
||||
return
|
||||
command = event.command.get(event.command_name, "")
|
||||
name = event.database_name
|
||||
name += "." + event.command_name
|
||||
statement = event.command_name
|
||||
if command:
|
||||
statement += " " + str(command)
|
||||
command_name = event.command_name
|
||||
span_name = f"{event.database_name}.{command_name}"
|
||||
statement = self._get_statement_by_command_name(command_name, event)
|
||||
collection = event.command.get(event.command_name)
|
||||
|
||||
try:
|
||||
span = self._tracer.start_span(name, kind=SpanKind.CLIENT)
|
||||
span = self._tracer.start_span(span_name, kind=SpanKind.CLIENT)
|
||||
if span.is_recording():
|
||||
span.set_attribute(
|
||||
SpanAttributes.DB_SYSTEM, DbSystemValues.MONGODB.value
|
||||
@ -196,6 +199,14 @@ class CommandTracer(monitoring.CommandListener):
|
||||
def _pop_span(self, event):
|
||||
return self._span_dict.pop(_get_span_dict_key(event), None)
|
||||
|
||||
def _get_statement_by_command_name(self, command_name, event):
|
||||
statement = command_name
|
||||
command_attribute = COMMAND_TO_ATTRIBUTE_MAPPING.get(command_name)
|
||||
command = event.command.get(command_attribute)
|
||||
if command and self.capture_statement:
|
||||
statement += " " + str(command)
|
||||
return statement
|
||||
|
||||
|
||||
def _get_span_dict_key(event):
|
||||
if event.connection_id is not None:
|
||||
@ -228,6 +239,7 @@ class PymongoInstrumentor(BaseInstrumentor):
|
||||
request_hook = kwargs.get("request_hook", dummy_callback)
|
||||
response_hook = kwargs.get("response_hook", dummy_callback)
|
||||
failed_hook = kwargs.get("failed_hook", dummy_callback)
|
||||
capture_statement = kwargs.get("capture_statement")
|
||||
# Create and register a CommandTracer only the first time
|
||||
if self._commandtracer_instance is None:
|
||||
tracer = get_tracer(__name__, __version__, tracer_provider)
|
||||
@ -237,6 +249,7 @@ class PymongoInstrumentor(BaseInstrumentor):
|
||||
request_hook=request_hook,
|
||||
response_hook=response_hook,
|
||||
failed_hook=failed_hook,
|
||||
capture_statement=capture_statement,
|
||||
)
|
||||
monitoring.register(self._commandtracer_instance)
|
||||
# If already created, just enable it
|
||||
|
@ -0,0 +1,20 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
COMMAND_TO_ATTRIBUTE_MAPPING = {
|
||||
"insert": "documents",
|
||||
"delete": "deletes",
|
||||
"update": "updates",
|
||||
"find": "filter",
|
||||
}
|
@ -64,7 +64,7 @@ class TestPymongo(TestBase):
|
||||
span.attributes[SpanAttributes.DB_NAME], "database_name"
|
||||
)
|
||||
self.assertEqual(
|
||||
span.attributes[SpanAttributes.DB_STATEMENT], "command_name find"
|
||||
span.attributes[SpanAttributes.DB_STATEMENT], "command_name"
|
||||
)
|
||||
self.assertEqual(
|
||||
span.attributes[SpanAttributes.NET_PEER_NAME], "test.com"
|
||||
|
@ -34,6 +34,7 @@ class TestFunctionalPymongo(TestBase):
|
||||
self.instrumentor = PymongoInstrumentor()
|
||||
self.instrumentor.instrument()
|
||||
self.instrumentor._commandtracer_instance._tracer = self._tracer
|
||||
self.instrumentor._commandtracer_instance.capture_statement = True
|
||||
client = MongoClient(
|
||||
MONGODB_HOST, MONGODB_PORT, serverSelectionTimeoutMS=2000
|
||||
)
|
||||
@ -44,7 +45,7 @@ class TestFunctionalPymongo(TestBase):
|
||||
self.instrumentor.uninstrument()
|
||||
super().tearDown()
|
||||
|
||||
def validate_spans(self):
|
||||
def validate_spans(self, expected_db_statement):
|
||||
spans = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans), 2)
|
||||
for span in spans:
|
||||
@ -72,14 +73,24 @@ class TestFunctionalPymongo(TestBase):
|
||||
pymongo_span.attributes[SpanAttributes.DB_MONGODB_COLLECTION],
|
||||
MONGODB_COLLECTION_NAME,
|
||||
)
|
||||
self.assertEqual(
|
||||
pymongo_span.attributes[SpanAttributes.DB_STATEMENT],
|
||||
expected_db_statement,
|
||||
)
|
||||
|
||||
def test_insert(self):
|
||||
"""Should create a child span for insert"""
|
||||
with self._tracer.start_as_current_span("rootSpan"):
|
||||
self._collection.insert_one(
|
||||
insert_result = self._collection.insert_one(
|
||||
{"name": "testName", "value": "testValue"}
|
||||
)
|
||||
self.validate_spans()
|
||||
insert_result_id = insert_result.inserted_id
|
||||
|
||||
expected_db_statement = (
|
||||
f"insert [{{'name': 'testName', 'value': 'testValue', '_id': "
|
||||
f"ObjectId('{insert_result_id}')}}]"
|
||||
)
|
||||
self.validate_spans(expected_db_statement)
|
||||
|
||||
def test_update(self):
|
||||
"""Should create a child span for update"""
|
||||
@ -87,19 +98,40 @@ class TestFunctionalPymongo(TestBase):
|
||||
self._collection.update_one(
|
||||
{"name": "testName"}, {"$set": {"value": "someOtherValue"}}
|
||||
)
|
||||
self.validate_spans()
|
||||
|
||||
expected_db_statement = (
|
||||
"update [SON([('q', {'name': 'testName'}), ('u', "
|
||||
"{'$set': {'value': 'someOtherValue'}}), ('multi', False), ('upsert', False)])]"
|
||||
)
|
||||
self.validate_spans(expected_db_statement)
|
||||
|
||||
def test_find(self):
|
||||
"""Should create a child span for find"""
|
||||
with self._tracer.start_as_current_span("rootSpan"):
|
||||
self._collection.find_one()
|
||||
self.validate_spans()
|
||||
self._collection.find_one({"name": "testName"})
|
||||
|
||||
expected_db_statement = "find {'name': 'testName'}"
|
||||
self.validate_spans(expected_db_statement)
|
||||
|
||||
def test_delete(self):
|
||||
"""Should create a child span for delete"""
|
||||
with self._tracer.start_as_current_span("rootSpan"):
|
||||
self._collection.delete_one({"name": "testName"})
|
||||
self.validate_spans()
|
||||
|
||||
expected_db_statement = (
|
||||
"delete [SON([('q', {'name': 'testName'}), ('limit', 1)])]"
|
||||
)
|
||||
self.validate_spans(expected_db_statement)
|
||||
|
||||
def test_find_without_capture_statement(self):
|
||||
"""Should create a child span for find"""
|
||||
self.instrumentor._commandtracer_instance.capture_statement = False
|
||||
|
||||
with self._tracer.start_as_current_span("rootSpan"):
|
||||
self._collection.find_one({"name": "testName"})
|
||||
|
||||
expected_db_statement = "find"
|
||||
self.validate_spans(expected_db_statement)
|
||||
|
||||
def test_uninstrument(self):
|
||||
# check that integration is working
|
||||
|
Reference in New Issue
Block a user