diff --git a/kafka/consumer.go b/kafka/consumer.go index a108fa4..88fcc24 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -122,6 +122,27 @@ func (cm *consumerMessage) Data() []byte { return cm.cm.Value } +type MessageWithMetadata interface { + GetMetadata() Metadata +} + +type Metadata struct { + Topic string + Partition int32 + Offset int64 +} + +func (cm *consumerMessage) GetMetadata() Metadata { + if cm.cm == nil { + panic("GetMetadata: attempt to use payload after discarding.") + } + return Metadata{ + Topic: cm.cm.Topic, + Partition: cm.cm.Partition, + Offset: cm.cm.Offset, + } +} + func (cm *consumerMessage) DiscardPayload() { if cm.offset != nil { // already discarded