botocore: add basic tracing for bedrock ConverseStream (#3204)

* Add tracing for ConverseStream

* Add converse stream example
This commit is contained in:
Riccardo Magliocchetti
2025-01-27 10:01:07 +01:00
committed by GitHub
parent 2756c1edff
commit 0bb1c42a78
11 changed files with 395 additions and 6 deletions

View File

@ -45,6 +45,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3161](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3161)) ([#3161](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3161))
- `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock InvokeModel API - `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock InvokeModel API
([#3200](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3200)) ([#3200](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3200))
- `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock ConverseStream API
([#3204](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3204))
### Fixed ### Fixed

View File

@ -18,6 +18,8 @@ Available examples
------------------ ------------------
- `converse.py` uses `bedrock-runtime` `Converse API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html>_`. - `converse.py` uses `bedrock-runtime` `Converse API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html>_`.
- `converse_stream.py` uses `bedrock-runtime` `ConverseStream API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html>_`.
- `invoke_model.py` uses `bedrock-runtime` `InvokeModel API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html>_`.
Setup Setup
----- -----

View File

@ -0,0 +1,26 @@
import os
import boto3
def main():
client = boto3.client("bedrock-runtime")
stream = client.converse_stream(
modelId=os.getenv("CHAT_MODEL", "amazon.titan-text-lite-v1"),
messages=[
{
"role": "user",
"content": [{"text": "Write a short poem on OpenTelemetry."}],
},
],
)
response = ""
for event in stream["stream"]:
if "contentBlockDelta" in event:
response += event["contentBlockDelta"]["delta"]["text"]
print(response)
if __name__ == "__main__":
main()

View File

@ -188,11 +188,15 @@ class BotocoreInstrumentor(BaseInstrumentor):
} }
_safe_invoke(extension.extract_attributes, attributes) _safe_invoke(extension.extract_attributes, attributes)
end_span_on_exit = extension.should_end_span_on_exit()
with self._tracer.start_as_current_span( with self._tracer.start_as_current_span(
call_context.span_name, call_context.span_name,
kind=call_context.span_kind, kind=call_context.span_kind,
attributes=attributes, attributes=attributes,
# tracing streaming services require to close the span manually
# at a later time after the stream has been consumed
end_on_exit=end_span_on_exit,
) as span: ) as span:
_safe_invoke(extension.before_service_call, span) _safe_invoke(extension.before_service_call, span)
self._call_request_hook(span, call_context) self._call_request_hook(span, call_context)

View File

@ -23,8 +23,12 @@ import json
import logging import logging
from typing import Any from typing import Any
from botocore.eventstream import EventStream
from botocore.response import StreamingBody from botocore.response import StreamingBody
from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import (
ConverseStreamWrapper,
)
from opentelemetry.instrumentation.botocore.extensions.types import ( from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT, _AttributeMapT,
_AwsSdkExtension, _AwsSdkExtension,
@ -62,7 +66,14 @@ class _BedrockRuntimeExtension(_AwsSdkExtension):
Amazon Bedrock Runtime</a>. Amazon Bedrock Runtime</a>.
""" """
_HANDLED_OPERATIONS = {"Converse", "InvokeModel"} _HANDLED_OPERATIONS = {"Converse", "ConverseStream", "InvokeModel"}
_DONT_CLOSE_SPAN_ON_END_OPERATIONS = {"ConverseStream"}
def should_end_span_on_exit(self):
return (
self._call_context.operation
not in self._DONT_CLOSE_SPAN_ON_END_OPERATIONS
)
def extract_attributes(self, attributes: _AttributeMapT): def extract_attributes(self, attributes: _AttributeMapT):
if self._call_context.operation not in self._HANDLED_OPERATIONS: if self._call_context.operation not in self._HANDLED_OPERATIONS:
@ -77,7 +88,7 @@ class _BedrockRuntimeExtension(_AwsSdkExtension):
GenAiOperationNameValues.CHAT.value GenAiOperationNameValues.CHAT.value
) )
# Converse # Converse / ConverseStream
if inference_config := self._call_context.params.get( if inference_config := self._call_context.params.get(
"inferenceConfig" "inferenceConfig"
): ):
@ -251,6 +262,20 @@ class _BedrockRuntimeExtension(_AwsSdkExtension):
return return
if not span.is_recording(): if not span.is_recording():
if not self.should_end_span_on_exit():
span.end()
return
# ConverseStream
if "stream" in result and isinstance(result["stream"], EventStream):
def stream_done_callback(response):
self._converse_on_success(span, response)
span.end()
result["stream"] = ConverseStreamWrapper(
result["stream"], stream_done_callback
)
return return
# Converse # Converse
@ -328,3 +353,6 @@ class _BedrockRuntimeExtension(_AwsSdkExtension):
span.set_status(Status(StatusCode.ERROR, str(exception))) span.set_status(Status(StatusCode.ERROR, str(exception)))
if span.is_recording(): if span.is_recording():
span.set_attribute(ERROR_TYPE, type(exception).__qualname__) span.set_attribute(ERROR_TYPE, type(exception).__qualname__)
if not self.should_end_span_on_exit():
span.end()

View File

@ -0,0 +1,74 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from botocore.eventstream import EventStream
from wrapt import ObjectProxy
# pylint: disable=abstract-method
class ConverseStreamWrapper(ObjectProxy):
"""Wrapper for botocore.eventstream.EventStream"""
def __init__(
self,
stream: EventStream,
stream_done_callback,
):
super().__init__(stream)
self._stream_done_callback = stream_done_callback
# accumulating things in the same shape of non-streaming version
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
self._response = {}
def __iter__(self):
for event in self.__wrapped__:
self._process_event(event)
yield event
def _process_event(self, event):
if "messageStart" in event:
# {'messageStart': {'role': 'assistant'}}
pass
if "contentBlockDelta" in event:
# {'contentBlockDelta': {'delta': {'text': "Hello"}, 'contentBlockIndex': 0}}
pass
if "contentBlockStop" in event:
# {'contentBlockStop': {'contentBlockIndex': 0}}
pass
if "messageStop" in event:
# {'messageStop': {'stopReason': 'end_turn'}}
if stop_reason := event["messageStop"].get("stopReason"):
self._response["stopReason"] = stop_reason
if "metadata" in event:
# {'metadata': {'usage': {'inputTokens': 12, 'outputTokens': 15, 'totalTokens': 27}, 'metrics': {'latencyMs': 2980}}}
if usage := event["metadata"].get("usage"):
self._response["usage"] = {}
if input_tokens := usage.get("inputTokens"):
self._response["usage"]["inputTokens"] = input_tokens
if output_tokens := usage.get("outputTokens"):
self._response["usage"]["outputTokens"] = output_tokens
self._stream_done_callback(self._response)

View File

@ -101,6 +101,14 @@ class _AwsSdkExtension:
""" """
return True return True
def should_end_span_on_exit(self) -> bool: # pylint:disable=no-self-use
"""Returns if the span should be closed automatically on exit
Extensions might override this function to disable automatic closing
of the span if they need to close it at a later time themselves.
"""
return True
def extract_attributes(self, attributes: _AttributeMapT): def extract_attributes(self, attributes: _AttributeMapT):
"""Callback which gets invoked before the span is created. """Callback which gets invoked before the span is created.

View File

@ -91,7 +91,7 @@ def assert_completion_attributes_from_streaming_body(
) )
def assert_completion_attributes( def assert_converse_completion_attributes(
span: ReadableSpan, span: ReadableSpan,
request_model: str, request_model: str,
response: dict[str, Any] | None, response: dict[str, Any] | None,
@ -128,6 +128,34 @@ def assert_completion_attributes(
) )
def assert_converse_stream_completion_attributes(
span: ReadableSpan,
request_model: str,
input_tokens: int | None = None,
output_tokens: int | None = None,
finish_reason: tuple[str] | None = None,
operation_name: str = "chat",
request_top_p: int | None = None,
request_temperature: int | None = None,
request_max_tokens: int | None = None,
request_stop_sequences: list[str] | None = None,
):
return assert_all_attributes(
span,
request_model,
input_tokens,
output_tokens,
finish_reason,
operation_name,
request_top_p,
request_temperature,
request_max_tokens,
tuple(request_stop_sequences)
if request_stop_sequences is not None
else request_stop_sequences,
)
def assert_equal_or_not_present(value, attribute_name, span): def assert_equal_or_not_present(value, attribute_name, span):
if value is not None: if value is not None:
assert value == span.attributes[attribute_name] assert value == span.attributes[attribute_name]

View File

@ -0,0 +1,69 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}],
"inferenceConfig": {"maxTokens": 10, "temperature": 0.8, "topP": 1, "stopSequences":
["|"]}}'
headers:
Content-Length:
- '170'
Content-Type:
- !!binary |
YXBwbGljYXRpb24vanNvbg==
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjNUMDk1MTU2Wg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLTA0YmY4MjVjLTAxMTY5NjdhYWM1NmIxM2RlMDI1N2QwMjtQYXJlbnQ9MDdkM2U3N2Rl
OGFjMzJhNDtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
ZGQ1MTZiNTEtOGU1Yi00NGYyLTk5MzMtZjAwYzBiOGFkYWYw
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/converse-stream
response:
body:
string: !!binary |
AAAAlAAAAFLEwW5hCzpldmVudC10eXBlBwAMbWVzc2FnZVN0YXJ0DTpjb250ZW50LXR5cGUHABBh
cHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsicCI6ImFiY2RlZmdoaWprbG1u
b3BxcnN0dXZ3Iiwicm9sZSI6ImFzc2lzdGFudCJ9P+wfRAAAAMQAAABXjLhVJQs6ZXZlbnQtdHlw
ZQcAEWNvbnRlbnRCbG9ja0RlbHRhDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTpt
ZXNzYWdlLXR5cGUHAAVldmVudHsiY29udGVudEJsb2NrSW5kZXgiOjAsImRlbHRhIjp7InRleHQi
OiJIaSEgSG93IGNhbiBJIGhlbHAgeW91In0sInAiOiJhYmNkZWZnaGlqa2xtbm9wcXJzdHUifeBJ
9mIAAACJAAAAVlvc+UsLOmV2ZW50LXR5cGUHABBjb250ZW50QmxvY2tTdG9wDTpjb250ZW50LXR5
cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsiY29udGVudEJsb2Nr
SW5kZXgiOjAsInAiOiJhYmNkZSJ95xzwrwAAAKcAAABRu0n9jQs6ZXZlbnQtdHlwZQcAC21lc3Nh
Z2VTdG9wDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVl
dmVudHsicCI6ImFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6QUJDREVGR0hJSiIsInN0b3BSZWFz
b24iOiJtYXhfdG9rZW5zIn1LR3pNAAAAygAAAE5X40OECzpldmVudC10eXBlBwAIbWV0YWRhdGEN
OmNvbnRlbnQtdHlwZQcAEGFwcGxpY2F0aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJt
ZXRyaWNzIjp7ImxhdGVuY3lNcyI6NjA4fSwicCI6ImFiY2RlZmdoaWprIiwidXNhZ2UiOnsiaW5w
dXRUb2tlbnMiOjgsIm91dHB1dFRva2VucyI6MTAsInRvdGFsVG9rZW5zIjoxOH19iiQr+w==
headers:
Connection:
- keep-alive
Content-Type:
- application/vnd.amazon.eventstream
Date:
- Thu, 23 Jan 2025 09:51:56 GMT
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
x-amzn-RequestId:
- 2b74a5d3-615a-4f81-b00f-f0b10a618e23
status:
code: 200
message: OK
version: 1

View File

@ -0,0 +1,54 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}]}'
headers:
Content-Length:
- '77'
Content-Type:
- !!binary |
YXBwbGljYXRpb24vanNvbg==
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjNUMDk1MTU3Wg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLTI5NzA1OTZhLTEyZWI5NDk2ODA1ZjZhYzE5YmU3ODM2NztQYXJlbnQ9Y2M0OTA0YWE2
ZjQ2NmYxYTtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
MjQzZWY2ZDgtNGJhNy00YTVlLWI0MGEtYThiNDE2ZDIzYjhk
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/does-not-exist/converse-stream
response:
body:
string: '{"message":"The provided model identifier is invalid."}'
headers:
Connection:
- keep-alive
Content-Length:
- '55'
Content-Type:
- application/json
Date:
- Thu, 23 Jan 2025 09:51:57 GMT
Set-Cookie: test_set_cookie
x-amzn-ErrorType:
- ValidationException:http://internal.amazon.com/coral/com.amazon.bedrock/
x-amzn-RequestId:
- 358b122c-d045-4d8f-a5bb-b0bd8cf6ee59
status:
code: 400
message: Bad Request
version: 1

View File

@ -25,8 +25,9 @@ from opentelemetry.semconv._incubating.attributes.error_attributes import (
from opentelemetry.trace.status import StatusCode from opentelemetry.trace.status import StatusCode
from .bedrock_utils import ( from .bedrock_utils import (
assert_completion_attributes,
assert_completion_attributes_from_streaming_body, assert_completion_attributes_from_streaming_body,
assert_converse_completion_attributes,
assert_converse_stream_completion_attributes,
) )
BOTO3_VERSION = tuple(int(x) for x in boto3.__version__.split(".")) BOTO3_VERSION = tuple(int(x) for x in boto3.__version__.split("."))
@ -58,7 +59,7 @@ def test_converse_with_content(
) )
(span,) = span_exporter.get_finished_spans() (span,) = span_exporter.get_finished_spans()
assert_completion_attributes( assert_converse_completion_attributes(
span, span,
llm_model_value, llm_model_value,
response, response,
@ -93,7 +94,7 @@ def test_converse_with_invalid_model(
) )
(span,) = span_exporter.get_finished_spans() (span,) = span_exporter.get_finished_spans()
assert_completion_attributes( assert_converse_completion_attributes(
span, span,
llm_model_value, llm_model_value,
None, None,
@ -107,6 +108,99 @@ def test_converse_with_invalid_model(
assert len(logs) == 0 assert len(logs) == 0
@pytest.mark.skipif(
BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available"
)
@pytest.mark.vcr()
def test_converse_stream_with_content(
span_exporter,
log_exporter,
bedrock_runtime_client,
instrument_with_content,
):
# pylint:disable=too-many-locals
messages = [{"role": "user", "content": [{"text": "Say this is a test"}]}]
llm_model_value = "amazon.titan-text-lite-v1"
max_tokens, temperature, top_p, stop_sequences = 10, 0.8, 1, ["|"]
response = bedrock_runtime_client.converse_stream(
messages=messages,
modelId=llm_model_value,
inferenceConfig={
"maxTokens": max_tokens,
"temperature": temperature,
"topP": top_p,
"stopSequences": stop_sequences,
},
)
# consume the stream in order to have it traced
finish_reason = None
input_tokens, output_tokens = None, None
text = ""
for event in response["stream"]:
if "contentBlockDelta" in event:
text += event["contentBlockDelta"]["delta"]["text"]
if "messageStop" in event:
finish_reason = (event["messageStop"]["stopReason"],)
if "metadata" in event:
usage = event["metadata"]["usage"]
input_tokens = usage["inputTokens"]
output_tokens = usage["outputTokens"]
assert text
(span,) = span_exporter.get_finished_spans()
assert_converse_stream_completion_attributes(
span,
llm_model_value,
input_tokens,
output_tokens,
finish_reason,
"chat",
top_p,
temperature,
max_tokens,
stop_sequences,
)
logs = log_exporter.get_finished_logs()
assert len(logs) == 0
@pytest.mark.skipif(
BOTO3_VERSION < (1, 35, 56), reason="ConverseStream API not available"
)
@pytest.mark.vcr()
def test_converse_stream_with_invalid_model(
span_exporter,
log_exporter,
bedrock_runtime_client,
instrument_with_content,
):
messages = [{"role": "user", "content": [{"text": "Say this is a test"}]}]
llm_model_value = "does-not-exist"
with pytest.raises(bedrock_runtime_client.exceptions.ValidationException):
bedrock_runtime_client.converse_stream(
messages=messages,
modelId=llm_model_value,
)
(span,) = span_exporter.get_finished_spans()
assert_converse_stream_completion_attributes(
span,
llm_model_value,
operation_name="chat",
)
assert span.status.status_code == StatusCode.ERROR
assert span.attributes[ERROR_TYPE] == "ValidationException"
logs = log_exporter.get_finished_logs()
assert len(logs) == 0
def get_invoke_model_body( def get_invoke_model_body(
llm_model, llm_model,
max_tokens=None, max_tokens=None,