Merge branch 'master' into dbindex-redis

This commit is contained in:
Leighton Chen
2020-11-24 09:32:30 -05:00
committed by GitHub
3 changed files with 89 additions and 129 deletions

View File

@ -2,6 +2,9 @@
## Unreleased
- Update asyncpg instrumentation to follow semantic conventions
([#188](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/188))
## Version 0.12b0
Released 2020-08-14

View File

@ -49,11 +49,30 @@ _APPLIED = "_opentelemetry_tracer"
def _hydrate_span_from_args(connection, query, parameters) -> dict:
span_attributes = {"db.type": "sql"}
"""Get network and database attributes from connection."""
span_attributes = {"db.system": "postgresql"}
# connection contains _params attribute which is a namedtuple ConnectionParameters.
# https://github.com/MagicStack/asyncpg/blob/master/asyncpg/connection.py#L68
params = getattr(connection, "_params", None)
span_attributes["db.instance"] = getattr(params, "database", None)
span_attributes["db.user"] = getattr(params, "user", None)
dbname = getattr(params, "database", None)
if dbname:
span_attributes["db.name"] = dbname
user = getattr(params, "user", None)
if user:
span_attributes["db.user"] = user
# connection contains _addr attribute which is either a host/port tuple, or unix socket string
# https://magicstack.github.io/asyncpg/current/_modules/asyncpg/connection.html
addr = getattr(connection, "_addr", None)
if isinstance(addr, tuple):
span_attributes["net.peer.name"] = addr[0]
span_attributes["net.peer.ip"] = addr[1]
span_attributes["net.transport"] = "IP.TCP"
elif isinstance(addr, str):
span_attributes["net.peer.name"] = addr
span_attributes["net.transport"] = "Unix"
if query is not None:
span_attributes["db.statement"] = query
@ -105,10 +124,10 @@ class AsyncPGInstrumentor(BaseInstrumentor):
tracer = getattr(asyncpg, _APPLIED)
exception = None
params = getattr(instance, "_params", {})
name = args[0] if args[0] else params.get("database", "postgresql")
with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:
with tracer.start_as_current_span(name, kind=SpanKind.CLIENT) as span:
if span.is_recording():
span_attributes = _hydrate_span_from_args(
instance,

View File

@ -7,11 +7,11 @@ from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.status import StatusCode
POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost")
POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432"))
POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests")
POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword")
POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser")
POSTGRES_HOST = os.getenv("POSTGRESQL_HOST", "localhost")
POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT", "5432"))
POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "opentelemetry-tests")
POSTGRES_PASSWORD = os.getenv("POSTGRESQL_PASSWORD", "testpassword")
POSTGRES_USER = os.getenv("POSTGRESQL_USER", "testuser")
def async_call(coro):
@ -41,34 +41,28 @@ class TestFunctionalAsyncPG(TestBase):
def tearDownClass(cls):
AsyncPGInstrumentor().uninstrument()
def check_span(self, span):
self.assertEqual(span.attributes["db.system"], "postgresql")
self.assertEqual(span.attributes["db.name"], POSTGRES_DB_NAME)
self.assertEqual(span.attributes["db.user"], POSTGRES_USER)
self.assertEqual(span.attributes["net.peer.name"], POSTGRES_HOST)
self.assertEqual(span.attributes["net.peer.ip"], POSTGRES_PORT)
def test_instrumented_execute_method_without_arguments(self, *_, **__):
async_call(self._connection.execute("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT 42;",
},
)
self.check_span(spans[0])
self.assertEqual(spans[0].name, "SELECT 42;")
self.assertEqual(spans[0].attributes["db.statement"], "SELECT 42;")
def test_instrumented_fetch_method_without_arguments(self, *_, **__):
async_call(self._connection.fetch("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT 42;",
},
)
self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "SELECT 42;")
def test_instrumented_transaction_method(self, *_, **__):
async def _transaction_execute():
@ -79,35 +73,16 @@ class TestFunctionalAsyncPG(TestBase):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(3, len(spans))
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "BEGIN;",
},
spans[0].attributes,
)
self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "BEGIN;")
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "SELECT 42;",
},
spans[1].attributes,
)
self.check_span(spans[1])
self.assertEqual(spans[1].attributes["db.statement"], "SELECT 42;")
self.assertIs(StatusCode.UNSET, spans[1].status.status_code)
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "COMMIT;",
},
spans[2].attributes,
)
self.check_span(spans[2])
self.assertEqual(spans[2].attributes["db.statement"], "COMMIT;")
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
def test_instrumented_failed_transaction_method(self, *_, **__):
@ -120,37 +95,19 @@ class TestFunctionalAsyncPG(TestBase):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(3, len(spans))
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "BEGIN;",
},
spans[0].attributes,
)
self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "BEGIN;")
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.check_span(spans[1])
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "SELECT 42::uuid;",
},
spans[1].attributes,
)
self.assertEqual(
StatusCode.ERROR, spans[1].status.status_code,
)
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "ROLLBACK;",
},
spans[2].attributes,
spans[1].attributes["db.statement"], "SELECT 42::uuid;"
)
self.assertEqual(StatusCode.ERROR, spans[1].status.status_code)
self.check_span(spans[2])
self.assertEqual(spans[2].attributes["db.statement"], "ROLLBACK;")
self.assertIs(StatusCode.UNSET, spans[2].status.status_code)
def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
@ -158,19 +115,8 @@ class TestFunctionalAsyncPG(TestBase):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
# This shouldn't be set because we don't capture parameters by
# default
#
# "db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)
self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;")
class TestFunctionalAsyncPG_CaptureParameters(TestBase):
@ -197,50 +143,46 @@ class TestFunctionalAsyncPG_CaptureParameters(TestBase):
def tearDownClass(cls):
AsyncPGInstrumentor().uninstrument()
def check_span(self, span):
self.assertEqual(span.attributes["db.system"], "postgresql")
self.assertEqual(span.attributes["db.name"], POSTGRES_DB_NAME)
self.assertEqual(span.attributes["db.user"], POSTGRES_USER)
self.assertEqual(span.attributes["net.peer.name"], POSTGRES_HOST)
self.assertEqual(span.attributes["net.peer.ip"], POSTGRES_PORT)
def test_instrumented_execute_method_with_arguments(self, *_, **__):
async_call(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.check_span(spans[0])
self.assertEqual(spans[0].name, "SELECT $1;")
self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;")
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
spans[0].attributes["db.statement.parameters"], "('1',)"
)
def test_instrumented_fetch_method_with_arguments(self, *_, **__):
async_call(self._connection.fetch("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;")
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
spans[0].attributes["db.statement.parameters"], "('1',)"
)
def test_instrumented_executemany_method_with_arguments(self, *_, **__):
async_call(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "SELECT $1;")
self.assertEqual(
{
"db.type": "sql",
"db.statement": "SELECT $1;",
"db.statement.parameters": "([['1'], ['2']],)",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
},
spans[0].attributes,
spans[0].attributes["db.statement.parameters"], "([['1'], ['2']],)"
)
def test_instrumented_execute_interface_error_method(self, *_, **__):
@ -248,13 +190,9 @@ class TestFunctionalAsyncPG_CaptureParameters(TestBase):
async_call(self._connection.execute("SELECT 42;", 1, 2, 3))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.check_span(spans[0])
self.assertEqual(spans[0].attributes["db.statement"], "SELECT 42;")
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.statement.parameters": "(1, 2, 3)",
"db.statement": "SELECT 42;",
},
spans[0].attributes["db.statement.parameters"], "(1, 2, 3)"
)