mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-07-29 13:12:39 +08:00
adding response_hook to elastic instrumentation (#670)
This commit is contained in:

committed by
GitHub

parent
b47328e134
commit
291e50813a
@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- `opentelemetry-sdk-extension-aws` Release AWS Python SDK Extension as 1.0.0
|
||||
([#667](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/667))
|
||||
|
||||
### Added
|
||||
- `opentelemetry-instrumentation-elasticsearch` Added `response_hook` and `request_hook` callbacks
|
||||
([#670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/670))
|
||||
|
||||
### Changed
|
||||
- `opentelemetry-instrumentation-botocore` Unpatch botocore Endpoint.prepare_request on uninstrument
|
||||
([#664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/664))
|
||||
|
@ -44,6 +44,41 @@ environment variable or by passing the prefix as an argument to the instrumentor
|
||||
.. code-block:: python
|
||||
|
||||
ElasticsearchInstrumentor("my-custom-prefix").instrument()
|
||||
|
||||
|
||||
The `instrument` method accepts the following keyword args:
|
||||
|
||||
tracer_provider (TracerProvider) - an optional tracer provider
|
||||
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
|
||||
this function signature is:
|
||||
def request_hook(span: Span, method: str, url: str, kwargs)
|
||||
response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request
|
||||
this function signature is:
|
||||
def response_hook(span: Span, response: dict)
|
||||
|
||||
for example:
|
||||
|
||||
.. code: python
|
||||
|
||||
from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
|
||||
import elasticsearch
|
||||
|
||||
def request_hook(span, method, url, kwargs):
|
||||
if span and span.is_recording():
|
||||
span.set_attribute("custom_user_attribute_from_request_hook", "some-value")
|
||||
|
||||
def response_hook(span, response):
|
||||
if span and span.is_recording():
|
||||
span.set_attribute("custom_user_attribute_from_response_hook", "some-value")
|
||||
|
||||
# instrument elasticsearch with request and response hooks
|
||||
ElasticsearchInstrumentor().instrument(request_hook=request_hook, response_hook=response_hook)
|
||||
|
||||
# Using elasticsearch as normal now will automatically generate spans,
|
||||
# including user custom attributes added from the hooks
|
||||
es = elasticsearch.Elasticsearch()
|
||||
es.index(index='my-index', doc_type='my-type', id=1, body={'my': 'data', 'timestamp': datetime.now()})
|
||||
es.get(index='my-index', doc_type='my-type', id=1)
|
||||
"""
|
||||
|
||||
from logging import getLogger
|
||||
@ -97,17 +132,23 @@ class ElasticsearchInstrumentor(BaseInstrumentor):
|
||||
"""
|
||||
tracer_provider = kwargs.get("tracer_provider")
|
||||
tracer = get_tracer(__name__, __version__, tracer_provider)
|
||||
request_hook = kwargs.get("request_hook")
|
||||
response_hook = kwargs.get("response_hook")
|
||||
_wrap(
|
||||
elasticsearch,
|
||||
"Transport.perform_request",
|
||||
_wrap_perform_request(tracer, self._span_name_prefix),
|
||||
_wrap_perform_request(
|
||||
tracer, self._span_name_prefix, request_hook, response_hook
|
||||
),
|
||||
)
|
||||
|
||||
def _uninstrument(self, **kwargs):
|
||||
unwrap(elasticsearch.Transport, "perform_request")
|
||||
|
||||
|
||||
def _wrap_perform_request(tracer, span_name_prefix):
|
||||
def _wrap_perform_request(
|
||||
tracer, span_name_prefix, request_hook=None, response_hook=None
|
||||
):
|
||||
# pylint: disable=R0912
|
||||
def wrapper(wrapped, _, args, kwargs):
|
||||
method = url = None
|
||||
@ -127,6 +168,10 @@ def _wrap_perform_request(tracer, span_name_prefix):
|
||||
with tracer.start_as_current_span(
|
||||
op_name, kind=SpanKind.CLIENT,
|
||||
) as span:
|
||||
|
||||
if callable(request_hook):
|
||||
request_hook(span, method, url, kwargs)
|
||||
|
||||
if span.is_recording():
|
||||
attributes = {
|
||||
SpanAttributes.DB_SYSTEM: "elasticsearch",
|
||||
@ -150,6 +195,9 @@ def _wrap_perform_request(tracer, span_name_prefix):
|
||||
"elasticsearch.{0}".format(member),
|
||||
str(rv[member]),
|
||||
)
|
||||
|
||||
if callable(response_hook):
|
||||
response_hook(span, rv)
|
||||
return rv
|
||||
|
||||
return wrapper
|
||||
|
@ -11,7 +11,7 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
from ast import literal_eval
|
||||
@ -316,3 +316,101 @@ class TestElasticsearchIntegration(TestBase):
|
||||
"title": "About searching",
|
||||
},
|
||||
)
|
||||
|
||||
def test_request_hook(self, request_mock):
|
||||
request_hook_method_attribute = "request_hook.method"
|
||||
request_hook_url_attribute = "request_hook.url"
|
||||
request_hook_kwargs_attribute = "request_hook.kwargs"
|
||||
|
||||
def request_hook(span, method, url, kwargs):
|
||||
|
||||
attributes = {
|
||||
request_hook_method_attribute: method,
|
||||
request_hook_url_attribute: url,
|
||||
request_hook_kwargs_attribute: json.dumps(kwargs),
|
||||
}
|
||||
|
||||
if span and span.is_recording():
|
||||
span.set_attributes(attributes)
|
||||
|
||||
ElasticsearchInstrumentor().uninstrument()
|
||||
ElasticsearchInstrumentor().instrument(request_hook=request_hook)
|
||||
|
||||
request_mock.return_value = (
|
||||
1,
|
||||
{},
|
||||
'{"found": false, "timed_out": true, "took": 7}',
|
||||
)
|
||||
es = Elasticsearch()
|
||||
index = "test-index"
|
||||
doc_type = "tweet"
|
||||
doc_id = 1
|
||||
kwargs = {"params": {"test": True}}
|
||||
es.get(index=index, doc_type=doc_type, id=doc_id, **kwargs)
|
||||
|
||||
spans = self.get_finished_spans()
|
||||
|
||||
self.assertEqual(1, len(spans))
|
||||
self.assertEqual(
|
||||
"GET", spans[0].attributes[request_hook_method_attribute]
|
||||
)
|
||||
self.assertEqual(
|
||||
f"/{index}/{doc_type}/{doc_id}",
|
||||
spans[0].attributes[request_hook_url_attribute],
|
||||
)
|
||||
self.assertEqual(
|
||||
json.dumps(kwargs),
|
||||
spans[0].attributes[request_hook_kwargs_attribute],
|
||||
)
|
||||
|
||||
def test_response_hook(self, request_mock):
|
||||
response_attribute_name = "db.query_result"
|
||||
|
||||
def response_hook(span, response):
|
||||
if span and span.is_recording():
|
||||
span.set_attribute(
|
||||
response_attribute_name, json.dumps(response)
|
||||
)
|
||||
|
||||
ElasticsearchInstrumentor().uninstrument()
|
||||
ElasticsearchInstrumentor().instrument(response_hook=response_hook)
|
||||
|
||||
response_payload = {
|
||||
"took": 9,
|
||||
"timed_out": False,
|
||||
"_shards": {
|
||||
"total": 1,
|
||||
"successful": 1,
|
||||
"skipped": 0,
|
||||
"failed": 0,
|
||||
},
|
||||
"hits": {
|
||||
"total": {"value": 1, "relation": "eq"},
|
||||
"max_score": 0.18232156,
|
||||
"hits": [
|
||||
{
|
||||
"_index": "test-index",
|
||||
"_type": "tweet",
|
||||
"_id": "1",
|
||||
"_score": 0.18232156,
|
||||
"_source": {"name": "tester"},
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
request_mock.return_value = (
|
||||
1,
|
||||
{},
|
||||
json.dumps(response_payload),
|
||||
)
|
||||
es = Elasticsearch()
|
||||
es.get(index="test-index", doc_type="tweet", id=1)
|
||||
|
||||
spans = self.get_finished_spans()
|
||||
|
||||
self.assertEqual(1, len(spans))
|
||||
self.assertEqual(
|
||||
json.dumps(response_payload),
|
||||
spans[0].attributes[response_attribute_name],
|
||||
)
|
||||
|
Reference in New Issue
Block a user