diff --git a/lib/kafkalib/errors.go b/lib/kafkalib/errors.go index 6d281142..84563885 100644 --- a/lib/kafkalib/errors.go +++ b/lib/kafkalib/errors.go @@ -3,14 +3,29 @@ package kafkalib import ( "context" "errors" + "log/slog" "strings" "github.com/segmentio/kafka-go" ) func isExceedMaxMessageBytesErr(err error) bool { - return err != nil && strings.Contains(err.Error(), - "Message Size Too Large: the server has a configurable maximum message size to avoid unbounded memory allocation and the client attempted to produce a message larger than this maximum") + if err == nil { + return false + } + + if errors.Is(err, kafka.MessageSizeTooLarge) { + return true + } + + if strings.Contains(err.Error(), + "Message Size Too Large: the server has a configurable maximum message size to avoid unbounded memory allocation and the client attempted to produce a message larger than this maximum") { + // TODO: Remove this if we don't see it in the logs + slog.Error("Matched 'Message Size Too Large' error but not kafka.MessageSizeTooLarge") + return true + } + + return false } // isRetryableError - returns true if the error is retryable diff --git a/lib/kafkalib/errors_test.go b/lib/kafkalib/errors_test.go index ab6a30ab..79defa9a 100644 --- a/lib/kafkalib/errors_test.go +++ b/lib/kafkalib/errors_test.go @@ -3,9 +3,10 @@ package kafkalib import ( "context" "fmt" - "github.com/segmentio/kafka-go" "testing" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" ) @@ -26,6 +27,14 @@ func TestIsExceedMaxMessageBytesErr(t *testing.T) { err: fmt.Errorf("Message Size Too Large: the server has a configurable maximum message size to avoid unbounded memory allocation and the client attempted to produce a message larger than this maximum, bytes: 1223213213"), expected: true, }, + { + err: kafka.TopicAuthorizationFailed, + expected: false, + }, + { + err: kafka.MessageSizeTooLarge, + expected: true, + }, } for _, tc := range tcs { @@ -57,6 +66,10 @@ func TestIsRetryableError(t *testing.T) { err: context.DeadlineExceeded, expected: true, }, + { + err: kafka.MessageSizeTooLarge, + expected: false, + }, } for _, tc := range tcs { diff --git a/lib/kafkalib/message.go b/lib/kafkalib/message.go deleted file mode 100644 index 09abaf11..00000000 --- a/lib/kafkalib/message.go +++ /dev/null @@ -1,27 +0,0 @@ -package kafkalib - -import ( - "encoding/json" - "fmt" - - "github.com/artie-labs/reader/lib" - "github.com/segmentio/kafka-go" -) - -func newMessage(topicPrefix string, rawMessage lib.RawMessage) (kafka.Message, error) { - valueBytes, err := json.Marshal(rawMessage.Event()) - if err != nil { - return kafka.Message{}, err - } - - keyBytes, err := json.Marshal(rawMessage.PartitionKey()) - if err != nil { - return kafka.Message{}, err - } - - return kafka.Message{ - Topic: fmt.Sprintf("%s.%s", topicPrefix, rawMessage.TopicSuffix()), - Key: keyBytes, - Value: valueBytes, - }, nil -} diff --git a/lib/kafkalib/writer.go b/lib/kafkalib/writer.go index b0928c3c..29fe1b09 100644 --- a/lib/kafkalib/writer.go +++ b/lib/kafkalib/writer.go @@ -3,6 +3,7 @@ package kafkalib import ( "context" "crypto/tls" + "encoding/json" "fmt" "log/slog" "time" @@ -147,3 +148,21 @@ func (b *BatchWriter) WriteRawMessages(ctx context.Context, rawMsgs []lib.RawMes } return nil } + +func newMessage(topicPrefix string, rawMessage lib.RawMessage) (kafka.Message, error) { + valueBytes, err := json.Marshal(rawMessage.Event()) + if err != nil { + return kafka.Message{}, err + } + + keyBytes, err := json.Marshal(rawMessage.PartitionKey()) + if err != nil { + return kafka.Message{}, err + } + + return kafka.Message{ + Topic: fmt.Sprintf("%s.%s", topicPrefix, rawMessage.TopicSuffix()), + Key: keyBytes, + Value: valueBytes, + }, nil +} diff --git a/lib/kafkalib/message_test.go b/lib/kafkalib/writer_test.go similarity index 100% rename from lib/kafkalib/message_test.go rename to lib/kafkalib/writer_test.go