Use an sha 256 hash of the system instructions as it's upload filename (#3814)

* Use md5 hash of system instructions as the filename for system instructions.

* Fix typecheck add changelog

* Respond to comments

* Merge to main and fix changelog

* Switch to Sha 256

* Make changes

* Fix hashing

* Fix lint, typecheck, broke test

* Respond to comments

* fix test

* Fix linter

* Fix assert statements and update changelog
This commit is contained in:
DylanRussell
2025-10-14 16:51:47 -04:00
committed by GitHub
parent ba0644f938
commit ebe5bdf465
5 changed files with 147 additions and 21 deletions

Submodule opentelemetry-operations-python added at 6358cf5626

View File

@@ -27,5 +27,6 @@ class AbstractFileSystem(RealAbstractFileSystem):
def open(
self, path: str, mode: Literal["w"], *args: Any, **kwargs: Any
) -> io.TextIOWrapper: ...
def exists(self, path: str) -> bool: ...
def url_to_fs(url: str) -> tuple[AbstractFileSystem, str]: ...

View File

@@ -15,8 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3795](#3795))
- Make inputs / outputs / system instructions optional params to `on_completion`,
([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3802](#3802)).
- `opentelemetry-instrumentation-google-genai`: migrate off the deprecated events API to use the logs API
([#3625](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3624))
- Use a SHA256 hash of the system instructions as it's upload filename, and check
if the file exists before re-uploading it, ([https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3814](#3814)).
## Version 0.1b0 (2025-09-25)

View File

@@ -15,9 +15,11 @@
from __future__ import annotations
import hashlib
import logging
import posixpath
import threading
from collections import OrderedDict
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import ExitStack
from dataclasses import asdict, dataclass
@@ -73,8 +75,16 @@ class CompletionRefs:
JsonEncodeable = list[dict[str, Any]]
# mapping of upload path to function computing upload data dict
UploadData = dict[str, Callable[[], JsonEncodeable]]
# mapping of upload path and whether the contents were hashed to the filename to function computing upload data dict
UploadData = dict[tuple[str, bool], Callable[[], JsonEncodeable]]
def is_system_instructions_hashable(
system_instruction: list[types.MessagePart] | None,
) -> bool:
return bool(system_instruction) and all(
isinstance(x, types.Text) for x in system_instruction
)
class UploadCompletionHook(CompletionHook):
@@ -97,10 +107,13 @@ class UploadCompletionHook(CompletionHook):
base_path: str,
max_size: int = 20,
upload_format: Format | None = None,
lru_cache_max_size: int = 1024,
) -> None:
self._max_size = max_size
self._fs, base_path = fsspec.url_to_fs(base_path)
self._base_path = self._fs.unstrip_protocol(base_path)
self.lru_dict: OrderedDict[str, bool] = OrderedDict()
self.lru_cache_max_size = lru_cache_max_size
if upload_format not in _FORMATS + (None,):
raise ValueError(
@@ -132,7 +145,10 @@ class UploadCompletionHook(CompletionHook):
finally:
self._semaphore.release()
for path, json_encodeable in upload_data.items():
for (
path,
contents_hashed_to_filename,
), json_encodeable in upload_data.items():
# could not acquire, drop data
if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with
_logger.warning(
@@ -143,7 +159,10 @@ class UploadCompletionHook(CompletionHook):
try:
fut = self._executor.submit(
self._do_upload, path, json_encodeable
self._do_upload,
path,
contents_hashed_to_filename,
json_encodeable,
)
fut.add_done_callback(done)
except RuntimeError:
@@ -152,10 +171,20 @@ class UploadCompletionHook(CompletionHook):
)
self._semaphore.release()
def _calculate_ref_path(self) -> CompletionRefs:
def _calculate_ref_path(
self, system_instruction: list[types.MessagePart]
) -> CompletionRefs:
# TODO: experimental with using the trace_id and span_id, or fetching
# gen_ai.response.id from the active span.
system_instruction_hash = None
if is_system_instructions_hashable(system_instruction):
# Get a hash of the text.
system_instruction_hash = hashlib.sha256(
"\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType]
"utf-8"
),
usedforsecurity=False,
).hexdigest()
uuid_str = str(uuid4())
return CompletionRefs(
inputs_ref=posixpath.join(
@@ -166,13 +195,32 @@ class UploadCompletionHook(CompletionHook):
),
system_instruction_ref=posixpath.join(
self._base_path,
f"{uuid_str}_system_instruction.{self._format}",
f"{system_instruction_hash or uuid_str}_system_instruction.{self._format}",
),
)
def _file_exists(self, path: str) -> bool:
if path in self.lru_dict:
self.lru_dict.move_to_end(path)
return True
# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists
file_exists = self._fs.exists(path)
# don't cache this because soon the file will exist..
if not file_exists:
return False
self.lru_dict[path] = True
if len(self.lru_dict) > self.lru_cache_max_size:
self.lru_dict.popitem(last=False)
return True
def _do_upload(
self, path: str, json_encodeable: Callable[[], JsonEncodeable]
self,
path: str,
contents_hashed_to_filename: bool,
json_encodeable: Callable[[], JsonEncodeable],
) -> None:
if contents_hashed_to_filename and self._file_exists(path):
return
if self._format == "json":
# output as a single line with the json messages array
message_lines = [json_encodeable()]
@@ -194,6 +242,11 @@ class UploadCompletionHook(CompletionHook):
gen_ai_json_dump(message, file)
file.write("\n")
if contents_hashed_to_filename:
self.lru_dict[path] = True
if len(self.lru_dict) > self.lru_cache_max_size:
self.lru_dict.popitem(last=False)
def on_completion(
self,
*,
@@ -213,7 +266,7 @@ class UploadCompletionHook(CompletionHook):
system_instruction=system_instruction or None,
)
# generate the paths to upload to
ref_names = self._calculate_ref_path()
ref_names = self._calculate_ref_path(system_instruction)
def to_dict(
dataclass_list: list[types.InputMessage]
@@ -223,35 +276,40 @@ class UploadCompletionHook(CompletionHook):
return [asdict(dc) for dc in dataclass_list]
references = [
(ref_name, ref, ref_attr)
for ref_name, ref, ref_attr in [
(ref_name, ref, ref_attr, contents_hashed_to_filename)
for ref_name, ref, ref_attr, contents_hashed_to_filename in [
(
ref_names.inputs_ref,
completion.inputs,
GEN_AI_INPUT_MESSAGES_REF,
False,
),
(
ref_names.outputs_ref,
completion.outputs,
GEN_AI_OUTPUT_MESSAGES_REF,
False,
),
(
ref_names.system_instruction_ref,
completion.system_instruction,
GEN_AI_SYSTEM_INSTRUCTIONS_REF,
is_system_instructions_hashable(
completion.system_instruction
),
),
]
if ref
if ref # Filter out empty input/output/sys instruction
]
self._submit_all(
{
ref_name: partial(to_dict, ref)
for ref_name, ref, _ in references
(ref_name, contents_hashed_to_filename): partial(to_dict, ref)
for ref_name, ref, _, contents_hashed_to_filename in references
}
)
# stamp the refs on telemetry
references = {ref_attr: name for name, _, ref_attr in references}
references = {ref_attr: name for name, _, ref_attr, _ in references}
if span:
span.set_attributes(references)
if log_record:

View File

@@ -121,10 +121,10 @@ class TestUploadCompletionHook(TestCase):
mock_fsspec = self._fsspec_patcher.start()
self.mock_fs = ThreadSafeMagicMock()
mock_fsspec.url_to_fs.return_value = self.mock_fs, ""
self.mock_fs.exists.return_value = False
self.hook = UploadCompletionHook(
base_path=BASE_PATH,
max_size=MAXSIZE,
base_path=BASE_PATH, max_size=MAXSIZE, lru_cache_max_size=5
)
def tearDown(self) -> None:
@@ -157,13 +157,46 @@ class TestUploadCompletionHook(TestCase):
# all items should be consumed
self.hook.shutdown()
# TODO: https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3812 fix flaky test that requires sleep.
time.sleep(2)
time.sleep(0.5)
self.assertEqual(
self.mock_fs.open.call_count,
3,
"should have uploaded 3 files",
)
def test_lru_cache_works(self):
record = LogRecord()
self.hook.on_completion(
inputs=[],
outputs=[],
system_instruction=FAKE_SYSTEM_INSTRUCTION,
log_record=record,
)
# Wait a bit for file upload to finish..
time.sleep(0.5)
self.assertIsNotNone(record.attributes)
self.assertTrue(
self.hook._file_exists(
record.attributes["gen_ai.system_instructions_ref"]
)
)
# LRU cache has a size of 5. So only AFTER 5 uploads should the original file be removed from the cache.
for iteration in range(5):
self.assertTrue(
record.attributes["gen_ai.system_instructions_ref"]
in self.hook.lru_dict
)
self.hook.on_completion(
inputs=[],
outputs=[],
system_instruction=[types.Text(content=str(iteration))],
)
self.hook.shutdown()
self.assertFalse(
record.attributes["gen_ai.system_instructions_ref"]
in self.hook.lru_dict
)
def test_upload_when_inputs_outputs_empty(self):
record = LogRecord()
self.hook.on_completion(
@@ -180,7 +213,7 @@ class TestUploadCompletionHook(TestCase):
1,
"should have uploaded 1 file",
)
assert record.attributes is not None
self.assertIsNotNone(record.attributes)
for ref_key in [
"gen_ai.input.messages_ref",
"gen_ai.output.messages_ref",
@@ -335,6 +368,39 @@ class TestUploadCompletionHookIntegration(TestBase):
with fsspec.open(path, "r") as file:
self.assertEqual(file.read(), value)
def test_system_insruction_is_hashed_to_avoid_reupload(self):
expected_hash = (
"7e35acac4feca03ab47929d4cc6cfef1df2190ae1ee1752196a05ffc2a6cb360"
)
# Create the file before upload..
expected_file_name = (
f"memory://{expected_hash}_system_instruction.json"
)
with fsspec.open(expected_file_name, "wb") as file:
file.write(b"asg")
# FIle should exist.
self.assertTrue(self.hook._file_exists(expected_file_name))
system_instructions = [
types.Text(content="You are a helpful assistant."),
types.Text(content="You will do your best."),
]
record = LogRecord()
self.hook.on_completion(
inputs=[],
outputs=[],
system_instruction=system_instructions,
log_record=record,
)
self.hook.shutdown()
self.assertIsNotNone(record.attributes)
self.assertEqual(
record.attributes["gen_ai.system_instructions_ref"],
expected_file_name,
)
# Content should not have been overwritten.
self.assert_fsspec_equal(expected_file_name, "asg")
def test_upload_completions(self):
tracer = self.tracer_provider.get_tracer(__name__)
log_record = LogRecord()