diff --git a/CHANGELOG.md b/CHANGELOG.md index 6876e2cf9..c9cafd2dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Breaking changes +- Add return statement to Confluent kafka Producer poll() and flush() calls when instrumented by ConfluentKafkaInstrumentor().instrument_producer() ([#2527](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2527)) - Rename `type` attribute to `asgi.event.type` in `opentelemetry-instrumentation-asgi` ([#2300](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2300)) - Rename AwsLambdaInstrumentor span attributes `faas.id` to `cloud.resource_id`, `faas.execution` to `faas.invocation_id` diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index c869d03dd..30181d39c 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -151,10 +151,10 @@ class ProxiedProducer(Producer): self._tracer = tracer def flush(self, timeout=-1): - self._producer.flush(timeout) + return self._producer.flush(timeout) def poll(self, timeout=-1): - self._producer.poll(timeout) + return self._producer.poll(timeout) def produce( self, topic, value=None, *args, **kwargs diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py index 21d5bd6f8..205de2773 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/test_instrumentation.py @@ -31,7 +31,7 @@ from opentelemetry.semconv.trace import ( ) from opentelemetry.test.test_base import TestBase -from .utils import MockConsumer, MockedMessage +from .utils import MockConsumer, MockedMessage, MockedProducer class TestConfluentKafka(TestBase): @@ -246,3 +246,35 @@ class TestConfluentKafka(TestBase): self.assertEqual( expected_attribute_value, span.attributes[attribute_key] ) + + def test_producer_poll(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + message_queue = [] + + producer = MockedProducer( + message_queue, + { + "bootstrap.servers": "localhost:29092", + }, + ) + + producer = instrumentation.instrument_producer(producer) + producer.produce(topic="topic-1", key="key-1", value="value-1") + msg = producer.poll() + self.assertIsNotNone(msg) + + def test_producer_flush(self) -> None: + instrumentation = ConfluentKafkaInstrumentor() + message_queue = [] + + producer = MockedProducer( + message_queue, + { + "bootstrap.servers": "localhost:29092", + }, + ) + + producer = instrumentation.instrument_producer(producer) + producer.produce(topic="topic-1", key="key-1", value="value-1") + msg = producer.flush() + self.assertIsNotNone(msg) diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py index 798daaeff..92e11798f 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/tests/utils.py @@ -1,4 +1,6 @@ -from confluent_kafka import Consumer +from typing import Optional + +from confluent_kafka import Consumer, Producer class MockConsumer(Consumer): @@ -20,11 +22,21 @@ class MockConsumer(Consumer): class MockedMessage: - def __init__(self, topic: str, partition: int, offset: int, headers): + def __init__( + self, + topic: str, + partition: int, + offset: int, + headers, + key: Optional[str] = None, + value: Optional[str] = None, + ): self._topic = topic self._partition = partition self._offset = offset self._headers = headers + self._key = key + self._value = value def topic(self): return self._topic @@ -37,3 +49,35 @@ class MockedMessage: def headers(self): return self._headers + + def key(self): + return self._key + + def value(self): + return self._value + + +class MockedProducer(Producer): + def __init__(self, queue, config): + self._queue = queue + super().__init__(config) + + def produce( + self, *args, **kwargs + ): # pylint: disable=keyword-arg-before-vararg + self._queue.append( + MockedMessage( + topic=kwargs.get("topic"), + partition=0, + offset=0, + headers=[], + key=kwargs.get("key"), + value=kwargs.get("value"), + ) + ) + + def poll(self, *args, **kwargs): + return len(self._queue) + + def flush(self, *args, **kwargs): + return len(self._queue)