diff --git a/src/workflows/airqo_etl_utils/message_broker_utils.py b/src/workflows/airqo_etl_utils/message_broker_utils.py index 3c0b487fcf..1a4838e63a 100644 --- a/src/workflows/airqo_etl_utils/message_broker_utils.py +++ b/src/workflows/airqo_etl_utils/message_broker_utils.py @@ -357,7 +357,7 @@ def publish_to_topic( if not auto_partition: self.partition_loads[selected_partition] += 1 - producer.close() + producer.flush() def consume_from_topic( self,