Switched langchain llm callbacks to use genai utils handler (#3889)

* Removed telemetry from inference callbacks and added calls to genai utils apis instead.

* Fixed errors

* Fixed typecheck errors

* Fixed typecheck errors

* Fixed precommit errors

* added util dependancy

* updated invocation manager

* removed unnecessary dependancies

* fixed precommit

* fixed typecheck

* fixed precommit

* fixed test

* removed unnecessary line

* addressed comments

* fixed errors

* fixed precommit

* removed get_property_value method

* fixed format

* Make tox -e typecheck install langchain, and fixed some of the type
errors. The remaining ones seem like real issues that need to be fixed.

* Please fix reportPossiblyUnboundVariable

* fixed typecheck

* added and updated tests

* Fixed conflicts and removed unnecessary changes

* Fixed precommit

---------

Co-authored-by: aaronabbott <aaronabbott@google.com>
This commit is contained in:
Ridhima Satam
2026-02-06 06:55:32 -08:00
committed by GitHub
parent e01a4c195d
commit e381a36718
17 changed files with 4427 additions and 3736 deletions

View File

@@ -7,5 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
- Added support to call genai utils handler for langchain LLM invocations.
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3889](#3889))
- Added span support for genAI langchain llm invocation.
([#3665](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3665))

View File

@@ -26,6 +26,8 @@ Next, set up a virtual environment like this:
source .venv/bin/activate
pip install "python-dotenv[cli]"
pip install -r requirements.txt
pip install opentelemetry-instrumentation-langchain
pip install util/opentelemetry-util-genai (once opentelemetry-util-genai package is released remove it from here and add dependency in opentelemetry-instrumentation-langchain)
Run
---

View File

@@ -25,9 +25,7 @@ classifiers = [
"Programming Language :: Python :: 3.13",
]
dependencies = [
"opentelemetry-api >= 1.31.0",
"opentelemetry-instrumentation ~= 0.57b0",
"opentelemetry-semantic-conventions ~= 0.57b0"
]
[project.optional-dependencies]

View File

@@ -38,7 +38,7 @@ API
from typing import Any, Callable, Collection
from langchain_core.callbacks import BaseCallbackHandler # type: ignore
from langchain_core.callbacks import BaseCallbackHandler
from wrapt import wrap_function_wrapper # type: ignore
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
@@ -46,10 +46,8 @@ from opentelemetry.instrumentation.langchain.callback_handler import (
OpenTelemetryLangChainCallbackHandler,
)
from opentelemetry.instrumentation.langchain.package import _instruments
from opentelemetry.instrumentation.langchain.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.schemas import Schemas
from opentelemetry.trace import get_tracer
from opentelemetry.util.genai.handler import get_telemetry_handler
class LangChainInstrumentor(BaseInstrumentor):
@@ -72,15 +70,12 @@ class LangChainInstrumentor(BaseInstrumentor):
Enable Langchain instrumentation.
"""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(
__name__,
__version__,
tracer_provider,
schema_url=Schemas.V1_37_0.value,
)
telemetry_handler = get_telemetry_handler(
tracer_provider=tracer_provider
)
otel_callback_handler = OpenTelemetryLangChainCallbackHandler(
tracer=tracer,
telemetry_handler=telemetry_handler
)
wrap_function_wrapper(
@@ -109,7 +104,7 @@ class _BaseCallbackManagerInitWrapper:
def __call__(
self,
wrapped: Callable[..., None],
instance: BaseCallbackHandler, # type: ignore
instance: BaseCallbackHandler,
args: tuple[Any, ...],
kwargs: dict[str, Any],
):

View File

@@ -14,44 +14,45 @@
from __future__ import annotations
from typing import Any
from typing import Any, Optional
from uuid import UUID
from langchain_core.callbacks import BaseCallbackHandler # type: ignore
from langchain_core.messages import BaseMessage # type: ignore
from langchain_core.outputs import LLMResult # type: ignore
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage
from langchain_core.outputs import LLMResult
from opentelemetry.instrumentation.langchain.span_manager import _SpanManager
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
from opentelemetry.instrumentation.langchain.invocation_manager import (
_InvocationManager,
)
from opentelemetry.util.genai.handler import TelemetryHandler
from opentelemetry.util.genai.types import (
Error,
InputMessage,
LLMInvocation,
OutputMessage,
Text,
)
from opentelemetry.trace import Tracer
class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler): # type: ignore[misc]
class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler):
"""
A callback handler for LangChain that uses OpenTelemetry to create spans for LLM calls and chains, tools etc,. in future.
"""
def __init__(
self,
tracer: Tracer,
) -> None:
super().__init__() # type: ignore
self.span_manager = _SpanManager(
tracer=tracer,
)
def __init__(self, telemetry_handler: TelemetryHandler) -> None:
super().__init__()
self._telemetry_handler = telemetry_handler
self._invocation_manager = _InvocationManager()
def on_chat_model_start(
self,
serialized: dict[str, Any],
messages: list[list[BaseMessage]], # type: ignore
messages: list[list[BaseMessage]],
*,
run_id: UUID,
tags: list[str] | None,
parent_run_id: UUID | None,
metadata: dict[str, Any] | None,
parent_run_id: Optional[UUID] = None,
tags: Optional[list[str]] = None,
metadata: Optional[dict[str, Any]] = None,
**kwargs: Any,
) -> None:
# Other providers/LLMs may be supported in the future and telemetry for them is skipped for now.
@@ -82,77 +83,99 @@ class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler): # type: ignor
if request_model == "unknown":
return
span = self.span_manager.create_chat_span(
run_id=run_id,
parent_run_id=parent_run_id,
request_model=request_model,
)
# Initialize variables with default values to avoid "possibly unbound" errors
top_p = None
frequency_penalty = None
presence_penalty = None
stop_sequences = None
seed = None
temperature = None
max_tokens = None
if params is not None:
top_p = params.get("top_p")
if top_p is not None:
span.set_attribute(GenAI.GEN_AI_REQUEST_TOP_P, top_p)
frequency_penalty = params.get("frequency_penalty")
if frequency_penalty is not None:
span.set_attribute(
GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, frequency_penalty
)
presence_penalty = params.get("presence_penalty")
if presence_penalty is not None:
span.set_attribute(
GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, presence_penalty
)
stop_sequences = params.get("stop")
if stop_sequences is not None:
span.set_attribute(
GenAI.GEN_AI_REQUEST_STOP_SEQUENCES, stop_sequences
)
seed = params.get("seed")
if seed is not None:
span.set_attribute(GenAI.GEN_AI_REQUEST_SEED, seed)
# ChatOpenAI
temperature = params.get("temperature")
if temperature is not None:
span.set_attribute(
GenAI.GEN_AI_REQUEST_TEMPERATURE, temperature
)
# ChatOpenAI
max_tokens = params.get("max_completion_tokens")
if max_tokens is not None:
span.set_attribute(GenAI.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
provider = "unknown"
if metadata is not None:
provider = metadata.get("ls_provider")
if provider is not None:
span.set_attribute("gen_ai.provider.name", provider)
# ChatBedrock
temperature = metadata.get("ls_temperature")
if temperature is not None:
span.set_attribute(
GenAI.GEN_AI_REQUEST_TEMPERATURE, temperature
)
# ChatBedrock
max_tokens = metadata.get("ls_max_tokens")
if max_tokens is not None:
span.set_attribute(GenAI.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
provider = metadata.get("ls_provider", "unknown")
# Override with ChatBedrock values if present
if "ls_temperature" in metadata:
temperature = metadata.get("ls_temperature")
if "ls_max_tokens" in metadata:
max_tokens = metadata.get("ls_max_tokens")
input_messages: list[InputMessage] = []
for sub_messages in messages:
for message in sub_messages:
# Cast to Any to avoid type checking issues with LangChain's complex content type
raw_content: Any = message.content # type: ignore[misc]
role = message.type
parts: list[Text] = []
if isinstance(raw_content, str):
parts = [Text(content=raw_content, type="text")]
elif isinstance(raw_content, list):
for item in raw_content: # type: ignore[misc]
if isinstance(item, str):
parts.append(Text(content=item, type="text"))
elif isinstance(item, dict):
# Safely extract text content from dict
text_value = item.get("text") # type: ignore[misc]
if isinstance(text_value, str) and text_value:
parts.append(
Text(content=text_value, type="text")
)
input_messages.append(InputMessage(parts=parts, role=role))
llm_invocation = LLMInvocation(
request_model=request_model,
input_messages=input_messages,
provider=provider,
top_p=top_p,
frequency_penalty=frequency_penalty,
presence_penalty=presence_penalty,
stop_sequences=stop_sequences,
seed=seed,
temperature=temperature,
max_tokens=max_tokens,
)
llm_invocation = self._telemetry_handler.start_llm(
invocation=llm_invocation
)
self._invocation_manager.add_invocation_state(
run_id=run_id,
parent_run_id=parent_run_id,
invocation=llm_invocation,
)
def on_llm_end(
self,
response: LLMResult, # type: ignore [reportUnknownParameterType]
response: LLMResult,
*,
run_id: UUID,
parent_run_id: UUID | None,
parent_run_id: UUID | None = None,
**kwargs: Any,
) -> None:
span = self.span_manager.get_span(run_id)
if span is None:
# If the span does not exist, we cannot set attributes or end it
llm_invocation = self._invocation_manager.get_invocation(run_id=run_id)
if llm_invocation is None or not isinstance(
llm_invocation, LLMInvocation
):
# If the invocation does not exist, we cannot set attributes or end it
return
finish_reasons: list[str] = []
for generation in getattr(response, "generations", []): # type: ignore
output_messages: list[OutputMessage] = []
for generation in getattr(response, "generations", []):
for chat_generation in generation:
# Get finish reason
finish_reason = "unknown" # Default value
generation_info = getattr(
chat_generation, "generation_info", None
)
@@ -160,9 +183,9 @@ class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler): # type: ignor
finish_reason = generation_info.get(
"finish_reason", "unknown"
)
if finish_reason is not None:
finish_reasons.append(str(finish_reason))
if chat_generation.message:
# Get finish reason if generation_info is None above
if (
generation_info is None
and chat_generation.message.response_metadata
@@ -172,53 +195,76 @@ class OpenTelemetryLangChainCallbackHandler(BaseCallbackHandler): # type: ignor
"stopReason", "unknown"
)
)
if finish_reason is not None:
finish_reasons.append(str(finish_reason))
# Get message content
parts = [
Text(
content=chat_generation.message.content,
type="text",
)
]
role = chat_generation.message.type
output_message = OutputMessage(
role=role,
parts=parts,
finish_reason=finish_reason,
)
output_messages.append(output_message)
# Get token usage if available
if chat_generation.message.usage_metadata:
input_tokens = (
chat_generation.message.usage_metadata.get(
"input_tokens", 0
)
)
llm_invocation.input_tokens = input_tokens
output_tokens = (
chat_generation.message.usage_metadata.get(
"output_tokens", 0
)
)
span.set_attribute(
GenAI.GEN_AI_USAGE_INPUT_TOKENS, input_tokens
)
span.set_attribute(
GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens
)
llm_invocation.output_tokens = output_tokens
span.set_attribute(
GenAI.GEN_AI_RESPONSE_FINISH_REASONS, finish_reasons
)
llm_invocation.output_messages = output_messages
llm_output = getattr(response, "llm_output", None) # type: ignore
llm_output = getattr(response, "llm_output", None)
if llm_output is not None:
response_model = llm_output.get("model_name") or llm_output.get(
"model"
)
if response_model is not None:
span.set_attribute(
GenAI.GEN_AI_RESPONSE_MODEL, str(response_model)
)
llm_invocation.response_model_name = str(response_model)
response_id = llm_output.get("id")
if response_id is not None:
span.set_attribute(GenAI.GEN_AI_RESPONSE_ID, str(response_id))
llm_invocation.response_id = str(response_id)
# End the LLM span
self.span_manager.end_span(run_id)
llm_invocation = self._telemetry_handler.stop_llm(
invocation=llm_invocation
)
if llm_invocation.span and not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)
def on_llm_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: UUID | None,
parent_run_id: UUID | None = None,
**kwargs: Any,
) -> None:
self.span_manager.handle_error(error, run_id)
llm_invocation = self._invocation_manager.get_invocation(run_id=run_id)
if llm_invocation is None or not isinstance(
llm_invocation, LLMInvocation
):
# If the invocation does not exist, we cannot set attributes or end it
return
error_otel = Error(message=str(error), type=type(error))
llm_invocation = self._telemetry_handler.fail_llm(
invocation=llm_invocation, error=error_otel
)
if llm_invocation.span and not llm_invocation.span.is_recording():
self._invocation_manager.delete_invocation_state(run_id=run_id)

View File

@@ -0,0 +1,61 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from uuid import UUID
from opentelemetry.util.genai.types import GenAIInvocation
__all__ = ["_InvocationManager"]
@dataclass
class _InvocationState:
invocation: GenAIInvocation
children: List[UUID] = field(default_factory=lambda: list())
class _InvocationManager:
def __init__(
self,
) -> None:
# Map from run_id -> _InvocationState, to keep track of invocations and parent/child relationships
# TODO: TTL cache to avoid memory leaks in long-running processes.
self._invocations: Dict[UUID, _InvocationState] = {}
def add_invocation_state(
self,
run_id: UUID,
parent_run_id: Optional[UUID],
invocation: GenAIInvocation,
):
invocation_state = _InvocationState(invocation=invocation)
self._invocations[run_id] = invocation_state
if parent_run_id is not None and parent_run_id in self._invocations:
parent_invocation_state = self._invocations[parent_run_id]
parent_invocation_state.children.append(run_id)
def get_invocation(self, run_id: UUID) -> Optional[GenAIInvocation]:
invocation_state = self._invocations.get(run_id)
return invocation_state.invocation if invocation_state else None
def delete_invocation_state(self, run_id: UUID) -> None:
invocation_state = self._invocations.get(run_id)
if not invocation_state:
return
for child_id in list(invocation_state.children):
self._invocations.pop(child_id, None)
self._invocations.pop(run_id, None)

View File

@@ -0,0 +1,159 @@
interactions:
- request:
body: |-
{
"messages": [
{
"content": "You are a helpful assistant!",
"role": "system"
},
{
"content": "What is the capital of France?",
"role": "user"
}
],
"model": "gpt-3.5-turbo",
"frequency_penalty": 0.5,
"max_completion_tokens": 100,
"presence_penalty": 0.5,
"seed": 100,
"stop": [
"\n",
"Human:",
"AI:"
],
"stream": false,
"temperature": 0.1,
"top_p": 0.9
}
headers:
Accept:
- application/json
Accept-Encoding:
- gzip, deflate, zstd
Connection:
- keep-alive
Content-Length:
- '316'
Content-Type:
- application/json
Host:
- api.openai.com
User-Agent:
- OpenAI/Python 2.16.0
X-Stainless-Arch:
- arm64
X-Stainless-Async:
- 'false'
X-Stainless-Lang:
- python
X-Stainless-OS:
- MacOS
X-Stainless-Package-Version:
- 2.16.0
X-Stainless-Raw-Response:
- 'true'
X-Stainless-Runtime:
- CPython
X-Stainless-Runtime-Version:
- 3.13.5
authorization:
- Bearer test_openai_api_key
cookie:
- test_cookie
x-stainless-retry-count:
- '0'
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: |-
{
"id": "chatcmpl-D5js6XcubHki2LoPJ6cG4o0tPn8LV",
"object": "chat.completion",
"created": 1770260342,
"model": "gpt-3.5-turbo-0125",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "The capital of France is Paris.",
"refusal": null,
"annotations": []
},
"logprobs": null,
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 24,
"completion_tokens": 7,
"total_tokens": 31,
"prompt_tokens_details": {
"cached_tokens": 0,
"audio_tokens": 0
},
"completion_tokens_details": {
"reasoning_tokens": 0,
"audio_tokens": 0,
"accepted_prediction_tokens": 0,
"rejected_prediction_tokens": 0
}
},
"service_tier": "default",
"system_fingerprint": null
}
headers:
CF-RAY:
- 9c8f263d4d5fa6c4-SJC
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 05 Feb 2026 02:59:02 GMT
Server:
- cloudflare
Set-Cookie: test_set_cookie
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '822'
openai-organization: test_openai_org_id
openai-processing-ms:
- '195'
openai-project:
- proj_3o0Aqh32nPiGbrex8BJtPTCm
openai-version:
- '2020-10-01'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- '10000'
x-ratelimit-limit-tokens:
- '10000000'
x-ratelimit-remaining-requests:
- '9999'
x-ratelimit-remaining-tokens:
- '9999981'
x-ratelimit-reset-requests:
- 6ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_11889cdfe9f548169adad4e04412b63c
status:
code: 200
message: OK
version: 1

View File

@@ -27,49 +27,49 @@ interactions:
"top_p": 0.9
}
headers:
accept:
Accept:
- application/json
accept-encoding:
Accept-Encoding:
- gzip, deflate, zstd
Connection:
- keep-alive
Content-Length:
- '316'
Content-Type:
- application/json
Host:
- api.openai.com
User-Agent:
- OpenAI/Python 2.16.0
X-Stainless-Arch:
- arm64
X-Stainless-Async:
- 'false'
X-Stainless-Lang:
- python
X-Stainless-OS:
- MacOS
X-Stainless-Package-Version:
- 2.16.0
X-Stainless-Raw-Response:
- 'true'
X-Stainless-Runtime:
- CPython
X-Stainless-Runtime-Version:
- 3.13.5
authorization:
- Bearer test_openai_api_key
connection:
- keep-alive
content-length:
- '316'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.106.1
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.106.1
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.5
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: |-
{
"id": "chatcmpl-CCAQbtjsmG2294sQ6utRc16OQWeol",
"id": "chatcmpl-D5js4ShPGJHKK6gtQ8R5PeLuwWDtN",
"object": "chat.completion",
"created": 1757016057,
"created": 1770260340,
"model": "gpt-3.5-turbo-0125",
"choices": [
{
@@ -104,13 +104,13 @@ interactions:
}
headers:
CF-RAY:
- 97a01376ad4d2af1-LAX
- 9c8f263889cd78eb-SJC
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 04 Sep 2025 20:00:57 GMT
- Thu, 05 Feb 2026 02:59:01 GMT
Server:
- cloudflare
Set-Cookie: test_set_cookie
@@ -130,27 +130,27 @@ interactions:
- '822'
openai-organization: test_openai_org_id
openai-processing-ms:
- '282'
- '323'
openai-project:
- proj_GLiYlAc06hF0Fm06IMReZLy4
- proj_3o0Aqh32nPiGbrex8BJtPTCm
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '287'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- '10000'
x-ratelimit-limit-tokens:
- '200000'
- '10000000'
x-ratelimit-remaining-requests:
- '9999'
x-ratelimit-remaining-tokens:
- '199982'
- '9999982'
x-ratelimit-reset-requests:
- 8.64s
- 6ms
x-ratelimit-reset-tokens:
- 5ms
- 0s
x-request-id:
- req_0e343602788d4f33869d09afcc7d4819
- req_40bc4456e47993c0a5d59c8f606ef54d
status:
code: 200
message: OK

View File

@@ -0,0 +1,111 @@
interactions:
- request:
body: |-
{
"messages": [
{
"content": "You are a helpful assistant!",
"role": "system"
},
{
"content": "What is the capital of France?",
"role": "user"
}
],
"model": "gpt-3.5-turbo",
"frequency_penalty": 0.5,
"max_completion_tokens": 100,
"presence_penalty": 0.5,
"seed": 100,
"stop": [
"\n",
"Human:",
"AI:"
],
"stream": false,
"temperature": 0.1,
"top_p": 0.9
}
headers:
Accept:
- application/json
Accept-Encoding:
- gzip, deflate, zstd
Connection:
- keep-alive
Content-Length:
- '316'
Content-Type:
- application/json
Host:
- api.openai.com
User-Agent:
- OpenAI/Python 2.16.0
X-Stainless-Arch:
- arm64
X-Stainless-Async:
- 'false'
X-Stainless-Lang:
- python
X-Stainless-OS:
- MacOS
X-Stainless-Package-Version:
- 2.16.0
X-Stainless-Raw-Response:
- 'true'
X-Stainless-Runtime:
- CPython
X-Stainless-Runtime-Version:
- 3.13.5
authorization:
- Bearer test_openai_api_key
cookie:
- test_cookie
x-stainless-retry-count:
- '0'
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: |-
{
"error": {
"message": "Incorrect API key provided: test_ope*******_key. You can find your API key at https://platform.openai.com/account/api-keys.",
"type": "invalid_request_error",
"param": null,
"code": "invalid_api_key"
}
}
headers:
CF-RAY:
- 9c8f28c1b80f6ad3-SJC
Connection:
- keep-alive
Content-Length:
- '269'
Content-Type:
- application/json; charset=utf-8
Date:
- Thu, 05 Feb 2026 03:00:44 GMT
Server:
- cloudflare
Set-Cookie: test_set_cookie
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization: test_openai_org_id
vary:
- Origin
x-openai-proxy-wasm:
- v0.1
x-request-id:
- req_d4d2b3c13bb64894b34e5b02f02b1611
status:
code: 401
message: Unauthorized
version: 1

View File

@@ -0,0 +1,109 @@
interactions:
- request:
body: |-
{
"messages": [
{
"content": "You are a helpful assistant!",
"role": "system"
},
{
"content": "What is the capital of France?",
"role": "user"
}
],
"model": "gpt-3.5-turbo",
"frequency_penalty": 0.5,
"max_completion_tokens": 100,
"presence_penalty": 0.5,
"seed": 100,
"stop": [
"\n",
"Human:",
"AI:"
],
"stream": false,
"temperature": 0.1,
"top_p": 0.9
}
headers:
Accept:
- application/json
Accept-Encoding:
- gzip, deflate, zstd
Connection:
- keep-alive
Content-Length:
- '316'
Content-Type:
- application/json
Host:
- api.openai.com
User-Agent:
- OpenAI/Python 2.16.0
X-Stainless-Arch:
- arm64
X-Stainless-Async:
- 'false'
X-Stainless-Lang:
- python
X-Stainless-OS:
- MacOS
X-Stainless-Package-Version:
- 2.16.0
X-Stainless-Raw-Response:
- 'true'
X-Stainless-Runtime:
- CPython
X-Stainless-Runtime-Version:
- 3.13.5
authorization:
- Bearer test_openai_api_key
x-stainless-retry-count:
- '0'
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: |-
{
"error": {
"message": "Incorrect API key provided: test_ope*******_key. You can find your API key at https://platform.openai.com/account/api-keys.",
"type": "invalid_request_error",
"param": null,
"code": "invalid_api_key"
}
}
headers:
CF-RAY:
- 9c8f28c12f51938c-SJC
Connection:
- keep-alive
Content-Length:
- '269'
Content-Type:
- application/json; charset=utf-8
Date:
- Thu, 05 Feb 2026 03:00:44 GMT
Server:
- cloudflare
Set-Cookie: test_set_cookie
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization: test_openai_org_id
vary:
- Origin
x-openai-proxy-wasm:
- v0.1
x-request-id:
- req_c707e654b3bb460a8fc6b5ded4ee43c9
status:
code: 401
message: Unauthorized
version: 1

View File

@@ -0,0 +1,141 @@
# tests/test_invocation_manager.py
import uuid
from unittest import mock
import pytest
from opentelemetry.instrumentation.langchain.invocation_manager import (
_InvocationManager,
)
from opentelemetry.util.genai.types import GenAIInvocation
@pytest.fixture
def invocation_manager():
return _InvocationManager()
@pytest.fixture
def mock_invocation():
return mock.Mock(spec=GenAIInvocation)
def test_add_invocation_state_without_parent(
invocation_manager, mock_invocation
):
run_id = uuid.uuid4()
invocation_manager.add_invocation_state(
run_id=run_id,
parent_run_id=None,
invocation=mock_invocation,
)
assert invocation_manager.get_invocation(run_id) == mock_invocation
assert len(invocation_manager._invocations) == 1
assert invocation_manager._invocations[run_id].children == []
def test_add_invocation_state_with_parent(invocation_manager):
parent_id = uuid.uuid4()
child_id = uuid.uuid4()
parent_invocation = mock.Mock(spec=GenAIInvocation)
child_invocation = mock.Mock(spec=GenAIInvocation)
# Add parent first
invocation_manager.add_invocation_state(
run_id=parent_id,
parent_run_id=None,
invocation=parent_invocation,
)
# Then add child with parent reference
invocation_manager.add_invocation_state(
run_id=child_id,
parent_run_id=parent_id,
invocation=child_invocation,
)
# Check that parent has child in its children list
assert child_id in invocation_manager._invocations[parent_id].children
assert invocation_manager.get_invocation(child_id) == child_invocation
assert invocation_manager.get_invocation(parent_id) == parent_invocation
def test_add_invocation_state_with_nonexistent_parent(
invocation_manager, mock_invocation
):
run_id = uuid.uuid4()
nonexistent_parent_id = uuid.uuid4()
# Adding with a parent that doesn't exist should still add the child without error
invocation_manager.add_invocation_state(
run_id=run_id,
parent_run_id=nonexistent_parent_id,
invocation=mock_invocation,
)
assert invocation_manager.get_invocation(run_id) == mock_invocation
assert len(invocation_manager._invocations) == 1
def test_get_nonexistent_invocation(invocation_manager):
nonexistent_id = uuid.uuid4()
assert invocation_manager.get_invocation(nonexistent_id) is None
def test_delete_invocation_state(invocation_manager, mock_invocation):
run_id = uuid.uuid4()
invocation_manager.add_invocation_state(
run_id=run_id,
parent_run_id=None,
invocation=mock_invocation,
)
# Verify it was added
assert invocation_manager.get_invocation(run_id) == mock_invocation
# Delete it
invocation_manager.delete_invocation_state(run_id)
# Verify it was removed
assert run_id not in invocation_manager._invocations
def test_delete_invocation_state_with_children(invocation_manager):
parent_id = uuid.uuid4()
child1_id = uuid.uuid4()
child2_id = uuid.uuid4()
parent_invocation = mock.Mock(spec=GenAIInvocation)
child1_invocation = mock.Mock(spec=GenAIInvocation)
child2_invocation = mock.Mock(spec=GenAIInvocation)
# Add parent and children
invocation_manager.add_invocation_state(
run_id=parent_id,
parent_run_id=None,
invocation=parent_invocation,
)
invocation_manager.add_invocation_state(
run_id=child1_id,
parent_run_id=parent_id,
invocation=child1_invocation,
)
invocation_manager.add_invocation_state(
run_id=child2_id,
parent_run_id=parent_id,
invocation=child2_invocation,
)
# Verify initial state
assert len(invocation_manager._invocations) == 3
assert len(invocation_manager._invocations[parent_id].children) == 2
# Delete parent
invocation_manager.delete_invocation_state(parent_id)
# Verify parent and all children were removed
assert parent_id not in invocation_manager._invocations
assert child1_id not in invocation_manager._invocations
assert child2_id not in invocation_manager._invocations
assert len(invocation_manager._invocations) == 0

View File

@@ -2,16 +2,30 @@ from typing import Optional
import pytest
from langchain_core.messages import HumanMessage, SystemMessage
from openai import AuthenticationError
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
from opentelemetry.semconv.attributes import error_attributes
# span_exporter, start_instrumentation, chat_openai_gpt_3_5_turbo_model are coming from fixtures defined in conftest.py
@pytest.mark.vcr()
@pytest.mark.parametrize("capture_content", ["SPAN_ONLY", "NO_CONTENT"])
def test_chat_openai_gpt_3_5_turbo_model_llm_call(
span_exporter, start_instrumentation, chat_openai_gpt_3_5_turbo_model
span_exporter,
start_instrumentation,
chat_openai_gpt_3_5_turbo_model,
monkeypatch,
capture_content,
):
monkeypatch.setenv(
"OTEL_SEMCONV_STABILITY_OPT_IN", "gen_ai_latest_experimental"
)
monkeypatch.setenv(
"OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", capture_content
)
messages = [
SystemMessage(content="You are a helpful assistant!"),
HumanMessage(content="What is the capital of France?"),
@@ -26,7 +40,57 @@ def test_chat_openai_gpt_3_5_turbo_model_llm_call(
for span in spans:
print(f"span: {span}")
print(f"span attributes: {span.attributes}")
assert_openai_completion_attributes(spans[0], response)
verifyContent: bool = True if capture_content == "SPAN_ONLY" else False
assert_openai_completion_attributes(
spans[0] if len(spans) > 0 else None,
response,
verifyContent=verifyContent,
)
# span_exporter, start_instrumentation, chat_openai_gpt_3_5_turbo_model are coming from fixtures defined in conftest.py
@pytest.mark.vcr()
@pytest.mark.parametrize("capture_content", ["SPAN_ONLY", "NO_CONTENT"])
def test_chat_openai_gpt_3_5_turbo_model_llm_call_with_error(
span_exporter,
start_instrumentation,
chat_openai_gpt_3_5_turbo_model,
monkeypatch,
capture_content,
):
monkeypatch.setenv(
"OTEL_SEMCONV_STABILITY_OPT_IN", "gen_ai_latest_experimental"
)
monkeypatch.setenv(
"OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", capture_content
)
messages = [
SystemMessage(content="You are a helpful assistant!"),
HumanMessage(content="What is the capital of France?"),
]
response = None
try:
response = chat_openai_gpt_3_5_turbo_model.invoke(messages)
except Exception as e:
# For this test, to get error, cassettes were recorded with no OPENAI_API_KEY, so an error is expected here.
assert isinstance(e, AuthenticationError)
assert response is None
# verify spans
spans = span_exporter.get_finished_spans()
print(f"spans: {spans}")
for span in spans:
print(f"span: {span}")
print(f"span attributes: {span.attributes}")
verifyContent: bool = True if capture_content == "SPAN_ONLY" else False
assert_openai_completion_attributes_with_error(
spans[0] if len(spans) > 0 else None, verifyContent=verifyContent
)
# span_exporter, start_instrumentation, us_amazon_nova_lite_v1_0 are coming from fixtures defined in conftest.py
@@ -71,63 +135,165 @@ def test_gemini(span_exporter, start_instrumentation, gemini):
def assert_openai_completion_attributes(
span: ReadableSpan, response: Optional
span: ReadableSpan, response: Optional, verifyContent: bool = True
):
assert span.name == "chat gpt-3.5-turbo"
assert span.attributes[gen_ai_attributes.GEN_AI_OPERATION_NAME] == "chat"
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_MODEL]
== "gpt-3.5-turbo"
)
assert (
span.attributes[gen_ai_attributes.GEN_AI_RESPONSE_MODEL]
== "gpt-3.5-turbo-0125"
)
assert span.attributes[gen_ai_attributes.GEN_AI_REQUEST_MAX_TOKENS] == 100
assert span.attributes[gen_ai_attributes.GEN_AI_REQUEST_TEMPERATURE] == 0.1
assert span.attributes["gen_ai.provider.name"] == "openai"
assert gen_ai_attributes.GEN_AI_RESPONSE_ID in span.attributes
assert span.attributes[gen_ai_attributes.GEN_AI_REQUEST_TOP_P] == 0.9
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_FREQUENCY_PENALTY]
== 0.5
)
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_PRESENCE_PENALTY]
== 0.5
)
stop_sequences = span.attributes.get(
gen_ai_attributes.GEN_AI_REQUEST_STOP_SEQUENCES
)
assert all(seq in ["\n", "Human:", "AI:"] for seq in stop_sequences)
assert span.attributes[gen_ai_attributes.GEN_AI_REQUEST_SEED] == 100
input_tokens = response.response_metadata.get("token_usage").get(
"prompt_tokens"
)
if input_tokens:
if span:
assert span.name == "chat gpt-3.5-turbo"
assert (
input_tokens
== span.attributes[gen_ai_attributes.GEN_AI_USAGE_INPUT_TOKENS]
span.attributes[gen_ai_attributes.GEN_AI_OPERATION_NAME] == "chat"
)
else:
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_MODEL]
== "gpt-3.5-turbo"
)
assert (
span.attributes[gen_ai_attributes.GEN_AI_RESPONSE_MODEL]
== "gpt-3.5-turbo-0125"
)
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_MAX_TOKENS] == 100
)
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_TEMPERATURE]
== 0.1
)
assert span.attributes["gen_ai.provider.name"] == "openai"
assert gen_ai_attributes.GEN_AI_RESPONSE_ID in span.attributes
assert span.attributes[gen_ai_attributes.GEN_AI_REQUEST_TOP_P] == 0.9
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_FREQUENCY_PENALTY]
== 0.5
)
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_PRESENCE_PENALTY]
== 0.5
)
stop_sequences = span.attributes.get(
gen_ai_attributes.GEN_AI_REQUEST_STOP_SEQUENCES
)
assert all(seq in ["\n", "Human:", "AI:"] for seq in stop_sequences)
assert span.attributes[gen_ai_attributes.GEN_AI_REQUEST_SEED] == 100
input_tokens = response.response_metadata.get("token_usage").get(
"prompt_tokens"
)
if input_tokens:
assert (
input_tokens
== span.attributes[gen_ai_attributes.GEN_AI_USAGE_INPUT_TOKENS]
)
else:
assert (
gen_ai_attributes.GEN_AI_USAGE_INPUT_TOKENS
not in span.attributes
)
output_tokens = response.response_metadata.get("token_usage").get(
"completion_tokens"
)
if output_tokens:
assert (
output_tokens
== span.attributes[
gen_ai_attributes.GEN_AI_USAGE_OUTPUT_TOKENS
]
)
else:
assert (
gen_ai_attributes.GEN_AI_USAGE_OUTPUT_TOKENS
not in span.attributes
)
if verifyContent:
input_message = span.attributes[
gen_ai_attributes.GEN_AI_INPUT_MESSAGES
]
assert input_message is not None
assert '"role":"system"' in input_message
assert '"content":"You are a helpful assistant!"' in input_message
assert '"role":"human"' in input_message
assert (
'"content":"What is the capital of France?"' in input_message
)
# Assert output message
output_message = span.attributes[
gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES
]
assert output_message is not None
assert '"role":"ai"' in output_message
assert (
'"content":"The capital of France is Paris."' in output_message
)
assert '"finish_reason":"stop"' in output_message
def assert_openai_completion_attributes_with_error(
span: ReadableSpan, verifyContent: bool = True
):
if span is not None:
assert span.name == "chat gpt-3.5-turbo"
assert (
span.attributes[error_attributes.ERROR_TYPE]
== "AuthenticationError"
)
assert (
span.attributes[gen_ai_attributes.GEN_AI_OPERATION_NAME] == "chat"
)
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_MODEL]
== "gpt-3.5-turbo"
)
assert gen_ai_attributes.GEN_AI_RESPONSE_MODEL not in span.attributes
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_MAX_TOKENS] == 100
)
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_TEMPERATURE]
== 0.1
)
assert span.attributes["gen_ai.provider.name"] == "openai"
assert gen_ai_attributes.GEN_AI_RESPONSE_ID not in span.attributes
assert span.attributes[gen_ai_attributes.GEN_AI_REQUEST_TOP_P] == 0.9
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_FREQUENCY_PENALTY]
== 0.5
)
assert (
span.attributes[gen_ai_attributes.GEN_AI_REQUEST_PRESENCE_PENALTY]
== 0.5
)
stop_sequences = span.attributes.get(
gen_ai_attributes.GEN_AI_REQUEST_STOP_SEQUENCES
)
assert all(seq in ["\n", "Human:", "AI:"] for seq in stop_sequences)
assert span.attributes[gen_ai_attributes.GEN_AI_REQUEST_SEED] == 100
assert (
gen_ai_attributes.GEN_AI_USAGE_INPUT_TOKENS not in span.attributes
)
output_tokens = response.response_metadata.get("token_usage").get(
"completion_tokens"
)
if output_tokens:
assert (
output_tokens
== span.attributes[gen_ai_attributes.GEN_AI_USAGE_OUTPUT_TOKENS]
)
else:
assert (
gen_ai_attributes.GEN_AI_USAGE_OUTPUT_TOKENS not in span.attributes
)
if verifyContent:
input_message = span.attributes[
gen_ai_attributes.GEN_AI_INPUT_MESSAGES
]
assert input_message is not None
assert '"role":"system"' in input_message
assert '"content":"You are a helpful assistant!"' in input_message
assert '"role":"human"' in input_message
assert (
'"content":"What is the capital of France?"' in input_message
)
# Assert output message
assert (
gen_ai_attributes.GEN_AI_OUTPUT_MESSAGES not in span.attributes
)
def assert_bedrock_completion_attributes(
span: ReadableSpan, response: Optional

View File

@@ -1,100 +0,0 @@
import unittest.mock
import uuid
import pytest
from opentelemetry.instrumentation.langchain.span_manager import (
_SpanManager,
_SpanState,
)
from opentelemetry.trace import SpanKind, get_tracer
from opentelemetry.trace.span import Span
class TestSpanManager:
@pytest.fixture
def tracer(self):
return get_tracer("test_tracer")
@pytest.fixture
def handler(self, tracer):
return _SpanManager(tracer=tracer)
@pytest.mark.parametrize(
"parent_run_id,parent_in_spans",
[
(None, False), # No parent
(uuid.uuid4(), False), # Parent not in spans
(uuid.uuid4(), True), # Parent in spans
],
)
def test_create_span(
self, handler, tracer, parent_run_id, parent_in_spans
):
# Arrange
run_id = uuid.uuid4()
span_name = "test_span"
kind = SpanKind.INTERNAL
mock_span = unittest.mock.Mock(spec=Span)
# Setup parent if needed
if parent_run_id is not None and parent_in_spans:
parent_mock_span = unittest.mock.Mock(spec=Span)
handler.spans[parent_run_id] = _SpanState(span=parent_mock_span)
with (
unittest.mock.patch.object(
tracer, "start_span", return_value=mock_span
) as mock_start_span,
unittest.mock.patch(
"opentelemetry.instrumentation.langchain.span_manager.set_span_in_context"
) as mock_set_span_in_context,
):
# Act
result = handler._create_span(
run_id, parent_run_id, span_name, kind
)
# Assert
assert result == mock_span
assert run_id in handler.spans
assert handler.spans[run_id].span == mock_span
# Verify parent-child relationship
if parent_run_id is not None and parent_in_spans:
mock_set_span_in_context.assert_called_once_with(
handler.spans[parent_run_id].span
)
mock_start_span.assert_called_once_with(
name=span_name,
kind=kind,
context=mock_set_span_in_context.return_value,
)
assert run_id in handler.spans[parent_run_id].children
else:
mock_start_span.assert_called_once_with(
name=span_name, kind=kind
)
mock_set_span_in_context.assert_called_once_with(mock_span)
def test_end_span(self, handler):
# Arrange
run_id = uuid.uuid4()
mock_span = unittest.mock.Mock(spec=Span)
handler.spans[run_id] = _SpanState(span=mock_span)
# Add a child to verify it's removed
child_run_id = uuid.uuid4()
child_mock_span = unittest.mock.Mock(spec=Span)
handler.spans[child_run_id] = _SpanState(span=child_mock_span)
handler.spans[run_id].children.append(child_run_id)
# Act
handler.end_span(run_id)
# Assert
mock_span.end.assert_called_once()
child_mock_span.end.assert_called_once()
assert run_id not in handler.spans
assert child_run_id not in handler.spans

View File

@@ -1106,6 +1106,7 @@ deps =
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-anthropic[instruments]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-langchain[instruments]
{toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments]
{toxinidir}/instrumentation/opentelemetry-instrumentation-asyncclick[instruments]
{toxinidir}/exporter/opentelemetry-exporter-credential-provider-gcp

View File

@@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3944](#3944))
- Add metrics to LLMInvocation traces
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3891](#3891))
- Add parent class genAI invocation
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3889](#3889))
## Version 0.2b0 (2025-10-14)

View File

@@ -93,16 +93,21 @@ def _new_str_any_dict() -> dict[str, Any]:
@dataclass
class LLMInvocation:
class GenAIInvocation:
context_token: ContextToken | None = None
span: Span | None = None
attributes: dict[str, Any] = field(default_factory=_new_str_any_dict)
@dataclass
class LLMInvocation(GenAIInvocation):
"""
Represents a single LLM call invocation. When creating an LLMInvocation object,
only update the data attributes. The span and context_token attributes are
set by the TelemetryHandler.
"""
request_model: str
context_token: ContextToken | None = None
span: Span | None = None
request_model: str | None = None
input_messages: list[InputMessage] = field(
default_factory=_new_input_messages
)

6862
uv.lock generated
View File

File diff suppressed because it is too large Load Diff