Skip to content

Commit

Permalink
Handle panic with empty partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Dec 19, 2024
1 parent 51d0c63 commit 6cb3860
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl ConsumerContext for RebalanceContext {
};

match rebalance {
Rebalance::Assign(partitions) => {
Rebalance::Assign(partitions) if partitions.count() > 0 => {
for partition in partitions.elements() {
let partition: TopicPartition = partition.into();

Expand Down Expand Up @@ -393,7 +393,7 @@ impl ConsumerContext for RebalanceContext {
}
}
}
Rebalance::Revoke(partitions) => {
Rebalance::Revoke(partitions) if partitions.count() > 0 => {
for partition in partitions.elements() {
let partition = partition.into();
match topic_partition_tasks.remove(&partition)
Expand All @@ -415,6 +415,9 @@ impl ConsumerContext for RebalanceContext {
Err(error) => warn!("Failed to commit the current consumer state: {error}"),
}
}
// called with empty partitions; important to not call .elements() as this panics apparently.
// unclear why we are called with no partitions
Rebalance::Assign(_) | Rebalance::Revoke(_) => {}
Rebalance::Error(_) => {}
}
}
Expand Down

0 comments on commit 6cb3860

Please sign in to comment.