From db617eb3fd0224c9fd6a8a45a9c3b6c983ae2904 Mon Sep 17 00:00:00 2001 From: Andre Murbach Maidl Date: Fri, 21 Mar 2025 11:25:38 -0300 Subject: [PATCH] Improve confluent-kafka instrumentation examples (#3369) * Improve confluent-kafka instrumentation example * Improve confluent-kafka tracer provider example * Apply suggestions from code review --------- Co-authored-by: Riccardo Magliocchetti --- .../instrumentation/confluent_kafka/__init__.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 48b2bd2c2..db422db7f 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -34,6 +34,9 @@ Usage # report a span of type consumer with the default settings consumer = Consumer(conf2) + def msg_process(msg): + print(msg) + def basic_consume_loop(consumer, topics): try: consumer.subscribe(topics) @@ -44,7 +47,7 @@ Usage if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event - sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}") + sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}") elif msg.error(): raise KafkaException(msg.error()) else: @@ -53,7 +56,7 @@ Usage # Close down consumer to commit final offsets. consumer.close() - basic_consume_loop(consumer, "my-topic") + basic_consume_loop(consumer, ["my-topic"]) The _instrument method accepts the following keyword args: tracer_provider (TracerProvider) - an optional tracer provider @@ -72,14 +75,16 @@ The _instrument method accepts the following keyword args: .. code:: python from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor + from opentelemetry.trace import get_tracer_provider from confluent_kafka import Producer, Consumer inst = ConfluentKafkaInstrumentor() + tracer_provider = get_tracer_provider() - p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'}) - c = confluent_kafka.Consumer({ - 'bootstrap.servers': 'localhost:29092', + p = Producer({'bootstrap.servers': 'localhost:9092'}) + c = Consumer({ + 'bootstrap.servers': 'localhost:9092', 'group.id': 'mygroup', 'auto.offset.reset': 'earliest' })