mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-08-02 11:31:52 +08:00
37 lines
960 B
Python
37 lines
960 B
Python
import aiobotocore.session
|
|
|
|
from ddtrace import Pin
|
|
from contextlib import contextmanager
|
|
|
|
|
|
LOCALSTACK_ENDPOINT_URL = {
|
|
's3': 'http://localhost:5000',
|
|
'ec2': 'http://localhost:5001',
|
|
'kms': 'http://localhost:5002',
|
|
'sqs': 'http://localhost:5003',
|
|
'lambda': 'http://localhost:5004',
|
|
'kinesis': 'http://localhost:5005',
|
|
}
|
|
|
|
|
|
@contextmanager
|
|
def aiobotocore_client(service, tracer):
|
|
"""Helper function that creates a new aiobotocore client so that
|
|
it is closed at the end of the context manager.
|
|
"""
|
|
session = aiobotocore.session.get_session()
|
|
endpoint = LOCALSTACK_ENDPOINT_URL[service]
|
|
client = session.create_client(
|
|
service,
|
|
region_name='us-west-2',
|
|
endpoint_url=endpoint,
|
|
aws_access_key_id='aws',
|
|
aws_secret_access_key='aws',
|
|
aws_session_token='aws',
|
|
)
|
|
Pin.override(client, tracer=tracer)
|
|
try:
|
|
yield client
|
|
finally:
|
|
client.close()
|