From 7d4d2ce4dc1c05bb7ce00ac5cfa8b04da560cee0 Mon Sep 17 00:00:00 2001 From: Leighton Chen Date: Tue, 4 Aug 2020 19:10:51 -0700 Subject: [PATCH] Rename remaining framework packages from "ext" to "instrumentation" (#969) --- .../tests/asyncpg/test_asyncpg_functional.py | 277 +++++++++ .../tests/celery/conftest.py | 92 +++ .../tests/celery/test_celery_functional.py | 532 ++++++++++++++++++ .../tests/check_availability.py | 115 ++++ .../tests/docker-compose.yml | 47 ++ .../tests/mysql/test_mysql_functional.py | 98 ++++ .../test_opencensusexporter_functional.py | 60 ++ .../tests/postgres/test_aiopg_functional.py | 200 +++++++ .../tests/postgres/test_psycopg_functional.py | 105 ++++ .../tests/pymongo/test_pymongo_functional.py | 116 ++++ .../tests/pymysql/test_pymysql_functional.py | 97 ++++ .../tests/redis/test_redis_functional.py | 120 ++++ .../tests/sqlalchemy_tests/__init__.py | 13 + .../tests/sqlalchemy_tests/mixins.py | 184 ++++++ .../tests/sqlalchemy_tests/test_instrument.py | 72 +++ .../tests/sqlalchemy_tests/test_mysql.py | 83 +++ .../tests/sqlalchemy_tests/test_postgres.py | 98 ++++ .../tests/sqlalchemy_tests/test_sqlite.py | 61 ++ 18 files changed, 2370 insertions(+) create mode 100644 tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py create mode 100644 tests/opentelemetry-docker-tests/tests/celery/conftest.py create mode 100644 tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py create mode 100644 tests/opentelemetry-docker-tests/tests/check_availability.py create mode 100644 tests/opentelemetry-docker-tests/tests/docker-compose.yml create mode 100644 tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py create mode 100644 tests/opentelemetry-docker-tests/tests/opencensus/test_opencensusexporter_functional.py create mode 100644 tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py create mode 100644 tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py create mode 100644 tests/opentelemetry-docker-tests/tests/pymongo/test_pymongo_functional.py create mode 100644 tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py create mode 100644 tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py create mode 100644 tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/__init__.py create mode 100644 tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/mixins.py create mode 100644 tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_instrument.py create mode 100644 tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_mysql.py create mode 100644 tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_postgres.py create mode 100644 tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_sqlite.py diff --git a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py new file mode 100644 index 000000000..87382702f --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py @@ -0,0 +1,277 @@ +import asyncio +import os + +import asyncpg + +from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace.status import StatusCanonicalCode + +POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost") +POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432")) +POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests") +POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword") +POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser") + + +def _await(coro): + loop = asyncio.get_event_loop() + return loop.run_until_complete(coro) + + +class TestFunctionalAsyncPG(TestBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._connection = None + cls._cursor = None + cls._tracer = cls.tracer_provider.get_tracer(__name__) + AsyncPGInstrumentor().instrument(tracer_provider=cls.tracer_provider) + cls._connection = _await( + asyncpg.connect( + database=POSTGRES_DB_NAME, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST, + port=POSTGRES_PORT, + ) + ) + + @classmethod + def tearDownClass(cls): + AsyncPGInstrumentor().uninstrument() + + def test_instrumented_execute_method_without_arguments(self, *_, **__): + _await(self._connection.execute("SELECT 42;")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT 42;", + }, + ) + + def test_instrumented_fetch_method_without_arguments(self, *_, **__): + _await(self._connection.fetch("SELECT 42;")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT 42;", + }, + ) + + def test_instrumented_transaction_method(self, *_, **__): + async def _transaction_execute(): + async with self._connection.transaction(): + await self._connection.execute("SELECT 42;") + + _await(_transaction_execute()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(3, len(spans)) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "BEGIN;", + }, + spans[0].attributes, + ) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "SELECT 42;", + }, + spans[1].attributes, + ) + self.assertEqual( + StatusCanonicalCode.OK, spans[1].status.canonical_code + ) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "COMMIT;", + }, + spans[2].attributes, + ) + self.assertEqual( + StatusCanonicalCode.OK, spans[2].status.canonical_code + ) + + def test_instrumented_failed_transaction_method(self, *_, **__): + async def _transaction_execute(): + async with self._connection.transaction(): + await self._connection.execute("SELECT 42::uuid;") + + with self.assertRaises(asyncpg.CannotCoerceError): + _await(_transaction_execute()) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(3, len(spans)) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "BEGIN;", + }, + spans[0].attributes, + ) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "SELECT 42::uuid;", + }, + spans[1].attributes, + ) + self.assertEqual( + StatusCanonicalCode.INVALID_ARGUMENT, + spans[1].status.canonical_code, + ) + self.assertEqual( + { + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.type": "sql", + "db.statement": "ROLLBACK;", + }, + spans[2].attributes, + ) + self.assertEqual( + StatusCanonicalCode.OK, spans[2].status.canonical_code + ) + + def test_instrumented_method_doesnt_capture_parameters(self, *_, **__): + _await(self._connection.execute("SELECT $1;", "1")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + # This shouldn't be set because we don't capture parameters by + # default + # + # "db.statement.parameters": "('1',)", + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT $1;", + }, + ) + + +class TestFunctionalAsyncPG_CaptureParameters(TestBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._connection = None + cls._cursor = None + cls._tracer = cls.tracer_provider.get_tracer(__name__) + AsyncPGInstrumentor(capture_parameters=True).instrument( + tracer_provider=cls.tracer_provider + ) + cls._connection = _await( + asyncpg.connect( + database=POSTGRES_DB_NAME, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST, + port=POSTGRES_PORT, + ) + ) + + @classmethod + def tearDownClass(cls): + AsyncPGInstrumentor().uninstrument() + + def test_instrumented_execute_method_with_arguments(self, *_, **__): + _await(self._connection.execute("SELECT $1;", "1")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + StatusCanonicalCode.OK, spans[0].status.canonical_code + ) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + "db.statement.parameters": "('1',)", + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT $1;", + }, + ) + + def test_instrumented_fetch_method_with_arguments(self, *_, **__): + _await(self._connection.fetch("SELECT $1;", "1")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.user": POSTGRES_USER, + "db.statement.parameters": "('1',)", + "db.instance": POSTGRES_DB_NAME, + "db.statement": "SELECT $1;", + }, + ) + + def test_instrumented_executemany_method_with_arguments(self, *_, **__): + _await(self._connection.executemany("SELECT $1;", [["1"], ["2"]])) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + { + "db.type": "sql", + "db.statement": "SELECT $1;", + "db.statement.parameters": "([['1'], ['2']],)", + "db.user": POSTGRES_USER, + "db.instance": POSTGRES_DB_NAME, + }, + spans[0].attributes, + ) + + def test_instrumented_execute_interface_error_method(self, *_, **__): + with self.assertRaises(asyncpg.InterfaceError): + _await(self._connection.execute("SELECT 42;", 1, 2, 3)) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + self.assertEqual( + spans[0].attributes, + { + "db.type": "sql", + "db.instance": POSTGRES_DB_NAME, + "db.user": POSTGRES_USER, + "db.statement.parameters": "(1, 2, 3)", + "db.statement": "SELECT 42;", + }, + ) diff --git a/tests/opentelemetry-docker-tests/tests/celery/conftest.py b/tests/opentelemetry-docker-tests/tests/celery/conftest.py new file mode 100644 index 000000000..085fe3bab --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/celery/conftest.py @@ -0,0 +1,92 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from functools import wraps + +import pytest + +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.sdk.trace import TracerProvider, export +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) + +REDIS_HOST = os.getenv("REDIS_HOST", "localhost") +REDIS_PORT = int(os.getenv("REDIS_PORT ", "6379")) +REDIS_URL = "redis://{host}:{port}".format(host=REDIS_HOST, port=REDIS_PORT) +BROKER_URL = "{redis}/{db}".format(redis=REDIS_URL, db=0) +BACKEND_URL = "{redis}/{db}".format(redis=REDIS_URL, db=1) + + +@pytest.fixture(scope="session") +def celery_config(): + return {"broker_url": BROKER_URL, "result_backend": BACKEND_URL} + + +@pytest.fixture +def celery_worker_parameters(): + return { + # See https://github.com/celery/celery/issues/3642#issuecomment-457773294 + "perform_ping_check": False, + } + + +@pytest.fixture(autouse=True) +def patch_celery_app(celery_app, celery_worker): + """Patch task decorator on app fixture to reload worker""" + # See https://github.com/celery/celery/issues/3642 + def wrap_task(fn): + @wraps(fn) + def wrapper(*args, **kwargs): + result = fn(*args, **kwargs) + celery_worker.reload() + return result + + return wrapper + + celery_app.task = wrap_task(celery_app.task) + + +@pytest.fixture(autouse=True) +def instrument(tracer_provider, memory_exporter): + CeleryInstrumentor().instrument(tracer_provider=tracer_provider) + memory_exporter.clear() + + yield + + CeleryInstrumentor().uninstrument() + + +@pytest.fixture(scope="session") +def tracer_provider(memory_exporter): + original_tracer_provider = trace_api.get_tracer_provider() + + tracer_provider = TracerProvider() + + span_processor = export.SimpleExportSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + + trace_api.set_tracer_provider(tracer_provider) + + yield tracer_provider + + trace_api.set_tracer_provider(original_tracer_provider) + + +@pytest.fixture(scope="session") +def memory_exporter(): + memory_exporter = InMemorySpanExporter() + return memory_exporter diff --git a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py new file mode 100644 index 000000000..2714c8ee0 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py @@ -0,0 +1,532 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import time + +import celery +import pytest +from celery import signals +from celery.exceptions import Retry + +import opentelemetry.instrumentation.celery +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.sdk import resources +from opentelemetry.sdk.trace import TracerProvider, export +from opentelemetry.trace.status import StatusCanonicalCode + +# set a high timeout for async executions due to issues in CI +ASYNC_GET_TIMEOUT = 120 + + +class MyException(Exception): + pass + + +@pytest.mark.skip(reason="inconsistent test results") +def test_instrumentation_info(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + result = fn_task.apply_async() + assert result.get(timeout=ASYNC_GET_TIMEOUT) == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + + async_span, run_span = spans + + assert ( + async_span.instrumentation_info.name + == opentelemetry.instrumentation.celery.__name__ + ) + assert ( + async_span.instrumentation_info.version + == opentelemetry.instrumentation.celery.__version__ + ) + assert ( + run_span.instrumentation_info.name + == opentelemetry.instrumentation.celery.__name__ + ) + assert ( + run_span.instrumentation_info.version + == opentelemetry.instrumentation.celery.__version__ + ) + + +def test_fn_task_run(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + t = fn_task.run() + assert t == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 0 + + +def test_fn_task_call(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + t = fn_task() + assert t == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 0 + + +def test_fn_task_apply(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + t = fn_task.apply() + assert t.successful() is True + assert t.result == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.fn_task" + assert span.attributes.get("messaging.message_id") == t.task_id + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + + +def test_fn_task_apply_bind(celery_app, memory_exporter): + @celery_app.task(bind=True) + def fn_task(self): + return self + + t = fn_task.apply() + assert t.successful() is True + assert "fn_task" in t.result.name + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.fn_task" + assert span.attributes.get("messaging.message_id") == t.task_id + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + + +@pytest.mark.skip(reason="inconsistent test results") +def test_fn_task_apply_async(celery_app, memory_exporter): + @celery_app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + result = fn_task_parameters.apply_async( + args=["user"], kwargs={"force_logout": True} + ) + assert result.get(timeout=ASYNC_GET_TIMEOUT) == ["user", True] + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + + async_span, run_span = spans + + assert run_span.context.trace_id != async_span.context.trace_id + + assert async_span.status.is_ok is True + assert async_span.name == "test_celery_functional.fn_task_parameters" + assert async_span.attributes.get("celery.action") == "apply_async" + assert async_span.attributes.get("messaging.message_id") == result.task_id + assert ( + async_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) + + assert run_span.status.is_ok is True + assert run_span.name == "test_celery_functional.fn_task_parameters" + assert run_span.attributes.get("celery.action") == "run" + assert run_span.attributes.get("celery.state") == "SUCCESS" + assert run_span.attributes.get("messaging.message_id") == result.task_id + assert ( + run_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) + + +@pytest.mark.skip(reason="inconsistent test results") +def test_concurrent_delays(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + results = [fn_task.delay() for _ in range(100)] + + for result in results: + assert result.get(timeout=ASYNC_GET_TIMEOUT) == 42 + + spans = memory_exporter.get_finished_spans() + + assert len(spans) == 200 + + +@pytest.mark.skip(reason="inconsistent test results") +def test_fn_task_delay(celery_app, memory_exporter): + @celery_app.task + def fn_task_parameters(user, force_logout=False): + return (user, force_logout) + + result = fn_task_parameters.delay("user", force_logout=True) + assert result.get(timeout=ASYNC_GET_TIMEOUT) == ["user", True] + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + + async_span, run_span = spans + + assert run_span.context.trace_id != async_span.context.trace_id + + assert async_span.status.is_ok is True + assert async_span.name == "test_celery_functional.fn_task_parameters" + assert async_span.attributes.get("celery.action") == "apply_async" + assert async_span.attributes.get("messaging.message_id") == result.task_id + assert ( + async_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) + + assert run_span.status.is_ok is True + assert run_span.name == "test_celery_functional.fn_task_parameters" + assert run_span.attributes.get("celery.action") == "run" + assert run_span.attributes.get("celery.state") == "SUCCESS" + assert run_span.attributes.get("messaging.message_id") == result.task_id + assert ( + run_span.attributes.get("celery.task_name") + == "test_celery_functional.fn_task_parameters" + ) + + +def test_fn_exception(celery_app, memory_exporter): + @celery_app.task + def fn_exception(): + raise Exception("Task class is failing") + + result = fn_exception.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is False + assert span.name == "test_celery_functional.fn_exception" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_exception" + ) + assert span.status.canonical_code == StatusCanonicalCode.UNKNOWN + assert span.attributes.get("messaging.message_id") == result.task_id + assert "Task class is failing" in span.status.description + + +def test_fn_exception_expected(celery_app, memory_exporter): + @celery_app.task(throws=(MyException,)) + def fn_exception(): + raise MyException("Task class is failing") + + result = fn_exception.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.status.canonical_code == StatusCanonicalCode.OK + assert span.name == "test_celery_functional.fn_exception" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_exception" + ) + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_fn_retry_exception(celery_app, memory_exporter): + @celery_app.task + def fn_exception(): + raise Retry("Task class is being retried") + + result = fn_exception.apply() + + assert result.failed() is False + assert "Task class is being retried" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.status.canonical_code == StatusCanonicalCode.OK + assert span.name == "test_celery_functional.fn_exception" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "RETRY" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.fn_exception" + ) + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_class_task(celery_app, memory_exporter): + class BaseTask(celery_app.Task): + def run(self): + return 42 + + task = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(celery_app, "register_task", None) + if register_task is not None: + register_task(task) + + result = task.apply() + + assert result.successful() is True + assert result.result == 42 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.BaseTask" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.BaseTask" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_class_task_exception(celery_app, memory_exporter): + class BaseTask(celery_app.Task): + def run(self): + raise Exception("Task class is failing") + + task = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(celery_app, "register_task", None) + if register_task is not None: + register_task(task) + + result = task.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is False + assert span.name == "test_celery_functional.BaseTask" + assert ( + span.attributes.get("celery.task_name") + == "test_celery_functional.BaseTask" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert span.status.canonical_code == StatusCanonicalCode.UNKNOWN + assert span.attributes.get("messaging.message_id") == result.task_id + assert "Task class is failing" in span.status.description + + +def test_class_task_exception_excepted(celery_app, memory_exporter): + class BaseTask(celery_app.Task): + throws = (MyException,) + + def run(self): + raise MyException("Task class is failing") + + task = BaseTask() + # register the Task class if it's available (required in Celery 4.0+) + register_task = getattr(celery_app, "register_task", None) + if register_task is not None: + register_task(task) + + result = task.apply() + + assert result.failed() is True + assert "Task class is failing" in result.traceback + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.status.canonical_code == StatusCanonicalCode.OK + assert span.name == "test_celery_functional.BaseTask" + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "FAILURE" + assert span.attributes.get("messaging.message_id") == result.task_id + + +def test_shared_task(celery_app, memory_exporter): + """Ensure Django Shared Task are supported""" + + @celery.shared_task + def add(x, y): + return x + y + + result = add.apply([2, 2]) + assert result.result == 4 + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + + assert span.status.is_ok is True + assert span.name == "test_celery_functional.add" + assert ( + span.attributes.get("celery.task_name") == "test_celery_functional.add" + ) + assert span.attributes.get("celery.action") == "run" + assert span.attributes.get("celery.state") == "SUCCESS" + assert span.attributes.get("messaging.message_id") == result.task_id + + +@pytest.mark.skip(reason="inconsistent test results") +def test_apply_async_previous_style_tasks( + celery_app, celery_worker, memory_exporter +): + """Ensures apply_async is properly patched if Celery 1.0 style tasks are + used even in newer versions. This should extend support to previous versions + of Celery.""" + + class CelerySuperClass(celery.task.Task): + abstract = True + + @classmethod + def apply_async(cls, args=None, kwargs=None, **kwargs_): + return super(CelerySuperClass, cls).apply_async( + args=args, kwargs=kwargs, **kwargs_ + ) + + def run(self, *args, **kwargs): + if "stop" in kwargs: + # avoid call loop + return + CelerySubClass.apply_async(args=[], kwargs={"stop": True}).get( + timeout=ASYNC_GET_TIMEOUT + ) + + class CelerySubClass(CelerySuperClass): + pass + + celery_worker.reload() + + task = CelerySubClass() + result = task.apply() + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 3 + + async_span, async_run_span, run_span = spans + + assert run_span.status.is_ok is True + assert run_span.name == "test_celery_functional.CelerySubClass" + assert ( + run_span.attributes.get("celery.task_name") + == "test_celery_functional.CelerySubClass" + ) + assert run_span.attributes.get("celery.action") == "run" + assert run_span.attributes.get("celery.state") == "SUCCESS" + assert run_span.attributes.get("messaging.message_id") == result.task_id + + assert async_run_span.status.is_ok is True + assert async_run_span.name == "test_celery_functional.CelerySubClass" + assert ( + async_run_span.attributes.get("celery.task_name") + == "test_celery_functional.CelerySubClass" + ) + assert async_run_span.attributes.get("celery.action") == "run" + assert async_run_span.attributes.get("celery.state") == "SUCCESS" + assert ( + async_run_span.attributes.get("messaging.message_id") != result.task_id + ) + + assert async_span.status.is_ok is True + assert async_span.name == "test_celery_functional.CelerySubClass" + assert ( + async_span.attributes.get("celery.task_name") + == "test_celery_functional.CelerySubClass" + ) + assert async_span.attributes.get("celery.action") == "apply_async" + assert async_span.attributes.get("messaging.message_id") != result.task_id + assert async_span.attributes.get( + "messaging.message_id" + ) == async_run_span.attributes.get("messaging.message_id") + + +def test_custom_tracer_provider(celery_app, memory_exporter): + @celery_app.task + def fn_task(): + return 42 + + resource = resources.Resource.create({}) + tracer_provider = TracerProvider(resource=resource) + span_processor = export.SimpleExportSpanProcessor(memory_exporter) + tracer_provider.add_span_processor(span_processor) + + trace_api.set_tracer_provider(tracer_provider) + + CeleryInstrumentor().uninstrument() + CeleryInstrumentor().instrument(tracer_provider=tracer_provider) + + fn_task.delay() + + spans_list = memory_exporter.get_finished_spans() + assert len(spans_list) == 1 + + span = spans_list[0] + assert span.resource == resource diff --git a/tests/opentelemetry-docker-tests/tests/check_availability.py b/tests/opentelemetry-docker-tests/tests/check_availability.py new file mode 100644 index 000000000..91b8e5539 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/check_availability.py @@ -0,0 +1,115 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import os +import time + +import mysql.connector +import psycopg2 +import pymongo +import redis + +MONGODB_COLLECTION_NAME = "test" +MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME", "opentelemetry-tests") +MONGODB_HOST = os.getenv("MONGODB_HOST", "localhost") +MONGODB_PORT = int(os.getenv("MONGODB_PORT", "27017")) +MYSQL_DB_NAME = os.getenv("MYSQL_DB_NAME ", "opentelemetry-tests") +MYSQL_HOST = os.getenv("MYSQL_HOST ", "localhost") +MYSQL_PORT = int(os.getenv("MYSQL_PORT ", "3306")) +MYSQL_USER = os.getenv("MYSQL_USER ", "testuser") +MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD ", "testpassword") +POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "opentelemetry-tests") +POSTGRES_HOST = os.getenv("POSTGRESQL_HOST", "localhost") +POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST", "testpassword") +POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT", "5432")) +POSTGRES_USER = os.getenv("POSTGRESQL_HOST", "testuser") +REDIS_HOST = os.getenv("REDIS_HOST", "localhost") +REDIS_PORT = int(os.getenv("REDIS_PORT ", "6379")) +RETRY_COUNT = 5 +RETRY_INTERVAL = 5 # Seconds + +logger = logging.getLogger(__name__) + + +def retryable(func): + def wrapper(): + # Try to connect to DB + for i in range(RETRY_COUNT): + try: + func() + return + except Exception as ex: # pylint: disable=broad-except + logger.error( + "waiting for %s, retry %d/%d [%s]", + func.__name__, + i + 1, + RETRY_COUNT, + ex, + ) + time.sleep(RETRY_INTERVAL) + raise Exception("waiting for {} failed".format(func.__name__)) + + return wrapper + + +@retryable +def check_pymongo_connection(): + client = pymongo.MongoClient( + MONGODB_HOST, MONGODB_PORT, serverSelectionTimeoutMS=2000 + ) + db = client[MONGODB_DB_NAME] + collection = db[MONGODB_COLLECTION_NAME] + collection.find_one() + client.close() + + +@retryable +def check_mysql_connection(): + connection = mysql.connector.connect( + user=MYSQL_USER, + password=MYSQL_PASSWORD, + host=MYSQL_HOST, + port=MYSQL_PORT, + database=MYSQL_DB_NAME, + ) + connection.close() + + +@retryable +def check_postgres_connection(): + connection = psycopg2.connect( + dbname=POSTGRES_DB_NAME, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST, + port=POSTGRES_PORT, + ) + connection.close() + + +@retryable +def check_redis_connection(): + connection = redis.Redis(host=REDIS_HOST, port=REDIS_PORT) + connection.hgetall("*") + + +def check_docker_services_availability(): + # Check if Docker services accept connections + check_pymongo_connection() + check_mysql_connection() + check_postgres_connection() + check_redis_connection() + + +check_docker_services_availability() diff --git a/tests/opentelemetry-docker-tests/tests/docker-compose.yml b/tests/opentelemetry-docker-tests/tests/docker-compose.yml new file mode 100644 index 000000000..bbb005a02 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/docker-compose.yml @@ -0,0 +1,47 @@ +version: '3' + +services: + otmongo: + ports: + - "27017:27017" + image: mongo:latest + otmysql: + ports: + - "3306:3306" + image: mysql:latest + restart: always + environment: + MYSQL_USER: testuser + MYSQL_PASSWORD: testpassword + MYSQL_ALLOW_EMPTY_PASSWORD: "yes" + MYSQL_DATABASE: opentelemetry-tests + otpostgres: + image: postgres + ports: + - "5432:5432" + environment: + POSTGRES_USER: testuser + POSTGRES_PASSWORD: testpassword + POSTGRES_DB: opentelemetry-tests + otredis: + image: redis:4.0-alpine + ports: + - "127.0.0.1:6379:6379" + otjaeger: + image: jaegertracing/all-in-one:1.8 + environment: + COLLECTOR_ZIPKIN_HTTP_PORT: "9411" + ports: + - "5775:5775/udp" + - "6831:6831/udp" + - "6832:6832/udp" + - "5778:5778" + - "16686:16686" + - "14268:14268" + - "9411:9411" + otopencensus: + image: omnition/opencensus-collector:0.1.11 + command: --logging-exporter DEBUG + ports: + - "8888:8888" + - "55678:55678" diff --git a/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py b/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py new file mode 100644 index 000000000..f2b07293b --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/mysql/test_mysql_functional.py @@ -0,0 +1,98 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time + +import mysql.connector + +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.mysql import MySQLInstrumentor +from opentelemetry.test.test_base import TestBase + +MYSQL_USER = os.getenv("MYSQL_USER ", "testuser") +MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD ", "testpassword") +MYSQL_HOST = os.getenv("MYSQL_HOST ", "localhost") +MYSQL_PORT = int(os.getenv("MYSQL_PORT ", "3306")) +MYSQL_DB_NAME = os.getenv("MYSQL_DB_NAME ", "opentelemetry-tests") + + +class TestFunctionalMysql(TestBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._connection = None + cls._cursor = None + cls._tracer = cls.tracer_provider.get_tracer(__name__) + MySQLInstrumentor().instrument() + cls._connection = mysql.connector.connect( + user=MYSQL_USER, + password=MYSQL_PASSWORD, + host=MYSQL_HOST, + port=MYSQL_PORT, + database=MYSQL_DB_NAME, + ) + cls._cursor = cls._connection.cursor() + + @classmethod + def tearDownClass(cls): + if cls._connection: + cls._connection.close() + MySQLInstrumentor().uninstrument() + + def validate_spans(self): + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + for span in spans: + if span.name == "rootSpan": + root_span = span + else: + db_span = span + self.assertIsInstance(span.start_time, int) + self.assertIsInstance(span.end_time, int) + self.assertIsNotNone(root_span) + self.assertIsNotNone(db_span) + self.assertEqual(root_span.name, "rootSpan") + self.assertEqual(db_span.name, "mysql.opentelemetry-tests") + self.assertIsNotNone(db_span.parent) + self.assertIs(db_span.parent, root_span.get_context()) + self.assertIs(db_span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual(db_span.attributes["db.instance"], MYSQL_DB_NAME) + self.assertEqual(db_span.attributes["net.peer.name"], MYSQL_HOST) + self.assertEqual(db_span.attributes["net.peer.port"], MYSQL_PORT) + + def test_execute(self): + """Should create a child span for execute + """ + with self._tracer.start_as_current_span("rootSpan"): + self._cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") + self.validate_spans() + + def test_executemany(self): + """Should create a child span for executemany + """ + with self._tracer.start_as_current_span("rootSpan"): + data = (("1",), ("2",), ("3",)) + stmt = "INSERT INTO test (id) VALUES (%s)" + self._cursor.executemany(stmt, data) + self.validate_spans() + + def test_callproc(self): + """Should create a child span for callproc + """ + with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( + Exception + ): + self._cursor.callproc("test", ()) + self.validate_spans() diff --git a/tests/opentelemetry-docker-tests/tests/opencensus/test_opencensusexporter_functional.py b/tests/opentelemetry-docker-tests/tests/opencensus/test_opencensusexporter_functional.py new file mode 100644 index 000000000..c79489178 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/opencensus/test_opencensusexporter_functional.py @@ -0,0 +1,60 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from opentelemetry import trace +from opentelemetry.context import attach, detach, set_value +from opentelemetry.exporter.opencensus.trace_exporter import ( + OpenCensusSpanExporter, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleExportSpanProcessor +from opentelemetry.test.test_base import TestBase + + +class ExportStatusSpanProcessor(SimpleExportSpanProcessor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.export_status = [] + + def on_end(self, span): + token = attach(set_value("suppress_instrumentation", True)) + self.export_status.append(self.span_exporter.export((span,))) + detach(token) + + +class TestOpenCensusSpanExporter(TestBase): + def setUp(self): + super().setUp() + + trace.set_tracer_provider(TracerProvider()) + self.tracer = trace.get_tracer(__name__) + self.span_processor = ExportStatusSpanProcessor( + OpenCensusSpanExporter( + service_name="basic-service", endpoint="localhost:55678" + ) + ) + + trace.get_tracer_provider().add_span_processor(self.span_processor) + + def test_export(self): + with self.tracer.start_as_current_span("foo"): + with self.tracer.start_as_current_span("bar"): + with self.tracer.start_as_current_span("baz"): + pass + + self.assertTrue(len(self.span_processor.export_status), 3) + + for export_status in self.span_processor.export_status: + self.assertEqual(export_status.name, "SUCCESS") + self.assertEqual(export_status.value, 0) diff --git a/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py b/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py new file mode 100644 index 000000000..1762da1d0 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/postgres/test_aiopg_functional.py @@ -0,0 +1,200 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +import os +import time + +import aiopg +import psycopg2 +import pytest + +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.aiopg import AiopgInstrumentor +from opentelemetry.test.test_base import TestBase + +POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost") +POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432")) +POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests") +POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword") +POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser") + + +def async_call(coro): + loop = asyncio.get_event_loop() + return loop.run_until_complete(coro) + + +class TestFunctionalAiopgConnect(TestBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._connection = None + cls._cursor = None + cls._tracer = cls.tracer_provider.get_tracer(__name__) + AiopgInstrumentor().instrument(tracer_provider=cls.tracer_provider) + cls._connection = async_call( + aiopg.connect( + dbname=POSTGRES_DB_NAME, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST, + port=POSTGRES_PORT, + ) + ) + cls._cursor = async_call(cls._connection.cursor()) + + @classmethod + def tearDownClass(cls): + if cls._cursor: + cls._cursor.close() + if cls._connection: + cls._connection.close() + AiopgInstrumentor().uninstrument() + + def validate_spans(self): + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + for span in spans: + if span.name == "rootSpan": + root_span = span + else: + child_span = span + self.assertIsInstance(span.start_time, int) + self.assertIsInstance(span.end_time, int) + self.assertIsNotNone(root_span) + self.assertIsNotNone(child_span) + self.assertEqual(root_span.name, "rootSpan") + self.assertEqual(child_span.name, "postgresql.opentelemetry-tests") + self.assertIsNotNone(child_span.parent) + self.assertIs(child_span.parent, root_span.get_context()) + self.assertIs(child_span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual( + child_span.attributes["db.instance"], POSTGRES_DB_NAME + ) + self.assertEqual(child_span.attributes["net.peer.name"], POSTGRES_HOST) + self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT) + + def test_execute(self): + """Should create a child span for execute method + """ + with self._tracer.start_as_current_span("rootSpan"): + async_call( + self._cursor.execute( + "CREATE TABLE IF NOT EXISTS test (id integer)" + ) + ) + self.validate_spans() + + def test_executemany(self): + """Should create a child span for executemany + """ + with pytest.raises(psycopg2.ProgrammingError): + with self._tracer.start_as_current_span("rootSpan"): + data = (("1",), ("2",), ("3",)) + stmt = "INSERT INTO test (id) VALUES (%s)" + async_call(self._cursor.executemany(stmt, data)) + self.validate_spans() + + def test_callproc(self): + """Should create a child span for callproc + """ + with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( + Exception + ): + async_call(self._cursor.callproc("test", ())) + self.validate_spans() + + +class TestFunctionalAiopgCreatePool(TestBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._connection = None + cls._cursor = None + cls._tracer = cls.tracer_provider.get_tracer(__name__) + AiopgInstrumentor().instrument(tracer_provider=cls.tracer_provider) + cls._pool = async_call( + aiopg.create_pool( + dbname=POSTGRES_DB_NAME, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST, + port=POSTGRES_PORT, + ) + ) + cls._connection = async_call(cls._pool.acquire()) + cls._cursor = async_call(cls._connection.cursor()) + + @classmethod + def tearDownClass(cls): + if cls._cursor: + cls._cursor.close() + if cls._connection: + cls._connection.close() + if cls._pool: + cls._pool.close() + AiopgInstrumentor().uninstrument() + + def validate_spans(self): + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + for span in spans: + if span.name == "rootSpan": + root_span = span + else: + child_span = span + self.assertIsInstance(span.start_time, int) + self.assertIsInstance(span.end_time, int) + self.assertIsNotNone(root_span) + self.assertIsNotNone(child_span) + self.assertEqual(root_span.name, "rootSpan") + self.assertEqual(child_span.name, "postgresql.opentelemetry-tests") + self.assertIsNotNone(child_span.parent) + self.assertIs(child_span.parent, root_span.get_context()) + self.assertIs(child_span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual( + child_span.attributes["db.instance"], POSTGRES_DB_NAME + ) + self.assertEqual(child_span.attributes["net.peer.name"], POSTGRES_HOST) + self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT) + + def test_execute(self): + """Should create a child span for execute method + """ + with self._tracer.start_as_current_span("rootSpan"): + async_call( + self._cursor.execute( + "CREATE TABLE IF NOT EXISTS test (id integer)" + ) + ) + self.validate_spans() + + def test_executemany(self): + """Should create a child span for executemany + """ + with pytest.raises(psycopg2.ProgrammingError): + with self._tracer.start_as_current_span("rootSpan"): + data = (("1",), ("2",), ("3",)) + stmt = "INSERT INTO test (id) VALUES (%s)" + async_call(self._cursor.executemany(stmt, data)) + self.validate_spans() + + def test_callproc(self): + """Should create a child span for callproc + """ + with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( + Exception + ): + async_call(self._cursor.callproc("test", ())) + self.validate_spans() diff --git a/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py b/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py new file mode 100644 index 000000000..a8e07ddb2 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/postgres/test_psycopg_functional.py @@ -0,0 +1,105 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time + +import psycopg2 + +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor +from opentelemetry.test.test_base import TestBase + +POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost") +POSTGRES_PORT = int(os.getenv("POSTGRESQL_PORT ", "5432")) +POSTGRES_DB_NAME = os.getenv("POSTGRESQL_DB_NAME ", "opentelemetry-tests") +POSTGRES_PASSWORD = os.getenv("POSTGRESQL_HOST ", "testpassword") +POSTGRES_USER = os.getenv("POSTGRESQL_HOST ", "testuser") + + +class TestFunctionalPsycopg(TestBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._connection = None + cls._cursor = None + cls._tracer = cls.tracer_provider.get_tracer(__name__) + Psycopg2Instrumentor().instrument(tracer_provider=cls.tracer_provider) + cls._connection = psycopg2.connect( + dbname=POSTGRES_DB_NAME, + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + host=POSTGRES_HOST, + port=POSTGRES_PORT, + ) + cls._connection.set_session(autocommit=True) + cls._cursor = cls._connection.cursor() + + @classmethod + def tearDownClass(cls): + if cls._cursor: + cls._cursor.close() + if cls._connection: + cls._connection.close() + Psycopg2Instrumentor().uninstrument() + + def validate_spans(self): + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + for span in spans: + if span.name == "rootSpan": + root_span = span + else: + child_span = span + self.assertIsInstance(span.start_time, int) + self.assertIsInstance(span.end_time, int) + self.assertIsNotNone(root_span) + self.assertIsNotNone(child_span) + self.assertEqual(root_span.name, "rootSpan") + self.assertEqual(child_span.name, "postgresql.opentelemetry-tests") + self.assertIsNotNone(child_span.parent) + self.assertIs(child_span.parent, root_span.get_context()) + self.assertIs(child_span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual( + child_span.attributes["db.instance"], POSTGRES_DB_NAME + ) + self.assertEqual(child_span.attributes["net.peer.name"], POSTGRES_HOST) + self.assertEqual(child_span.attributes["net.peer.port"], POSTGRES_PORT) + + def test_execute(self): + """Should create a child span for execute method + """ + with self._tracer.start_as_current_span("rootSpan"): + self._cursor.execute( + "CREATE TABLE IF NOT EXISTS test (id integer)" + ) + self.validate_spans() + + def test_executemany(self): + """Should create a child span for executemany + """ + with self._tracer.start_as_current_span("rootSpan"): + data = (("1",), ("2",), ("3",)) + stmt = "INSERT INTO test (id) VALUES (%s)" + self._cursor.executemany(stmt, data) + self.validate_spans() + + def test_callproc(self): + """Should create a child span for callproc + """ + with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( + Exception + ): + self._cursor.callproc("test", ()) + self.validate_spans() diff --git a/tests/opentelemetry-docker-tests/tests/pymongo/test_pymongo_functional.py b/tests/opentelemetry-docker-tests/tests/pymongo/test_pymongo_functional.py new file mode 100644 index 000000000..8c52ad065 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/pymongo/test_pymongo_functional.py @@ -0,0 +1,116 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from pymongo import MongoClient + +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.pymongo import PymongoInstrumentor +from opentelemetry.test.test_base import TestBase + +MONGODB_HOST = os.getenv("MONGODB_HOST ", "localhost") +MONGODB_PORT = int(os.getenv("MONGODB_PORT ", "27017")) +MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME ", "opentelemetry-tests") +MONGODB_COLLECTION_NAME = "test" + + +class TestFunctionalPymongo(TestBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._tracer = cls.tracer_provider.get_tracer(__name__) + PymongoInstrumentor().instrument() + client = MongoClient( + MONGODB_HOST, MONGODB_PORT, serverSelectionTimeoutMS=2000 + ) + db = client[MONGODB_DB_NAME] + cls._collection = db[MONGODB_COLLECTION_NAME] + + def validate_spans(self): + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + for span in spans: + if span.name == "rootSpan": + root_span = span + else: + pymongo_span = span + self.assertIsInstance(span.start_time, int) + self.assertIsInstance(span.end_time, int) + self.assertIsNot(root_span, None) + self.assertIsNot(pymongo_span, None) + self.assertIsNotNone(pymongo_span.parent) + self.assertIs(pymongo_span.parent, root_span.get_context()) + self.assertIs(pymongo_span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual( + pymongo_span.attributes["db.instance"], MONGODB_DB_NAME + ) + self.assertEqual( + pymongo_span.attributes["net.peer.name"], MONGODB_HOST + ) + self.assertEqual( + pymongo_span.attributes["net.peer.port"], MONGODB_PORT + ) + + def test_insert(self): + """Should create a child span for insert + """ + with self._tracer.start_as_current_span("rootSpan"): + self._collection.insert_one( + {"name": "testName", "value": "testValue"} + ) + self.validate_spans() + + def test_update(self): + """Should create a child span for update + """ + with self._tracer.start_as_current_span("rootSpan"): + self._collection.update_one( + {"name": "testName"}, {"$set": {"value": "someOtherValue"}} + ) + self.validate_spans() + + def test_find(self): + """Should create a child span for find + """ + with self._tracer.start_as_current_span("rootSpan"): + self._collection.find_one() + self.validate_spans() + + def test_delete(self): + """Should create a child span for delete + """ + with self._tracer.start_as_current_span("rootSpan"): + self._collection.delete_one({"name": "testName"}) + self.validate_spans() + + def test_uninstrument(self): + # check that integration is working + self._collection.find_one() + spans = self.memory_exporter.get_finished_spans() + self.memory_exporter.clear() + self.assertEqual(len(spans), 1) + + # uninstrument and check not new spans are created + PymongoInstrumentor().uninstrument() + self._collection.find_one() + spans = self.memory_exporter.get_finished_spans() + self.memory_exporter.clear() + self.assertEqual(len(spans), 0) + + # re-enable and check that it works again + PymongoInstrumentor().instrument() + self._collection.find_one() + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) diff --git a/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py b/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py new file mode 100644 index 000000000..1636f85fb --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/pymysql/test_pymysql_functional.py @@ -0,0 +1,97 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import pymysql as pymy + +from opentelemetry import trace as trace_api +from opentelemetry.instrumentation.pymysql import PyMySQLInstrumentor +from opentelemetry.test.test_base import TestBase + +MYSQL_USER = os.getenv("MYSQL_USER ", "testuser") +MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD ", "testpassword") +MYSQL_HOST = os.getenv("MYSQL_HOST ", "localhost") +MYSQL_PORT = int(os.getenv("MYSQL_PORT ", "3306")) +MYSQL_DB_NAME = os.getenv("MYSQL_DB_NAME ", "opentelemetry-tests") + + +class TestFunctionalPyMysql(TestBase): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._connection = None + cls._cursor = None + cls._tracer = cls.tracer_provider.get_tracer(__name__) + PyMySQLInstrumentor().instrument() + cls._connection = pymy.connect( + user=MYSQL_USER, + password=MYSQL_PASSWORD, + host=MYSQL_HOST, + port=MYSQL_PORT, + database=MYSQL_DB_NAME, + ) + cls._cursor = cls._connection.cursor() + + @classmethod + def tearDownClass(cls): + if cls._connection: + cls._connection.close() + PyMySQLInstrumentor().uninstrument() + + def validate_spans(self): + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + for span in spans: + if span.name == "rootSpan": + root_span = span + else: + db_span = span + self.assertIsInstance(span.start_time, int) + self.assertIsInstance(span.end_time, int) + self.assertIsNotNone(root_span) + self.assertIsNotNone(db_span) + self.assertEqual(root_span.name, "rootSpan") + self.assertEqual(db_span.name, "mysql.opentelemetry-tests") + self.assertIsNotNone(db_span.parent) + self.assertIs(db_span.parent, root_span.get_context()) + self.assertIs(db_span.kind, trace_api.SpanKind.CLIENT) + self.assertEqual(db_span.attributes["db.instance"], MYSQL_DB_NAME) + self.assertEqual(db_span.attributes["net.peer.name"], MYSQL_HOST) + self.assertEqual(db_span.attributes["net.peer.port"], MYSQL_PORT) + + def test_execute(self): + """Should create a child span for execute + """ + with self._tracer.start_as_current_span("rootSpan"): + self._cursor.execute("CREATE TABLE IF NOT EXISTS test (id INT)") + self.validate_spans() + + def test_executemany(self): + """Should create a child span for executemany + """ + with self._tracer.start_as_current_span("rootSpan"): + data = (("1",), ("2",), ("3",)) + stmt = "INSERT INTO test (id) VALUES (%s)" + self._cursor.executemany(stmt, data) + self.validate_spans() + + def test_callproc(self): + """Should create a child span for callproc + """ + with self._tracer.start_as_current_span("rootSpan"), self.assertRaises( + Exception + ): + self._cursor.callproc("test", ()) + self.validate_spans() diff --git a/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py b/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py new file mode 100644 index 000000000..7e6ea2e04 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py @@ -0,0 +1,120 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import redis + +from opentelemetry import trace +from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.test.test_base import TestBase + + +class TestRedisInstrument(TestBase): + + test_service = "redis" + + def setUp(self): + super().setUp() + self.redis_client = redis.Redis(port=6379) + self.redis_client.flushall() + RedisInstrumentor().instrument(tracer_provider=self.tracer_provider) + + def tearDown(self): + super().tearDown() + RedisInstrumentor().uninstrument() + + def _check_span(self, span): + self.assertEqual(span.attributes["service"], self.test_service) + self.assertEqual(span.name, "redis.command") + self.assertIs( + span.status.canonical_code, trace.status.StatusCanonicalCode.OK + ) + self.assertEqual(span.attributes.get("db.instance"), 0) + self.assertEqual( + span.attributes.get("db.url"), "redis://localhost:6379" + ) + + def test_long_command(self): + self.redis_client.mget(*range(1000)) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self._check_span(span) + self.assertTrue( + span.attributes.get("db.statement").startswith("MGET 0 1 2 3") + ) + self.assertTrue(span.attributes.get("db.statement").endswith("...")) + + def test_basics(self): + self.assertIsNone(self.redis_client.get("cheese")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self._check_span(span) + self.assertEqual(span.attributes.get("db.statement"), "GET cheese") + self.assertEqual(span.attributes.get("redis.args_length"), 2) + + def test_pipeline_traced(self): + with self.redis_client.pipeline(transaction=False) as pipeline: + pipeline.set("blah", 32) + pipeline.rpush("foo", "éé") + pipeline.hgetall("xxx") + pipeline.execute() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self._check_span(span) + self.assertEqual( + span.attributes.get("db.statement"), + "SET blah 32\nRPUSH foo éé\nHGETALL xxx", + ) + self.assertEqual(span.attributes.get("redis.pipeline_length"), 3) + + def test_pipeline_immediate(self): + with self.redis_client.pipeline() as pipeline: + pipeline.set("a", 1) + pipeline.immediate_execute_command("SET", "b", 2) + pipeline.execute() + + spans = self.memory_exporter.get_finished_spans() + # expecting two separate spans here, rather than a + # single span for the whole pipeline + self.assertEqual(len(spans), 2) + span = spans[0] + self._check_span(span) + self.assertEqual(span.attributes.get("db.statement"), "SET b 2") + + def test_parent(self): + """Ensure OpenTelemetry works with redis.""" + ot_tracer = trace.get_tracer("redis_svc") + + with ot_tracer.start_as_current_span("redis_get"): + self.assertIsNone(self.redis_client.get("cheese")) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + child_span, parent_span = spans[0], spans[1] + + # confirm the parenting + self.assertIsNone(parent_span.parent) + self.assertIs(child_span.parent, parent_span.get_context()) + + self.assertEqual(parent_span.name, "redis_get") + self.assertEqual(parent_span.instrumentation_info.name, "redis_svc") + + self.assertEqual( + child_span.attributes.get("service"), self.test_service + ) + self.assertEqual(child_span.name, "redis.command") diff --git a/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/__init__.py b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/__init__.py new file mode 100644 index 000000000..b0a6f4284 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/mixins.py b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/mixins.py new file mode 100644 index 000000000..a438f58eb --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/mixins.py @@ -0,0 +1,184 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib + +from sqlalchemy import Column, Integer, String, create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker + +from opentelemetry import trace +from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor +from opentelemetry.instrumentation.sqlalchemy.engine import _DB, _ROWS, _STMT +from opentelemetry.test.test_base import TestBase + +Base = declarative_base() + + +def _create_engine(engine_args): + # create a SQLAlchemy engine + config = dict(engine_args) + url = config.pop("url") + return create_engine(url, **config) + + +class Player(Base): + """Player entity used to test SQLAlchemy ORM""" + + __tablename__ = "players" + + id = Column(Integer, primary_key=True) + name = Column(String(20)) + + +class SQLAlchemyTestMixin(TestBase): + __test__ = False + + """SQLAlchemy test mixin that includes a complete set of tests + that must be executed for different engine. When a new test (or + a regression test) should be added to SQLAlchemy test suite, a new + entry must be appended here so that it will be executed for all + available and supported engines. If the test is specific to only + one engine, that test must be added to the specific `TestCase` + implementation. + + To support a new engine, create a new `TestCase` that inherits from + `SQLAlchemyTestMixin` and `TestCase`. Then you must define the following + static class variables: + * VENDOR: the database vendor name + * SQL_DB: the `db.type` tag that we expect (it's the name of the database available in the `.env` file) + * SERVICE: the service that we expect by default + * ENGINE_ARGS: all arguments required to create the engine + + To check specific tags in each test, you must implement the + `check_meta(self, span)` method. + """ + + VENDOR = None + SQL_DB = None + SERVICE = None + ENGINE_ARGS = None + + @contextlib.contextmanager + def connection(self): + # context manager that provides a connection + # to the underlying database + try: + conn = self.engine.connect() + yield conn + finally: + conn.close() + + def check_meta(self, span): + """function that can be implemented according to the + specific engine implementation + """ + + def setUp(self): + super().setUp() + # create an engine with the given arguments + self.engine = _create_engine(self.ENGINE_ARGS) + + # create the database / entities and prepare a session for the test + Base.metadata.drop_all(bind=self.engine) + Base.metadata.create_all(self.engine, checkfirst=False) + self.session = sessionmaker(bind=self.engine)() + # trace the engine + SQLAlchemyInstrumentor().instrument( + engine=self.engine, tracer_provider=self.tracer_provider + ) + self.memory_exporter.clear() + + def tearDown(self): + # pylint: disable=invalid-name + # clear the database and dispose the engine + self.session.close() + Base.metadata.drop_all(bind=self.engine) + self.engine.dispose() + SQLAlchemyInstrumentor().uninstrument() + super().tearDown() + + def _check_span(self, span): + self.assertEqual(span.name, "{}.query".format(self.VENDOR)) + self.assertEqual(span.attributes.get("service"), self.SERVICE) + self.assertEqual(span.attributes.get(_DB), self.SQL_DB) + self.assertIs( + span.status.canonical_code, trace.status.StatusCanonicalCode.OK + ) + self.assertGreater((span.end_time - span.start_time), 0) + + def test_orm_insert(self): + # ensures that the ORM session is traced + wayne = Player(id=1, name="wayne") + self.session.add(wayne) + self.session.commit() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self._check_span(span) + self.assertIn("INSERT INTO players", span.attributes.get(_STMT)) + self.assertEqual(span.attributes.get(_ROWS), 1) + self.check_meta(span) + + def test_session_query(self): + # ensures that the Session queries are traced + out = list(self.session.query(Player).filter_by(name="wayne")) + self.assertEqual(len(out), 0) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self._check_span(span) + self.assertIn( + "SELECT players.id AS players_id, players.name AS players_name \nFROM players \nWHERE players.name", + span.attributes.get(_STMT), + ) + self.check_meta(span) + + def test_engine_connect_execute(self): + # ensures that engine.connect() is properly traced + with self.connection() as conn: + rows = conn.execute("SELECT * FROM players").fetchall() + self.assertEqual(len(rows), 0) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self._check_span(span) + self.assertEqual(span.attributes.get(_STMT), "SELECT * FROM players") + self.check_meta(span) + + def test_parent(self): + """Ensure that sqlalchemy works with opentelemetry.""" + tracer = self.tracer_provider.get_tracer("sqlalch_svc") + + with tracer.start_as_current_span("sqlalch_op"): + with self.connection() as conn: + rows = conn.execute("SELECT * FROM players").fetchall() + self.assertEqual(len(rows), 0) + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + child_span, parent_span = spans + + # confirm the parenting + self.assertIsNone(parent_span.parent) + self.assertIs(child_span.parent, parent_span.get_context()) + + self.assertEqual(parent_span.name, "sqlalch_op") + self.assertEqual(parent_span.instrumentation_info.name, "sqlalch_svc") + + self.assertEqual(child_span.name, "{}.query".format(self.VENDOR)) + self.assertEqual(child_span.attributes.get("service"), self.SERVICE) diff --git a/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_instrument.py b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_instrument.py new file mode 100644 index 000000000..20c837c03 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_instrument.py @@ -0,0 +1,72 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +import sqlalchemy + +from opentelemetry import trace +from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor +from opentelemetry.test.test_base import TestBase + +POSTGRES_CONFIG = { + "host": "127.0.0.1", + "port": int(os.getenv("TEST_POSTGRES_PORT", "5432")), + "user": os.getenv("TEST_POSTGRES_USER", "testuser"), + "password": os.getenv("TEST_POSTGRES_PASSWORD", "testpassword"), + "dbname": os.getenv("TEST_POSTGRES_DB", "opentelemetry-tests"), +} + + +class SQLAlchemyInstrumentTestCase(TestBase): + """TestCase that checks if the engine is properly traced + when the `instrument()` method is used. + """ + + def setUp(self): + # create a traced engine with the given arguments + SQLAlchemyInstrumentor().instrument() + dsn = ( + "postgresql://%(user)s:%(password)s@%(host)s:%(port)s/%(dbname)s" + % POSTGRES_CONFIG + ) + self.engine = sqlalchemy.create_engine(dsn) + + # prepare a connection + self.conn = self.engine.connect() + super().setUp() + + def tearDown(self): + # clear the database and dispose the engine + self.conn.close() + self.engine.dispose() + SQLAlchemyInstrumentor().uninstrument() + + def test_engine_traced(self): + # ensures that the engine is traced + rows = self.conn.execute("SELECT 1").fetchall() + self.assertEqual(len(rows), 1) + + traces = self.memory_exporter.get_finished_spans() + # trace composition + self.assertEqual(len(traces), 1) + span = traces[0] + # check subset of span fields + self.assertEqual(span.name, "postgres.query") + self.assertEqual(span.attributes.get("service"), "postgres") + self.assertIs( + span.status.canonical_code, trace.status.StatusCanonicalCode.OK + ) + self.assertGreater((span.end_time - span.start_time), 0) diff --git a/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_mysql.py b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_mysql.py new file mode 100644 index 000000000..44c3501b1 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_mysql.py @@ -0,0 +1,83 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +import pytest +from sqlalchemy.exc import ProgrammingError + +from opentelemetry import trace +from opentelemetry.instrumentation.sqlalchemy.engine import ( + _DB, + _HOST, + _PORT, + _ROWS, + _STMT, +) + +from .mixins import SQLAlchemyTestMixin + +MYSQL_CONFIG = { + "host": "127.0.0.1", + "port": int(os.getenv("TEST_MYSQL_PORT", "3306")), + "user": os.getenv("TEST_MYSQL_USER", "testuser"), + "password": os.getenv("TEST_MYSQL_PASSWORD", "testpassword"), + "database": os.getenv("TEST_MYSQL_DATABASE", "opentelemetry-tests"), +} + + +class MysqlConnectorTestCase(SQLAlchemyTestMixin): + """TestCase for mysql-connector engine""" + + __test__ = True + + VENDOR = "mysql" + SQL_DB = "opentelemetry-tests" + SERVICE = "mysql" + ENGINE_ARGS = { + "url": "mysql+mysqlconnector://%(user)s:%(password)s@%(host)s:%(port)s/%(database)s" + % MYSQL_CONFIG + } + + def check_meta(self, span): + # check database connection tags + self.assertEqual(span.attributes.get(_HOST), MYSQL_CONFIG["host"]) + self.assertEqual(span.attributes.get(_PORT), MYSQL_CONFIG["port"]) + + def test_engine_execute_errors(self): + # ensures that SQL errors are reported + with pytest.raises(ProgrammingError): + with self.connection() as conn: + conn.execute("SELECT * FROM a_wrong_table").fetchall() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + # span fields + self.assertEqual(span.name, "{}.query".format(self.VENDOR)) + self.assertEqual(span.attributes.get("service"), self.SERVICE) + self.assertEqual( + span.attributes.get(_STMT), "SELECT * FROM a_wrong_table" + ) + self.assertEqual(span.attributes.get(_DB), self.SQL_DB) + self.assertIsNone(span.attributes.get(_ROWS)) + self.check_meta(span) + self.assertTrue(span.end_time - span.start_time > 0) + # check the error + self.assertIs( + span.status.canonical_code, + trace.status.StatusCanonicalCode.UNKNOWN, + ) + self.assertIn("a_wrong_table", span.status.description) diff --git a/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_postgres.py b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_postgres.py new file mode 100644 index 000000000..615a196f5 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_postgres.py @@ -0,0 +1,98 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +import psycopg2 +import pytest +from sqlalchemy.exc import ProgrammingError + +from opentelemetry import trace +from opentelemetry.instrumentation.sqlalchemy.engine import ( + _DB, + _HOST, + _PORT, + _ROWS, + _STMT, +) + +from .mixins import SQLAlchemyTestMixin + +POSTGRES_CONFIG = { + "host": "127.0.0.1", + "port": int(os.getenv("TEST_POSTGRES_PORT", "5432")), + "user": os.getenv("TEST_POSTGRES_USER", "testuser"), + "password": os.getenv("TEST_POSTGRES_PASSWORD", "testpassword"), + "dbname": os.getenv("TEST_POSTGRES_DB", "opentelemetry-tests"), +} + + +class PostgresTestCase(SQLAlchemyTestMixin): + """TestCase for Postgres Engine""" + + __test__ = True + + VENDOR = "postgres" + SQL_DB = "opentelemetry-tests" + SERVICE = "postgres" + ENGINE_ARGS = { + "url": "postgresql://%(user)s:%(password)s@%(host)s:%(port)s/%(dbname)s" + % POSTGRES_CONFIG + } + + def check_meta(self, span): + # check database connection tags + self.assertEqual(span.attributes.get(_HOST), POSTGRES_CONFIG["host"]) + self.assertEqual(span.attributes.get(_PORT), POSTGRES_CONFIG["port"]) + + def test_engine_execute_errors(self): + # ensures that SQL errors are reported + with pytest.raises(ProgrammingError): + with self.connection() as conn: + conn.execute("SELECT * FROM a_wrong_table").fetchall() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + # span fields + self.assertEqual(span.name, "{}.query".format(self.VENDOR)) + self.assertEqual(span.attributes.get("service"), self.SERVICE) + self.assertEqual( + span.attributes.get(_STMT), "SELECT * FROM a_wrong_table" + ) + self.assertEqual(span.attributes.get(_DB), self.SQL_DB) + self.assertIsNone(span.attributes.get(_ROWS)) + self.check_meta(span) + self.assertTrue(span.end_time - span.start_time > 0) + # check the error + self.assertIs( + span.status.canonical_code, + trace.status.StatusCanonicalCode.UNKNOWN, + ) + self.assertIn("a_wrong_table", span.status.description) + + +class PostgresCreatorTestCase(PostgresTestCase): + """TestCase for Postgres Engine that includes the same tests set + of `PostgresTestCase`, but it uses a specific `creator` function. + """ + + VENDOR = "postgres" + SQL_DB = "opentelemetry-tests" + SERVICE = "postgres" + ENGINE_ARGS = { + "url": "postgresql://", + "creator": lambda: psycopg2.connect(**POSTGRES_CONFIG), + } diff --git a/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_sqlite.py b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_sqlite.py new file mode 100644 index 000000000..4295fc045 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/sqlalchemy_tests/test_sqlite.py @@ -0,0 +1,61 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import pytest +from sqlalchemy.exc import OperationalError + +from opentelemetry import trace +from opentelemetry.instrumentation.sqlalchemy.engine import _DB, _ROWS, _STMT + +from .mixins import SQLAlchemyTestMixin + + +class SQLiteTestCase(SQLAlchemyTestMixin): + """TestCase for the SQLite engine""" + + __test__ = True + + VENDOR = "sqlite" + SQL_DB = ":memory:" + SERVICE = "sqlite" + ENGINE_ARGS = {"url": "sqlite:///:memory:"} + + def test_engine_execute_errors(self): + # ensures that SQL errors are reported + with pytest.raises(OperationalError): + with self.connection() as conn: + conn.execute("SELECT * FROM a_wrong_table").fetchall() + + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + # span fields + self.assertEqual(span.name, "{}.query".format(self.VENDOR)) + self.assertEqual(span.attributes.get("service"), self.SERVICE) + self.assertEqual( + span.attributes.get(_STMT), "SELECT * FROM a_wrong_table" + ) + self.assertEqual(span.attributes.get(_DB), self.SQL_DB) + self.assertIsNone(span.attributes.get(_ROWS)) + self.assertTrue((span.end_time - span.start_time) > 0) + # check the error + self.assertIs( + span.status.canonical_code, + trace.status.StatusCanonicalCode.UNKNOWN, + ) + self.assertEqual( + span.status.description, "no such table: a_wrong_table" + )