mirror of
https://github.com/open-telemetry/opentelemetry-python-contrib.git
synced 2025-08-02 19:47:17 +08:00
feat: add opentelemetry-instrumentation-aiokafka (#2082)
This commit is contained in:
18
.github/workflows/lint_0.yml
vendored
18
.github/workflows/lint_0.yml
vendored
@ -970,6 +970,24 @@ jobs:
|
||||
- name: Run tests
|
||||
run: tox -e lint-instrumentation-aio-pika
|
||||
|
||||
lint-instrumentation-aiokafka:
|
||||
name: instrumentation-aiokafka
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo @ SHA - ${{ github.sha }}
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python 3.12
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.12"
|
||||
|
||||
- name: Install tox
|
||||
run: pip install tox
|
||||
|
||||
- name: Run tests
|
||||
run: tox -e lint-instrumentation-aiokafka
|
||||
|
||||
lint-instrumentation-kafka-python:
|
||||
name: instrumentation-kafka-python
|
||||
runs-on: ubuntu-latest
|
||||
|
108
.github/workflows/test_1.yml
vendored
108
.github/workflows/test_1.yml
vendored
@ -3094,6 +3094,114 @@ jobs:
|
||||
- name: Run tests
|
||||
run: tox -e pypy3-test-instrumentation-aio-pika-3 -- -ra
|
||||
|
||||
py38-test-instrumentation-aiokafka_ubuntu-latest:
|
||||
name: instrumentation-aiokafka 3.8 Ubuntu
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo @ SHA - ${{ github.sha }}
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python 3.8
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.8"
|
||||
|
||||
- name: Install tox
|
||||
run: pip install tox
|
||||
|
||||
- name: Run tests
|
||||
run: tox -e py38-test-instrumentation-aiokafka -- -ra
|
||||
|
||||
py39-test-instrumentation-aiokafka_ubuntu-latest:
|
||||
name: instrumentation-aiokafka 3.9 Ubuntu
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo @ SHA - ${{ github.sha }}
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python 3.9
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.9"
|
||||
|
||||
- name: Install tox
|
||||
run: pip install tox
|
||||
|
||||
- name: Run tests
|
||||
run: tox -e py39-test-instrumentation-aiokafka -- -ra
|
||||
|
||||
py310-test-instrumentation-aiokafka_ubuntu-latest:
|
||||
name: instrumentation-aiokafka 3.10 Ubuntu
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo @ SHA - ${{ github.sha }}
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python 3.10
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.10"
|
||||
|
||||
- name: Install tox
|
||||
run: pip install tox
|
||||
|
||||
- name: Run tests
|
||||
run: tox -e py310-test-instrumentation-aiokafka -- -ra
|
||||
|
||||
py311-test-instrumentation-aiokafka_ubuntu-latest:
|
||||
name: instrumentation-aiokafka 3.11 Ubuntu
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo @ SHA - ${{ github.sha }}
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python 3.11
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.11"
|
||||
|
||||
- name: Install tox
|
||||
run: pip install tox
|
||||
|
||||
- name: Run tests
|
||||
run: tox -e py311-test-instrumentation-aiokafka -- -ra
|
||||
|
||||
py312-test-instrumentation-aiokafka_ubuntu-latest:
|
||||
name: instrumentation-aiokafka 3.12 Ubuntu
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo @ SHA - ${{ github.sha }}
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python 3.12
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.12"
|
||||
|
||||
- name: Install tox
|
||||
run: pip install tox
|
||||
|
||||
- name: Run tests
|
||||
run: tox -e py312-test-instrumentation-aiokafka -- -ra
|
||||
|
||||
pypy3-test-instrumentation-aiokafka_ubuntu-latest:
|
||||
name: instrumentation-aiokafka pypy-3.8 Ubuntu
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repo @ SHA - ${{ github.sha }}
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python pypy-3.8
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "pypy-3.8"
|
||||
|
||||
- name: Install tox
|
||||
run: pip install tox
|
||||
|
||||
- name: Run tests
|
||||
run: tox -e pypy3-test-instrumentation-aiokafka -- -ra
|
||||
|
||||
py38-test-instrumentation-kafka-python_ubuntu-latest:
|
||||
name: instrumentation-kafka-python 3.8 Ubuntu
|
||||
runs-on: ubuntu-latest
|
||||
|
@ -7,8 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## Unreleased
|
||||
|
||||
### Added
|
||||
|
||||
- `opentelemetry-instrumentation-fastapi` Add autoinstrumentation mechanism tests.
|
||||
([#2860](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2860))
|
||||
- `opentelemetry-instrumentation-aiokafka` Add instrumentor and auto instrumentation support for aiokafka
|
||||
([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082))
|
||||
|
||||
## Version 1.27.0/0.48b0 ()
|
||||
|
||||
|
@ -20,6 +20,7 @@ django>=2.2
|
||||
# Required by instrumentation and exporter packages
|
||||
aio_pika~=7.2.0
|
||||
aiohttp~=3.0
|
||||
aiokafka~=0.11.0
|
||||
aiopg>=0.13.0,<1.3.0
|
||||
asyncpg>=0.12.0
|
||||
boto~=2.0
|
||||
|
10
docs/instrumentation/aiokafka/aiokafka.rst
Normal file
10
docs/instrumentation/aiokafka/aiokafka.rst
Normal file
@ -0,0 +1,10 @@
|
||||
.. include:: ../../../instrumentation/opentelemetry-instrumentation-aiokafka/README.rst
|
||||
:end-before: References
|
||||
|
||||
API
|
||||
---
|
||||
|
||||
.. automodule:: opentelemetry.instrumentation.aiokafka
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
@ -4,6 +4,7 @@
|
||||
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika >= 7.2.0, < 10.0.0 | No | experimental
|
||||
| [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No | migration
|
||||
| [opentelemetry-instrumentation-aiohttp-server](./opentelemetry-instrumentation-aiohttp-server) | aiohttp ~= 3.0 | No | experimental
|
||||
| [opentelemetry-instrumentation-aiokafka](./opentelemetry-instrumentation-aiokafka) | aiokafka >= 0.8, < 1.0 | No | experimental
|
||||
| [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 2.0.0 | No | experimental
|
||||
| [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | Yes | migration
|
||||
| [opentelemetry-instrumentation-asyncio](./opentelemetry-instrumentation-asyncio) | asyncio | No | experimental
|
||||
|
201
instrumentation/opentelemetry-instrumentation-aiokafka/LICENSE
Normal file
201
instrumentation/opentelemetry-instrumentation-aiokafka/LICENSE
Normal file
@ -0,0 +1,201 @@
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
@ -0,0 +1,22 @@
|
||||
OpenTelemetry aiokafka instrumentation
|
||||
=======================================
|
||||
|
||||
|pypi|
|
||||
|
||||
.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-aiokafka.svg
|
||||
:target: https://pypi.org/project/opentelemetry-instrumentation-aiokafka/
|
||||
|
||||
Installation
|
||||
------------
|
||||
|
||||
::
|
||||
|
||||
pip install opentelemetry-instrumentation-aiokafka
|
||||
|
||||
|
||||
References
|
||||
----------
|
||||
|
||||
* `OpenTelemetry aiokafka Instrumentation <https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/aiokafka/aiokafka.html>`_
|
||||
* `OpenTelemetry Project <https://opentelemetry.io/>`_
|
||||
* `OpenTelemetry Python Examples <https://github.com/open-telemetry/opentelemetry-python/tree/main/docs/examples>`_
|
@ -0,0 +1,49 @@
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "opentelemetry-instrumentation-aiokafka"
|
||||
dynamic = ["version"]
|
||||
description = "OpenTelemetry aiokafka instrumentation"
|
||||
readme = "README.rst"
|
||||
license = "Apache-2.0"
|
||||
requires-python = ">=3.8"
|
||||
authors = [
|
||||
{ name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" },
|
||||
]
|
||||
classifiers = [
|
||||
"Development Status :: 4 - Beta",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: Apache Software License",
|
||||
"Programming Language :: Python",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
]
|
||||
dependencies = [
|
||||
"opentelemetry-api ~= 1.27",
|
||||
"opentelemetry-instrumentation == 0.49b0.dev",
|
||||
"opentelemetry-semantic-conventions == 0.49b0.dev",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
instruments = ["aiokafka >= 0.8, < 1.0"]
|
||||
|
||||
[project.entry-points.opentelemetry_instrumentor]
|
||||
aiokafka = "opentelemetry.instrumentation.aiokafka:AIOKafkaInstrumentor"
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-aiokafka"
|
||||
|
||||
[tool.hatch.version]
|
||||
path = "src/opentelemetry/instrumentation/aiokafka/version.py"
|
||||
|
||||
[tool.hatch.build.targets.sdist]
|
||||
include = ["/src", "/tests"]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/opentelemetry"]
|
@ -0,0 +1,135 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Instrument aiokafka to report instrumentation-kafka produced and consumed messages
|
||||
|
||||
Usage
|
||||
-----
|
||||
|
||||
..code:: python
|
||||
|
||||
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
|
||||
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
|
||||
|
||||
# Instrument kafka
|
||||
AIOKafkaInstrumentor().instrument()
|
||||
|
||||
# report a span of type producer with the default settings
|
||||
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
|
||||
await producer.send('my-topic', b'raw_bytes')
|
||||
|
||||
# report a span of type consumer with the default settings
|
||||
consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
|
||||
async for message in consumer:
|
||||
# process message
|
||||
|
||||
The _instrument() method accepts the following keyword args:
|
||||
tracer_provider (TracerProvider) - an optional tracer provider
|
||||
async_produce_hook (Callable) - a function with extra user-defined logic to be performed before sending the message
|
||||
this function signature is:
|
||||
def async_produce_hook(span: Span, args, kwargs)
|
||||
async_consume_hook (Callable) - a function with extra user-defined logic to be performed after consuming a message
|
||||
this function signature is:
|
||||
def async_consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs)
|
||||
for example:
|
||||
|
||||
.. code: python
|
||||
from opentelemetry.instrumentation.kafka import AIOKafkaInstrumentor
|
||||
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
|
||||
|
||||
async def async_produce_hook(span, args, kwargs):
|
||||
if span and span.is_recording():
|
||||
span.set_attribute("custom_user_attribute_from_async_response_hook", "some-value")
|
||||
async def async_consume_hook(span, record, args, kwargs):
|
||||
if span and span.is_recording():
|
||||
span.set_attribute("custom_user_attribute_from_consume_hook", "some-value")
|
||||
|
||||
# instrument kafka with produce and consume hooks
|
||||
AIOKafkaInstrumentor().instrument(async_produce_hook=async_produce_hook, async_consume_hook=async_consume_hook)
|
||||
|
||||
# Using kafka as normal now will automatically generate spans,
|
||||
# including user custom attributes added from the hooks
|
||||
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
|
||||
await producer.send('my-topic', b'raw_bytes')
|
||||
|
||||
API
|
||||
___
|
||||
"""
|
||||
from asyncio import iscoroutinefunction
|
||||
from typing import Collection
|
||||
|
||||
import aiokafka
|
||||
from wrapt import wrap_function_wrapper
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.instrumentation.aiokafka.package import _instruments
|
||||
from opentelemetry.instrumentation.aiokafka.utils import (
|
||||
_wrap_anext,
|
||||
_wrap_send,
|
||||
)
|
||||
from opentelemetry.instrumentation.aiokafka.version import __version__
|
||||
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
||||
from opentelemetry.instrumentation.utils import unwrap
|
||||
from opentelemetry.semconv.schemas import Schemas
|
||||
|
||||
|
||||
class AIOKafkaInstrumentor(BaseInstrumentor):
|
||||
"""An instrumentor for kafka module
|
||||
See `BaseInstrumentor`
|
||||
"""
|
||||
|
||||
def instrumentation_dependencies(self) -> Collection[str]:
|
||||
return _instruments
|
||||
|
||||
def _instrument(self, **kwargs):
|
||||
"""Instruments the kafka module
|
||||
|
||||
Args:
|
||||
**kwargs: Optional arguments
|
||||
``tracer_provider``: a TracerProvider, defaults to global.
|
||||
``async_produce_hook``: a callable to be executed just before producing a message
|
||||
``async_consume_hook``: a callable to be executed just after consuming a message
|
||||
"""
|
||||
tracer_provider = kwargs.get("tracer_provider")
|
||||
|
||||
async_produce_hook = kwargs.get("async_produce_hook")
|
||||
if not iscoroutinefunction(async_produce_hook):
|
||||
async_produce_hook = None
|
||||
|
||||
async_consume_hook = kwargs.get("async_consume_hook")
|
||||
if not iscoroutinefunction(async_consume_hook):
|
||||
async_consume_hook = None
|
||||
|
||||
tracer = trace.get_tracer(
|
||||
__name__,
|
||||
__version__,
|
||||
tracer_provider=tracer_provider,
|
||||
schema_url=Schemas.V1_27_0.value,
|
||||
)
|
||||
|
||||
wrap_function_wrapper(
|
||||
aiokafka.AIOKafkaProducer,
|
||||
"send",
|
||||
_wrap_send(tracer, async_produce_hook),
|
||||
)
|
||||
wrap_function_wrapper(
|
||||
aiokafka.AIOKafkaConsumer,
|
||||
"__anext__",
|
||||
_wrap_anext(tracer, async_consume_hook),
|
||||
)
|
||||
|
||||
def _uninstrument(self, **kwargs):
|
||||
unwrap(aiokafka.AIOKafkaProducer, "send")
|
||||
unwrap(aiokafka.AIOKafkaConsumer, "__anext__")
|
@ -0,0 +1,16 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
_instruments = ("aiokafka >= 0.8, < 1.0",)
|
@ -0,0 +1,370 @@
|
||||
import json
|
||||
from logging import getLogger
|
||||
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union
|
||||
|
||||
import aiokafka
|
||||
from aiokafka import ConsumerRecord
|
||||
|
||||
from opentelemetry import context, propagate, trace
|
||||
from opentelemetry.context import Context
|
||||
from opentelemetry.propagators import textmap
|
||||
from opentelemetry.semconv._incubating.attributes import messaging_attributes
|
||||
from opentelemetry.semconv.attributes import server_attributes
|
||||
from opentelemetry.trace import Tracer
|
||||
from opentelemetry.trace.span import Span
|
||||
|
||||
_LOG = getLogger(__name__)
|
||||
|
||||
|
||||
def _extract_bootstrap_servers(
|
||||
client: aiokafka.AIOKafkaClient,
|
||||
) -> Union[str, List[str]]:
|
||||
return client._bootstrap_servers
|
||||
|
||||
|
||||
def _extract_client_id(client: aiokafka.AIOKafkaClient) -> str:
|
||||
return client._client_id
|
||||
|
||||
|
||||
def _extract_consumer_group(
|
||||
consumer: aiokafka.AIOKafkaConsumer,
|
||||
) -> Optional[str]:
|
||||
return consumer._group_id
|
||||
|
||||
|
||||
def _extract_argument(
|
||||
key: str,
|
||||
position: int,
|
||||
default_value: Any,
|
||||
args: Tuple[Any],
|
||||
kwargs: Dict[str, Any],
|
||||
) -> Any:
|
||||
if len(args) > position:
|
||||
return args[position]
|
||||
return kwargs.get(key, default_value)
|
||||
|
||||
|
||||
def _extract_send_topic(args: Tuple[Any], kwargs: Dict[str, Any]) -> str:
|
||||
"""extract topic from `send` method arguments in AIOKafkaProducer class"""
|
||||
return _extract_argument("topic", 0, "unknown", args, kwargs)
|
||||
|
||||
|
||||
def _extract_send_value(
|
||||
args: Tuple[Any], kwargs: Dict[str, Any]
|
||||
) -> Optional[Any]:
|
||||
"""extract value from `send` method arguments in AIOKafkaProducer class"""
|
||||
return _extract_argument("value", 1, None, args, kwargs)
|
||||
|
||||
|
||||
def _extract_send_key(
|
||||
args: Tuple[Any], kwargs: Dict[str, Any]
|
||||
) -> Optional[Any]:
|
||||
"""extract key from `send` method arguments in AIOKafkaProducer class"""
|
||||
return _extract_argument("key", 2, None, args, kwargs)
|
||||
|
||||
|
||||
def _extract_send_headers(args: Tuple[Any], kwargs: Dict[str, Any]):
|
||||
"""extract headers from `send` method arguments in AIOKafkaProducer class"""
|
||||
return _extract_argument("headers", 5, None, args, kwargs)
|
||||
|
||||
|
||||
async def _extract_send_partition(
|
||||
instance: aiokafka.AIOKafkaProducer,
|
||||
args: Tuple[Any],
|
||||
kwargs: Dict[str, Any],
|
||||
) -> Optional[int]:
|
||||
"""extract partition `send` method arguments, using the `_partition` method in AIOKafkaProducer class"""
|
||||
try:
|
||||
topic = _extract_send_topic(args, kwargs)
|
||||
key = _extract_send_key(args, kwargs)
|
||||
value = _extract_send_value(args, kwargs)
|
||||
partition = _extract_argument("partition", 3, None, args, kwargs)
|
||||
key_bytes, value_bytes = instance._serialize(topic, key, value)
|
||||
valid_types = (bytes, bytearray, memoryview, type(None))
|
||||
if (
|
||||
type(key_bytes) not in valid_types
|
||||
or type(value_bytes) not in valid_types
|
||||
):
|
||||
return None
|
||||
|
||||
await instance.client._wait_on_metadata(topic)
|
||||
|
||||
return instance._partition(
|
||||
topic, partition, key, value, key_bytes, value_bytes
|
||||
)
|
||||
except Exception as exception: # pylint: disable=W0703
|
||||
_LOG.debug("Unable to extract partition: %s", exception)
|
||||
return None
|
||||
|
||||
|
||||
ProduceHookT = Optional[Callable[[Span, Tuple, Dict], Awaitable[None]]]
|
||||
ConsumeHookT = Optional[
|
||||
Callable[[Span, ConsumerRecord, Tuple, Dict], Awaitable[None]]
|
||||
]
|
||||
|
||||
HeadersT = List[Tuple[str, Optional[bytes]]]
|
||||
|
||||
|
||||
class AIOKafkaContextGetter(textmap.Getter[HeadersT]):
|
||||
def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]:
|
||||
if carrier is None:
|
||||
return None
|
||||
|
||||
for item_key, value in carrier:
|
||||
if item_key == key:
|
||||
if value is not None:
|
||||
return [value.decode()]
|
||||
return None
|
||||
|
||||
def keys(self, carrier: HeadersT) -> List[str]:
|
||||
if carrier is None:
|
||||
return []
|
||||
return [key for (key, value) in carrier]
|
||||
|
||||
|
||||
class AIOKafkaContextSetter(textmap.Setter[HeadersT]):
|
||||
def set(
|
||||
self, carrier: HeadersT, key: Optional[str], value: Optional[str]
|
||||
) -> None:
|
||||
if carrier is None or key is None:
|
||||
return
|
||||
|
||||
if value is not None:
|
||||
carrier.append((key, value.encode()))
|
||||
else:
|
||||
carrier.append((key, value))
|
||||
|
||||
|
||||
_aiokafka_getter = AIOKafkaContextGetter()
|
||||
_aiokafka_setter = AIOKafkaContextSetter()
|
||||
|
||||
|
||||
def _enrich_base_span(
|
||||
span: Span,
|
||||
*,
|
||||
bootstrap_servers: Union[str, List[str]],
|
||||
client_id: str,
|
||||
topic: str,
|
||||
partition: Optional[int],
|
||||
key: Optional[Any],
|
||||
) -> None:
|
||||
span.set_attribute(
|
||||
messaging_attributes.MESSAGING_SYSTEM,
|
||||
messaging_attributes.MessagingSystemValues.KAFKA.value,
|
||||
)
|
||||
span.set_attribute(
|
||||
server_attributes.SERVER_ADDRESS, json.dumps(bootstrap_servers)
|
||||
)
|
||||
span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id)
|
||||
span.set_attribute(messaging_attributes.MESSAGING_DESTINATION_NAME, topic)
|
||||
|
||||
if partition is not None:
|
||||
span.set_attribute(
|
||||
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID,
|
||||
str(partition),
|
||||
)
|
||||
|
||||
if key is not None:
|
||||
span.set_attribute(
|
||||
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key
|
||||
)
|
||||
|
||||
|
||||
def _enrich_send_span(
|
||||
span: Span,
|
||||
*,
|
||||
bootstrap_servers: Union[str, List[str]],
|
||||
client_id: str,
|
||||
topic: str,
|
||||
partition: Optional[int],
|
||||
key: Optional[str],
|
||||
) -> None:
|
||||
if not span.is_recording():
|
||||
return
|
||||
|
||||
_enrich_base_span(
|
||||
span,
|
||||
bootstrap_servers=bootstrap_servers,
|
||||
client_id=client_id,
|
||||
topic=topic,
|
||||
partition=partition,
|
||||
key=key,
|
||||
)
|
||||
|
||||
span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "send")
|
||||
span.set_attribute(
|
||||
messaging_attributes.MESSAGING_OPERATION_TYPE,
|
||||
messaging_attributes.MessagingOperationTypeValues.PUBLISH.value,
|
||||
)
|
||||
|
||||
|
||||
def _enrich_anext_span(
|
||||
span: Span,
|
||||
*,
|
||||
bootstrap_servers: Union[str, List[str]],
|
||||
client_id: str,
|
||||
consumer_group: Optional[str],
|
||||
topic: str,
|
||||
partition: Optional[int],
|
||||
key: Optional[str],
|
||||
offset: int,
|
||||
) -> None:
|
||||
if not span.is_recording():
|
||||
return
|
||||
|
||||
_enrich_base_span(
|
||||
span,
|
||||
bootstrap_servers=bootstrap_servers,
|
||||
client_id=client_id,
|
||||
topic=topic,
|
||||
partition=partition,
|
||||
key=key,
|
||||
)
|
||||
|
||||
if consumer_group is not None:
|
||||
span.set_attribute(
|
||||
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group
|
||||
)
|
||||
|
||||
span.set_attribute(
|
||||
messaging_attributes.MESSAGING_OPERATION_NAME, "receive"
|
||||
)
|
||||
span.set_attribute(
|
||||
messaging_attributes.MESSAGING_OPERATION_TYPE,
|
||||
messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
|
||||
)
|
||||
|
||||
span.set_attribute(
|
||||
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET, offset
|
||||
)
|
||||
|
||||
# https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic
|
||||
# A message within Kafka is uniquely defined by its topic name, topic partition and offset.
|
||||
if partition is not None:
|
||||
span.set_attribute(
|
||||
messaging_attributes.MESSAGING_MESSAGE_ID,
|
||||
f"{topic}.{partition}.{offset}",
|
||||
)
|
||||
|
||||
|
||||
def _get_span_name(operation: str, topic: str):
|
||||
return f"{topic} {operation}"
|
||||
|
||||
|
||||
def _wrap_send(
|
||||
tracer: Tracer, async_produce_hook: ProduceHookT
|
||||
) -> Callable[..., Awaitable[None]]:
|
||||
async def _traced_send(
|
||||
func: Callable[..., Awaitable[None]],
|
||||
instance: aiokafka.AIOKafkaProducer,
|
||||
args: Tuple[Any],
|
||||
kwargs: Dict[str, Any],
|
||||
) -> None:
|
||||
headers = _extract_send_headers(args, kwargs)
|
||||
if headers is None:
|
||||
headers = []
|
||||
kwargs["headers"] = headers
|
||||
|
||||
topic = _extract_send_topic(args, kwargs)
|
||||
bootstrap_servers = _extract_bootstrap_servers(instance.client)
|
||||
client_id = _extract_client_id(instance.client)
|
||||
key = _extract_send_key(args, kwargs)
|
||||
partition = await _extract_send_partition(instance, args, kwargs)
|
||||
span_name = _get_span_name("send", topic)
|
||||
with tracer.start_as_current_span(
|
||||
span_name, kind=trace.SpanKind.PRODUCER
|
||||
) as span:
|
||||
_enrich_send_span(
|
||||
span,
|
||||
bootstrap_servers=bootstrap_servers,
|
||||
client_id=client_id,
|
||||
topic=topic,
|
||||
partition=partition,
|
||||
key=key,
|
||||
)
|
||||
propagate.inject(
|
||||
headers,
|
||||
context=trace.set_span_in_context(span),
|
||||
setter=_aiokafka_setter,
|
||||
)
|
||||
try:
|
||||
if async_produce_hook is not None:
|
||||
await async_produce_hook(span, args, kwargs)
|
||||
except Exception as hook_exception: # pylint: disable=W0703
|
||||
_LOG.exception(hook_exception)
|
||||
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
return _traced_send
|
||||
|
||||
|
||||
async def _create_consumer_span(
|
||||
tracer: Tracer,
|
||||
async_consume_hook: ConsumeHookT,
|
||||
record: ConsumerRecord,
|
||||
extracted_context: Context,
|
||||
bootstrap_servers: Union[str, List[str]],
|
||||
client_id: str,
|
||||
consumer_group: Optional[str],
|
||||
args: Tuple[Any],
|
||||
kwargs: Dict[str, Any],
|
||||
):
|
||||
span_name = _get_span_name("receive", record.topic)
|
||||
with tracer.start_as_current_span(
|
||||
span_name,
|
||||
context=extracted_context,
|
||||
kind=trace.SpanKind.CONSUMER,
|
||||
) as span:
|
||||
new_context = trace.set_span_in_context(span, extracted_context)
|
||||
token = context.attach(new_context)
|
||||
_enrich_anext_span(
|
||||
span,
|
||||
bootstrap_servers=bootstrap_servers,
|
||||
client_id=client_id,
|
||||
consumer_group=consumer_group,
|
||||
topic=record.topic,
|
||||
partition=record.partition,
|
||||
key=record.key,
|
||||
offset=record.offset,
|
||||
)
|
||||
try:
|
||||
if async_consume_hook is not None:
|
||||
await async_consume_hook(span, record, args, kwargs)
|
||||
except Exception as hook_exception: # pylint: disable=W0703
|
||||
_LOG.exception(hook_exception)
|
||||
context.detach(token)
|
||||
|
||||
|
||||
def _wrap_anext(
|
||||
tracer: Tracer, async_consume_hook: ConsumeHookT
|
||||
) -> Callable[..., Awaitable[aiokafka.ConsumerRecord]]:
|
||||
async def _traced_next(
|
||||
func: Callable[..., Awaitable[aiokafka.ConsumerRecord]],
|
||||
instance: aiokafka.AIOKafkaConsumer,
|
||||
args: Tuple[Any],
|
||||
kwargs: Dict[str, Any],
|
||||
) -> aiokafka.ConsumerRecord:
|
||||
record = await func(*args, **kwargs)
|
||||
|
||||
if record:
|
||||
bootstrap_servers = _extract_bootstrap_servers(instance._client)
|
||||
client_id = _extract_client_id(instance._client)
|
||||
consumer_group = _extract_consumer_group(instance)
|
||||
|
||||
extracted_context = propagate.extract(
|
||||
record.headers, getter=_aiokafka_getter
|
||||
)
|
||||
await _create_consumer_span(
|
||||
tracer,
|
||||
async_consume_hook,
|
||||
record,
|
||||
extracted_context,
|
||||
bootstrap_servers,
|
||||
client_id,
|
||||
consumer_group,
|
||||
args,
|
||||
kwargs,
|
||||
)
|
||||
return record
|
||||
|
||||
return _traced_next
|
@ -0,0 +1,15 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
__version__ = "0.49b0.dev"
|
@ -0,0 +1,4 @@
|
||||
aiokafka==0.11.0
|
||||
pytest==7.4.4
|
||||
-e opentelemetry-instrumentation
|
||||
-e instrumentation/opentelemetry-instrumentation-aiokafka
|
@ -0,0 +1,40 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from unittest import TestCase
|
||||
|
||||
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
|
||||
from wrapt import BoundFunctionWrapper
|
||||
|
||||
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
|
||||
|
||||
|
||||
class TestAIOKafka(TestCase):
|
||||
def test_instrument_api(self) -> None:
|
||||
instrumentation = AIOKafkaInstrumentor()
|
||||
|
||||
instrumentation.instrument()
|
||||
self.assertTrue(
|
||||
isinstance(AIOKafkaProducer.send, BoundFunctionWrapper)
|
||||
)
|
||||
self.assertTrue(
|
||||
isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper)
|
||||
)
|
||||
|
||||
instrumentation.uninstrument()
|
||||
self.assertFalse(
|
||||
isinstance(AIOKafkaProducer.send, BoundFunctionWrapper)
|
||||
)
|
||||
self.assertFalse(
|
||||
isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper)
|
||||
)
|
@ -0,0 +1,306 @@
|
||||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# pylint: disable=unnecessary-dunder-call
|
||||
|
||||
from unittest import IsolatedAsyncioTestCase, mock
|
||||
|
||||
from opentelemetry.instrumentation.aiokafka.utils import (
|
||||
AIOKafkaContextGetter,
|
||||
AIOKafkaContextSetter,
|
||||
_aiokafka_getter,
|
||||
_aiokafka_setter,
|
||||
_create_consumer_span,
|
||||
_extract_send_partition,
|
||||
_get_span_name,
|
||||
_wrap_anext,
|
||||
_wrap_send,
|
||||
)
|
||||
from opentelemetry.trace import SpanKind
|
||||
|
||||
|
||||
class TestUtils(IsolatedAsyncioTestCase):
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self.topic_name = "test_topic"
|
||||
self.args = [self.topic_name]
|
||||
self.headers = []
|
||||
self.kwargs = {"partition": 0, "headers": self.headers}
|
||||
|
||||
def test_context_setter(self) -> None:
|
||||
context_setter = AIOKafkaContextSetter()
|
||||
|
||||
carrier_list = [("key1", b"val1")]
|
||||
context_setter.set(carrier_list, "key2", "val2")
|
||||
self.assertTrue(("key2", "val2".encode()) in carrier_list)
|
||||
|
||||
def test_context_getter(self) -> None:
|
||||
context_setter = AIOKafkaContextSetter()
|
||||
context_getter = AIOKafkaContextGetter()
|
||||
|
||||
carrier_list = []
|
||||
context_setter.set(carrier_list, "key1", "val1")
|
||||
self.assertEqual(context_getter.get(carrier_list, "key1"), ["val1"])
|
||||
self.assertEqual(["key1"], context_getter.keys(carrier_list))
|
||||
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
|
||||
)
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._extract_send_partition"
|
||||
)
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._enrich_send_span"
|
||||
)
|
||||
@mock.patch("opentelemetry.trace.set_span_in_context")
|
||||
@mock.patch("opentelemetry.propagate.inject")
|
||||
async def test_wrap_send_with_topic_as_arg(
|
||||
self,
|
||||
inject: mock.MagicMock,
|
||||
set_span_in_context: mock.MagicMock,
|
||||
enrich_span: mock.MagicMock,
|
||||
extract_send_partition: mock.MagicMock,
|
||||
extract_bootstrap_servers: mock.MagicMock,
|
||||
) -> None:
|
||||
await self.wrap_send_helper(
|
||||
inject,
|
||||
set_span_in_context,
|
||||
enrich_span,
|
||||
extract_send_partition,
|
||||
extract_bootstrap_servers,
|
||||
)
|
||||
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
|
||||
)
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._extract_send_partition"
|
||||
)
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._enrich_send_span"
|
||||
)
|
||||
@mock.patch("opentelemetry.trace.set_span_in_context")
|
||||
@mock.patch("opentelemetry.propagate.inject")
|
||||
async def test_wrap_send_with_topic_as_kwarg(
|
||||
self,
|
||||
inject: mock.MagicMock,
|
||||
set_span_in_context: mock.MagicMock,
|
||||
enrich_span: mock.MagicMock,
|
||||
extract_send_partition: mock.AsyncMock,
|
||||
extract_bootstrap_servers: mock.MagicMock,
|
||||
) -> None:
|
||||
self.args = []
|
||||
self.kwargs["topic"] = self.topic_name
|
||||
await self.wrap_send_helper(
|
||||
inject,
|
||||
set_span_in_context,
|
||||
enrich_span,
|
||||
extract_send_partition,
|
||||
extract_bootstrap_servers,
|
||||
)
|
||||
|
||||
async def wrap_send_helper(
|
||||
self,
|
||||
inject: mock.MagicMock,
|
||||
set_span_in_context: mock.MagicMock,
|
||||
enrich_span: mock.MagicMock,
|
||||
extract_send_partition: mock.AsyncMock,
|
||||
extract_bootstrap_servers: mock.MagicMock,
|
||||
) -> None:
|
||||
tracer = mock.MagicMock()
|
||||
produce_hook = mock.AsyncMock()
|
||||
original_send_callback = mock.AsyncMock()
|
||||
kafka_producer = mock.MagicMock()
|
||||
expected_span_name = _get_span_name("send", self.topic_name)
|
||||
|
||||
wrapped_send = _wrap_send(tracer, produce_hook)
|
||||
retval = await wrapped_send(
|
||||
original_send_callback, kafka_producer, self.args, self.kwargs
|
||||
)
|
||||
|
||||
extract_bootstrap_servers.assert_called_once_with(
|
||||
kafka_producer.client
|
||||
)
|
||||
extract_send_partition.assert_awaited_once_with(
|
||||
kafka_producer, self.args, self.kwargs
|
||||
)
|
||||
tracer.start_as_current_span.assert_called_once_with(
|
||||
expected_span_name, kind=SpanKind.PRODUCER
|
||||
)
|
||||
|
||||
span = tracer.start_as_current_span().__enter__.return_value
|
||||
enrich_span.assert_called_once_with(
|
||||
span,
|
||||
bootstrap_servers=extract_bootstrap_servers.return_value,
|
||||
client_id=kafka_producer.client._client_id,
|
||||
topic=self.topic_name,
|
||||
partition=extract_send_partition.return_value,
|
||||
key=None,
|
||||
)
|
||||
|
||||
set_span_in_context.assert_called_once_with(span)
|
||||
context = set_span_in_context.return_value
|
||||
inject.assert_called_once_with(
|
||||
self.headers, context=context, setter=_aiokafka_setter
|
||||
)
|
||||
|
||||
produce_hook.assert_awaited_once_with(span, self.args, self.kwargs)
|
||||
|
||||
original_send_callback.assert_awaited_once_with(
|
||||
*self.args, **self.kwargs
|
||||
)
|
||||
self.assertEqual(retval, original_send_callback.return_value)
|
||||
|
||||
@mock.patch("opentelemetry.propagate.extract")
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._create_consumer_span"
|
||||
)
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
|
||||
)
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._extract_client_id"
|
||||
)
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group"
|
||||
)
|
||||
async def test_wrap_next(
|
||||
self,
|
||||
extract_consumer_group: mock.MagicMock,
|
||||
extract_client_id: mock.MagicMock,
|
||||
extract_bootstrap_servers: mock.MagicMock,
|
||||
_create_consumer_span: mock.MagicMock,
|
||||
extract: mock.MagicMock,
|
||||
) -> None:
|
||||
tracer = mock.MagicMock()
|
||||
consume_hook = mock.AsyncMock()
|
||||
original_next_callback = mock.AsyncMock()
|
||||
kafka_consumer = mock.MagicMock()
|
||||
|
||||
wrapped_next = _wrap_anext(tracer, consume_hook)
|
||||
record = await wrapped_next(
|
||||
original_next_callback, kafka_consumer, self.args, self.kwargs
|
||||
)
|
||||
|
||||
extract_bootstrap_servers.assert_called_once_with(
|
||||
kafka_consumer._client
|
||||
)
|
||||
bootstrap_servers = extract_bootstrap_servers.return_value
|
||||
|
||||
extract_client_id.assert_called_once_with(kafka_consumer._client)
|
||||
client_id = extract_client_id.return_value
|
||||
|
||||
extract_consumer_group.assert_called_once_with(kafka_consumer)
|
||||
consumer_group = extract_consumer_group.return_value
|
||||
|
||||
original_next_callback.assert_awaited_once_with(
|
||||
*self.args, **self.kwargs
|
||||
)
|
||||
self.assertEqual(record, original_next_callback.return_value)
|
||||
|
||||
extract.assert_called_once_with(
|
||||
record.headers, getter=_aiokafka_getter
|
||||
)
|
||||
context = extract.return_value
|
||||
|
||||
_create_consumer_span.assert_called_once_with(
|
||||
tracer,
|
||||
consume_hook,
|
||||
record,
|
||||
context,
|
||||
bootstrap_servers,
|
||||
client_id,
|
||||
consumer_group,
|
||||
self.args,
|
||||
self.kwargs,
|
||||
)
|
||||
|
||||
@mock.patch("opentelemetry.trace.set_span_in_context")
|
||||
@mock.patch("opentelemetry.context.attach")
|
||||
@mock.patch(
|
||||
"opentelemetry.instrumentation.aiokafka.utils._enrich_anext_span"
|
||||
)
|
||||
@mock.patch("opentelemetry.context.detach")
|
||||
async def test_create_consumer_span(
|
||||
self,
|
||||
detach: mock.MagicMock,
|
||||
enrich_span: mock.MagicMock,
|
||||
attach: mock.MagicMock,
|
||||
set_span_in_context: mock.MagicMock,
|
||||
) -> None:
|
||||
tracer = mock.MagicMock()
|
||||
consume_hook = mock.AsyncMock()
|
||||
bootstrap_servers = mock.MagicMock()
|
||||
extracted_context = mock.MagicMock()
|
||||
record = mock.MagicMock()
|
||||
client_id = mock.MagicMock()
|
||||
consumer_group = mock.MagicMock()
|
||||
|
||||
await _create_consumer_span(
|
||||
tracer,
|
||||
consume_hook,
|
||||
record,
|
||||
extracted_context,
|
||||
bootstrap_servers,
|
||||
client_id,
|
||||
consumer_group,
|
||||
self.args,
|
||||
self.kwargs,
|
||||
)
|
||||
|
||||
expected_span_name = _get_span_name("receive", record.topic)
|
||||
|
||||
tracer.start_as_current_span.assert_called_once_with(
|
||||
expected_span_name,
|
||||
context=extracted_context,
|
||||
kind=SpanKind.CONSUMER,
|
||||
)
|
||||
span = tracer.start_as_current_span.return_value.__enter__()
|
||||
set_span_in_context.assert_called_once_with(span, extracted_context)
|
||||
attach.assert_called_once_with(set_span_in_context.return_value)
|
||||
|
||||
enrich_span.assert_called_once_with(
|
||||
span,
|
||||
bootstrap_servers=bootstrap_servers,
|
||||
client_id=client_id,
|
||||
consumer_group=consumer_group,
|
||||
topic=record.topic,
|
||||
partition=record.partition,
|
||||
key=record.key,
|
||||
offset=record.offset,
|
||||
)
|
||||
consume_hook.assert_awaited_once_with(
|
||||
span, record, self.args, self.kwargs
|
||||
)
|
||||
detach.assert_called_once_with(attach.return_value)
|
||||
|
||||
async def test_kafka_properties_extractor(self):
|
||||
aiokafka_instance_mock = mock.Mock()
|
||||
aiokafka_instance_mock._serialize.return_value = None, None
|
||||
aiokafka_instance_mock._partition.return_value = "partition"
|
||||
aiokafka_instance_mock.client._wait_on_metadata = mock.AsyncMock()
|
||||
assert (
|
||||
await _extract_send_partition(
|
||||
aiokafka_instance_mock, self.args, self.kwargs
|
||||
)
|
||||
== "partition"
|
||||
)
|
||||
aiokafka_instance_mock.client._wait_on_metadata.side_effect = (
|
||||
Exception("mocked error")
|
||||
)
|
||||
assert (
|
||||
await _extract_send_partition(
|
||||
aiokafka_instance_mock, self.args, self.kwargs
|
||||
)
|
||||
is None
|
||||
)
|
@ -32,6 +32,7 @@ dependencies = [
|
||||
"opentelemetry-instrumentation-aio-pika==0.49b0.dev",
|
||||
"opentelemetry-instrumentation-aiohttp-client==0.49b0.dev",
|
||||
"opentelemetry-instrumentation-aiohttp-server==0.49b0.dev",
|
||||
"opentelemetry-instrumentation-aiokafka==0.49b0.dev",
|
||||
"opentelemetry-instrumentation-aiopg==0.49b0.dev",
|
||||
"opentelemetry-instrumentation-asgi==0.49b0.dev",
|
||||
"opentelemetry-instrumentation-asyncio==0.49b0.dev",
|
||||
|
@ -28,6 +28,10 @@ libraries = [
|
||||
"library": "aiohttp ~= 3.0",
|
||||
"instrumentation": "opentelemetry-instrumentation-aiohttp-server==0.49b0.dev",
|
||||
},
|
||||
{
|
||||
"library": "aiokafka >= 0.8, < 1.0",
|
||||
"instrumentation": "opentelemetry-instrumentation-aiokafka==0.49b0.dev",
|
||||
},
|
||||
{
|
||||
"library": "aiopg >= 0.13.0, < 2.0.0",
|
||||
"instrumentation": "opentelemetry-instrumentation-aiopg==0.49b0.dev",
|
||||
|
17
tox.ini
17
tox.ini
@ -342,6 +342,11 @@ envlist =
|
||||
pypy3-test-instrumentation-aio-pika-{0,1,2,3}
|
||||
lint-instrumentation-aio-pika
|
||||
|
||||
; opentelemetry-instrumentation-aiokafka
|
||||
py3{8,9,10,11,12}-test-instrumentation-aiokafka
|
||||
pypy3-test-instrumentation-aiokafka
|
||||
lint-instrumentation-aiokafka
|
||||
|
||||
; opentelemetry-instrumentation-kafka-python
|
||||
py3{8,9,10,11}-test-instrumentation-kafka-python
|
||||
py3{8,9,10,11,12}-test-instrumentation-kafka-pythonng
|
||||
@ -436,6 +441,11 @@ commands_pre =
|
||||
aio-pika-3: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-aio-pika/test-requirements-3.txt
|
||||
lint-instrumentation-aio-pika: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-aio-pika/test-requirements-3.txt
|
||||
|
||||
aiokafka: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api
|
||||
aiokafka: pip install opentelemetry-semantic-conventions@{env:CORE_REPO}\#egg=opentelemetry-semantic-conventions&subdirectory=opentelemetry-semantic-conventions
|
||||
aiokafka: pip install opentelemetry-sdk@{env:CORE_REPO}\#egg=opentelemetry-sdk&subdirectory=opentelemetry-sdk
|
||||
aiokafka: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka/test-requirements.txt
|
||||
|
||||
kafka-python: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api
|
||||
kafka-python: pip install opentelemetry-semantic-conventions@{env:CORE_REPO}\#egg=opentelemetry-semantic-conventions&subdirectory=opentelemetry-semantic-conventions
|
||||
kafka-python: pip install opentelemetry-sdk@{env:CORE_REPO}\#egg=opentelemetry-sdk&subdirectory=opentelemetry-sdk
|
||||
@ -930,6 +940,12 @@ commands =
|
||||
lint-instrumentation-jinja2: flake8 --config {toxinidir}/.flake8 {toxinidir}/instrumentation/opentelemetry-instrumentation-jinja2
|
||||
lint-instrumentation-jinja2: sh -c "cd instrumentation && pylint --rcfile ../.pylintrc opentelemetry-instrumentation-jinja2"
|
||||
|
||||
test-instrumentation-aiokafka: pytest {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka/tests {posargs}
|
||||
lint-instrumentation-aiokafka: black --diff --check --config {toxinidir}/pyproject.toml {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka
|
||||
lint-instrumentation-aiokafka: isort --diff --check-only --settings-path {toxinidir}/.isort.cfg {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka
|
||||
lint-instrumentation-aiokafka: flake8 --config {toxinidir}/.flake8 {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka
|
||||
lint-instrumentation-aiokafka: sh -c "cd instrumentation && pylint --rcfile ../.pylintrc opentelemetry-instrumentation-aiokafka"
|
||||
|
||||
test-instrumentation-kafka-python: pytest {toxinidir}/instrumentation/opentelemetry-instrumentation-kafka-python/tests {posargs}
|
||||
lint-instrumentation-kafka-python: black --diff --check --config {toxinidir}/pyproject.toml {toxinidir}/instrumentation/opentelemetry-instrumentation-kafka-python
|
||||
lint-instrumentation-kafka-python: isort --diff --check-only --settings-path {toxinidir}/.isort.cfg {toxinidir}/instrumentation/opentelemetry-instrumentation-kafka-python
|
||||
@ -1277,6 +1293,7 @@ commands_pre =
|
||||
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-asyncpg \
|
||||
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-celery \
|
||||
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-pika \
|
||||
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka \
|
||||
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-kafka-python \
|
||||
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-confluent-kafka \
|
||||
-e {toxinidir}/instrumentation/opentelemetry-instrumentation-dbapi \
|
||||
|
Reference in New Issue
Block a user