diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index 0970e677..4d6673ea 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -35,6 +35,7 @@ func NewBatchWriter(ctx context.Context, cfg config.Kafka, statsD *mtr.Client) ( } func (w *BatchWriter) reload(ctx context.Context) error { + slog.Info("Reloading kafka writer") if err := w.writer.Close(); err != nil { return err } @@ -88,6 +89,22 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e var kafkaErr error chunk := iter.Next() for attempts := 0; attempts < 10; attempts++ { + if attempts > 0 { + sleepMs := lib.JitterMs(baseJitterMs, maxJitterMs, attempts-1) + slog.Info("Failed to publish to kafka", + slog.Any("err", kafkaErr), + slog.Int("attempts", attempts-1), + slog.Int("sleepMs", sleepMs), + ) + time.Sleep(time.Duration(sleepMs) * time.Millisecond) + + if RetryableError(kafkaErr) { + if reloadErr := w.reload(ctx); reloadErr != nil { + slog.Warn("Failed to reload kafka writer", slog.Any("err", reloadErr)) + } + } + } + kafkaErr = w.writer.WriteMessages(ctx, chunk...) if kafkaErr == nil { tags["what"] = "success" @@ -99,20 +116,6 @@ func (w *BatchWriter) WriteMessages(ctx context.Context, msgs []kafka.Message) e kafkaErr = nil break } - - sleepMs := lib.JitterMs(baseJitterMs, maxJitterMs, attempts) - slog.Info("Failed to publish to kafka", - slog.Any("err", kafkaErr), - slog.Int("attempts", attempts), - slog.Int("sleepMs", sleepMs), - ) - time.Sleep(time.Duration(sleepMs) * time.Millisecond) - - if RetryableError(kafkaErr) { - if reloadErr := w.reload(ctx); reloadErr != nil { - slog.Warn("Failed to reload kafka writer", slog.Any("err", reloadErr)) - } - } } if w.statsD != nil {