Skip to content

Commit

Permalink
[kafka] Return slice instead of iterator from Batched (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Apr 4, 2024
1 parent ec558fe commit 57a7bda
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 53 deletions.
32 changes: 0 additions & 32 deletions lib/iterator/batch.go

This file was deleted.

13 changes: 13 additions & 0 deletions lib/kafkalib/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package kafkalib

// batched splits a slice of items into a slice of step-sized slices.
func batched[T any](items []T, step int) [][]T {
step = max(step, 1)
var result [][]T
for index := 0; index < len(items); {
end := min(index+step, len(items))
result = append(result, items[index:end])
index = end
}
return result
}
19 changes: 7 additions & 12 deletions lib/iterator/batch_test.go → lib/kafkalib/batch_test.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,35 @@
package iterator
package kafkalib

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBatchIterator(t *testing.T) {
func TestBatched(t *testing.T) {
// length of items is 0
{
batches, err := Collect(Batched([]int{}, 2))
assert.NoError(t, err)
batches := batched([]int{}, 2)
assert.Empty(t, batches)
}
// length of items is 1
{
batches, err := Collect(Batched([]int{1}, 2))
assert.NoError(t, err)
batches := batched([]int{1}, 2)
assert.Equal(t, [][]int{{1}}, batches)
}
// n is 0
{
batches, err := Collect(Batched([]int{1, 2}, 0))
assert.NoError(t, err)
batches := batched([]int{1, 2}, 0)
assert.Equal(t, [][]int{{1}, {2}}, batches)
}
// length of items is a multiple of n
{
batches, err := Collect(Batched([]int{1, 2, 3, 4}, 2))
assert.NoError(t, err)
batches := batched([]int{1, 2, 3, 4}, 2)
assert.Equal(t, [][]int{{1, 2}, {3, 4}}, batches)
}
// length of items is not a multiple of n
{
batches, err := Collect(Batched([]int{1, 2, 3, 4, 5}, 2))
assert.NoError(t, err)
batches := batched([]int{1, 2, 3, 4, 5}, 2)
assert.Equal(t, [][]int{{1, 2}, {3, 4}, {5}}, batches)
}
}
13 changes: 4 additions & 9 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"crypto/tls"
"fmt"
awsCfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"
"log/slog"
"time"

awsCfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2"

"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"github.com/artie-labs/reader/lib/iterator"
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/transfer/lib/jitter"
"github.com/artie-labs/transfer/lib/size"
Expand Down Expand Up @@ -104,17 +104,12 @@ func (b *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMes
msgs = append(msgs, kafkaMsg)
}

iter := iterator.Batched(msgs, int(b.cfg.GetPublishSize()))
for iter.HasNext() {
for _, batch := range batched(msgs, int(b.cfg.GetPublishSize())) {
tags := map[string]string{
"what": "error",
}

var kafkaErr error
batch, err := iter.Next()
if err != nil {
return err
}
for attempts := range 10 {
if attempts > 0 {
sleepDuration := jitter.Jitter(baseJitterMs, maxJitterMs, attempts-1)
Expand Down

0 comments on commit 57a7bda

Please sign in to comment.