botocore: Add DynamoDB extension (#735)

* botocore: Add dynamodb extension

* extract addtional DynamoDB specific attributes according to the spec
* move DynamoDB tests to separate test module

* changelog

* add license info

Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com>
This commit is contained in:
Mario Jonke
2021-10-18 11:53:06 +02:00
committed by GitHub
parent bf41b2e33f
commit 3058281f5e
8 changed files with 975 additions and 52 deletions

View File

@ -13,6 +13,8 @@ automatically by the auto instrumentor.
([#745](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/745))
- `opentelemetry-instrumentation-pika` Bugfix use properties.headers. It will prevent the header injection from raising.
([#740](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/740))
- `opentelemetry-instrumentation-botocore` Add extension for DynamoDB
([#735](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/735))
## [1.6.0-0.25b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.0-0.25b0) - 2021-10-13
### Added

View File

@ -214,7 +214,6 @@ class BotocoreInstrumentor(BaseInstrumentor):
if BotocoreInstrumentor._is_lambda_invoke(call_context):
BotocoreInstrumentor._patch_lambda_invoke(call_context.params)
_set_api_call_attributes(span, call_context)
_safe_invoke(extension.before_service_call, span)
self._call_request_hook(span, call_context)
@ -261,14 +260,6 @@ class BotocoreInstrumentor(BaseInstrumentor):
)
def _set_api_call_attributes(span, call_context: _AwsSdkCallContext):
if not span.is_recording():
return
if "TableName" in call_context.params:
span.set_attribute("aws.table_name", call_context.params["TableName"])
def _apply_response_attributes(span: Span, result):
if result is None or not span.is_recording():
return

View File

@ -1,3 +1,17 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import logging
@ -18,6 +32,7 @@ def _lazy_load(module, cls):
_KNOWN_EXTENSIONS = {
"dynamodb": _lazy_load(".dynamodb", "_DynamoDbExtension"),
"sqs": _lazy_load(".sqs", "_SqsExtension"),
}

View File

@ -0,0 +1,424 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import inspect
import json
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from urllib.parse import urlparse
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkCallContext,
_AwsSdkExtension,
_BotoResultT,
)
from opentelemetry.semconv.trace import DbSystemValues, SpanAttributes
from opentelemetry.trace.span import Span
from opentelemetry.util.types import AttributeValue
_AttributePathT = Union[str, Tuple[str]]
# converter functions
def _conv_val_to_single_attr_tuple(value: str) -> Tuple[str]:
return None if value is None else (value,)
def _conv_dict_to_key_tuple(value: Dict[str, Any]) -> Optional[Tuple[str]]:
return tuple(value.keys()) if isinstance(value, Dict) else None
def _conv_list_to_json_list(value: List) -> Optional[List[str]]:
return (
[json.dumps(item) for item in value]
if isinstance(value, List)
else None
)
def _conv_val_to_single_json_tuple(value: str) -> Optional[Tuple[str]]:
return (json.dumps(value),) if value is not None else None
def _conv_dict_to_json_str(value: Dict) -> Optional[str]:
return json.dumps(value) if isinstance(value, Dict) else None
def _conv_val_to_len(value) -> Optional[int]:
return len(value) if value is not None else None
################################################################################
# common request attributes
################################################################################
_REQ_TABLE_NAME = ("TableName", _conv_val_to_single_attr_tuple)
_REQ_REQITEMS_TABLE_NAMES = ("RequestItems", _conv_dict_to_key_tuple)
_REQ_GLOBAL_SEC_INDEXES = ("GlobalSecondaryIndexes", _conv_list_to_json_list)
_REQ_LOCAL_SEC_INDEXES = ("LocalSecondaryIndexes", _conv_list_to_json_list)
_REQ_PROV_READ_CAP = (("ProvisionedThroughput", "ReadCapacityUnits"), None)
_REQ_PROV_WRITE_CAP = (("ProvisionedThroughput", "WriteCapacityUnits"), None)
_REQ_CONSISTENT_READ = ("ConsistentRead", None)
_REQ_PROJECTION = ("ProjectionExpression", None)
_REQ_ATTRS_TO_GET = ("AttributesToGet", None)
_REQ_LIMIT = ("Limit", None)
_REQ_SELECT = ("Select", None)
_REQ_INDEX_NAME = ("IndexName", None)
################################################################################
# common response attributes
################################################################################
_RES_CONSUMED_CAP = ("ConsumedCapacity", _conv_list_to_json_list)
_RES_CONSUMED_CAP_SINGLE = ("ConsumedCapacity", _conv_val_to_single_json_tuple)
_RES_ITEM_COL_METRICS = ("ItemCollectionMetrics", _conv_dict_to_json_str)
################################################################################
# DynamoDB operations with enhanced attributes
################################################################################
_AttrSpecT = Tuple[_AttributePathT, Optional[Callable]]
class _DynamoDbOperation(abc.ABC):
start_attributes = None # type: Optional[Dict[str, _AttrSpecT]]
request_attributes = None # type: Optional[Dict[str, _AttrSpecT]]
response_attributes = None # type: Optional[Dict[str, _AttrSpecT]]
@classmethod
@abc.abstractmethod
def operation_name(cls):
pass
@classmethod
def add_start_attributes(
cls, call_context: _AwsSdkCallContext, attributes: _AttributeMapT
):
pass
@classmethod
def add_response_attributes(
cls, call_context: _AwsSdkCallContext, span: Span, result: _BotoResultT
):
pass
class _OpBatchGetItem(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_REQITEMS_TABLE_NAMES,
}
response_attributes = {
SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY: _RES_CONSUMED_CAP,
}
@classmethod
def operation_name(cls):
return "BatchGetItem"
class _OpBatchWriteItem(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_REQITEMS_TABLE_NAMES,
}
response_attributes = {
SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY: _RES_CONSUMED_CAP,
SpanAttributes.AWS_DYNAMODB_ITEM_COLLECTION_METRICS: _RES_ITEM_COL_METRICS,
}
@classmethod
def operation_name(cls):
return "BatchWriteItem"
class _OpCreateTable(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_TABLE_NAME,
}
request_attributes = {
SpanAttributes.AWS_DYNAMODB_GLOBAL_SECONDARY_INDEXES: _REQ_GLOBAL_SEC_INDEXES,
SpanAttributes.AWS_DYNAMODB_LOCAL_SECONDARY_INDEXES: _REQ_LOCAL_SEC_INDEXES,
SpanAttributes.AWS_DYNAMODB_PROVISIONED_READ_CAPACITY: _REQ_PROV_READ_CAP,
SpanAttributes.AWS_DYNAMODB_PROVISIONED_WRITE_CAPACITY: _REQ_PROV_WRITE_CAP,
}
@classmethod
def operation_name(cls):
return "CreateTable"
class _OpDeleteItem(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_TABLE_NAME,
}
response_attributes = {
SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY: _RES_CONSUMED_CAP_SINGLE,
SpanAttributes.AWS_DYNAMODB_ITEM_COLLECTION_METRICS: _RES_ITEM_COL_METRICS,
}
@classmethod
def operation_name(cls):
return "DeleteItem"
class _OpDeleteTable(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_TABLE_NAME,
}
@classmethod
def operation_name(cls):
return "DeleteTable"
class _OpDescribeTable(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_TABLE_NAME,
}
@classmethod
def operation_name(cls):
return "DescribeTable"
class _OpGetItem(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_TABLE_NAME,
}
request_attributes = {
SpanAttributes.AWS_DYNAMODB_CONSISTENT_READ: _REQ_CONSISTENT_READ,
SpanAttributes.AWS_DYNAMODB_PROJECTION: _REQ_PROJECTION,
}
response_attributes = {
SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY: _RES_CONSUMED_CAP_SINGLE,
}
@classmethod
def operation_name(cls):
return "GetItem"
class _OpListTables(_DynamoDbOperation):
request_attributes = {
SpanAttributes.AWS_DYNAMODB_EXCLUSIVE_START_TABLE: (
"ExclusiveStartTableName",
None,
),
SpanAttributes.AWS_DYNAMODB_LIMIT: _REQ_LIMIT,
}
response_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_COUNT: (
"TableNames",
_conv_val_to_len,
),
}
@classmethod
def operation_name(cls):
return "ListTables"
class _OpPutItem(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_TABLE_NAME
}
response_attributes = {
SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY: _RES_CONSUMED_CAP_SINGLE,
SpanAttributes.AWS_DYNAMODB_ITEM_COLLECTION_METRICS: _RES_ITEM_COL_METRICS,
}
@classmethod
def operation_name(cls):
return "PutItem"
class _OpQuery(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_TABLE_NAME,
}
request_attributes = {
SpanAttributes.AWS_DYNAMODB_SCAN_FORWARD: ("ScanIndexForward", None),
SpanAttributes.AWS_DYNAMODB_ATTRIBUTES_TO_GET: _REQ_ATTRS_TO_GET,
SpanAttributes.AWS_DYNAMODB_CONSISTENT_READ: _REQ_CONSISTENT_READ,
SpanAttributes.AWS_DYNAMODB_INDEX_NAME: _REQ_INDEX_NAME,
SpanAttributes.AWS_DYNAMODB_LIMIT: _REQ_LIMIT,
SpanAttributes.AWS_DYNAMODB_PROJECTION: _REQ_PROJECTION,
SpanAttributes.AWS_DYNAMODB_SELECT: _REQ_SELECT,
}
response_attributes = {
SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY: _RES_CONSUMED_CAP_SINGLE,
}
@classmethod
def operation_name(cls):
return "Query"
class _OpScan(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_TABLE_NAME,
}
request_attributes = {
SpanAttributes.AWS_DYNAMODB_SEGMENT: ("Segment", None),
SpanAttributes.AWS_DYNAMODB_TOTAL_SEGMENTS: ("TotalSegments", None),
SpanAttributes.AWS_DYNAMODB_ATTRIBUTES_TO_GET: _REQ_ATTRS_TO_GET,
SpanAttributes.AWS_DYNAMODB_CONSISTENT_READ: _REQ_CONSISTENT_READ,
SpanAttributes.AWS_DYNAMODB_INDEX_NAME: _REQ_INDEX_NAME,
SpanAttributes.AWS_DYNAMODB_LIMIT: _REQ_LIMIT,
SpanAttributes.AWS_DYNAMODB_PROJECTION: _REQ_PROJECTION,
SpanAttributes.AWS_DYNAMODB_SELECT: _REQ_SELECT,
}
response_attributes = {
SpanAttributes.AWS_DYNAMODB_COUNT: ("Count", None),
SpanAttributes.AWS_DYNAMODB_SCANNED_COUNT: ("ScannedCount", None),
SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY: _RES_CONSUMED_CAP_SINGLE,
}
@classmethod
def operation_name(cls):
return "Scan"
class _OpUpdateItem(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_TABLE_NAME,
}
response_attributes = {
SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY: _RES_CONSUMED_CAP_SINGLE,
SpanAttributes.AWS_DYNAMODB_ITEM_COLLECTION_METRICS: _RES_ITEM_COL_METRICS,
}
@classmethod
def operation_name(cls):
return "UpdateItem"
class _OpUpdateTable(_DynamoDbOperation):
start_attributes = {
SpanAttributes.AWS_DYNAMODB_TABLE_NAMES: _REQ_TABLE_NAME,
}
request_attributes = {
SpanAttributes.AWS_DYNAMODB_ATTRIBUTE_DEFINITIONS: (
"AttributeDefinitions",
_conv_list_to_json_list,
),
SpanAttributes.AWS_DYNAMODB_GLOBAL_SECONDARY_INDEX_UPDATES: (
"GlobalSecondaryIndexUpdates",
_conv_list_to_json_list,
),
SpanAttributes.AWS_DYNAMODB_PROVISIONED_READ_CAPACITY: _REQ_PROV_READ_CAP,
SpanAttributes.AWS_DYNAMODB_PROVISIONED_WRITE_CAPACITY: _REQ_PROV_WRITE_CAP,
}
@classmethod
def operation_name(cls):
return "UpdateTable"
################################################################################
# DynamoDB extension
################################################################################
_OPERATION_MAPPING = {
op.operation_name(): op
for op in globals().values()
if inspect.isclass(op)
and issubclass(op, _DynamoDbOperation)
and not inspect.isabstract(op)
} # type: Dict[str, _DynamoDbOperation]
class _DynamoDbExtension(_AwsSdkExtension):
def __init__(self, call_context: _AwsSdkCallContext):
super().__init__(call_context)
self._op = _OPERATION_MAPPING.get(call_context.operation)
def extract_attributes(self, attributes: _AttributeMapT):
attributes[SpanAttributes.DB_SYSTEM] = DbSystemValues.DYNAMODB.value
attributes[SpanAttributes.DB_OPERATION] = self._call_context.operation
attributes[SpanAttributes.NET_PEER_NAME] = self._get_peer_name()
if self._op is None:
return
def attr_setter(key: str, value: AttributeValue):
attributes[key] = value
self._add_attributes(
self._call_context.params, self._op.start_attributes, attr_setter
)
def _get_peer_name(self) -> str:
return urlparse(self._call_context.endpoint_url).netloc
def before_service_call(self, span: Span):
if not span.is_recording() or self._op is None:
return
self._add_attributes(
self._call_context.params,
self._op.request_attributes,
span.set_attribute,
)
def on_success(self, span: Span, result: _BotoResultT):
if not span.is_recording():
return
if self._op is None:
return
self._add_attributes(
result, self._op.response_attributes, span.set_attribute
)
def _add_attributes(
self,
provider: Dict[str, Any],
attributes: Dict[str, _AttrSpecT],
setter: Callable[[str, AttributeValue], None],
):
if attributes is None:
return
for attr_key, attr_spec in attributes.items():
attr_path, converter = attr_spec
value = self._get_attr_value(provider, attr_path)
if value is None:
continue
if converter is not None:
value = converter(value)
if value is None:
continue
setter(attr_key, value)
@staticmethod
def _get_attr_value(provider: Dict[str, Any], attr_path: _AttributePathT):
if isinstance(attr_path, str):
return provider.get(attr_path)
value = provider
for path_part in attr_path:
value = value.get(path_part)
if value is None:
return None
return None if value is provider else value

View File

@ -1,3 +1,17 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,

View File

@ -1,3 +1,17 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Dict, Optional, Tuple

View File

@ -0,0 +1,506 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from unittest import mock
import botocore.session
from moto import mock_dynamodb2 # pylint: disable=import-error
from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
from opentelemetry.instrumentation.botocore.extensions.dynamodb import (
_DynamoDbExtension,
)
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.span import Span
# pylint: disable=too-many-public-methods
class TestDynamoDbExtension(TestBase):
def setUp(self):
super().setUp()
BotocoreInstrumentor().instrument()
session = botocore.session.get_session()
session.set_credentials(
access_key="access-key", secret_key="secret-key"
)
self.client = session.create_client(
"dynamodb", region_name="us-west-2"
)
self.default_table_name = "test_table"
def tearDown(self):
super().tearDown()
BotocoreInstrumentor().uninstrument()
def _create_table(self, **kwargs):
create_args = {
"TableName": self.default_table_name,
"AttributeDefinitions": [
{"AttributeName": "id", "AttributeType": "S"},
{"AttributeName": "idl", "AttributeType": "S"},
{"AttributeName": "idg", "AttributeType": "S"},
],
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5,
},
"LocalSecondaryIndexes": [
{
"IndexName": "lsi",
"KeySchema": [{"AttributeName": "idl", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "KEYS_ONLY"},
}
],
"GlobalSecondaryIndexes": [
{
"IndexName": "gsi",
"KeySchema": [{"AttributeName": "idg", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "KEYS_ONLY"},
}
],
}
create_args.update(kwargs)
self.client.create_table(**create_args)
def _create_prepared_table(self, **kwargs):
self._create_table(**kwargs)
table = kwargs.get("TableName", self.default_table_name)
self.client.put_item(
TableName=table,
Item={"id": {"S": "1"}, "idl": {"S": "2"}, "idg": {"S": "3"}},
)
self.memory_exporter.clear()
@staticmethod
def _create_extension(operation: str) -> _DynamoDbExtension:
call_context = mock.MagicMock(operation=operation)
return _DynamoDbExtension(call_context)
def assert_span(self, operation: str) -> Span:
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(1, len(spans))
span = spans[0]
self.assertEqual("dynamodb", span.attributes[SpanAttributes.DB_SYSTEM])
self.assertEqual(
operation, span.attributes[SpanAttributes.DB_OPERATION]
)
self.assertEqual(
"dynamodb.us-west-2.amazonaws.com",
span.attributes[SpanAttributes.NET_PEER_NAME],
)
return span
def assert_table_names(self, span: Span, *table_names):
self.assertEqual(
tuple(table_names),
span.attributes[SpanAttributes.AWS_DYNAMODB_TABLE_NAMES],
)
def assert_consumed_capacity(self, span: Span, *table_names):
cap = span.attributes[SpanAttributes.AWS_DYNAMODB_CONSUMED_CAPACITY]
self.assertEqual(len(cap), len(table_names))
cap_tables = set()
for item in cap:
# should be like {"TableName": name, "CapacityUnits": number, ...}
deserialized = json.loads(item)
cap_tables.add(deserialized["TableName"])
for table_name in table_names:
self.assertIn(table_name, cap_tables)
def assert_item_col_metrics(self, span: Span):
actual = span.attributes[
SpanAttributes.AWS_DYNAMODB_ITEM_COLLECTION_METRICS
]
self.assertIsNotNone(actual)
json.loads(actual)
def assert_provisioned_read_cap(self, span: Span, expected: int):
actual = span.attributes[
SpanAttributes.AWS_DYNAMODB_PROVISIONED_READ_CAPACITY
]
self.assertEqual(expected, actual)
def assert_provisioned_write_cap(self, span: Span, expected: int):
actual = span.attributes[
SpanAttributes.AWS_DYNAMODB_PROVISIONED_WRITE_CAPACITY
]
self.assertEqual(expected, actual)
def assert_consistent_read(self, span: Span, expected: bool):
actual = span.attributes[SpanAttributes.AWS_DYNAMODB_CONSISTENT_READ]
self.assertEqual(expected, actual)
def assert_projection(self, span: Span, expected: str):
actual = span.attributes[SpanAttributes.AWS_DYNAMODB_PROJECTION]
self.assertEqual(expected, actual)
def assert_attributes_to_get(self, span: Span, *attrs):
self.assertEqual(
tuple(attrs),
span.attributes[SpanAttributes.AWS_DYNAMODB_ATTRIBUTES_TO_GET],
)
def assert_index_name(self, span: Span, expected: str):
self.assertEqual(
expected, span.attributes[SpanAttributes.AWS_DYNAMODB_INDEX_NAME]
)
def assert_limit(self, span: Span, expected: int):
self.assertEqual(
expected, span.attributes[SpanAttributes.AWS_DYNAMODB_LIMIT]
)
def assert_select(self, span: Span, expected: str):
self.assertEqual(
expected, span.attributes[SpanAttributes.AWS_DYNAMODB_SELECT]
)
def assert_extension_item_col_metrics(self, operation: str):
span = self.tracer_provider.get_tracer("test").start_span("test")
extension = self._create_extension(operation)
extension.on_success(
span, {"ItemCollectionMetrics": {"ItemCollectionKey": {"id": "1"}}}
)
self.assert_item_col_metrics(span)
@mock_dynamodb2
def test_batch_get_item(self):
table_name1 = "test_table1"
table_name2 = "test_table2"
self._create_prepared_table(TableName=table_name1)
self._create_prepared_table(TableName=table_name2)
self.client.batch_get_item(
RequestItems={
table_name1: {"Keys": [{"id": {"S": "test_key"}}]},
table_name2: {"Keys": [{"id": {"S": "test_key2"}}]},
},
ReturnConsumedCapacity="TOTAL",
)
span = self.assert_span("BatchGetItem")
self.assert_table_names(span, table_name1, table_name2)
self.assert_consumed_capacity(span, table_name1, table_name2)
@mock_dynamodb2
def test_batch_write_item(self):
table_name1 = "test_table1"
table_name2 = "test_table2"
self._create_prepared_table(TableName=table_name1)
self._create_prepared_table(TableName=table_name2)
self.client.batch_write_item(
RequestItems={
table_name1: [{"PutRequest": {"Item": {"id": {"S": "123"}}}}],
table_name2: [{"PutRequest": {"Item": {"id": {"S": "456"}}}}],
},
ReturnConsumedCapacity="TOTAL",
ReturnItemCollectionMetrics="SIZE",
)
span = self.assert_span("BatchWriteItem")
self.assert_table_names(span, table_name1, table_name2)
self.assert_consumed_capacity(span, table_name1, table_name2)
self.assert_item_col_metrics(span)
@mock_dynamodb2
def test_create_table(self):
local_sec_idx = {
"IndexName": "local_sec_idx",
"KeySchema": [{"AttributeName": "value", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "KEYS_ONLY"},
}
global_sec_idx = {
"IndexName": "global_sec_idx",
"KeySchema": [{"AttributeName": "value", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "KEYS_ONLY"},
}
self.client.create_table(
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
{"AttributeName": "value", "AttributeType": "S"},
],
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
LocalSecondaryIndexes=[local_sec_idx],
GlobalSecondaryIndexes=[global_sec_idx],
ProvisionedThroughput={
"ReadCapacityUnits": 42,
"WriteCapacityUnits": 17,
},
TableName=self.default_table_name,
)
span = self.assert_span("CreateTable")
self.assert_table_names(span, self.default_table_name)
self.assertEqual(
(json.dumps(global_sec_idx),),
span.attributes[
SpanAttributes.AWS_DYNAMODB_GLOBAL_SECONDARY_INDEXES
],
)
self.assertEqual(
(json.dumps(local_sec_idx),),
span.attributes[
SpanAttributes.AWS_DYNAMODB_LOCAL_SECONDARY_INDEXES
],
)
self.assert_provisioned_read_cap(span, 42)
@mock_dynamodb2
def test_delete_item(self):
self._create_prepared_table()
self.client.delete_item(
TableName=self.default_table_name,
Key={"id": {"S": "1"}},
ReturnConsumedCapacity="TOTAL",
ReturnItemCollectionMetrics="SIZE",
)
span = self.assert_span("DeleteItem")
self.assert_table_names(span, self.default_table_name)
# moto does not seem to return these:
# self.assert_consumed_capacity(span, self.default_table_name)
# self.assert_item_coll_metrics(span)
def test_delete_item_consumed_capacity(self):
span = self.tracer_provider.get_tracer("test").start_span("test")
extension = self._create_extension("DeleteItem")
extension.on_success(
span, {"ConsumedCapacity": {"TableName": "table"}}
)
self.assert_consumed_capacity(span, "table")
def test_delete_item_item_collection_metrics(self):
self.assert_extension_item_col_metrics("DeleteItem")
@mock_dynamodb2
def test_delete_table(self):
self._create_prepared_table()
self.client.delete_table(TableName=self.default_table_name)
span = self.assert_span("DeleteTable")
self.assert_table_names(span, self.default_table_name)
@mock_dynamodb2
def test_describe_table(self):
self._create_prepared_table()
self.client.describe_table(TableName=self.default_table_name)
span = self.assert_span("DescribeTable")
self.assert_table_names(span, self.default_table_name)
@mock_dynamodb2
def test_get_item(self):
self._create_prepared_table()
self.client.get_item(
TableName=self.default_table_name,
Key={"id": {"S": "1"}},
ConsistentRead=True,
AttributesToGet=["id"],
ProjectionExpression="1,2",
ReturnConsumedCapacity="TOTAL",
)
span = self.assert_span("GetItem")
self.assert_table_names(span, self.default_table_name)
self.assert_consistent_read(span, True)
self.assert_projection(span, "1,2")
self.assert_consumed_capacity(span, self.default_table_name)
@mock_dynamodb2
def test_list_tables(self):
self._create_table(TableName="my_table")
self._create_prepared_table()
self.client.list_tables(ExclusiveStartTableName="my_table", Limit=5)
span = self.assert_span("ListTables")
self.assertEqual(
"my_table",
span.attributes[SpanAttributes.AWS_DYNAMODB_EXCLUSIVE_START_TABLE],
)
self.assertEqual(
1, span.attributes[SpanAttributes.AWS_DYNAMODB_TABLE_COUNT]
)
self.assertEqual(5, span.attributes[SpanAttributes.AWS_DYNAMODB_LIMIT])
@mock_dynamodb2
def test_put_item(self):
table = "test_table"
self._create_prepared_table(TableName=table)
self.client.put_item(
TableName=table,
Item={"id": {"S": "1"}, "idl": {"S": "2"}, "idg": {"S": "3"}},
ReturnConsumedCapacity="TOTAL",
ReturnItemCollectionMetrics="SIZE",
)
span = self.assert_span("PutItem")
self.assert_table_names(span, table)
self.assert_consumed_capacity(span, table)
# moto does not seem to return these:
# self.assert_item_coll_metrics(span)
def test_put_item_item_collection_metrics(self):
self.assert_extension_item_col_metrics("PutItem")
@mock_dynamodb2
def test_query(self):
self._create_prepared_table()
self.client.query(
TableName=self.default_table_name,
IndexName="lsi",
Select="ALL_ATTRIBUTES",
AttributesToGet=["id"],
Limit=42,
ConsistentRead=True,
KeyConditions={
"id": {
"AttributeValueList": [{"S": "123"}],
"ComparisonOperator": "EQ",
}
},
ScanIndexForward=True,
ProjectionExpression="1,2",
ReturnConsumedCapacity="TOTAL",
)
span = self.assert_span("Query")
self.assert_table_names(span, self.default_table_name)
self.assertTrue(
span.attributes[SpanAttributes.AWS_DYNAMODB_SCAN_FORWARD]
)
self.assert_attributes_to_get(span, "id")
self.assert_consistent_read(span, True)
self.assert_index_name(span, "lsi")
self.assert_limit(span, 42)
self.assert_projection(span, "1,2")
self.assert_select(span, "ALL_ATTRIBUTES")
self.assert_consumed_capacity(span, self.default_table_name)
@mock_dynamodb2
def test_scan(self):
self._create_prepared_table()
self.client.scan(
TableName=self.default_table_name,
IndexName="lsi",
AttributesToGet=["id", "idl"],
Limit=42,
Select="ALL_ATTRIBUTES",
TotalSegments=17,
Segment=21,
ProjectionExpression="1,2",
ConsistentRead=True,
ReturnConsumedCapacity="TOTAL",
)
span = self.assert_span("Scan")
self.assert_table_names(span, self.default_table_name)
self.assertEqual(
21, span.attributes[SpanAttributes.AWS_DYNAMODB_SEGMENT]
)
self.assertEqual(
17, span.attributes[SpanAttributes.AWS_DYNAMODB_TOTAL_SEGMENTS]
)
self.assertEqual(1, span.attributes[SpanAttributes.AWS_DYNAMODB_COUNT])
self.assertEqual(
1, span.attributes[SpanAttributes.AWS_DYNAMODB_SCANNED_COUNT]
)
self.assert_attributes_to_get(span, "id", "idl")
self.assert_consistent_read(span, True)
self.assert_index_name(span, "lsi")
self.assert_limit(span, 42)
self.assert_projection(span, "1,2")
self.assert_select(span, "ALL_ATTRIBUTES")
self.assert_consumed_capacity(span, self.default_table_name)
@mock_dynamodb2
def test_update_item(self):
self._create_prepared_table()
self.client.update_item(
TableName=self.default_table_name,
Key={"id": {"S": "123"}},
AttributeUpdates={"id": {"Value": {"S": "456"}, "Action": "PUT"}},
ReturnConsumedCapacity="TOTAL",
ReturnItemCollectionMetrics="SIZE",
)
span = self.assert_span("UpdateItem")
self.assert_table_names(span, self.default_table_name)
self.assert_consumed_capacity(span, self.default_table_name)
# moto does not seem to return these:
# self.assert_item_coll_metrics(span)
def test_update_item_item_collection_metrics(self):
self.assert_extension_item_col_metrics("UpdateItem")
@mock_dynamodb2
def test_update_table(self):
self._create_prepared_table()
global_sec_idx_updates = {
"Update": {
"IndexName": "gsi",
"ProvisionedThroughput": {
"ReadCapacityUnits": 777,
"WriteCapacityUnits": 666,
},
}
}
attr_definition = {"AttributeName": "id", "AttributeType": "N"}
self.client.update_table(
TableName=self.default_table_name,
AttributeDefinitions=[attr_definition],
ProvisionedThroughput={
"ReadCapacityUnits": 23,
"WriteCapacityUnits": 19,
},
GlobalSecondaryIndexUpdates=[global_sec_idx_updates],
)
span = self.assert_span("UpdateTable")
self.assert_table_names(span, self.default_table_name)
self.assert_provisioned_read_cap(span, 23)
self.assert_provisioned_write_cap(span, 19)
self.assertEqual(
(json.dumps(attr_definition),),
span.attributes[SpanAttributes.AWS_DYNAMODB_ATTRIBUTE_DEFINITIONS],
)
self.assertEqual(
(json.dumps(global_sec_idx_updates),),
span.attributes[
SpanAttributes.AWS_DYNAMODB_GLOBAL_SECONDARY_INDEX_UPDATES
],
)

View File

@ -20,7 +20,6 @@ from unittest.mock import Mock, patch
import botocore.session
from botocore.exceptions import ParamValidationError
from moto import ( # pylint: disable=import-error
mock_dynamodb2,
mock_ec2,
mock_iam,
mock_kinesis,
@ -418,48 +417,6 @@ class TestBotocoreInstrumentor(TestBase):
detach(token)
self.assertEqual(0, len(self.get_finished_spans()))
@mock_dynamodb2
def test_dynamodb_client(self):
ddb = self._make_client("dynamodb")
test_table_name = "test_table_name"
ddb.create_table(
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
ProvisionedThroughput={
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5,
},
TableName=test_table_name,
)
self.assert_span(
"DynamoDB",
"CreateTable",
request_id=_REQUEST_ID_REGEX_MATCH,
attributes={"aws.table_name": test_table_name},
)
self.memory_exporter.clear()
ddb.put_item(TableName=test_table_name, Item={"id": {"S": "test_key"}})
self.assert_span(
"DynamoDB",
"PutItem",
request_id=_REQUEST_ID_REGEX_MATCH,
attributes={"aws.table_name": test_table_name},
)
self.memory_exporter.clear()
ddb.get_item(TableName=test_table_name, Key={"id": {"S": "test_key"}})
self.assert_span(
"DynamoDB",
"GetItem",
request_id=_REQUEST_ID_REGEX_MATCH,
attributes={"aws.table_name": test_table_name},
)
@mock_s3
def test_request_hook(self):
request_hook_service_attribute_name = "request_hook.service_name"