diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ddb3f488b..f0fd2243dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2573)) - `opentelemetry-instrumentation-confluent-kafka` Add support for version 2.4.0 of confluent_kafka ([#2616](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2616)) +- `opentelemetry-instrumentation-confluent-kafka` Add support for produce purge + ([#2638](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2638)) ### Breaking changes diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 30181d39c2..45d45ccb63 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -156,6 +156,9 @@ def flush(self, timeout=-1): def poll(self, timeout=-1): return self._producer.poll(timeout) + def purge(self, in_queue=True, in_flight=True, blocking=True): + self._producer.purge(in_queue, in_flight, blocking) + def produce( self, topic, value=None, *args, **kwargs ): # pylint: disable=keyword-arg-before-vararg