Skip to content

Commit

Permalink
Add batch writer (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Jan 25, 2024
1 parent 4ecfaaa commit 875df05
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 20 deletions.
12 changes: 12 additions & 0 deletions lib/jitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package lib

import (
"math"
"math/rand"
)

func JitterMs(baseMs, maxMs, attempts int) int {
// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
// sleep = random_between(0, min(cap, base * 2 ** attempt))
return rand.Intn(int(math.Min(float64(maxMs), float64(baseMs)*math.Pow(2, float64(attempts)))))
}
18 changes: 18 additions & 0 deletions lib/kafkalib/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package kafkalib

import "strings"

func IsExceedMaxMessageBytesErr(err error) bool {
return err != nil && strings.Contains(err.Error(),
"Message Size Too Large: the server has a configurable maximum message size to avoid unbounded memory allocation and the client attempted to produce a message larger than this maximum")
}

func IsBatchEmptyErr(err error) bool {
return err != nil && err.Error() == "batch is empty"
}

// RetryableError - returns true if the error is retryable
// If it's retryable, you need to reload the Kafka client.
func RetryableError(err error) bool {
return err != nil && strings.Contains(err.Error(), "Topic Authorization Failed: the client is not authorized to access the requested topic")
}
33 changes: 33 additions & 0 deletions lib/kafkalib/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package kafkalib

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestIsExceedMaxMessageBytesErr(t *testing.T) {
type _tc struct {
err error
expected bool
}

tcs := []_tc{
{
err: fmt.Errorf(""),
},
{
err: nil,
},
{
err: fmt.Errorf("Message Size Too Large: the server has a configurable maximum message size to avoid unbounded memory allocation and the client attempted to produce a message larger than this maximum, bytes: 1223213213"),
expected: true,
},
}

for _, tc := range tcs {
actual := IsExceedMaxMessageBytesErr(tc.err)
assert.Equal(t, tc.expected, actual, tc.err)
}
}
20 changes: 0 additions & 20 deletions lib/kafkalib/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,3 @@ func NewWriter(ctx context.Context, cfg config.Kafka) (*kafka.Writer, error) {

return writer, nil
}

type ReloadableWriter struct {
*kafka.Writer

cfg config.Kafka
}

func (w *ReloadableWriter) Reload(ctx context.Context) error {
if err := w.Writer.Close(); err != nil {
return err
}

writer, err := NewWriter(ctx, w.cfg)
if err != nil {
return err
}

w.Writer = writer
return nil
}
126 changes: 126 additions & 0 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package kafkalib

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/artie-labs/transfer/lib/cdc/util"
"github.com/artie-labs/transfer/lib/size"
"github.com/segmentio/kafka-go"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
)

const (
baseJitterMs = 300
maxJitterMs = 5000
)

type BatchWriter struct {
*kafka.Writer

ctx context.Context
cfg config.Kafka
}

func NewBatchWriter(ctx context.Context, cfg config.Kafka) (BatchWriter, error) {
w, err := NewWriter(ctx, cfg)
if err != nil {
return BatchWriter{}, err
}
return BatchWriter{w, ctx, cfg}, nil
}

func (w *BatchWriter) MaxMessageSize() uint64 {
return w.cfg.MaxRequestSize
}

func (w *BatchWriter) reload() error {
if err := w.Writer.Close(); err != nil {
return err
}

writer, err := NewWriter(w.ctx, w.cfg)
if err != nil {
return err
}

w.Writer = writer
return nil
}

type RawMessage struct {
TopicSuffix string
PartitionKey map[string]interface{}
Payload util.SchemaEventPayload
}

func buildKafkaMessages(cfg *config.Kafka, msgs []RawMessage) ([]kafka.Message, error) {
result := make([]kafka.Message, len(msgs))
for i, msg := range msgs {
topic := fmt.Sprintf("%s.%s", cfg.TopicPrefix, msg.TopicSuffix)
kMsg, err := NewMessage(topic, msg.PartitionKey, msg.Payload)
if err != nil {
return nil, err
}
result[i] = kMsg
}
return result, nil
}

func (k *BatchWriter) Write(rawMsgs []RawMessage) error {
msgs, err := buildKafkaMessages(&k.cfg, rawMsgs)
if err != nil {
return fmt.Errorf("failed to build to kafka messages: %w", err)
}

chunkSize := k.cfg.GetPublishSize()

b := NewBatch(msgs, chunkSize)
if batchErr := b.IsValid(); batchErr != nil {
if IsBatchEmptyErr(batchErr) {
return nil
}

return fmt.Errorf("batch is not valid: %w", batchErr)
}

for b.HasNext() {
var kafkaErr error
chunk := b.NextChunk()
for attempts := 0; attempts < 10; attempts++ {
kafkaErr = k.WriteMessages(k.ctx, chunk...)
if kafkaErr == nil {
break
}

if IsExceedMaxMessageBytesErr(kafkaErr) {
slog.Info("Skipping this chunk since the batch exceeded the server")
kafkaErr = nil
break
}

if RetryableError(kafkaErr) {
if reloadErr := k.reload(); reloadErr != nil {
slog.Warn("Failed to reload kafka writer", slog.Any("err", reloadErr))
}
} else {
sleepMs := lib.JitterMs(baseJitterMs, maxJitterMs, attempts)
slog.Info("Failed to publish to kafka",
slog.Any("err", kafkaErr),
slog.Int("attempts", attempts),
slog.Int("sleepMs", sleepMs),
)
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
}
}

if kafkaErr != nil {
return fmt.Errorf("failed to write message: %w, approxSize: %d", kafkaErr, size.GetApproxSize(chunk))
}
}
return nil
}

0 comments on commit 875df05

Please sign in to comment.