mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-08-02 19:47:17 +08:00
@ -5,6 +5,7 @@ import pytest
|
||||
from asyncpg import Connection, Record, cursor
|
||||
from wrapt import ObjectProxy
|
||||
|
||||
from opentelemetry import trace as trace_api
|
||||
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
|
||||
from opentelemetry.test.test_base import TestBase
|
||||
|
||||
@ -105,3 +106,36 @@ class TestAsyncPGInstrumentation(TestBase):
|
||||
spans = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans), 2)
|
||||
self.assertEqual([span.status.is_ok for span in spans], [True, True])
|
||||
|
||||
def test_no_op_tracer_provider(self):
|
||||
AsyncPGInstrumentor().uninstrument()
|
||||
AsyncPGInstrumentor().instrument(
|
||||
tracer_provider=trace_api.NoOpTracerProvider()
|
||||
)
|
||||
|
||||
# Mock out all interaction with postgres
|
||||
async def bind_mock(*args, **kwargs):
|
||||
return []
|
||||
|
||||
async def exec_mock(*args, **kwargs):
|
||||
return [], None, True
|
||||
|
||||
conn = mock.Mock()
|
||||
conn.is_closed = lambda: False
|
||||
|
||||
conn._protocol = mock.Mock()
|
||||
conn._protocol.bind = bind_mock
|
||||
conn._protocol.execute = exec_mock
|
||||
conn._protocol.bind_execute = exec_mock
|
||||
conn._protocol.close_portal = bind_mock
|
||||
|
||||
state = mock.Mock()
|
||||
state.closed = False
|
||||
|
||||
# init the cursor and fetch a single record
|
||||
crs = cursor.Cursor(conn, "SELECT * FROM test", state, [], Record)
|
||||
asyncio.run(crs._init(1))
|
||||
asyncio.run(crs.fetch(1))
|
||||
|
||||
spans = self.memory_exporter.get_finished_spans()
|
||||
self.assertEqual(len(spans), 0)
|
||||
|
Reference in New Issue
Block a user