Skip to content

Commit

Permalink
fixes consumer commit loss
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubhang Balkundi committed Apr 3, 2024
1 parent 0af1dbf commit 2114d48
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [v2.0.20] 2024-03-25

- Manually commit uncommitted offsets before closing the Kafka Consumer

## [v2.0.18] 2024-03-25

# Removed
Expand Down
2 changes: 1 addition & 1 deletion cmd/ziggurat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func die(err error) {

//go:embed templates/*
var res embed.FS
var Version = "v2.0.18"
var Version = "v2.0.20"

var definePaths = func(basePath string) map[string]string {
return map[string]string{
Expand Down
6 changes: 6 additions & 0 deletions kafka/confluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type confluentConsumer interface {
Poll(int) kafka.Event
StoreOffsets([]kafka.TopicPartition) ([]kafka.TopicPartition, error)
Logs() chan kafka.LogEvent
Commit() ([]kafka.TopicPartition, error)
Close() error
}

Expand All @@ -28,6 +29,11 @@ func (m *MockConsumer) StoreOffsets(partitions []kafka.TopicPartition) ([]kafka.
return args.Get(0).([]kafka.TopicPartition), args.Error(1)
}

func (m *MockConsumer) Commit() ([]kafka.TopicPartition, error) {
args := m.Called()
return args.Get(0).([]kafka.TopicPartition), args.Error(1)
}

func (m *MockConsumer) Close() error {
return m.Called().Error(0)
}
Expand Down
1 change: 1 addition & 0 deletions kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestWorkerOrchestration(t *testing.T) {
mc.On("StoreOffsets", []kafka.TopicPartition{expectedTopicPartStoreOffsets}).
Return([]kafka.TopicPartition{expectedTopicPartStoreOffsets}, nil)
mc.On("Close").Return(nil)
mc.On("Commit").Return([]kafka.TopicPartition{}, nil)
mc.On("Logs").Return(logChan)

var msgCount int32
Expand Down
6 changes: 5 additions & 1 deletion kafka/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ type worker struct {
func (w *worker) run(ctx context.Context) {

defer func() {
err := w.consumer.Close()
_, err := w.consumer.Commit()
if err != nil {
w.logger.Error("pre-close commit error", err, map[string]interface{}{"Worker-ID": w.id})
}
err = w.consumer.Close()
w.logger.Error("error closing kafka consumer", err, map[string]interface{}{"Worker-ID": w.id})
}()

Expand Down
3 changes: 3 additions & 0 deletions kafka/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestWorker(t *testing.T) {
mc.On("Poll", 100).Return(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 1},
})
mc.On("Commit").Return([]kafka.TopicPartition{}, nil)
mc.On("StoreOffsets", mock.AnythingOfType("[]kafka.TopicPartition")).Return([]kafka.TopicPartition{}, nil)
mc.On("Close").Return(nil)
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
Expand Down Expand Up @@ -89,6 +90,7 @@ func TestWorker(t *testing.T) {

mc.On("Logs").Return(make(chan kafka.LogEvent))
mc.On("Poll", 100).Return(kafka.NewError(kafka.ErrAllBrokersDown, "fatal error", true))
mc.On("Commit").Return([]kafka.TopicPartition{}, nil)
mc.On("Close").Return(nil)
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
defer cancel()
Expand Down Expand Up @@ -116,6 +118,7 @@ func TestWorker(t *testing.T) {

topic := "foo"
mc.On("Logs").Return(make(chan kafka.LogEvent))
mc.On("Commit").Return([]kafka.TopicPartition{}, nil)
mc.On("StoreOffsets", mock.AnythingOfType("[]kafka.TopicPartition")).Return([]kafka.TopicPartition{}, nil)
mc.On("Poll", 100).Return(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 1},
Expand Down

0 comments on commit 2114d48

Please sign in to comment.