add elasticsearch db.statement sanitization (#1598)

This commit is contained in:
Nimrod Shlagman
2023-02-10 02:50:42 +02:00
committed by GitHub
parent 7af87e1bec
commit df32e8ca7f
5 changed files with 228 additions and 23 deletions

View File

@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572))
- `opentelemetry-instrumentation-elasticsearch` Add optional db.statement query sanitization.
([#1598](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1598))
- `opentelemetry-instrumentation-celery` Record exceptions as events on the span.
([#1573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1573))
- Add metric instrumentation for urllib

View File

@ -44,6 +44,7 @@ environment variable or by passing the prefix as an argument to the instrumentor
The instrument() method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
sanitize_query (bool) - an optional query sanitization flag
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
this function signature is:
def request_hook(span: Span, method: str, url: str, kwargs)
@ -96,6 +97,8 @@ from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer
from .utils import sanitize_body
logger = getLogger(__name__)
@ -135,11 +138,16 @@ class ElasticsearchInstrumentor(BaseInstrumentor):
tracer = get_tracer(__name__, __version__, tracer_provider)
request_hook = kwargs.get("request_hook")
response_hook = kwargs.get("response_hook")
sanitize_query = kwargs.get("sanitize_query", False)
_wrap(
elasticsearch,
"Transport.perform_request",
_wrap_perform_request(
tracer, self._span_name_prefix, request_hook, response_hook
tracer,
sanitize_query,
self._span_name_prefix,
request_hook,
response_hook,
),
)
@ -154,7 +162,11 @@ _regex_search_url = re.compile(r"/([^/]+)/_search[/]?")
def _wrap_perform_request(
tracer, span_name_prefix, request_hook=None, response_hook=None
tracer,
sanitize_query,
span_name_prefix,
request_hook=None,
response_hook=None,
):
# pylint: disable=R0912,R0914
def wrapper(wrapped, _, args, kwargs):
@ -213,7 +225,10 @@ def _wrap_perform_request(
if method:
attributes["elasticsearch.method"] = method
if body:
attributes[SpanAttributes.DB_STATEMENT] = str(body)
statement = str(body)
if sanitize_query:
statement = sanitize_body(body)
attributes[SpanAttributes.DB_STATEMENT] = statement
if params:
attributes["elasticsearch.params"] = str(params)
if doc_id:

View File

@ -0,0 +1,59 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
sanitized_keys = (
"message",
"should",
"filter",
"query",
"queries",
"intervals",
"match",
)
sanitized_value = "?"
# pylint: disable=C0103
def _flatten_dict(d, parent_key=""):
items = []
for k, v in d.items():
new_key = parent_key + "." + k if parent_key else k
if isinstance(v, dict):
items.extend(_flatten_dict(v, new_key).items())
else:
items.append((new_key, v))
return dict(items)
def _unflatten_dict(d):
res = {}
for k, v in d.items():
keys = k.split(".")
d = res
for key in keys[:-1]:
if key not in d:
d[key] = {}
d = d[key]
d[keys[-1]] = v
return res
def sanitize_body(body) -> str:
flatten_body = _flatten_dict(body)
for key in flatten_body:
if key.endswith(sanitized_keys):
flatten_body[key] = sanitized_value
return str(_unflatten_dict(flatten_body))

View File

@ -0,0 +1,65 @@
interval_query = {
"query": {
"intervals": {
"my_text": {
"all_of": {
"ordered": True,
"intervals": [
{
"match": {
"query": "my favorite food",
"max_gaps": 0,
"ordered": True,
}
},
{
"any_of": {
"intervals": [
{"match": {"query": "hot water"}},
{"match": {"query": "cold porridge"}},
]
}
},
],
}
}
}
}
}
match_query = {"query": {"match": {"message": {"query": "this is a test"}}}}
filter_query = {
"query": {
"bool": {
"must": [
{"match": {"title": "Search"}},
{"match": {"content": "Elasticsearch"}},
],
"filter": [
{"term": {"status": "published"}},
{"range": {"publish_date": {"gte": "2015-01-01"}}},
],
}
}
}
interval_query_sanitized = {
"query": {
"intervals": {
"my_text": {"all_of": {"ordered": True, "intervals": "?"}}
}
}
}
match_query_sanitized = {"query": {"match": {"message": {"query": "?"}}}}
filter_query_sanitized = {
"query": {
"bool": {
"must": [
{"match": {"title": "Search"}},
{"match": {"content": "Elasticsearch"}},
],
"filter": "?",
}
}
}

View File

@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import threading
@ -27,10 +28,13 @@ from opentelemetry import trace
from opentelemetry.instrumentation.elasticsearch import (
ElasticsearchInstrumentor,
)
from opentelemetry.instrumentation.elasticsearch.utils import sanitize_body
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import StatusCode
from . import sanitization_queries # pylint: disable=no-name-in-module
major_version = elasticsearch.VERSION[0]
if major_version == 7:
@ -42,7 +46,6 @@ elif major_version == 5:
else:
from . import helpers_es2 as helpers # pylint: disable=no-name-in-module
Article = helpers.Article
@ -50,6 +53,22 @@ Article = helpers.Article
"elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request"
)
class TestElasticsearchIntegration(TestBase):
search_attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index/_search",
"elasticsearch.method": helpers.dsl_search_method,
"elasticsearch.target": "test-index",
SpanAttributes.DB_STATEMENT: str(
{"query": {"bool": {"filter": [{"term": {"author": "testing"}}]}}}
),
}
create_attributes = {
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index",
"elasticsearch.method": "HEAD",
}
def setUp(self):
super().setUp()
self.tracer = self.tracer_provider.get_tracer(__name__)
@ -241,21 +260,36 @@ class TestElasticsearchIntegration(TestBase):
self.assertIsNotNone(span.end_time)
self.assertEqual(
span.attributes,
self.search_attributes,
)
def test_dsl_search_sanitized(self, request_mock):
# Reset instrumentation to use sanitized query (default)
ElasticsearchInstrumentor().uninstrument()
ElasticsearchInstrumentor().instrument(sanitize_query=True)
# update expected attributes to match sanitized query
sanitized_search_attributes = self.search_attributes.copy()
sanitized_search_attributes.update(
{
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index/_search",
"elasticsearch.method": helpers.dsl_search_method,
"elasticsearch.target": "test-index",
SpanAttributes.DB_STATEMENT: str(
{
"query": {
"bool": {
"filter": [{"term": {"author": "testing"}}]
}
}
}
),
},
SpanAttributes.DB_STATEMENT: "{'query': {'bool': {'filter': '?'}}}"
}
)
request_mock.return_value = (1, {}, '{"hits": {"hits": []}}')
client = Elasticsearch()
search = Search(using=client, index="test-index").filter(
"term", author="testing"
)
search.execute()
spans = self.get_finished_spans()
span = spans[0]
self.assertEqual(1, len(spans))
self.assertEqual(span.name, "Elasticsearch/<target>/_search")
self.assertIsNotNone(span.end_time)
self.assertEqual(
span.attributes,
sanitized_search_attributes,
)
def test_dsl_create(self, request_mock):
@ -264,17 +298,14 @@ class TestElasticsearchIntegration(TestBase):
Article.init(using=client)
spans = self.get_finished_spans()
assert spans
self.assertEqual(2, len(spans))
span1 = spans.by_attr(key="elasticsearch.method", value="HEAD")
span2 = spans.by_attr(key="elasticsearch.method", value="PUT")
self.assertEqual(
span1.attributes,
{
SpanAttributes.DB_SYSTEM: "elasticsearch",
"elasticsearch.url": "/test-index",
"elasticsearch.method": "HEAD",
},
self.create_attributes,
)
attributes = {
@ -288,6 +319,25 @@ class TestElasticsearchIntegration(TestBase):
helpers.dsl_create_statement,
)
def test_dsl_create_sanitized(self, request_mock):
# Reset instrumentation to explicitly use sanitized query
ElasticsearchInstrumentor().uninstrument()
ElasticsearchInstrumentor().instrument(sanitize_query=True)
request_mock.return_value = (1, {}, {})
client = Elasticsearch()
Article.init(using=client)
spans = self.get_finished_spans()
assert spans
self.assertEqual(2, len(spans))
span = spans.by_attr(key="elasticsearch.method", value="HEAD")
self.assertEqual(
span.attributes,
self.create_attributes,
)
def test_dsl_index(self, request_mock):
request_mock.return_value = helpers.dsl_index_result
@ -412,3 +462,17 @@ class TestElasticsearchIntegration(TestBase):
json.dumps(response_payload),
spans[0].attributes[response_attribute_name],
)
def test_body_sanitization(self, _):
self.assertEqual(
sanitize_body(sanitization_queries.interval_query),
str(sanitization_queries.interval_query_sanitized),
)
self.assertEqual(
sanitize_body(sanitization_queries.match_query),
str(sanitization_queries.match_query_sanitized),
)
self.assertEqual(
sanitize_body(sanitization_queries.filter_query),
str(sanitization_queries.filter_query_sanitized),
)