Improve confluent-kafka instrumentation examples (#3369)

* Improve confluent-kafka instrumentation example

* Improve confluent-kafka tracer provider example

* Apply suggestions from code review

---------

Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
This commit is contained in:
Andre Murbach Maidl
2025-03-21 11:25:38 -03:00
committed by GitHub
parent 3c60b62ad1
commit db617eb3fd

View File

@ -34,6 +34,9 @@ Usage
# report a span of type consumer with the default settings # report a span of type consumer with the default settings
consumer = Consumer(conf2) consumer = Consumer(conf2)
def msg_process(msg):
print(msg)
def basic_consume_loop(consumer, topics): def basic_consume_loop(consumer, topics):
try: try:
consumer.subscribe(topics) consumer.subscribe(topics)
@ -44,7 +47,7 @@ Usage
if msg.error(): if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF: if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event # End of partition event
sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}") sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}")
elif msg.error(): elif msg.error():
raise KafkaException(msg.error()) raise KafkaException(msg.error())
else: else:
@ -53,7 +56,7 @@ Usage
# Close down consumer to commit final offsets. # Close down consumer to commit final offsets.
consumer.close() consumer.close()
basic_consume_loop(consumer, "my-topic") basic_consume_loop(consumer, ["my-topic"])
The _instrument method accepts the following keyword args: The _instrument method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider tracer_provider (TracerProvider) - an optional tracer provider
@ -72,14 +75,16 @@ The _instrument method accepts the following keyword args:
.. code:: python .. code:: python
from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
from opentelemetry.trace import get_tracer_provider
from confluent_kafka import Producer, Consumer from confluent_kafka import Producer, Consumer
inst = ConfluentKafkaInstrumentor() inst = ConfluentKafkaInstrumentor()
tracer_provider = get_tracer_provider()
p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'}) p = Producer({'bootstrap.servers': 'localhost:9092'})
c = confluent_kafka.Consumer({ c = Consumer({
'bootstrap.servers': 'localhost:29092', 'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup', 'group.id': 'mygroup',
'auto.offset.reset': 'earliest' 'auto.offset.reset': 'earliest'
}) })