Add confluent kafka docs (#1668)

* add elasticsearch to docs

* add confluent kafka to docs

* tox generate fix

* tox docs fix

---------

Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
This commit is contained in:
Nimrod Shlagman
2023-02-15 14:17:28 +02:00
committed by GitHub
parent 5e4766ed66
commit 1e89854832
5 changed files with 58 additions and 41 deletions

View File

@ -25,6 +25,7 @@ asyncpg>=0.12.0
boto~=2.0 boto~=2.0
botocore~=1.0 botocore~=1.0
celery>=4.0 celery>=4.0
confluent-kafka>= 1.8.2,< 2.0.0
elasticsearch>=2.0,<9.0 elasticsearch>=2.0,<9.0
flask~=2.0 flask~=2.0
falcon~=2.0 falcon~=2.0

View File

@ -126,25 +126,14 @@ def getlistcfg(strval):
] ]
if "class_references" in mcfg: ignore_categories = ["py-class", "py-func", "py-exc", "any"]
class_references = getlistcfg(mcfg["class_references"])
for class_reference in class_references: for category in ignore_categories:
nitpick_ignore.append( if category in mcfg:
( items = getlistcfg(mcfg[category])
"py:class", for item in items:
class_reference, nitpick_ignore.append((category.replace("-", ":"), item))
)
)
if "anys" in mcfg:
anys = getlistcfg(mcfg["anys"])
for _any in anys:
nitpick_ignore.append(
(
"any",
_any,
)
)
# Add any paths that contain templates here, relative to this directory. # Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"] templates_path = ["_templates"]

View File

@ -0,0 +1,7 @@
.. include:: ../../../instrumentation/opentelemetry-instrumentation-confluent-kafka/README.rst
.. automodule:: opentelemetry.instrumentation.confluent_kafka
:members:
:undoc-members:
:show-inheritance:
:noindex:

View File

@ -1,5 +1,5 @@
[default] [default]
class_references= py-class=
; TODO: Understand why sphinx is not able to find this local class ; TODO: Understand why sphinx is not able to find this local class
opentelemetry.propagators.textmap.CarrierT opentelemetry.propagators.textmap.CarrierT
opentelemetry.propagators.textmap.Setter opentelemetry.propagators.textmap.Setter
@ -11,6 +11,8 @@ class_references=
opentelemetry.propagators.textmap.Getter opentelemetry.propagators.textmap.Getter
; - AWSXRayPropagator ; - AWSXRayPropagator
opentelemetry.sdk.trace.id_generator.IdGenerator opentelemetry.sdk.trace.id_generator.IdGenerator
opentelemetry.instrumentation.confluent_kafka.ProxiedProducer
opentelemetry.instrumentation.confluent_kafka.ProxiedConsumer
; - AwsXRayIdGenerator ; - AwsXRayIdGenerator
TextMapPropagator TextMapPropagator
CarrierT CarrierT
@ -26,8 +28,16 @@ class_references=
httpx.AsyncByteStream httpx.AsyncByteStream
httpx.Response httpx.Response
yarl.URL yarl.URL
cimpl.Producer
cimpl.Consumer
func
Message
TopicPartition
callable
Consumer
confluent_kafka.Message
anys= any=
; API ; API
opentelemetry.propagators.textmap.TextMapPropagator.fields opentelemetry.propagators.textmap.TextMapPropagator.fields
; - AWSXRayPropagator ; - AWSXRayPropagator
@ -44,3 +54,12 @@ anys=
; - instrumentation.* ; - instrumentation.*
Setter Setter
httpx httpx
;
py-func=
poll
flush
Message.error
py-exc=
KafkaException
KafkaError

View File

@ -13,12 +13,12 @@
# limitations under the License. # limitations under the License.
""" """
Instrument `confluent-kafka-python` to report instrumentation-confluent-kafka produced and consumed messages Instrument confluent-kafka-python to report instrumentation-confluent-kafka produced and consumed messages
Usage Usage
----- -----
..code:: python .. code-block:: python
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
from confluent_kafka import Producer, Consumer from confluent_kafka import Producer, Consumer
@ -30,12 +30,10 @@ Usage
conf1 = {'bootstrap.servers': "localhost:9092"} conf1 = {'bootstrap.servers': "localhost:9092"}
producer = Producer(conf1) producer = Producer(conf1)
producer.produce('my-topic',b'raw_bytes') producer.produce('my-topic',b'raw_bytes')
conf2 = {'bootstrap.servers': "localhost:9092", 'group.id': "foo", 'auto.offset.reset': 'smallest'}
conf2 = {'bootstrap.servers': "localhost:9092",
'group.id': "foo",
'auto.offset.reset': 'smallest'}
# report a span of type consumer with the default settings # report a span of type consumer with the default settings
consumer = Consumer(conf2) consumer = Consumer(conf2)
def basic_consume_loop(consumer, topics): def basic_consume_loop(consumer, topics):
try: try:
consumer.subscribe(topics) consumer.subscribe(topics)
@ -43,11 +41,10 @@ Usage
while running: while running:
msg = consumer.poll(timeout=1.0) msg = consumer.poll(timeout=1.0)
if msg is None: continue if msg is None: continue
if msg.error(): if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF: if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event # End of partition event
sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}}\n") sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}")
elif msg.error(): elif msg.error():
raise KafkaException(msg.error()) raise KafkaException(msg.error())
else: else:
@ -57,19 +54,26 @@ Usage
consumer.close() consumer.close()
basic_consume_loop(consumer, "my-topic") basic_consume_loop(consumer, "my-topic")
---
The _instrument method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message
this function signature is:
def instrument_producer(producer: Producer, tracer_provider=None)
instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message
this function signature is:
def instrument_consumer(consumer: Consumer, tracer_provider=None)
for example:
.. code:: python
The `_instrument` method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message
this function signature is:
def instrument_producer(producer: Producer, tracer_provider=None)
instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message
this function signature is:
def instrument_consumer(consumer: Consumer, tracer_provider=None)
for example:
.. code: python
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
from confluent_kafka import Producer, Consumer from confluent_kafka import Producer, Consumer
inst = ConfluentKafkaInstrumentor() inst = ConfluentKafkaInstrumentor()
@ -85,15 +89,12 @@ for example:
p = inst.instrument_producer(p, tracer_provider) p = inst.instrument_producer(p, tracer_provider)
c = inst.instrument_consumer(c, tracer_provider=tracer_provider) c = inst.instrument_consumer(c, tracer_provider=tracer_provider)
# Using kafka as normal now will automatically generate spans, # Using kafka as normal now will automatically generate spans,
# including user custom attributes added from the hooks # including user custom attributes added from the hooks
conf = {'bootstrap.servers': "localhost:9092"} conf = {'bootstrap.servers': "localhost:9092"}
p.produce('my-topic',b'raw_bytes') p.produce('my-topic',b'raw_bytes')
msg = c.poll() msg = c.poll()
API
___ ___
""" """
from typing import Collection from typing import Collection