Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer (#1656)

This commit is contained in:
Rajashree Mandaogane
2023-02-13 11:02:27 -08:00
committed by GitHub
parent b8d7448f34
commit 3f8fdf2620
3 changed files with 20 additions and 0 deletions

View File

@ -46,6 +46,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1435](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1435))
- mongo db - fix db statement capturing
([#1512](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1512))
- Add commit method for ConfluentKafkaInstrumentor's ProxiedConsumer
([#1656](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1656))
## Version 1.15.0/0.36b0 (2022-12-10)

View File

@ -173,6 +173,9 @@ class ProxiedConsumer(Consumer):
def committed(self, partitions, timeout=-1):
return self._consumer.committed(partitions, timeout)
def commit(self, *args, **kwargs):
return self._consumer.commit(*args, **kwargs)
def consume(
self, num_messages=1, *args, **kwargs
): # pylint: disable=keyword-arg-before-vararg

View File

@ -58,3 +58,18 @@ class TestConfluentKafka(TestCase):
consumer = instrumentation.uninstrument_consumer(consumer)
self.assertEqual(consumer.__class__, Consumer)
def test_consumer_commit_method_exists(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
consumer = Consumer(
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
}
)
consumer = instrumentation.instrument_consumer(consumer)
self.assertEqual(consumer.__class__, ProxiedConsumer)
self.assertTrue(hasattr(consumer, "commit"))