From 61fe56dc7ee4ca7cdd09b231f4cd07df35a35a6c Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Thu, 14 Nov 2024 11:30:03 +0100 Subject: [PATCH] Even better log statement in Kafka (#2293) --- crates/ingress-kafka/src/consumer_task.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index f90430afe..cf2ae8484 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -241,13 +241,6 @@ impl ConsumerTask { let topics: Vec<&str> = self.topics.iter().map(|x| &**x).collect(); consumer.subscribe(&topics)?; - debug!( - restate.subscription.id = %self.sender.subscription.id(), - messaging.consumer.group.name = consumer_group_id, - "Assigned topic/partitions/offset: {:?}", - consumer.assignment()? - ); - let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default(); let result = loop { @@ -269,6 +262,12 @@ impl ConsumerTask { None => break Err(Error::TopicPartitionSplit(topic.clone(), partition)) }; + debug!( + restate.subscription.id = %self.sender.subscription.id(), + messaging.consumer.group.name = consumer_group_id, + "Starting topic '{topic}' partition '{partition}' consumption loop from offset '{offset}'" + ); + let task = topic_partition_queue_consumption_loop( self.sender.clone(), topic.clone(), partition,