From f51a216079103aa93d7b6bf67ad9e2aaf010a63c Mon Sep 17 00:00:00 2001 From: Charlie Hornberger Date: Wed, 27 May 2020 18:39:45 +0200 Subject: [PATCH 1/2] adds MessageWithMetadata for consumers who need Kafka offset info --- kafka/consumer.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/kafka/consumer.go b/kafka/consumer.go index a108fa4..564cc5f 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -122,6 +122,24 @@ func (cm *consumerMessage) Data() []byte { return cm.cm.Value } +type MessageWithMetadata interface { + GetMetadata() Metadata +} + +type Metadata struct { + Topic string + Partition int32 + Offset int64 +} + +func (cm *consumerMessage) GetMetadata() Metadata { + return Metadata{ + Topic: cm.offset.topic, + Partition: cm.offset.partition, + Offset: cm.offset.offset, + } +} + func (cm *consumerMessage) DiscardPayload() { if cm.offset != nil { // already discarded From a0aa4b006506ccf36e3de1b257914dd252831e81 Mon Sep 17 00:00:00 2001 From: Charlie Hornberger Date: Thu, 28 May 2020 00:12:39 +0200 Subject: [PATCH 2/2] fixes use of consumerMessage.offset, which is nil until DiscardPayload() is called --- kafka/consumer.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 564cc5f..88fcc24 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -133,10 +133,13 @@ type Metadata struct { } func (cm *consumerMessage) GetMetadata() Metadata { + if cm.cm == nil { + panic("GetMetadata: attempt to use payload after discarding.") + } return Metadata{ - Topic: cm.offset.topic, - Partition: cm.offset.partition, - Offset: cm.offset.offset, + Topic: cm.cm.Topic, + Partition: cm.cm.Partition, + Offset: cm.cm.Offset, } }