Skip to content

Commit

Permalink
[kafka] Move newMessage and improve error checking (#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
nathan-artie authored Apr 4, 2024
1 parent 57a7bda commit 9a51b75
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 30 deletions.
19 changes: 17 additions & 2 deletions lib/kafkalib/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,29 @@ package kafkalib
import (
"context"
"errors"
"log/slog"
"strings"

"github.com/segmentio/kafka-go"
)

func isExceedMaxMessageBytesErr(err error) bool {
return err != nil && strings.Contains(err.Error(),
"Message Size Too Large: the server has a configurable maximum message size to avoid unbounded memory allocation and the client attempted to produce a message larger than this maximum")
if err == nil {
return false
}

if errors.Is(err, kafka.MessageSizeTooLarge) {
return true
}

if strings.Contains(err.Error(),
"Message Size Too Large: the server has a configurable maximum message size to avoid unbounded memory allocation and the client attempted to produce a message larger than this maximum") {
// TODO: Remove this if we don't see it in the logs
slog.Error("Matched 'Message Size Too Large' error but not kafka.MessageSizeTooLarge")
return true
}

return false
}

// isRetryableError - returns true if the error is retryable
Expand Down
15 changes: 14 additions & 1 deletion lib/kafkalib/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package kafkalib
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"testing"

"github.com/segmentio/kafka-go"

"github.com/stretchr/testify/assert"
)

Expand All @@ -26,6 +27,14 @@ func TestIsExceedMaxMessageBytesErr(t *testing.T) {
err: fmt.Errorf("Message Size Too Large: the server has a configurable maximum message size to avoid unbounded memory allocation and the client attempted to produce a message larger than this maximum, bytes: 1223213213"),
expected: true,
},
{
err: kafka.TopicAuthorizationFailed,
expected: false,
},
{
err: kafka.MessageSizeTooLarge,
expected: true,
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -57,6 +66,10 @@ func TestIsRetryableError(t *testing.T) {
err: context.DeadlineExceeded,
expected: true,
},
{
err: kafka.MessageSizeTooLarge,
expected: false,
},
}

for _, tc := range tcs {
Expand Down
27 changes: 0 additions & 27 deletions lib/kafkalib/message.go

This file was deleted.

19 changes: 19 additions & 0 deletions lib/kafkalib/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafkalib
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"log/slog"
"time"
Expand Down Expand Up @@ -147,3 +148,21 @@ func (b *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMes
}
return nil
}

func newMessage(topicPrefix string, rawMessage lib.RawMessage) (kafka.Message, error) {
valueBytes, err := json.Marshal(rawMessage.Event())
if err != nil {
return kafka.Message{}, err
}

keyBytes, err := json.Marshal(rawMessage.PartitionKey())
if err != nil {
return kafka.Message{}, err
}

return kafka.Message{
Topic: fmt.Sprintf("%s.%s", topicPrefix, rawMessage.TopicSuffix()),
Key: keyBytes,
Value: valueBytes,
}, nil
}
File renamed without changes.

0 comments on commit 9a51b75

Please sign in to comment.