mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-31 22:23:12 +08:00
Flush meter provider at end of lambda function handler (#1613)
* Flush meter provider at end of lambda function handler Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com> * Update `force_flush()` check based on PR feedback Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com> --------- Signed-off-by: Anthony J Mirabella <a9@aneurysm9.com>
This commit is contained in:

committed by
GitHub

parent
6ed2c56eca
commit
d8788b68dd
@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553))
|
([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553))
|
||||||
- `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available
|
- `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available
|
||||||
([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212))
|
([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212))
|
||||||
|
- `opentelemetry-instrumentation-aws-lambda` Flush `MeterProvider` at end of function invocation.
|
||||||
|
([#1613](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1613))
|
||||||
- Fix aiohttp bug with unset `trace_configs` ([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592))
|
- Fix aiohttp bug with unset `trace_configs` ([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592))
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
@ -45,6 +45,7 @@ API
|
|||||||
The `instrument` method accepts the following keyword args:
|
The `instrument` method accepts the following keyword args:
|
||||||
|
|
||||||
tracer_provider (TracerProvider) - an optional tracer provider
|
tracer_provider (TracerProvider) - an optional tracer provider
|
||||||
|
meter_provider (MeterProvider) - an optional meter provider
|
||||||
event_context_extractor (Callable) - a function that returns an OTel Trace
|
event_context_extractor (Callable) - a function that returns an OTel Trace
|
||||||
Context given the Lambda Event the AWS Lambda was invoked with
|
Context given the Lambda Event the AWS Lambda was invoked with
|
||||||
this function signature is: def event_context_extractor(lambda_event: Any) -> Context
|
this function signature is: def event_context_extractor(lambda_event: Any) -> Context
|
||||||
@ -68,6 +69,7 @@ for example:
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
from importlib import import_module
|
from importlib import import_module
|
||||||
from typing import Any, Callable, Collection
|
from typing import Any, Callable, Collection
|
||||||
from urllib.parse import urlencode
|
from urllib.parse import urlencode
|
||||||
@ -79,6 +81,10 @@ from opentelemetry.instrumentation.aws_lambda.package import _instruments
|
|||||||
from opentelemetry.instrumentation.aws_lambda.version import __version__
|
from opentelemetry.instrumentation.aws_lambda.version import __version__
|
||||||
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
||||||
from opentelemetry.instrumentation.utils import unwrap
|
from opentelemetry.instrumentation.utils import unwrap
|
||||||
|
from opentelemetry.metrics import (
|
||||||
|
MeterProvider,
|
||||||
|
get_meter_provider,
|
||||||
|
)
|
||||||
from opentelemetry.propagate import get_global_textmap
|
from opentelemetry.propagate import get_global_textmap
|
||||||
from opentelemetry.propagators.aws.aws_xray_propagator import (
|
from opentelemetry.propagators.aws.aws_xray_propagator import (
|
||||||
TRACE_HEADER_KEY,
|
TRACE_HEADER_KEY,
|
||||||
@ -274,6 +280,7 @@ def _instrument(
|
|||||||
event_context_extractor: Callable[[Any], Context],
|
event_context_extractor: Callable[[Any], Context],
|
||||||
tracer_provider: TracerProvider = None,
|
tracer_provider: TracerProvider = None,
|
||||||
disable_aws_context_propagation: bool = False,
|
disable_aws_context_propagation: bool = False,
|
||||||
|
meter_provider: MeterProvider = None,
|
||||||
):
|
):
|
||||||
def _instrumented_lambda_handler_call(
|
def _instrumented_lambda_handler_call(
|
||||||
call_wrapped, instance, args, kwargs
|
call_wrapped, instance, args, kwargs
|
||||||
@ -352,15 +359,33 @@ def _instrument(
|
|||||||
result.get("statusCode"),
|
result.get("statusCode"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
_tracer_provider = tracer_provider or get_tracer_provider()
|
_tracer_provider = tracer_provider or get_tracer_provider()
|
||||||
try:
|
if hasattr(_tracer_provider, "force_flush"):
|
||||||
# NOTE: `force_flush` before function quit in case of Lambda freeze.
|
try:
|
||||||
# Assumes we are using the OpenTelemetry SDK implementation of the
|
# NOTE: `force_flush` before function quit in case of Lambda freeze.
|
||||||
# `TracerProvider`.
|
_tracer_provider.force_flush(flush_timeout)
|
||||||
_tracer_provider.force_flush(flush_timeout)
|
except Exception: # pylint: disable=broad-except
|
||||||
except Exception: # pylint: disable=broad-except
|
logger.exception(
|
||||||
logger.error(
|
f"TracerProvider failed to flush traces"
|
||||||
"TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
|
)
|
||||||
|
else:
|
||||||
|
logger.warning("TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation.")
|
||||||
|
|
||||||
|
_meter_provider = meter_provider or get_meter_provider()
|
||||||
|
if hasattr(_meter_provider, "force_flush"):
|
||||||
|
rem = flush_timeout - (time.time()-now)*1000
|
||||||
|
if rem > 0:
|
||||||
|
try:
|
||||||
|
# NOTE: `force_flush` before function quit in case of Lambda freeze.
|
||||||
|
_meter_provider.force_flush(rem)
|
||||||
|
except Exception: # pylint: disable=broad-except
|
||||||
|
logger.exception(
|
||||||
|
f"MeterProvider failed to flush metrics"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"MeterProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
|
||||||
)
|
)
|
||||||
|
|
||||||
return result
|
return result
|
||||||
@ -385,6 +410,7 @@ class AwsLambdaInstrumentor(BaseInstrumentor):
|
|||||||
Args:
|
Args:
|
||||||
**kwargs: Optional arguments
|
**kwargs: Optional arguments
|
||||||
``tracer_provider``: a TracerProvider, defaults to global
|
``tracer_provider``: a TracerProvider, defaults to global
|
||||||
|
``meter_provider``: a MeterProvider, defaults to global
|
||||||
``event_context_extractor``: a method which takes the Lambda
|
``event_context_extractor``: a method which takes the Lambda
|
||||||
Event as input and extracts an OTel Context from it. By default,
|
Event as input and extracts an OTel Context from it. By default,
|
||||||
the context is extracted from the HTTP headers of an API Gateway
|
the context is extracted from the HTTP headers of an API Gateway
|
||||||
@ -432,6 +458,7 @@ class AwsLambdaInstrumentor(BaseInstrumentor):
|
|||||||
),
|
),
|
||||||
tracer_provider=kwargs.get("tracer_provider"),
|
tracer_provider=kwargs.get("tracer_provider"),
|
||||||
disable_aws_context_propagation=disable_aws_context_propagation,
|
disable_aws_context_propagation=disable_aws_context_propagation,
|
||||||
|
meter_provider=kwargs.get("meter_provider"),
|
||||||
)
|
)
|
||||||
|
|
||||||
def _uninstrument(self, **kwargs):
|
def _uninstrument(self, **kwargs):
|
||||||
|
Reference in New Issue
Block a user