Skip to content

Commit

Permalink
Log consumer assignment on startup, log subscription id and consumer …
Browse files Browse the repository at this point in the history
…group id (#2285)
  • Loading branch information
slinkydeveloper authored Nov 13, 2024
1 parent ecd19ac commit 3848fa9
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ impl MessageSender {
messaging.system = "kafka",
messaging.operation = "receive",
messaging.source.name = msg.topic(),
messaging.destination.name = %self.subscription.sink()
messaging.destination.name = %self.subscription.sink(),
restate.subscription.id = %self.subscription.id(),
messaging.consumer.group.name = consumer_group_id
);
info!(parent: &ingress_span, "Processing Kafka ingress request");
let ingress_span_context = ingress_span.context().span().span_context().clone();
Expand Down Expand Up @@ -229,6 +231,8 @@ impl ConsumerTask {
.expect("group.id must be set")
.to_string();
debug!(
restate.subscription.id = %self.sender.subscription.id(),
messaging.consumer.group.name = consumer_group_id,
"Starting consumer for topics {:?} with configuration {:?}",
self.topics, self.client_config
);
Expand All @@ -237,6 +241,13 @@ impl ConsumerTask {
let topics: Vec<&str> = self.topics.iter().map(|x| &**x).collect();
consumer.subscribe(&topics)?;

debug!(
restate.subscription.id = %self.sender.subscription.id(),
messaging.consumer.group.name = consumer_group_id,
"Assigned topic/partitions/offset: {:?}",
consumer.assignment()?
);

let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default();

let result = loop {
Expand Down

0 comments on commit 3848fa9

Please sign in to comment.