Rename remaining framework packages from "ext" to "instrumentation" (#969)

This commit is contained in:
Leighton Chen
2020-08-04 19:10:51 -07:00
committed by GitHub
commit 7d4d2ce4dc
18 changed files with 2370 additions and 0 deletions

View File

@ -0,0 +1,277 @@
import asyncio
import os
import asyncpg
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.status import StatusCanonicalCode
POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost")
POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432"))
POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests")
POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword")
POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser")
def _await(coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
class TestFunctionalAsyncPG(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
AsyncPGInstrumentor().instrument(tracer_provider=cls.tracer_provider)
cls._connection = _await(
asyncpg.connect(
database=POSTGRES_DB_NAME,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
)
@classmethod
def tearDownClass(cls):
AsyncPGInstrumentor().uninstrument()
def test_instrumented_execute_method_without_arguments(self, *_, **__):
_await(self._connection.execute("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT 42;",
},
)
def test_instrumented_fetch_method_without_arguments(self, *_, **__):
_await(self._connection.fetch("SELECT 42;"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT 42;",
},
)
def test_instrumented_transaction_method(self, *_, **__):
async def _transaction_execute():
async with self._connection.transaction():
await self._connection.execute("SELECT 42;")
_await(_transaction_execute())
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(3, len(spans))
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "BEGIN;",
},
spans[0].attributes,
)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "SELECT 42;",
},
spans[1].attributes,
)
self.assertEqual(
StatusCanonicalCode.OK, spans[1].status.canonical_code
)
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "COMMIT;",
},
spans[2].attributes,
)
self.assertEqual(
StatusCanonicalCode.OK, spans[2].status.canonical_code
)
def test_instrumented_failed_transaction_method(self, *_, **__):
async def _transaction_execute():
async with self._connection.transaction():
await self._connection.execute("SELECT 42::uuid;")
with self.assertRaises(asyncpg.CannotCoerceError):
_await(_transaction_execute())
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(3, len(spans))
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "BEGIN;",
},
spans[0].attributes,
)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "SELECT 42::uuid;",
},
spans[1].attributes,
)
self.assertEqual(
StatusCanonicalCode.INVALID_ARGUMENT,
spans[1].status.canonical_code,
)
self.assertEqual(
{
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.type": "sql",
"db.statement": "ROLLBACK;",
},
spans[2].attributes,
)
self.assertEqual(
StatusCanonicalCode.OK, spans[2].status.canonical_code
)
def test_instrumented_method_doesnt_capture_parameters(self, *_, **__):
_await(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
# This shouldn't be set because we don't capture parameters by
# default
#
# "db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)
class TestFunctionalAsyncPG_CaptureParameters(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
AsyncPGInstrumentor(capture_parameters=True).instrument(
tracer_provider=cls.tracer_provider
)
cls._connection = _await(
asyncpg.connect(
database=POSTGRES_DB_NAME,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
)
@classmethod
def tearDownClass(cls):
AsyncPGInstrumentor().uninstrument()
def test_instrumented_execute_method_with_arguments(self, *_, **__):
_await(self._connection.execute("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
StatusCanonicalCode.OK, spans[0].status.canonical_code
)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)
def test_instrumented_fetch_method_with_arguments(self, *_, **__):
_await(self._connection.fetch("SELECT $1;", "1"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.user": POSTGRES_USER,
"db.statement.parameters": "('1',)",
"db.instance": POSTGRES_DB_NAME,
"db.statement": "SELECT $1;",
},
)
def test_instrumented_executemany_method_with_arguments(self, *_, **__):
_await(self._connection.executemany("SELECT $1;", [["1"], ["2"]]))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
{
"db.type": "sql",
"db.statement": "SELECT $1;",
"db.statement.parameters": "([['1'], ['2']],)",
"db.user": POSTGRES_USER,
"db.instance": POSTGRES_DB_NAME,
},
spans[0].attributes,
)
def test_instrumented_execute_interface_error_method(self, *_, **__):
with self.assertRaises(asyncpg.InterfaceError):
_await(self._connection.execute("SELECT 42;", 1, 2, 3))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(
spans[0].attributes,
{
"db.type": "sql",
"db.instance": POSTGRES_DB_NAME,
"db.user": POSTGRES_USER,
"db.statement.parameters": "(1, 2, 3)",
"db.statement": "SELECT 42;",
},
)

View File

@ -0,0 +1,92 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from functools import wraps
import pytest
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk.trace import TracerProvider, export
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT ", "6379"))
REDIS_URL = "redis://{host}:{port}".format(host=REDIS_HOST, port=REDIS_PORT)
BROKER_URL = "{redis}/{db}".format(redis=REDIS_URL, db=0)
BACKEND_URL = "{redis}/{db}".format(redis=REDIS_URL, db=1)
@pytest.fixture(scope="session")
def celery_config():
return {"broker_url": BROKER_URL, "result_backend": BACKEND_URL}
@pytest.fixture
def celery_worker_parameters():
return {
# See https://github.com/celery/celery/issues/3642#issuecomment-457773294
"perform_ping_check": False,
}
@pytest.fixture(autouse=True)
def patch_celery_app(celery_app, celery_worker):
"""Patch task decorator on app fixture to reload worker"""
# See https://github.com/celery/celery/issues/3642
def wrap_task(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
result = fn(*args, **kwargs)
celery_worker.reload()
return result
return wrapper
celery_app.task = wrap_task(celery_app.task)
@pytest.fixture(autouse=True)
def instrument(tracer_provider, memory_exporter):
CeleryInstrumentor().instrument(tracer_provider=tracer_provider)
memory_exporter.clear()
yield
CeleryInstrumentor().uninstrument()
@pytest.fixture(scope="session")
def tracer_provider(memory_exporter):
original_tracer_provider = trace_api.get_tracer_provider()
tracer_provider = TracerProvider()
span_processor = export.SimpleExportSpanProcessor(memory_exporter)
tracer_provider.add_span_processor(span_processor)
trace_api.set_tracer_provider(tracer_provider)
yield tracer_provider
trace_api.set_tracer_provider(original_tracer_provider)
@pytest.fixture(scope="session")
def memory_exporter():
memory_exporter = InMemorySpanExporter()
return memory_exporter

View File

@ -0,0 +1,532 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import celery
import pytest
from celery import signals
from celery.exceptions import Retry
import opentelemetry.instrumentation.celery
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk import resources
from opentelemetry.sdk.trace import TracerProvider, export
from opentelemetry.trace.status import StatusCanonicalCode
# set a high timeout for async executions due to issues in CI
ASYNC_GET_TIMEOUT = 120
class MyException(Exception):
pass
@pytest.mark.skip(reason="inconsistent test results")
def test_instrumentation_info(celery_app, memory_exporter):
@celery_app.task
def fn_task():
return 42
result = fn_task.apply_async()
assert result.get(timeout=ASYNC_GET_TIMEOUT) == 42
spans = memory_exporter.get_finished_spans()
assert len(spans) == 2
async_span, run_span = spans
assert (
async_span.instrumentation_info.name
== opentelemetry.instrumentation.celery.__name__
)
assert (
async_span.instrumentation_info.version
== opentelemetry.instrumentation.celery.__version__
)
assert (
run_span.instrumentation_info.name
== opentelemetry.instrumentation.celery.__name__
)
assert (
run_span.instrumentation_info.version
== opentelemetry.instrumentation.celery.__version__
)
def test_fn_task_run(celery_app, memory_exporter):
@celery_app.task
def fn_task():
return 42
t = fn_task.run()
assert t == 42
spans = memory_exporter.get_finished_spans()
assert len(spans) == 0
def test_fn_task_call(celery_app, memory_exporter):
@celery_app.task
def fn_task():
return 42
t = fn_task()
assert t == 42
spans = memory_exporter.get_finished_spans()
assert len(spans) == 0
def test_fn_task_apply(celery_app, memory_exporter):
@celery_app.task
def fn_task():
return 42
t = fn_task.apply()
assert t.successful() is True
assert t.result == 42
spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.status.is_ok is True
assert span.name == "test_celery_functional.fn_task"
assert span.attributes.get("messaging.message_id") == t.task_id
assert (
span.attributes.get("celery.task_name")
== "test_celery_functional.fn_task"
)
assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "SUCCESS"
def test_fn_task_apply_bind(celery_app, memory_exporter):
@celery_app.task(bind=True)
def fn_task(self):
return self
t = fn_task.apply()
assert t.successful() is True
assert "fn_task" in t.result.name
spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.status.is_ok is True
assert span.name == "test_celery_functional.fn_task"
assert span.attributes.get("messaging.message_id") == t.task_id
assert (
span.attributes.get("celery.task_name")
== "test_celery_functional.fn_task"
)
assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "SUCCESS"
@pytest.mark.skip(reason="inconsistent test results")
def test_fn_task_apply_async(celery_app, memory_exporter):
@celery_app.task
def fn_task_parameters(user, force_logout=False):
return (user, force_logout)
result = fn_task_parameters.apply_async(
args=["user"], kwargs={"force_logout": True}
)
assert result.get(timeout=ASYNC_GET_TIMEOUT) == ["user", True]
spans = memory_exporter.get_finished_spans()
assert len(spans) == 2
async_span, run_span = spans
assert run_span.context.trace_id != async_span.context.trace_id
assert async_span.status.is_ok is True
assert async_span.name == "test_celery_functional.fn_task_parameters"
assert async_span.attributes.get("celery.action") == "apply_async"
assert async_span.attributes.get("messaging.message_id") == result.task_id
assert (
async_span.attributes.get("celery.task_name")
== "test_celery_functional.fn_task_parameters"
)
assert run_span.status.is_ok is True
assert run_span.name == "test_celery_functional.fn_task_parameters"
assert run_span.attributes.get("celery.action") == "run"
assert run_span.attributes.get("celery.state") == "SUCCESS"
assert run_span.attributes.get("messaging.message_id") == result.task_id
assert (
run_span.attributes.get("celery.task_name")
== "test_celery_functional.fn_task_parameters"
)
@pytest.mark.skip(reason="inconsistent test results")
def test_concurrent_delays(celery_app, memory_exporter):
@celery_app.task
def fn_task():
return 42
results = [fn_task.delay() for _ in range(100)]
for result in results:
assert result.get(timeout=ASYNC_GET_TIMEOUT) == 42
spans = memory_exporter.get_finished_spans()
assert len(spans) == 200
@pytest.mark.skip(reason="inconsistent test results")
def test_fn_task_delay(celery_app, memory_exporter):
@celery_app.task
def fn_task_parameters(user, force_logout=False):
return (user, force_logout)
result = fn_task_parameters.delay("user", force_logout=True)
assert result.get(timeout=ASYNC_GET_TIMEOUT) == ["user", True]
spans = memory_exporter.get_finished_spans()
assert len(spans) == 2
async_span, run_span = spans
assert run_span.context.trace_id != async_span.context.trace_id
assert async_span.status.is_ok is True
assert async_span.name == "test_celery_functional.fn_task_parameters"
assert async_span.attributes.get("celery.action") == "apply_async"
assert async_span.attributes.get("messaging.message_id") == result.task_id
assert (
async_span.attributes.get("celery.task_name")
== "test_celery_functional.fn_task_parameters"
)
assert run_span.status.is_ok is True
assert run_span.name == "test_celery_functional.fn_task_parameters"
assert run_span.attributes.get("celery.action") == "run"
assert run_span.attributes.get("celery.state") == "SUCCESS"
assert run_span.attributes.get("messaging.message_id") == result.task_id
assert (
run_span.attributes.get("celery.task_name")
== "test_celery_functional.fn_task_parameters"
)
def test_fn_exception(celery_app, memory_exporter):
@celery_app.task
def fn_exception():
raise Exception("Task class is failing")
result = fn_exception.apply()
assert result.failed() is True
assert "Task class is failing" in result.traceback
spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.status.is_ok is False
assert span.name == "test_celery_functional.fn_exception"
assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "FAILURE"
assert (
span.attributes.get("celery.task_name")
== "test_celery_functional.fn_exception"
)
assert span.status.canonical_code == StatusCanonicalCode.UNKNOWN
assert span.attributes.get("messaging.message_id") == result.task_id
assert "Task class is failing" in span.status.description
def test_fn_exception_expected(celery_app, memory_exporter):
@celery_app.task(throws=(MyException,))
def fn_exception():
raise MyException("Task class is failing")
result = fn_exception.apply()
assert result.failed() is True
assert "Task class is failing" in result.traceback
spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.status.is_ok is True
assert span.status.canonical_code == StatusCanonicalCode.OK
assert span.name == "test_celery_functional.fn_exception"
assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "FAILURE"
assert (
span.attributes.get("celery.task_name")
== "test_celery_functional.fn_exception"
)
assert span.attributes.get("messaging.message_id") == result.task_id
def test_fn_retry_exception(celery_app, memory_exporter):
@celery_app.task
def fn_exception():
raise Retry("Task class is being retried")
result = fn_exception.apply()
assert result.failed() is False
assert "Task class is being retried" in result.traceback
spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.status.is_ok is True
assert span.status.canonical_code == StatusCanonicalCode.OK
assert span.name == "test_celery_functional.fn_exception"
assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "RETRY"
assert (
span.attributes.get("celery.task_name")
== "test_celery_functional.fn_exception"
)
assert span.attributes.get("messaging.message_id") == result.task_id
def test_class_task(celery_app, memory_exporter):
class BaseTask(celery_app.Task):
def run(self):
return 42
task = BaseTask()
# register the Task class if it's available (required in Celery 4.0+)
register_task = getattr(celery_app, "register_task", None)
if register_task is not None:
register_task(task)
result = task.apply()
assert result.successful() is True
assert result.result == 42
spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.status.is_ok is True
assert span.name == "test_celery_functional.BaseTask"
assert (
span.attributes.get("celery.task_name")
== "test_celery_functional.BaseTask"
)
assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "SUCCESS"
assert span.attributes.get("messaging.message_id") == result.task_id
def test_class_task_exception(celery_app, memory_exporter):
class BaseTask(celery_app.Task):
def run(self):
raise Exception("Task class is failing")
task = BaseTask()
# register the Task class if it's available (required in Celery 4.0+)
register_task = getattr(celery_app, "register_task", None)
if register_task is not None:
register_task(task)
result = task.apply()
assert result.failed() is True
assert "Task class is failing" in result.traceback
spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.status.is_ok is False
assert span.name == "test_celery_functional.BaseTask"
assert (
span.attributes.get("celery.task_name")
== "test_celery_functional.BaseTask"
)
assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "FAILURE"
assert span.status.canonical_code == StatusCanonicalCode.UNKNOWN
assert span.attributes.get("messaging.message_id") == result.task_id
assert "Task class is failing" in span.status.description
def test_class_task_exception_excepted(celery_app, memory_exporter):
class BaseTask(celery_app.Task):
throws = (MyException,)
def run(self):
raise MyException("Task class is failing")
task = BaseTask()
# register the Task class if it's available (required in Celery 4.0+)
register_task = getattr(celery_app, "register_task", None)
if register_task is not None:
register_task(task)
result = task.apply()
assert result.failed() is True
assert "Task class is failing" in result.traceback
spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.status.is_ok is True
assert span.status.canonical_code == StatusCanonicalCode.OK
assert span.name == "test_celery_functional.BaseTask"
assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "FAILURE"
assert span.attributes.get("messaging.message_id") == result.task_id
def test_shared_task(celery_app, memory_exporter):
"""Ensure Django Shared Task are supported"""
@celery.shared_task
def add(x, y):
return x + y
result = add.apply([2, 2])
assert result.result == 4
spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.status.is_ok is True
assert span.name == "test_celery_functional.add"
assert (
span.attributes.get("celery.task_name") == "test_celery_functional.add"
)
assert span.attributes.get("celery.action") == "run"
assert span.attributes.get("celery.state") == "SUCCESS"
assert span.attributes.get("messaging.message_id") == result.task_id
@pytest.mark.skip(reason="inconsistent test results")
def test_apply_async_previous_style_tasks(
celery_app, celery_worker, memory_exporter
):
"""Ensures apply_async is properly patched if Celery 1.0 style tasks are
used even in newer versions. This should extend support to previous versions
of Celery."""
class CelerySuperClass(celery.task.Task):
abstract = True
@classmethod
def apply_async(cls, args=None, kwargs=None, **kwargs_):
return super(CelerySuperClass, cls).apply_async(
args=args, kwargs=kwargs, **kwargs_
)
def run(self, *args, **kwargs):
if "stop" in kwargs:
# avoid call loop
return
CelerySubClass.apply_async(args=[], kwargs={"stop": True}).get(
timeout=ASYNC_GET_TIMEOUT
)
class CelerySubClass(CelerySuperClass):
pass
celery_worker.reload()
task = CelerySubClass()
result = task.apply()
spans = memory_exporter.get_finished_spans()
assert len(spans) == 3
async_span, async_run_span, run_span = spans
assert run_span.status.is_ok is True
assert run_span.name == "test_celery_functional.CelerySubClass"
assert (
run_span.attributes.get("celery.task_name")
== "test_celery_functional.CelerySubClass"
)
assert run_span.attributes.get("celery.action") == "run"
assert run_span.attributes.get("celery.state") == "SUCCESS"
assert run_span.attributes.get("messaging.message_id") == result.task_id
assert async_run_span.status.is_ok is True
assert async_run_span.name == "test_celery_functional.CelerySubClass"
assert (
async_run_span.attributes.get("celery.task_name")
== "test_celery_functional.CelerySubClass"
)
assert async_run_span.attributes.get("celery.action") == "run"
assert async_run_span.attributes.get("celery.state") == "SUCCESS"
assert (
async_run_span.attributes.get("messaging.message_id") != result.task_id
)
assert async_span.status.is_ok is True
assert async_span.name == "test_celery_functional.CelerySubClass"
assert (
async_span.attributes.get("celery.task_name")
== "test_celery_functional.CelerySubClass"
)
assert async_span.attributes.get("celery.action") == "apply_async"
assert async_span.attributes.get("messaging.message_id") != result.task_id
assert async_span.attributes.get(
"messaging.message_id"
) == async_run_span.attributes.get("messaging.message_id")
def test_custom_tracer_provider(celery_app, memory_exporter):
@celery_app.task
def fn_task():
return 42
resource = resources.Resource.create({})
tracer_provider = TracerProvider(resource=resource)
span_processor = export.SimpleExportSpanProcessor(memory_exporter)
tracer_provider.add_span_processor(span_processor)
trace_api.set_tracer_provider(tracer_provider)
CeleryInstrumentor().uninstrument()
CeleryInstrumentor().instrument(tracer_provider=tracer_provider)
fn_task.delay()
spans_list = memory_exporter.get_finished_spans()
assert len(spans_list) == 1
span = spans_list[0]
assert span.resource == resource

View File

@ -0,0 +1,115 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import time
import mysql.connector
import psycopg2
import pymongo
import redis
MONGODB_COLLECTION_NAME = "test"
MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME", "opentelemetry-tests")
MONGODB_HOST = os.getenv("MONGODB_HOST", "localhost")
MONGODB_PORT = int(os.getenv("MONGODB_PORT", "27017"))
MYSQL_DB_NAME = os.getenv("MYSQL_DB_NAME ", "opentelemetry-tests")
MYSQL_HOST = os.getenv("MYSQL_HOST ", "localhost")
MYSQL_PORT = int(os.getenv("MYSQL_PORT ", "3306"))
MYSQL_USER = os.getenv("MYSQL_USER ", "testuser")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD ", "testpassword")
POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "opentelemetry-tests")
POSTGRES_HOST = os.getenv("POSTGRESQL_HOST", "localhost")
POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST", "testpassword")
POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT", "5432"))
POSTGRES_USER = os.getenv("POSTGRESQL_HOST", "testuser")
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT ", "6379"))
RETRY_COUNT = 5
RETRY_INTERVAL = 5 # Seconds
logger = logging.getLogger(__name__)
def retryable(func):
def wrapper():
# Try to connect to DB
for i in range(RETRY_COUNT):
try:
func()
return
except Exception as ex: # pylint: disable=broad-except
logger.error(
"waiting for %s, retry %d/%d [%s]",
func.__name__,
i + 1,
RETRY_COUNT,
ex,
)
time.sleep(RETRY_INTERVAL)
raise Exception("waiting for {} failed".format(func.__name__))
return wrapper
@retryable
def check_pymongo_connection():
client = pymongo.MongoClient(
MONGODB_HOST, MONGODB_PORT, serverSelectionTimeoutMS=2000
)
db = client[MONGODB_DB_NAME]
collection = db[MONGODB_COLLECTION_NAME]
collection.find_one()
client.close()
@retryable
def check_mysql_connection():
connection = mysql.connector.connect(
user=MYSQL_USER,
password=MYSQL_PASSWORD,
host=MYSQL_HOST,
port=MYSQL_PORT,
database=MYSQL_DB_NAME,
)
connection.close()
@retryable
def check_postgres_connection():
connection = psycopg2.connect(
dbname=POSTGRES_DB_NAME,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
connection.close()
@retryable
def check_redis_connection():
connection = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
connection.hgetall("*")
def check_docker_services_availability():
# Check if Docker services accept connections
check_pymongo_connection()
check_mysql_connection()
check_postgres_connection()
check_redis_connection()
check_docker_services_availability()

View File

@ -0,0 +1,47 @@
version: '3'
services:
otmongo:
ports:
- "27017:27017"
image: mongo:latest
otmysql:
ports:
- "3306:3306"
image: mysql:latest
restart: always
environment:
MYSQL_USER: testuser
MYSQL_PASSWORD: testpassword
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
MYSQL_DATABASE: opentelemetry-tests
otpostgres:
image: postgres
ports:
- "5432:5432"
environment:
POSTGRES_USER: testuser
POSTGRES_PASSWORD: testpassword
POSTGRES_DB: opentelemetry-tests
otredis:
image: redis:4.0-alpine
ports:
- "127.0.0.1:6379:6379"
otjaeger:
image: jaegertracing/all-in-one:1.8
environment:
COLLECTOR_ZIPKIN_HTTP_PORT: "9411"
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"
- "9411:9411"
otopencensus:
image: omnition/opencensus-collector:0.1.11
command: --logging-exporter DEBUG
ports:
- "8888:8888"
- "55678:55678"

View File

@ -0,0 +1,98 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import mysql.connector
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.mysql import MySQLInstrumentor
from opentelemetry.test.test_base import TestBase
MYSQL_USER = os.getenv("MYSQL_USER ", "testuser")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD ", "testpassword")
MYSQL_HOST = os.getenv("MYSQL_HOST ", "localhost")
MYSQL_PORT = int(os.getenv("MYSQL_PORT ", "3306"))
MYSQL_DB_NAME = os.getenv("MYSQL_DB_NAME ", "opentelemetry-tests")
class TestFunctionalMysql(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
MySQLInstrumentor().instrument()
cls._connection = mysql.connector.connect(
user=MYSQL_USER,
password=MYSQL_PASSWORD,
host=MYSQL_HOST,
port=MYSQL_PORT,
database=MYSQL_DB_NAME,
)
cls._cursor = cls._connection.cursor()
@classmethod
def tearDownClass(cls):
if cls._connection:
cls._connection.close()
MySQLInstrumentor().uninstrument()
def validate_spans(self):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
for span in spans:
if span.name == "rootSpan":
root_span = span
else:
db_span = span
self.assertIsInstance(span.start_time, int)
self.assertIsInstance(span.end_time, int)
self.assertIsNotNone(root_span)
self.assertIsNotNone(db_span)
self.assertEqual(root_span.name, "rootSpan")
self.assertEqual(db_span.name, "mysql.opentelemetry-tests")
self.assertIsNotNone(db_span.parent)
self.assertIs(db_span.parent, root_span.get_context())
self.assertIs(db_span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(db_span.attributes["db.instance"], MYSQL_DB_NAME)
self.assertEqual(db_span.attributes["net.peer.name"], MYSQL_HOST)
self.assertEqual(db_span.attributes["net.peer.port"], MYSQL_PORT)
def test_execute(self):
"""Should create a child span for execute
"""
with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)")
self.validate_spans()
def test_executemany(self):
"""Should create a child span for executemany
"""
with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",))
stmt = "INSERT INTO test (id) VALUES (%s)"
self._cursor.executemany(stmt, data)
self.validate_spans()
def test_callproc(self):
"""Should create a child span for callproc
"""
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
Exception
):
self._cursor.callproc("test", ())
self.validate_spans()

View File

@ -0,0 +1,60 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from opentelemetry import trace
from opentelemetry.context import attach, detach, set_value
from opentelemetry.exporter.opencensus.trace_exporter import (
OpenCensusSpanExporter,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor
from opentelemetry.test.test_base import TestBase
class ExportStatusSpanProcessor(SimpleExportSpanProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.export_status = []
def on_end(self, span):
token = attach(set_value("suppress_instrumentation", True))
self.export_status.append(self.span_exporter.export((span,)))
detach(token)
class TestOpenCensusSpanExporter(TestBase):
def setUp(self):
super().setUp()
trace.set_tracer_provider(TracerProvider())
self.tracer = trace.get_tracer(__name__)
self.span_processor = ExportStatusSpanProcessor(
OpenCensusSpanExporter(
service_name="basic-service", endpoint="localhost:55678"
)
)
trace.get_tracer_provider().add_span_processor(self.span_processor)
def test_export(self):
with self.tracer.start_as_current_span("foo"):
with self.tracer.start_as_current_span("bar"):
with self.tracer.start_as_current_span("baz"):
pass
self.assertTrue(len(self.span_processor.export_status), 3)
for export_status in self.span_processor.export_status:
self.assertEqual(export_status.name, "SUCCESS")
self.assertEqual(export_status.value, 0)

View File

@ -0,0 +1,200 @@
# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os
import time
import aiopg
import psycopg2
import pytest
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.aiopg import AiopgInstrumentor
from opentelemetry.test.test_base import TestBase
POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost")
POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432"))
POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests")
POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword")
POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser")
def async_call(coro):
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
class TestFunctionalAiopgConnect(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
AiopgInstrumentor().instrument(tracer_provider=cls.tracer_provider)
cls._connection = async_call(
aiopg.connect(
dbname=POSTGRES_DB_NAME,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
)
cls._cursor = async_call(cls._connection.cursor())
@classmethod
def tearDownClass(cls):
if cls._cursor:
cls._cursor.close()
if cls._connection:
cls._connection.close()
AiopgInstrumentor().uninstrument()
def validate_spans(self):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
for span in spans:
if span.name == "rootSpan":
root_span = span
else:
child_span = span
self.assertIsInstance(span.start_time, int)
self.assertIsInstance(span.end_time, int)
self.assertIsNotNone(root_span)
self.assertIsNotNone(child_span)
self.assertEqual(root_span.name, "rootSpan")
self.assertEqual(child_span.name, "postgresql.opentelemetry-tests")
self.assertIsNotNone(child_span.parent)
self.assertIs(child_span.parent, root_span.get_context())
self.assertIs(child_span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(
child_span.attributes["db.instance"], POSTGRES_DB_NAME
)
self.assertEqual(child_span.attributes["net.peer.name"], POSTGRES_HOST)
self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT)
def test_execute(self):
"""Should create a child span for execute method
"""
with self._tracer.start_as_current_span("rootSpan"):
async_call(
self._cursor.execute(
"CREATE TABLE IF NOT EXISTS test (id integer)"
)
)
self.validate_spans()
def test_executemany(self):
"""Should create a child span for executemany
"""
with pytest.raises(psycopg2.ProgrammingError):
with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",))
stmt = "INSERT INTO test (id) VALUES (%s)"
async_call(self._cursor.executemany(stmt, data))
self.validate_spans()
def test_callproc(self):
"""Should create a child span for callproc
"""
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
Exception
):
async_call(self._cursor.callproc("test", ()))
self.validate_spans()
class TestFunctionalAiopgCreatePool(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
AiopgInstrumentor().instrument(tracer_provider=cls.tracer_provider)
cls._pool = async_call(
aiopg.create_pool(
dbname=POSTGRES_DB_NAME,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
)
cls._connection = async_call(cls._pool.acquire())
cls._cursor = async_call(cls._connection.cursor())
@classmethod
def tearDownClass(cls):
if cls._cursor:
cls._cursor.close()
if cls._connection:
cls._connection.close()
if cls._pool:
cls._pool.close()
AiopgInstrumentor().uninstrument()
def validate_spans(self):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
for span in spans:
if span.name == "rootSpan":
root_span = span
else:
child_span = span
self.assertIsInstance(span.start_time, int)
self.assertIsInstance(span.end_time, int)
self.assertIsNotNone(root_span)
self.assertIsNotNone(child_span)
self.assertEqual(root_span.name, "rootSpan")
self.assertEqual(child_span.name, "postgresql.opentelemetry-tests")
self.assertIsNotNone(child_span.parent)
self.assertIs(child_span.parent, root_span.get_context())
self.assertIs(child_span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(
child_span.attributes["db.instance"], POSTGRES_DB_NAME
)
self.assertEqual(child_span.attributes["net.peer.name"], POSTGRES_HOST)
self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT)
def test_execute(self):
"""Should create a child span for execute method
"""
with self._tracer.start_as_current_span("rootSpan"):
async_call(
self._cursor.execute(
"CREATE TABLE IF NOT EXISTS test (id integer)"
)
)
self.validate_spans()
def test_executemany(self):
"""Should create a child span for executemany
"""
with pytest.raises(psycopg2.ProgrammingError):
with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",))
stmt = "INSERT INTO test (id) VALUES (%s)"
async_call(self._cursor.executemany(stmt, data))
self.validate_spans()
def test_callproc(self):
"""Should create a child span for callproc
"""
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
Exception
):
async_call(self._cursor.callproc("test", ()))
self.validate_spans()

View File

@ -0,0 +1,105 @@
# Copyright 2020, OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import psycopg2
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from opentelemetry.test.test_base import TestBase
POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost")
POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432"))
POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests")
POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword")
POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser")
class TestFunctionalPsycopg(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
Psycopg2Instrumentor().instrument(tracer_provider=cls.tracer_provider)
cls._connection = psycopg2.connect(
dbname=POSTGRES_DB_NAME,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
cls._connection.set_session(autocommit=True)
cls._cursor = cls._connection.cursor()
@classmethod
def tearDownClass(cls):
if cls._cursor:
cls._cursor.close()
if cls._connection:
cls._connection.close()
Psycopg2Instrumentor().uninstrument()
def validate_spans(self):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
for span in spans:
if span.name == "rootSpan":
root_span = span
else:
child_span = span
self.assertIsInstance(span.start_time, int)
self.assertIsInstance(span.end_time, int)
self.assertIsNotNone(root_span)
self.assertIsNotNone(child_span)
self.assertEqual(root_span.name, "rootSpan")
self.assertEqual(child_span.name, "postgresql.opentelemetry-tests")
self.assertIsNotNone(child_span.parent)
self.assertIs(child_span.parent, root_span.get_context())
self.assertIs(child_span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(
child_span.attributes["db.instance"], POSTGRES_DB_NAME
)
self.assertEqual(child_span.attributes["net.peer.name"], POSTGRES_HOST)
self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT)
def test_execute(self):
"""Should create a child span for execute method
"""
with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute(
"CREATE TABLE IF NOT EXISTS test (id integer)"
)
self.validate_spans()
def test_executemany(self):
"""Should create a child span for executemany
"""
with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",))
stmt = "INSERT INTO test (id) VALUES (%s)"
self._cursor.executemany(stmt, data)
self.validate_spans()
def test_callproc(self):
"""Should create a child span for callproc
"""
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
Exception
):
self._cursor.callproc("test", ())
self.validate_spans()

View File

@ -0,0 +1,116 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from pymongo import MongoClient
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.pymongo import PymongoInstrumentor
from opentelemetry.test.test_base import TestBase
MONGODB_HOST = os.getenv("MONGODB_HOST ", "localhost")
MONGODB_PORT = int(os.getenv("MONGODB_PORT ", "27017"))
MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME ", "opentelemetry-tests")
MONGODB_COLLECTION_NAME = "test"
class TestFunctionalPymongo(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._tracer = cls.tracer_provider.get_tracer(__name__)
PymongoInstrumentor().instrument()
client = MongoClient(
MONGODB_HOST, MONGODB_PORT, serverSelectionTimeoutMS=2000
)
db = client[MONGODB_DB_NAME]
cls._collection = db[MONGODB_COLLECTION_NAME]
def validate_spans(self):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
for span in spans:
if span.name == "rootSpan":
root_span = span
else:
pymongo_span = span
self.assertIsInstance(span.start_time, int)
self.assertIsInstance(span.end_time, int)
self.assertIsNot(root_span, None)
self.assertIsNot(pymongo_span, None)
self.assertIsNotNone(pymongo_span.parent)
self.assertIs(pymongo_span.parent, root_span.get_context())
self.assertIs(pymongo_span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(
pymongo_span.attributes["db.instance"], MONGODB_DB_NAME
)
self.assertEqual(
pymongo_span.attributes["net.peer.name"], MONGODB_HOST
)
self.assertEqual(
pymongo_span.attributes["net.peer.port"], MONGODB_PORT
)
def test_insert(self):
"""Should create a child span for insert
"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.insert_one(
{"name": "testName", "value": "testValue"}
)
self.validate_spans()
def test_update(self):
"""Should create a child span for update
"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.update_one(
{"name": "testName"}, {"$set": {"value": "someOtherValue"}}
)
self.validate_spans()
def test_find(self):
"""Should create a child span for find
"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.find_one()
self.validate_spans()
def test_delete(self):
"""Should create a child span for delete
"""
with self._tracer.start_as_current_span("rootSpan"):
self._collection.delete_one({"name": "testName"})
self.validate_spans()
def test_uninstrument(self):
# check that integration is working
self._collection.find_one()
spans = self.memory_exporter.get_finished_spans()
self.memory_exporter.clear()
self.assertEqual(len(spans), 1)
# uninstrument and check not new spans are created
PymongoInstrumentor().uninstrument()
self._collection.find_one()
spans = self.memory_exporter.get_finished_spans()
self.memory_exporter.clear()
self.assertEqual(len(spans), 0)
# re-enable and check that it works again
PymongoInstrumentor().instrument()
self._collection.find_one()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)

View File

@ -0,0 +1,97 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pymysql as pymy
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation.pymysql import PyMySQLInstrumentor
from opentelemetry.test.test_base import TestBase
MYSQL_USER = os.getenv("MYSQL_USER ", "testuser")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD ", "testpassword")
MYSQL_HOST = os.getenv("MYSQL_HOST ", "localhost")
MYSQL_PORT = int(os.getenv("MYSQL_PORT ", "3306"))
MYSQL_DB_NAME = os.getenv("MYSQL_DB_NAME ", "opentelemetry-tests")
class TestFunctionalPyMysql(TestBase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
PyMySQLInstrumentor().instrument()
cls._connection = pymy.connect(
user=MYSQL_USER,
password=MYSQL_PASSWORD,
host=MYSQL_HOST,
port=MYSQL_PORT,
database=MYSQL_DB_NAME,
)
cls._cursor = cls._connection.cursor()
@classmethod
def tearDownClass(cls):
if cls._connection:
cls._connection.close()
PyMySQLInstrumentor().uninstrument()
def validate_spans(self):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
for span in spans:
if span.name == "rootSpan":
root_span = span
else:
db_span = span
self.assertIsInstance(span.start_time, int)
self.assertIsInstance(span.end_time, int)
self.assertIsNotNone(root_span)
self.assertIsNotNone(db_span)
self.assertEqual(root_span.name, "rootSpan")
self.assertEqual(db_span.name, "mysql.opentelemetry-tests")
self.assertIsNotNone(db_span.parent)
self.assertIs(db_span.parent, root_span.get_context())
self.assertIs(db_span.kind, trace_api.SpanKind.CLIENT)
self.assertEqual(db_span.attributes["db.instance"], MYSQL_DB_NAME)
self.assertEqual(db_span.attributes["net.peer.name"], MYSQL_HOST)
self.assertEqual(db_span.attributes["net.peer.port"], MYSQL_PORT)
def test_execute(self):
"""Should create a child span for execute
"""
with self._tracer.start_as_current_span("rootSpan"):
self._cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)")
self.validate_spans()
def test_executemany(self):
"""Should create a child span for executemany
"""
with self._tracer.start_as_current_span("rootSpan"):
data = (("1",), ("2",), ("3",))
stmt = "INSERT INTO test (id) VALUES (%s)"
self._cursor.executemany(stmt, data)
self.validate_spans()
def test_callproc(self):
"""Should create a child span for callproc
"""
with self._tracer.start_as_current_span("rootSpan"), self.assertRaises(
Exception
):
self._cursor.callproc("test", ())
self.validate_spans()

View File

@ -0,0 +1,120 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import redis
from opentelemetry import trace
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.test.test_base import TestBase
class TestRedisInstrument(TestBase):
test_service = "redis"
def setUp(self):
super().setUp()
self.redis_client = redis.Redis(port=6379)
self.redis_client.flushall()
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)
def tearDown(self):
super().tearDown()
RedisInstrumentor().uninstrument()
def _check_span(self, span):
self.assertEqual(span.attributes["service"], self.test_service)
self.assertEqual(span.name, "redis.command")
self.assertIs(
span.status.canonical_code, trace.status.StatusCanonicalCode.OK
)
self.assertEqual(span.attributes.get("db.instance"), 0)
self.assertEqual(
span.attributes.get("db.url"), "redis://localhost:6379"
)
def test_long_command(self):
self.redis_client.mget(*range(1000))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span)
self.assertTrue(
span.attributes.get("db.statement").startswith("MGET 0 1 2 3")
)
self.assertTrue(span.attributes.get("db.statement").endswith("..."))
def test_basics(self):
self.assertIsNone(self.redis_client.get("cheese"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span)
self.assertEqual(span.attributes.get("db.statement"), "GET cheese")
self.assertEqual(span.attributes.get("redis.args_length"), 2)
def test_pipeline_traced(self):
with self.redis_client.pipeline(transaction=False) as pipeline:
pipeline.set("blah", 32)
pipeline.rpush("foo", "éé")
pipeline.hgetall("xxx")
pipeline.execute()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span)
self.assertEqual(
span.attributes.get("db.statement"),
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
)
self.assertEqual(span.attributes.get("redis.pipeline_length"), 3)
def test_pipeline_immediate(self):
with self.redis_client.pipeline() as pipeline:
pipeline.set("a", 1)
pipeline.immediate_execute_command("SET", "b", 2)
pipeline.execute()
spans = self.memory_exporter.get_finished_spans()
# expecting two separate spans here, rather than a
# single span for the whole pipeline
self.assertEqual(len(spans), 2)
span = spans[0]
self._check_span(span)
self.assertEqual(span.attributes.get("db.statement"), "SET b 2")
def test_parent(self):
"""Ensure OpenTelemetry works with redis."""
ot_tracer = trace.get_tracer("redis_svc")
with ot_tracer.start_as_current_span("redis_get"):
self.assertIsNone(self.redis_client.get("cheese"))
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans[0], spans[1]
# confirm the parenting
self.assertIsNone(parent_span.parent)
self.assertIs(child_span.parent, parent_span.get_context())
self.assertEqual(parent_span.name, "redis_get")
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")
self.assertEqual(
child_span.attributes.get("service"), self.test_service
)
self.assertEqual(child_span.name, "redis.command")

View File

@ -0,0 +1,13 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,184 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.sqlalchemy.engine import _DB, _ROWS, _STMT
from opentelemetry.test.test_base import TestBase
Base = declarative_base()
def _create_engine(engine_args):
# create a SQLAlchemy engine
config = dict(engine_args)
url = config.pop("url")
return create_engine(url, **config)
class Player(Base):
"""Player entity used to test SQLAlchemy ORM"""
__tablename__ = "players"
id = Column(Integer, primary_key=True)
name = Column(String(20))
class SQLAlchemyTestMixin(TestBase):
__test__ = False
"""SQLAlchemy test mixin that includes a complete set of tests
that must be executed for different engine. When a new test (or
a regression test) should be added to SQLAlchemy test suite, a new
entry must be appended here so that it will be executed for all
available and supported engines. If the test is specific to only
one engine, that test must be added to the specific `TestCase`
implementation.
To support a new engine, create a new `TestCase` that inherits from
`SQLAlchemyTestMixin` and `TestCase`. Then you must define the following
static class variables:
* VENDOR: the database vendor name
* SQL_DB: the `db.type` tag that we expect (it's the name of the database available in the `.env` file)
* SERVICE: the service that we expect by default
* ENGINE_ARGS: all arguments required to create the engine
To check specific tags in each test, you must implement the
`check_meta(self, span)` method.
"""
VENDOR = None
SQL_DB = None
SERVICE = None
ENGINE_ARGS = None
@contextlib.contextmanager
def connection(self):
# context manager that provides a connection
# to the underlying database
try:
conn = self.engine.connect()
yield conn
finally:
conn.close()
def check_meta(self, span):
"""function that can be implemented according to the
specific engine implementation
"""
def setUp(self):
super().setUp()
# create an engine with the given arguments
self.engine = _create_engine(self.ENGINE_ARGS)
# create the database / entities and prepare a session for the test
Base.metadata.drop_all(bind=self.engine)
Base.metadata.create_all(self.engine, checkfirst=False)
self.session = sessionmaker(bind=self.engine)()
# trace the engine
SQLAlchemyInstrumentor().instrument(
engine=self.engine, tracer_provider=self.tracer_provider
)
self.memory_exporter.clear()
def tearDown(self):
# pylint: disable=invalid-name
# clear the database and dispose the engine
self.session.close()
Base.metadata.drop_all(bind=self.engine)
self.engine.dispose()
SQLAlchemyInstrumentor().uninstrument()
super().tearDown()
def _check_span(self, span):
self.assertEqual(span.name, "{}.query".format(self.VENDOR))
self.assertEqual(span.attributes.get("service"), self.SERVICE)
self.assertEqual(span.attributes.get(_DB), self.SQL_DB)
self.assertIs(
span.status.canonical_code, trace.status.StatusCanonicalCode.OK
)
self.assertGreater((span.end_time - span.start_time), 0)
def test_orm_insert(self):
# ensures that the ORM session is traced
wayne = Player(id=1, name="wayne")
self.session.add(wayne)
self.session.commit()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span)
self.assertIn("INSERT INTO players", span.attributes.get(_STMT))
self.assertEqual(span.attributes.get(_ROWS), 1)
self.check_meta(span)
def test_session_query(self):
# ensures that the Session queries are traced
out = list(self.session.query(Player).filter_by(name="wayne"))
self.assertEqual(len(out), 0)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span)
self.assertIn(
"SELECT players.id AS players_id, players.name AS players_name \nFROM players \nWHERE players.name",
span.attributes.get(_STMT),
)
self.check_meta(span)
def test_engine_connect_execute(self):
# ensures that engine.connect() is properly traced
with self.connection() as conn:
rows = conn.execute("SELECT * FROM players").fetchall()
self.assertEqual(len(rows), 0)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
self._check_span(span)
self.assertEqual(span.attributes.get(_STMT), "SELECT * FROM players")
self.check_meta(span)
def test_parent(self):
"""Ensure that sqlalchemy works with opentelemetry."""
tracer = self.tracer_provider.get_tracer("sqlalch_svc")
with tracer.start_as_current_span("sqlalch_op"):
with self.connection() as conn:
rows = conn.execute("SELECT * FROM players").fetchall()
self.assertEqual(len(rows), 0)
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
child_span, parent_span = spans
# confirm the parenting
self.assertIsNone(parent_span.parent)
self.assertIs(child_span.parent, parent_span.get_context())
self.assertEqual(parent_span.name, "sqlalch_op")
self.assertEqual(parent_span.instrumentation_info.name, "sqlalch_svc")
self.assertEqual(child_span.name, "{}.query".format(self.VENDOR))
self.assertEqual(child_span.attributes.get("service"), self.SERVICE)

View File

@ -0,0 +1,72 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import sqlalchemy
from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.test.test_base import TestBase
POSTGRES_CONFIG = {
"host": "127.0.0.1",
"port": int(os.getenv("TEST_POSTGRES_PORT", "5432")),
"user": os.getenv("TEST_POSTGRES_USER", "testuser"),
"password": os.getenv("TEST_POSTGRES_PASSWORD", "testpassword"),
"dbname": os.getenv("TEST_POSTGRES_DB", "opentelemetry-tests"),
}
class SQLAlchemyInstrumentTestCase(TestBase):
"""TestCase that checks if the engine is properly traced
when the `instrument()` method is used.
"""
def setUp(self):
# create a traced engine with the given arguments
SQLAlchemyInstrumentor().instrument()
dsn = (
"postgresql://%(user)s:%(password)s@%(host)s:%(port)s/%(dbname)s"
% POSTGRES_CONFIG
)
self.engine = sqlalchemy.create_engine(dsn)
# prepare a connection
self.conn = self.engine.connect()
super().setUp()
def tearDown(self):
# clear the database and dispose the engine
self.conn.close()
self.engine.dispose()
SQLAlchemyInstrumentor().uninstrument()
def test_engine_traced(self):
# ensures that the engine is traced
rows = self.conn.execute("SELECT 1").fetchall()
self.assertEqual(len(rows), 1)
traces = self.memory_exporter.get_finished_spans()
# trace composition
self.assertEqual(len(traces), 1)
span = traces[0]
# check subset of span fields
self.assertEqual(span.name, "postgres.query")
self.assertEqual(span.attributes.get("service"), "postgres")
self.assertIs(
span.status.canonical_code, trace.status.StatusCanonicalCode.OK
)
self.assertGreater((span.end_time - span.start_time), 0)

View File

@ -0,0 +1,83 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import pytest
from sqlalchemy.exc import ProgrammingError
from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy.engine import (
_DB,
_HOST,
_PORT,
_ROWS,
_STMT,
)
from .mixins import SQLAlchemyTestMixin
MYSQL_CONFIG = {
"host": "127.0.0.1",
"port": int(os.getenv("TEST_MYSQL_PORT", "3306")),
"user": os.getenv("TEST_MYSQL_USER", "testuser"),
"password": os.getenv("TEST_MYSQL_PASSWORD", "testpassword"),
"database": os.getenv("TEST_MYSQL_DATABASE", "opentelemetry-tests"),
}
class MysqlConnectorTestCase(SQLAlchemyTestMixin):
"""TestCase for mysql-connector engine"""
__test__ = True
VENDOR = "mysql"
SQL_DB = "opentelemetry-tests"
SERVICE = "mysql"
ENGINE_ARGS = {
"url": "mysql+mysqlconnector://%(user)s:%(password)s@%(host)s:%(port)s/%(database)s"
% MYSQL_CONFIG
}
def check_meta(self, span):
# check database connection tags
self.assertEqual(span.attributes.get(_HOST), MYSQL_CONFIG["host"])
self.assertEqual(span.attributes.get(_PORT), MYSQL_CONFIG["port"])
def test_engine_execute_errors(self):
# ensures that SQL errors are reported
with pytest.raises(ProgrammingError):
with self.connection() as conn:
conn.execute("SELECT * FROM a_wrong_table").fetchall()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# span fields
self.assertEqual(span.name, "{}.query".format(self.VENDOR))
self.assertEqual(span.attributes.get("service"), self.SERVICE)
self.assertEqual(
span.attributes.get(_STMT), "SELECT * FROM a_wrong_table"
)
self.assertEqual(span.attributes.get(_DB), self.SQL_DB)
self.assertIsNone(span.attributes.get(_ROWS))
self.check_meta(span)
self.assertTrue(span.end_time - span.start_time > 0)
# check the error
self.assertIs(
span.status.canonical_code,
trace.status.StatusCanonicalCode.UNKNOWN,
)
self.assertIn("a_wrong_table", span.status.description)

View File

@ -0,0 +1,98 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import psycopg2
import pytest
from sqlalchemy.exc import ProgrammingError
from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy.engine import (
_DB,
_HOST,
_PORT,
_ROWS,
_STMT,
)
from .mixins import SQLAlchemyTestMixin
POSTGRES_CONFIG = {
"host": "127.0.0.1",
"port": int(os.getenv("TEST_POSTGRES_PORT", "5432")),
"user": os.getenv("TEST_POSTGRES_USER", "testuser"),
"password": os.getenv("TEST_POSTGRES_PASSWORD", "testpassword"),
"dbname": os.getenv("TEST_POSTGRES_DB", "opentelemetry-tests"),
}
class PostgresTestCase(SQLAlchemyTestMixin):
"""TestCase for Postgres Engine"""
__test__ = True
VENDOR = "postgres"
SQL_DB = "opentelemetry-tests"
SERVICE = "postgres"
ENGINE_ARGS = {
"url": "postgresql://%(user)s:%(password)s@%(host)s:%(port)s/%(dbname)s"
% POSTGRES_CONFIG
}
def check_meta(self, span):
# check database connection tags
self.assertEqual(span.attributes.get(_HOST), POSTGRES_CONFIG["host"])
self.assertEqual(span.attributes.get(_PORT), POSTGRES_CONFIG["port"])
def test_engine_execute_errors(self):
# ensures that SQL errors are reported
with pytest.raises(ProgrammingError):
with self.connection() as conn:
conn.execute("SELECT * FROM a_wrong_table").fetchall()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# span fields
self.assertEqual(span.name, "{}.query".format(self.VENDOR))
self.assertEqual(span.attributes.get("service"), self.SERVICE)
self.assertEqual(
span.attributes.get(_STMT), "SELECT * FROM a_wrong_table"
)
self.assertEqual(span.attributes.get(_DB), self.SQL_DB)
self.assertIsNone(span.attributes.get(_ROWS))
self.check_meta(span)
self.assertTrue(span.end_time - span.start_time > 0)
# check the error
self.assertIs(
span.status.canonical_code,
trace.status.StatusCanonicalCode.UNKNOWN,
)
self.assertIn("a_wrong_table", span.status.description)
class PostgresCreatorTestCase(PostgresTestCase):
"""TestCase for Postgres Engine that includes the same tests set
of `PostgresTestCase`, but it uses a specific `creator` function.
"""
VENDOR = "postgres"
SQL_DB = "opentelemetry-tests"
SERVICE = "postgres"
ENGINE_ARGS = {
"url": "postgresql://",
"creator": lambda: psycopg2.connect(**POSTGRES_CONFIG),
}

View File

@ -0,0 +1,61 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import pytest
from sqlalchemy.exc import OperationalError
from opentelemetry import trace
from opentelemetry.instrumentation.sqlalchemy.engine import _DB, _ROWS, _STMT
from .mixins import SQLAlchemyTestMixin
class SQLiteTestCase(SQLAlchemyTestMixin):
"""TestCase for the SQLite engine"""
__test__ = True
VENDOR = "sqlite"
SQL_DB = ":memory:"
SERVICE = "sqlite"
ENGINE_ARGS = {"url": "sqlite:///:memory:"}
def test_engine_execute_errors(self):
# ensures that SQL errors are reported
with pytest.raises(OperationalError):
with self.connection() as conn:
conn.execute("SELECT * FROM a_wrong_table").fetchall()
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 1)
span = spans[0]
# span fields
self.assertEqual(span.name, "{}.query".format(self.VENDOR))
self.assertEqual(span.attributes.get("service"), self.SERVICE)
self.assertEqual(
span.attributes.get(_STMT), "SELECT * FROM a_wrong_table"
)
self.assertEqual(span.attributes.get(_DB), self.SQL_DB)
self.assertIsNone(span.attributes.get(_ROWS))
self.assertTrue((span.end_time - span.start_time) > 0)
# check the error
self.assertIs(
span.status.canonical_code,
trace.status.StatusCanonicalCode.UNKNOWN,
)
self.assertEqual(
span.status.description, "no such table: a_wrong_table"
)