GenAI Utils | Add metrics to LLMInvocations (#3891)

* add metrics to genai utils

* add Instruments class for GenAI metrics and refactor metric recording

* refactor: streamline metric payload handling in InvocationMetricsRecorder

* fix: grammar

* doc: added changelog

* fix: update version for semconvs

* Lint fixes

* Fix Nits, remove overly defensive code

* Add monotonic timing support for LLM invocation duration calculations

* small cleanups to types to simplify code

* cleanup test

* Simplify histogram duration time calculations

* Remove unused constants and helper function

* Refactor histogram creation to use standalone functions instead of a class

---------

Co-authored-by: aaronabbott <aaronabbott@google.com>
This commit is contained in:
Keith Decker
2025-12-03 09:29:37 -07:00
committed by GitHub
parent 0870f96c1c
commit 61641aa383
6 changed files with 346 additions and 6 deletions

View File

@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3943](#3943))
- Add more Semconv attributes to LLMInvocation spans.
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3862](#3862))
- Add metrics to LLMInvocation traces
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3891](#3891))
## Version 0.2b0 (2025-10-14)

View File

@@ -60,20 +60,24 @@ Usage:
from __future__ import annotations
import timeit
from contextlib import contextmanager
from typing import Iterator
from opentelemetry import context as otel_context
from opentelemetry.metrics import MeterProvider, get_meter
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.semconv.schemas import Schemas
from opentelemetry.trace import (
Span,
SpanKind,
TracerProvider,
get_tracer,
set_span_in_context,
)
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder
from opentelemetry.util.genai.span_utils import (
_apply_error_attributes,
_apply_finish_attributes,
@@ -88,13 +92,35 @@ class TelemetryHandler:
them as spans, metrics, and events.
"""
def __init__(self, tracer_provider: TracerProvider | None = None):
def __init__(
self,
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
):
self._tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url=Schemas.V1_37_0.value,
)
self._metrics_recorder: InvocationMetricsRecorder | None = None
meter = get_meter(__name__, meter_provider=meter_provider)
self._metrics_recorder = InvocationMetricsRecorder(meter)
def _record_llm_metrics(
self,
invocation: LLMInvocation,
span: Span | None = None,
*,
error_type: str | None = None,
) -> None:
if self._metrics_recorder is None or span is None:
return
self._metrics_recorder.record(
span,
invocation,
error_type=error_type,
)
def start_llm(
self,
@@ -106,6 +132,9 @@ class TelemetryHandler:
name=f"{GenAI.GenAiOperationNameValues.CHAT.value} {invocation.request_model}",
kind=SpanKind.CLIENT,
)
# Record a monotonic start timestamp (seconds) for duration
# calculation using timeit.default_timer.
invocation.monotonic_start_s = timeit.default_timer()
invocation.span = span
invocation.context_token = otel_context.attach(
set_span_in_context(span)
@@ -118,10 +147,12 @@ class TelemetryHandler:
# TODO: Provide feedback that this invocation was not started
return invocation
_apply_finish_attributes(invocation.span, invocation)
span = invocation.span
_apply_finish_attributes(span, invocation)
self._record_llm_metrics(invocation, span)
# Detach context and end span
otel_context.detach(invocation.context_token)
invocation.span.end()
span.end()
return invocation
def fail_llm( # pylint: disable=no-self-use
@@ -132,11 +163,14 @@ class TelemetryHandler:
# TODO: Provide feedback that this invocation was not started
return invocation
span = invocation.span
_apply_finish_attributes(invocation.span, invocation)
_apply_error_attributes(invocation.span, error)
_apply_error_attributes(span, error)
error_type = getattr(error.type, "__qualname__", None)
self._record_llm_metrics(invocation, span, error_type=error_type)
# Detach context and end span
otel_context.detach(invocation.context_token)
invocation.span.end()
span.end()
return invocation
@contextmanager
@@ -166,6 +200,7 @@ class TelemetryHandler:
def get_telemetry_handler(
tracer_provider: TracerProvider | None = None,
meter_provider: MeterProvider | None = None,
) -> TelemetryHandler:
"""
Returns a singleton TelemetryHandler instance.
@@ -174,6 +209,9 @@ def get_telemetry_handler(
get_telemetry_handler, "_default_handler", None
)
if handler is None:
handler = TelemetryHandler(tracer_provider=tracer_provider)
handler = TelemetryHandler(
tracer_provider=tracer_provider,
meter_provider=meter_provider,
)
setattr(get_telemetry_handler, "_default_handler", handler)
return handler

View File

@@ -0,0 +1,54 @@
from opentelemetry.metrics import Histogram, Meter
from opentelemetry.semconv._incubating.metrics import gen_ai_metrics
_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS = [
0.01,
0.02,
0.04,
0.08,
0.16,
0.32,
0.64,
1.28,
2.56,
5.12,
10.24,
20.48,
40.96,
81.92,
]
_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS = [
1,
4,
16,
64,
256,
1024,
4096,
16384,
65536,
262144,
1048576,
4194304,
16777216,
67108864,
]
def create_duration_histogram(meter: Meter) -> Histogram:
return meter.create_histogram(
name=gen_ai_metrics.GEN_AI_CLIENT_OPERATION_DURATION,
description="Duration of GenAI client operation",
unit="s",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_OPERATION_DURATION_BUCKETS,
)
def create_token_histogram(meter: Meter) -> Histogram:
return meter.create_histogram(
name=gen_ai_metrics.GEN_AI_CLIENT_TOKEN_USAGE,
description="Number of input and output tokens used by GenAI clients",
unit="{token}",
explicit_bucket_boundaries_advisory=_GEN_AI_CLIENT_TOKEN_USAGE_BUCKETS,
)

View File

@@ -0,0 +1,98 @@
"""Helpers for emitting GenAI metrics from LLM invocations."""
from __future__ import annotations
import timeit
from numbers import Number
from typing import Dict, Optional
from opentelemetry.metrics import Histogram, Meter
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.trace import Span, set_span_in_context
from opentelemetry.util.genai.instruments import (
create_duration_histogram,
create_token_histogram,
)
from opentelemetry.util.genai.types import LLMInvocation
from opentelemetry.util.types import AttributeValue
class InvocationMetricsRecorder:
"""Records duration and token usage histograms for GenAI invocations."""
def __init__(self, meter: Meter):
self._duration_histogram: Histogram = create_duration_histogram(meter)
self._token_histogram: Histogram = create_token_histogram(meter)
def record(
self,
span: Optional[Span],
invocation: LLMInvocation,
*,
error_type: Optional[str] = None,
) -> None:
"""Record duration and token metrics for an invocation if possible."""
if span is None:
return
token_counts: list[tuple[int, str]] = []
if invocation.input_tokens is not None:
token_counts.append(
(
invocation.input_tokens,
GenAI.GenAiTokenTypeValues.INPUT.value,
)
)
if invocation.output_tokens is not None:
token_counts.append(
(
invocation.output_tokens,
GenAI.GenAiTokenTypeValues.OUTPUT.value,
)
)
attributes: Dict[str, AttributeValue] = {
GenAI.GEN_AI_OPERATION_NAME: GenAI.GenAiOperationNameValues.CHAT.value
}
if invocation.request_model:
attributes[GenAI.GEN_AI_REQUEST_MODEL] = invocation.request_model
if invocation.provider:
attributes[GenAI.GEN_AI_PROVIDER_NAME] = invocation.provider
if invocation.response_model_name:
attributes[GenAI.GEN_AI_RESPONSE_MODEL] = (
invocation.response_model_name
)
# Calculate duration from span timing or invocation monotonic start
duration_seconds: Optional[float] = None
if invocation.monotonic_start_s is not None:
duration_seconds = max(
timeit.default_timer() - invocation.monotonic_start_s, 0.0
)
span_context = set_span_in_context(span)
if error_type:
attributes["error.type"] = error_type
if (
duration_seconds is not None
and isinstance(duration_seconds, Number)
and duration_seconds >= 0
):
self._duration_histogram.record(
duration_seconds,
attributes=attributes,
context=span_context,
)
for token_count, token_type in token_counts:
self._token_histogram.record(
token_count,
attributes=attributes | {GenAI.GEN_AI_TOKEN_TYPE: token_type},
context=span_context,
)
__all__ = ["InvocationMetricsRecorder"]

View File

@@ -123,6 +123,12 @@ class LLMInvocation:
max_tokens: int | None = None
stop_sequences: list[str] | None = None
seed: int | None = None
monotonic_start_s: float | None = None
"""
Monotonic start time in seconds (from timeit.default_timer) used
for duration calculations to avoid mixing clock sources. This is
populated by the TelemetryHandler when starting an invocation.
"""
@dataclass

View File

@@ -0,0 +1,142 @@
from __future__ import annotations
from typing import Any, Dict, List
from unittest import TestCase
from unittest.mock import patch
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.types import Error, LLMInvocation
class TelemetryHandlerMetricsTest(TestCase):
def setUp(self) -> None:
self.metric_reader = InMemoryMetricReader()
self.meter_provider = MeterProvider(
metric_readers=[self.metric_reader]
)
self.span_exporter = InMemorySpanExporter()
self.tracer_provider = TracerProvider()
self.tracer_provider.add_span_processor(
SimpleSpanProcessor(self.span_exporter)
)
def test_stop_llm_records_duration_and_tokens(self) -> None:
handler = TelemetryHandler(
tracer_provider=self.tracer_provider,
meter_provider=self.meter_provider,
)
invocation = LLMInvocation(request_model="model", provider="prov")
invocation.input_tokens = 5
invocation.output_tokens = 7
# Patch default_timer during start to ensure monotonic_start_s
with patch("timeit.default_timer", return_value=1000.0):
handler.start_llm(invocation)
# Simulate 2 seconds of elapsed monotonic time (seconds)
with patch(
"timeit.default_timer",
return_value=1002.0,
):
handler.stop_llm(invocation)
metrics = self._harvest_metrics()
self.assertIn("gen_ai.client.operation.duration", metrics)
duration_points = metrics["gen_ai.client.operation.duration"]
self.assertEqual(len(duration_points), 1)
duration_point = duration_points[0]
self.assertEqual(
duration_point.attributes[GenAI.GEN_AI_OPERATION_NAME],
GenAI.GenAiOperationNameValues.CHAT.value,
)
self.assertEqual(
duration_point.attributes[GenAI.GEN_AI_REQUEST_MODEL], "model"
)
self.assertEqual(
duration_point.attributes[GenAI.GEN_AI_PROVIDER_NAME], "prov"
)
self.assertAlmostEqual(duration_point.sum, 2.0, places=3)
self.assertIn("gen_ai.client.token.usage", metrics)
token_points = metrics["gen_ai.client.token.usage"]
token_by_type = {
point.attributes[GenAI.GEN_AI_TOKEN_TYPE]: point
for point in token_points
}
self.assertEqual(len(token_by_type), 2)
self.assertAlmostEqual(
token_by_type[GenAI.GenAiTokenTypeValues.INPUT.value].sum,
5.0,
places=3,
)
self.assertAlmostEqual(
token_by_type[GenAI.GenAiTokenTypeValues.COMPLETION.value].sum,
7.0,
places=3,
)
def test_fail_llm_records_error_and_available_tokens(self) -> None:
handler = TelemetryHandler(
tracer_provider=self.tracer_provider,
meter_provider=self.meter_provider,
)
invocation = LLMInvocation(request_model="err-model", provider=None)
invocation.input_tokens = 11
# Patch default_timer during start to ensure monotonic_start_s
with patch("timeit.default_timer", return_value=2000.0):
handler.start_llm(invocation)
error = Error(message="boom", type=ValueError)
with patch(
"timeit.default_timer",
return_value=2001.0,
):
handler.fail_llm(invocation, error)
metrics = self._harvest_metrics()
self.assertIn("gen_ai.client.operation.duration", metrics)
duration_points = metrics["gen_ai.client.operation.duration"]
self.assertEqual(len(duration_points), 1)
duration_point = duration_points[0]
self.assertEqual(
duration_point.attributes.get("error.type"), "ValueError"
)
self.assertEqual(
duration_point.attributes.get(GenAI.GEN_AI_REQUEST_MODEL),
"err-model",
)
self.assertAlmostEqual(duration_point.sum, 1.0, places=3)
self.assertIn("gen_ai.client.token.usage", metrics)
token_points = metrics["gen_ai.client.token.usage"]
self.assertEqual(len(token_points), 1)
token_point = token_points[0]
self.assertEqual(
token_point.attributes[GenAI.GEN_AI_TOKEN_TYPE],
GenAI.GenAiTokenTypeValues.INPUT.value,
)
self.assertAlmostEqual(token_point.sum, 11.0, places=3)
def _harvest_metrics(self) -> Dict[str, List[Any]]:
try:
self.meter_provider.force_flush()
except Exception: # pylint: disable=broad-except
pass
self.metric_reader.collect()
metrics_by_name: Dict[str, List[Any]] = {}
data = self.metric_reader.get_metrics_data()
for resource_metric in (data and data.resource_metrics) or []:
for scope_metric in resource_metric.scope_metrics or []:
for metric in scope_metric.metrics or []:
points = metric.data.data_points or []
metrics_by_name.setdefault(metric.name, []).extend(points)
return metrics_by_name