Skip to content

Commit

Permalink
Sleep Kafka writer before retrying (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Feb 1, 2024
1 parent f28a43a commit c23ac63
Showing 1 changed file with 17 additions and 14 deletions.
31 changes: 17 additions & 14 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewBatchWriter(ctx context.Context, cfg config.Kafka, statsD *mtr.Client) (
}

func (w *BatchWriter) reload(ctx context.Context) error {
slog.Info("Reloading kafka writer")
if err := w.writer.Close(); err != nil {
return err
}
Expand Down Expand Up @@ -88,6 +89,22 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e
var kafkaErr error
chunk := iter.Next()
for attempts := 0; attempts < 10; attempts++ {
if attempts > 0 {
sleepMs := lib.JitterMs(baseJitterMs, maxJitterMs, attempts-1)
slog.Info("Failed to publish to kafka",
slog.Any("err", kafkaErr),
slog.Int("attempts", attempts-1),
slog.Int("sleepMs", sleepMs),
)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)

if RetryableError(kafkaErr) {
if reloadErr := w.reload(ctx); reloadErr != nil {
slog.Warn("Failed to reload kafka writer", slog.Any("err", reloadErr))
}
}
}

kafkaErr = w.writer.WriteMessages(ctx, chunk...)
if kafkaErr == nil {
tags["what"] = "success"
Expand All @@ -99,20 +116,6 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e
kafkaErr = nil
break
}

sleepMs := lib.JitterMs(baseJitterMs, maxJitterMs, attempts)
slog.Info("Failed to publish to kafka",
slog.Any("err", kafkaErr),
slog.Int("attempts", attempts),
slog.Int("sleepMs", sleepMs),
)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)

if RetryableError(kafkaErr) {
if reloadErr := w.reload(ctx); reloadErr != nil {
slog.Warn("Failed to reload kafka writer", slog.Any("err", reloadErr))
}
}
}

if w.statsD != nil {
Expand Down

0 comments on commit c23ac63

Please sign in to comment.