diff --git a/lib/iterator/batch.go b/lib/iterator/batch.go deleted file mode 100644 index 08ec3d7c..00000000 --- a/lib/iterator/batch.go +++ /dev/null @@ -1,32 +0,0 @@ -package iterator - -import "fmt" - -type batchIterator[T any] struct { - items []T - index int - step int -} - -// Batched returns an iterator that splits a list of items into batches of the given step size. -func Batched[T any](items []T, step int) Iterator[[]T] { - return &batchIterator[T]{ - items: items, - index: 0, - step: max(step, 1), - } -} - -func (i *batchIterator[T]) HasNext() bool { - return i.index < len(i.items) -} - -func (i *batchIterator[T]) Next() ([]T, error) { - if !i.HasNext() { - return nil, fmt.Errorf("iterator has finished") - } - end := min(i.index+i.step, len(i.items)) - result := i.items[i.index:end] - i.index = end - return result, nil -} diff --git a/lib/kafkalib/batch.go b/lib/kafkalib/batch.go new file mode 100644 index 00000000..a721a407 --- /dev/null +++ b/lib/kafkalib/batch.go @@ -0,0 +1,13 @@ +package kafkalib + +// batched splits a slice of items into a slice of step-sized slices. +func batched[T any](items []T, step int) [][]T { + step = max(step, 1) + var result [][]T + for index := 0; index < len(items); { + end := min(index+step, len(items)) + result = append(result, items[index:end]) + index = end + } + return result +} diff --git a/lib/iterator/batch_test.go b/lib/kafkalib/batch_test.go similarity index 50% rename from lib/iterator/batch_test.go rename to lib/kafkalib/batch_test.go index 54fa13e1..bf132a3f 100644 --- a/lib/iterator/batch_test.go +++ b/lib/kafkalib/batch_test.go @@ -1,4 +1,4 @@ -package iterator +package kafkalib import ( "testing" @@ -6,35 +6,30 @@ import ( "github.com/stretchr/testify/assert" ) -func TestBatchIterator(t *testing.T) { +func TestBatched(t *testing.T) { // length of items is 0 { - batches, err := Collect(Batched([]int{}, 2)) - assert.NoError(t, err) + batches := batched([]int{}, 2) assert.Empty(t, batches) } // length of items is 1 { - batches, err := Collect(Batched([]int{1}, 2)) - assert.NoError(t, err) + batches := batched([]int{1}, 2) assert.Equal(t, [][]int{{1}}, batches) } // n is 0 { - batches, err := Collect(Batched([]int{1, 2}, 0)) - assert.NoError(t, err) + batches := batched([]int{1, 2}, 0) assert.Equal(t, [][]int{{1}, {2}}, batches) } // length of items is a multiple of n { - batches, err := Collect(Batched([]int{1, 2, 3, 4}, 2)) - assert.NoError(t, err) + batches := batched([]int{1, 2, 3, 4}, 2) assert.Equal(t, [][]int{{1, 2}, {3, 4}}, batches) } // length of items is not a multiple of n { - batches, err := Collect(Batched([]int{1, 2, 3, 4, 5}, 2)) - assert.NoError(t, err) + batches := batched([]int{1, 2, 3, 4, 5}, 2) assert.Equal(t, [][]int{{1, 2}, {3, 4}, {5}}, batches) } } diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index c75abcc6..b0928c3c 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -4,14 +4,14 @@ import ( "context" "crypto/tls" "fmt" - awsCfg "github.com/aws/aws-sdk-go-v2/config" - "github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2" "log/slog" "time" + awsCfg "github.com/aws/aws-sdk-go-v2/config" + "github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2" + "github.com/artie-labs/reader/config" "github.com/artie-labs/reader/lib" - "github.com/artie-labs/reader/lib/iterator" "github.com/artie-labs/reader/lib/mtr" "github.com/artie-labs/transfer/lib/jitter" "github.com/artie-labs/transfer/lib/size" @@ -104,17 +104,12 @@ func (b *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMes msgs = append(msgs, kafkaMsg) } - iter := iterator.Batched(msgs, int(b.cfg.GetPublishSize())) - for iter.HasNext() { + for _, batch := range batched(msgs, int(b.cfg.GetPublishSize())) { tags := map[string]string{ "what": "error", } var kafkaErr error - batch, err := iter.Next() - if err != nil { - return err - } for attempts := range 10 { if attempts > 0 { sleepDuration := jitter.Jitter(baseJitterMs, maxJitterMs, attempts-1)