Consistent way of not instrumenting multiple times (#549)

This commit is contained in:
Leighton Chen
2021-07-09 09:55:44 -07:00
committed by GitHub
parent bf97e172f0
commit 56da6d74df
17 changed files with 238 additions and 84 deletions

View File

@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#538](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/538))
- Changed the psycopg2-binary to psycopg2 as dependency in production
([#543](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/543))
- Implement consistent way of checking if instrumentation is already active
([#549](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/549))
- Require aiopg to be less than 1.3.0
([#560](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/560))
- `opentelemetry-instrumentation-django` Migrated Django middleware to new-style.

View File

@ -259,7 +259,7 @@ def _instrument(
span_name=span_name,
tracer_provider=tracer_provider,
)
trace_config.opentelemetry_aiohttp_instrumented = True
trace_config._is_instrumented_by_opentelemetry = True
trace_configs.append(trace_config)
kwargs["trace_configs"] = trace_configs
@ -282,7 +282,7 @@ def _uninstrument_session(client_session: aiohttp.ClientSession):
client_session._trace_configs = [
trace_config
for trace_config in trace_configs
if not hasattr(trace_config, "opentelemetry_aiohttp_instrumented")
if not hasattr(trace_config, "_is_instrumented_by_opentelemetry")
]

View File

@ -1,10 +1,7 @@
import typing
import wrapt
from aiopg.utils import ( # pylint: disable=no-name-in-module
_ContextManager,
_PoolAcquireContextManager,
)
from aiopg.utils import _ContextManager, _PoolAcquireContextManager
from opentelemetry.instrumentation.dbapi import (
CursorTracer,
@ -64,9 +61,7 @@ def get_traced_connection_proxy(
def cursor(self, *args, **kwargs):
coro = self._cursor(*args, **kwargs)
return _ContextManager( # pylint: disable=no-value-for-parameter
coro
)
return _ContextManager(coro)
async def _cursor(self, *args, **kwargs):
# pylint: disable=protected-access

View File

@ -41,6 +41,7 @@ from aiopg.utils import ( # pylint: disable=no-name-in-module
from opentelemetry.instrumentation.aiopg.aiopg_integration import (
AiopgIntegration,
AsyncProxyObject,
get_traced_connection_proxy,
)
from opentelemetry.instrumentation.aiopg.version import __version__
@ -153,6 +154,10 @@ def instrument_connection(
Returns:
An instrumented connection.
"""
if isinstance(connection, AsyncProxyObject):
logger.warning("Connection already instrumented")
return connection
db_integration = AiopgIntegration(
name,
database_system,
@ -173,7 +178,7 @@ def uninstrument_connection(connection):
Returns:
An uninstrumented connection.
"""
if isinstance(connection, wrapt.ObjectProxy):
if isinstance(connection, AsyncProxyObject):
return connection.__wrapped__
logger.warning("Connection is not instrumented")

View File

@ -207,6 +207,23 @@ class TestAiopgInstrumentor(TestBase):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
def test_instrument_connection_after_instrument(self):
cnx = async_call(aiopg.connect(database="test"))
query = "SELECT * FROM test"
cursor = async_call(cnx.cursor())
async_call(cursor.execute(query))
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 0)
AiopgInstrumentor().instrument()
cnx = AiopgInstrumentor().instrument_connection(cnx)
cursor = async_call(cnx.cursor())
async_call(cursor.execute(query))
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
def test_custom_tracer_provider_instrument_connection(self):
resource = resources.Resource.create(
{"service.name": "db-test-service"}

View File

@ -180,6 +180,10 @@ def instrument_connection(
Returns:
An instrumented connection.
"""
if isinstance(connection, wrapt.ObjectProxy):
logger.warning("Connection already instrumented")
return connection
db_integration = DatabaseApiIntegration(
name,
database_system,

View File

@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Collection
import fastapi
from starlette import middleware
from starlette.routing import Match
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
@ -24,6 +26,7 @@ from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.util.http import get_excluded_urls, parse_excluded_urls
_excluded_urls_from_env = get_excluded_urls("FASTAPI")
_logger = logging.getLogger(__name__)
class FastAPIInstrumentor(BaseInstrumentor):
@ -39,7 +42,10 @@ class FastAPIInstrumentor(BaseInstrumentor):
app: fastapi.FastAPI, tracer_provider=None, excluded_urls=None,
):
"""Instrument an uninstrumented FastAPI application."""
if not getattr(app, "is_instrumented_by_opentelemetry", False):
if not hasattr(app, "_is_instrumented_by_opentelemetry"):
app._is_instrumented_by_opentelemetry = False
if not getattr(app, "_is_instrumented_by_opentelemetry", False):
if excluded_urls is None:
excluded_urls = _excluded_urls_from_env
else:
@ -51,7 +57,21 @@ class FastAPIInstrumentor(BaseInstrumentor):
span_details_callback=_get_route_details,
tracer_provider=tracer_provider,
)
app.is_instrumented_by_opentelemetry = True
app._is_instrumented_by_opentelemetry = True
else:
_logger.warning(
"Attempting to instrument FastAPI app while already instrumented"
)
@staticmethod
def uninstrument_app(app: fastapi.FastAPI):
app.user_middleware = [
x
for x in app.user_middleware
if x.cls is not OpenTelemetryMiddleware
]
app.middleware_stack = app.build_middleware_stack()
app._is_instrumented_by_opentelemetry = False
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

View File

@ -19,6 +19,7 @@ import fastapi
from fastapi.testclient import TestClient
import opentelemetry.instrumentation.fastapi as otel_fastapi
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
@ -57,6 +58,47 @@ class TestFastAPIManualInstrumentation(TestBase):
super().tearDown()
self.env_patch.stop()
self.exclude_patch.stop()
with self.disable_logging():
self._instrumentor.uninstrument()
self._instrumentor.uninstrument_app(self._app)
def test_instrument_app_with_instrument(self):
if not isinstance(self, TestAutoInstrumentation):
self._instrumentor.instrument()
self._client.get("/foobar")
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 3)
for span in spans:
self.assertIn("/foobar", span.name)
def test_uninstrument_app(self):
self._client.get("/foobar")
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 3)
# pylint: disable=import-outside-toplevel
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
self._app.add_middleware(HTTPSRedirectMiddleware)
self._instrumentor.uninstrument_app(self._app)
print(self._app.user_middleware[0].cls)
self.assertFalse(
isinstance(
self._app.user_middleware[0].cls, OpenTelemetryMiddleware
)
)
self._client = TestClient(self._app)
resp = self._client.get("/foobar")
self.assertEqual(200, resp.status_code)
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 3)
def test_uninstrument_app_after_instrument(self):
if not isinstance(self, TestAutoInstrumentation):
self._instrumentor.instrument()
self._instrumentor.uninstrument_app(self._app)
self._client.get("/foobar")
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 0)
def test_basic_fastapi_call(self):
self._client.get("/foobar")

View File

@ -193,7 +193,8 @@ class _InstrumentedFlask(flask.Flask):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._original_wsgi_ = self.wsgi_app
self._original_wsgi_app = self.wsgi_app
self._is_instrumented_by_opentelemetry = True
self.wsgi_app = _rewrapped_app(
self.wsgi_app, _InstrumentedFlask._response_hook
@ -229,18 +230,21 @@ class FlaskInstrumentor(BaseInstrumentor):
_InstrumentedFlask._request_hook = request_hook
if callable(response_hook):
_InstrumentedFlask._response_hook = response_hook
flask.Flask = _InstrumentedFlask
tracer_provider = kwargs.get("tracer_provider")
_InstrumentedFlask._tracer_provider = tracer_provider
flask.Flask = _InstrumentedFlask
def instrument_app(
self, app, request_hook=None, response_hook=None, tracer_provider=None
): # pylint: disable=no-self-use
if not hasattr(app, "_is_instrumented"):
app._is_instrumented = False
def _uninstrument(self, **kwargs):
flask.Flask = self._original_flask
if not app._is_instrumented:
@staticmethod
def instrument_app(
app, request_hook=None, response_hook=None, tracer_provider=None
):
if not hasattr(app, "_is_instrumented_by_opentelemetry"):
app._is_instrumented_by_opentelemetry = False
if not app._is_instrumented_by_opentelemetry:
app._original_wsgi_app = app.wsgi_app
app.wsgi_app = _rewrapped_app(app.wsgi_app, response_hook)
@ -250,28 +254,22 @@ class FlaskInstrumentor(BaseInstrumentor):
app._before_request = _before_request
app.before_request(_before_request)
app.teardown_request(_teardown_request)
app._is_instrumented = True
app._is_instrumented_by_opentelemetry = True
else:
_logger.warning(
"Attempting to instrument Flask app while already instrumented"
)
def _uninstrument(self, **kwargs):
flask.Flask = self._original_flask
def uninstrument_app(self, app): # pylint: disable=no-self-use
if not hasattr(app, "_is_instrumented"):
app._is_instrumented = False
if app._is_instrumented:
@staticmethod
def uninstrument_app(app):
if hasattr(app, "_original_wsgi_app"):
app.wsgi_app = app._original_wsgi_app
# FIXME add support for other Flask blueprints that are not None
app.before_request_funcs[None].remove(app._before_request)
app.teardown_request_funcs[None].remove(_teardown_request)
del app._original_wsgi_app
app._is_instrumented = False
app._is_instrumented_by_opentelemetry = False
else:
_logger.warning(
"Attempting to uninstrument Flask "

View File

@ -79,7 +79,16 @@ class TestProgrammatic(InstrumentationTest, TestBase, WsgiTestBase):
with self.disable_logging():
FlaskInstrumentor().uninstrument_app(self.app)
def test_uninstrument(self):
def test_instrument_app_and_instrument(self):
FlaskInstrumentor().instrument()
resp = self.client.get("/hello/123")
self.assertEqual(200, resp.status_code)
self.assertEqual([b"Hello: 123"], list(resp.response))
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
FlaskInstrumentor().uninstrument()
def test_uninstrument_app(self):
resp = self.client.get("/hello/123")
self.assertEqual(200, resp.status_code)
self.assertEqual([b"Hello: 123"], list(resp.response))
@ -94,6 +103,16 @@ class TestProgrammatic(InstrumentationTest, TestBase, WsgiTestBase):
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 1)
def test_uninstrument_app_after_instrument(self):
FlaskInstrumentor().instrument()
FlaskInstrumentor().uninstrument_app(self.app)
resp = self.client.get("/hello/123")
self.assertEqual(200, resp.status_code)
self.assertEqual([b"Hello: 123"], list(resp.response))
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(span_list), 0)
FlaskInstrumentor().uninstrument()
# pylint: disable=no-member
def test_only_strings_in_environ(self):
"""

View File

@ -39,6 +39,7 @@ API
---
"""
import logging
import typing
from typing import Collection
@ -53,6 +54,7 @@ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.psycopg2.package import _instruments
from opentelemetry.instrumentation.psycopg2.version import __version__
_logger = logging.getLogger(__name__)
_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory"
@ -91,24 +93,32 @@ class Psycopg2Instrumentor(BaseInstrumentor):
dbapi.unwrap_connect(psycopg2, "connect")
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
def instrument_connection(
self, connection, tracer_provider=None
): # pylint: disable=no-self-use
setattr(
connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory
)
connection.cursor_factory = _new_cursor_factory(
tracer_provider=tracer_provider
)
@staticmethod
def instrument_connection(connection, tracer_provider=None):
if not hasattr(connection, "_is_instrumented_by_opentelemetry"):
connection._is_instrumented_by_opentelemetry = False
if not connection._is_instrumented_by_opentelemetry:
setattr(
connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory
)
connection.cursor_factory = _new_cursor_factory(
tracer_provider=tracer_provider
)
connection._is_instrumented_by_opentelemetry = True
else:
_logger.warning(
"Attempting to instrument Psycopg connection while already instrumented"
)
return connection
# TODO(owais): check if core dbapi can do this for all dbapi implementations e.g, pymysql and mysql
def uninstrument_connection(
self, connection
): # pylint: disable=no-self-use
@staticmethod
def uninstrument_connection(connection):
connection.cursor_factory = getattr(
connection, _OTEL_CURSOR_FACTORY_KEY, None
)
return connection

View File

@ -172,7 +172,25 @@ class TestPostgresqlIntegration(TestBase):
self.assertEqual(len(spans_list), 1)
# pylint: disable=unused-argument
def test_uninstrument_connection(self):
def test_instrument_connection_with_instrument(self):
cnx = psycopg2.connect(database="test")
query = "SELECT * FROM test"
cursor = cnx.cursor()
cursor.execute(query)
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 0)
Psycopg2Instrumentor().instrument()
cnx = Psycopg2Instrumentor().instrument_connection(cnx)
cursor = cnx.cursor()
cursor.execute(query)
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
# pylint: disable=unused-argument
def test_uninstrument_connection_with_instrument(self):
Psycopg2Instrumentor().instrument()
cnx = psycopg2.connect(database="test")
query = "SELECT * FROM test"
@ -188,3 +206,21 @@ class TestPostgresqlIntegration(TestBase):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
# pylint: disable=unused-argument
def test_uninstrument_connection_with_instrument_connection(self):
cnx = psycopg2.connect(database="test")
Psycopg2Instrumentor().instrument_connection(cnx)
query = "SELECT * FROM test"
cursor = cnx.cursor()
cursor.execute(query)
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)
cnx = Psycopg2Instrumentor().uninstrument_connection(cnx)
cursor = cnx.cursor()
cursor.execute(query)
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

View File

@ -49,17 +49,16 @@ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.pymysql.package import _instruments
from opentelemetry.instrumentation.pymysql.version import __version__
_CONNECTION_ATTRIBUTES = {
"database": "db",
"port": "port",
"host": "host",
"user": "user",
}
_DATABASE_SYSTEM = "mysql"
class PyMySQLInstrumentor(BaseInstrumentor):
_CONNECTION_ATTRIBUTES = {
"database": "db",
"port": "port",
"host": "host",
"user": "user",
}
_DATABASE_SYSTEM = "mysql"
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
@ -73,8 +72,8 @@ class PyMySQLInstrumentor(BaseInstrumentor):
__name__,
pymysql,
"connect",
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
_DATABASE_SYSTEM,
_CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
@ -83,8 +82,8 @@ class PyMySQLInstrumentor(BaseInstrumentor):
""""Disable PyMySQL instrumentation"""
dbapi.unwrap_connect(pymysql, "connect")
# pylint:disable=no-self-use
def instrument_connection(self, connection, tracer_provider=None):
@staticmethod
def instrument_connection(connection, tracer_provider=None):
"""Enable instrumentation in a PyMySQL connection.
Args:
@ -99,13 +98,14 @@ class PyMySQLInstrumentor(BaseInstrumentor):
return dbapi.instrument_connection(
__name__,
connection,
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
_DATABASE_SYSTEM,
_CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
def uninstrument_connection(self, connection):
@staticmethod
def uninstrument_connection(connection):
"""Disable instrumentation in a PyMySQL connection.
Args:

View File

@ -140,8 +140,8 @@ class PyramidInstrumentor(BaseInstrumentor):
""""Disable Pyramid instrumentation"""
unwrap(Configurator, "__init__")
# pylint:disable=no-self-use
def instrument_config(self, config):
@staticmethod
def instrument_config(config):
"""Enable instrumentation in a Pyramid configurator.
Args:
@ -149,5 +149,6 @@ class PyramidInstrumentor(BaseInstrumentor):
"""
config.include("opentelemetry.instrumentation.pyramid.callbacks")
def uninstrument_config(self, config):
@staticmethod
def uninstrument_config(config):
config.add_settings({SETTING_TRACE_ENABLED: False})

View File

@ -48,13 +48,13 @@ from opentelemetry.instrumentation.sqlite3.package import _instruments
from opentelemetry.instrumentation.sqlite3.version import __version__
from opentelemetry.trace import get_tracer
# No useful attributes of sqlite3 connection object
_CONNECTION_ATTRIBUTES = {}
_DATABASE_SYSTEM = "sqlite"
class SQLite3Instrumentor(BaseInstrumentor):
# No useful attributes of sqlite3 connection object
_CONNECTION_ATTRIBUTES = {}
_DATABASE_SYSTEM = "sqlite"
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
@ -68,8 +68,8 @@ class SQLite3Instrumentor(BaseInstrumentor):
__name__,
sqlite3,
"connect",
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
_DATABASE_SYSTEM,
_CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
@ -78,8 +78,8 @@ class SQLite3Instrumentor(BaseInstrumentor):
""""Disable SQLite3 instrumentation"""
dbapi.unwrap_connect(sqlite3, "connect")
# pylint:disable=no-self-use
def instrument_connection(self, connection, tracer_provider=None):
@staticmethod
def instrument_connection(connection, tracer_provider=None):
"""Enable instrumentation in a SQLite connection.
Args:
@ -94,13 +94,14 @@ class SQLite3Instrumentor(BaseInstrumentor):
return dbapi.instrument_connection(
__name__,
connection,
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
_DATABASE_SYSTEM,
_CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
)
def uninstrument_connection(self, connection):
@staticmethod
def uninstrument_connection(connection):
"""Disable instrumentation in a SQLite connection.
Args:

View File

@ -43,7 +43,7 @@ class BaseInstrumentor(ABC):
"""
_instance = None
_is_instrumented = False
_is_instrumented_by_opentelemetry = False
def __new__(cls, *args, **kwargs):
@ -52,6 +52,10 @@ class BaseInstrumentor(ABC):
return cls._instance
@property
def is_instrumented_by_opentelemetry(self):
return self._is_instrumented_by_opentelemetry
@abstractmethod
def instrumentation_dependencies(self) -> Collection[str]:
"""Return a list of python packages with versions that the will be instrumented.
@ -90,7 +94,7 @@ class BaseInstrumentor(ABC):
``opentelemetry-instrument`` command does.
"""
if self._is_instrumented:
if self._is_instrumented_by_opentelemetry:
_LOG.warning("Attempting to instrument while already instrumented")
return None
@ -99,13 +103,13 @@ class BaseInstrumentor(ABC):
if not skip_dep_check:
conflict = self._check_dependency_conflicts()
if conflict:
_LOG.warning(conflict)
_LOG.error(conflict)
return None
result = self._instrument( # pylint: disable=assignment-from-no-return
**kwargs
)
self._is_instrumented = True
self._is_instrumented_by_opentelemetry = True
return result
def uninstrument(self, **kwargs):
@ -115,9 +119,9 @@ class BaseInstrumentor(ABC):
usage of ``kwargs``.
"""
if self._is_instrumented:
if self._is_instrumented_by_opentelemetry:
result = self._uninstrument(**kwargs)
self._is_instrumented = False
self._is_instrumented_by_opentelemetry = False
return result
_LOG.warning("Attempting to uninstrument while already uninstrumented")

View File

@ -53,6 +53,6 @@ class TestDistro(TestCase):
instrumentor = MockInstrumetor()
entry_point = MockEntryPoint(MockInstrumetor)
self.assertFalse(instrumentor._is_instrumented)
self.assertFalse(instrumentor._is_instrumented_by_opentelemetry)
distro.load_instrumentor(entry_point)
self.assertTrue(instrumentor._is_instrumented)
self.assertTrue(instrumentor._is_instrumented_by_opentelemetry)