mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-08-02 19:47:17 +08:00
botocore: Introduce instrumentation extensions (#718)
* botocore: Introduce instrumentation extensions * add extensions that are invoked before and after an AWS SDK service call to enrich the span with service specific request and response attirbutes * move SQS specific parts to a separate extension * changelog Co-authored-by: Owais Lone <owais@users.noreply.github.com> Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com>
This commit is contained in:
@ -35,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
([#664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/664))
|
||||
- `opentelemetry-instrumentation-botocore` Fix span injection for lambda invoke
|
||||
([#663](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/663))
|
||||
- `opentelemetry-instrumentation-botocore` Introduce instrumentation extensions
|
||||
([#718](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/718))
|
||||
|
||||
### Changed
|
||||
|
||||
|
@ -80,7 +80,7 @@ for example:
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Collection, Dict, Optional, Tuple
|
||||
from typing import Any, Callable, Collection, Dict, Optional, Tuple
|
||||
|
||||
from botocore.client import BaseClient
|
||||
from botocore.endpoint import Endpoint
|
||||
@ -88,6 +88,7 @@ from botocore.exceptions import ClientError
|
||||
from wrapt import wrap_function_wrapper
|
||||
|
||||
from opentelemetry import context as context_api
|
||||
from opentelemetry.instrumentation.botocore.extensions import _find_extension
|
||||
from opentelemetry.instrumentation.botocore.extensions.types import (
|
||||
_AwsSdkCallContext,
|
||||
)
|
||||
@ -190,6 +191,10 @@ class BotocoreInstrumentor(BaseInstrumentor):
|
||||
if call_context is None:
|
||||
return original_func(*args, **kwargs)
|
||||
|
||||
extension = _find_extension(call_context)
|
||||
if not extension.should_trace_service_call():
|
||||
return original_func(*args, **kwargs)
|
||||
|
||||
attributes = {
|
||||
SpanAttributes.RPC_SYSTEM: "aws-api",
|
||||
SpanAttributes.RPC_SERVICE: call_context.service_id,
|
||||
@ -198,6 +203,8 @@ class BotocoreInstrumentor(BaseInstrumentor):
|
||||
"aws.region": call_context.region,
|
||||
}
|
||||
|
||||
_safe_invoke(extension.extract_attributes, attributes)
|
||||
|
||||
with self._tracer.start_as_current_span(
|
||||
call_context.span_name,
|
||||
kind=call_context.span_kind,
|
||||
@ -208,6 +215,7 @@ class BotocoreInstrumentor(BaseInstrumentor):
|
||||
BotocoreInstrumentor._patch_lambda_invoke(call_context.params)
|
||||
|
||||
_set_api_call_attributes(span, call_context)
|
||||
_safe_invoke(extension.before_service_call, span)
|
||||
self._call_request_hook(span, call_context)
|
||||
|
||||
token = context_api.attach(
|
||||
@ -220,11 +228,14 @@ class BotocoreInstrumentor(BaseInstrumentor):
|
||||
except ClientError as error:
|
||||
result = getattr(error, "response", None)
|
||||
_apply_response_attributes(span, result)
|
||||
_safe_invoke(extension.on_error, span, error)
|
||||
raise
|
||||
else:
|
||||
_apply_response_attributes(span, result)
|
||||
_safe_invoke(extension.on_success, span, result)
|
||||
finally:
|
||||
context_api.detach(token)
|
||||
_safe_invoke(extension.after_service_call)
|
||||
|
||||
self._call_response_hook(span, call_context, result)
|
||||
|
||||
@ -254,8 +265,6 @@ def _set_api_call_attributes(span, call_context: _AwsSdkCallContext):
|
||||
if not span.is_recording():
|
||||
return
|
||||
|
||||
if "QueueUrl" in call_context.params:
|
||||
span.set_attribute("aws.queue_url", call_context.params["QueueUrl"])
|
||||
if "TableName" in call_context.params:
|
||||
span.set_attribute("aws.table_name", call_context.params["TableName"])
|
||||
|
||||
@ -309,3 +318,14 @@ def _determine_call_context(
|
||||
# extracting essential attributes ('service' and 'operation') failed.
|
||||
logger.error("Error when initializing call context", exc_info=ex)
|
||||
return None
|
||||
|
||||
|
||||
def _safe_invoke(function: Callable, *args):
|
||||
function_name = "<unknown>"
|
||||
try:
|
||||
function_name = function.__name__
|
||||
function(*args)
|
||||
except Exception as ex: # pylint:disable=broad-except
|
||||
logger.error(
|
||||
"Error when invoking function '%s'", function_name, exc_info=ex
|
||||
)
|
||||
|
@ -0,0 +1,35 @@
|
||||
import importlib
|
||||
import logging
|
||||
|
||||
from opentelemetry.instrumentation.botocore.extensions.types import (
|
||||
_AwsSdkCallContext,
|
||||
_AwsSdkExtension,
|
||||
)
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _lazy_load(module, cls):
|
||||
def loader():
|
||||
imported_mod = importlib.import_module(module, __name__)
|
||||
return getattr(imported_mod, cls, None)
|
||||
|
||||
return loader
|
||||
|
||||
|
||||
_KNOWN_EXTENSIONS = {
|
||||
"sqs": _lazy_load(".sqs", "_SqsExtension"),
|
||||
}
|
||||
|
||||
|
||||
def _find_extension(call_context: _AwsSdkCallContext) -> _AwsSdkExtension:
|
||||
try:
|
||||
loader = _KNOWN_EXTENSIONS.get(call_context.service)
|
||||
if loader is None:
|
||||
return _AwsSdkExtension(call_context)
|
||||
|
||||
extension_cls = loader()
|
||||
return extension_cls(call_context)
|
||||
except Exception as ex: # pylint: disable=broad-except
|
||||
_logger.error("Error when loading extension: %s", ex)
|
||||
return _AwsSdkExtension(call_context)
|
||||
|
@ -0,0 +1,12 @@
|
||||
from opentelemetry.instrumentation.botocore.extensions.types import (
|
||||
_AttributeMapT,
|
||||
_AwsSdkExtension,
|
||||
)
|
||||
|
||||
|
||||
class _SqsExtension(_AwsSdkExtension):
|
||||
def extract_attributes(self, attributes: _AttributeMapT):
|
||||
queue_url = self._call_context.params.get("QueueUrl")
|
||||
if queue_url:
|
||||
# TODO: update when semantic conventions exist
|
||||
attributes["aws.queue_url"] = queue_url
|
@ -2,12 +2,17 @@ import logging
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
from opentelemetry.trace import SpanKind
|
||||
from opentelemetry.trace.span import Span
|
||||
from opentelemetry.util.types import AttributeValue
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
_BotoClientT = "botocore.client.BaseClient"
|
||||
_BotoResultT = Dict[str, Any]
|
||||
_BotoClientErrorT = "botocore.exceptions.ClientError"
|
||||
|
||||
_OperationParamsT = Dict[str, Any]
|
||||
_AttributeMapT = Dict[str, AttributeValue]
|
||||
|
||||
|
||||
class _AwsSdkCallContext:
|
||||
@ -70,3 +75,49 @@ class _AwsSdkCallContext:
|
||||
except AttributeError:
|
||||
_logger.warning("Could not get attribute '%s'", name)
|
||||
return default
|
||||
|
||||
|
||||
class _AwsSdkExtension:
|
||||
def __init__(self, call_context: _AwsSdkCallContext):
|
||||
self._call_context = call_context
|
||||
|
||||
def should_trace_service_call(self) -> bool: # pylint:disable=no-self-use
|
||||
"""Returns if the AWS SDK service call should be traced or not
|
||||
|
||||
Extensions might override this function to disable tracing for certain
|
||||
operations.
|
||||
"""
|
||||
return True
|
||||
|
||||
def extract_attributes(self, attributes: _AttributeMapT):
|
||||
"""Callback which gets invoked before the span is created.
|
||||
|
||||
Extensions might override this function to extract additional attributes.
|
||||
"""
|
||||
|
||||
def before_service_call(self, span: Span):
|
||||
"""Callback which gets invoked after the span is created but before the
|
||||
AWS SDK service is called.
|
||||
|
||||
Extensions might override this function e.g. for injecting the span into
|
||||
a carrier.
|
||||
"""
|
||||
|
||||
def on_success(self, span: Span, result: _BotoResultT):
|
||||
"""Callback that gets invoked when the AWS SDK call returns
|
||||
successfully.
|
||||
|
||||
Extensions might override this function e.g. to extract and set response
|
||||
attributes on the span.
|
||||
"""
|
||||
|
||||
def on_error(self, span: Span, exception: _BotoClientErrorT):
|
||||
"""Callback that gets invoked when the AWS SDK service call raises a
|
||||
ClientError.
|
||||
"""
|
||||
|
||||
def after_service_call(self):
|
||||
"""Callback that gets invoked after the AWS SDK service was called.
|
||||
|
||||
Extensions might override this function to do some cleanup tasks.
|
||||
"""
|
||||
|
Reference in New Issue
Block a user