diff --git a/config/config.go b/config/config.go index 6fa1dfb2..3072a6d5 100644 --- a/config/config.go +++ b/config/config.go @@ -14,7 +14,8 @@ type Kafka struct { BootstrapServers string `yaml:"bootstrapServers"` TopicPrefix string `yaml:"topicPrefix"` AwsEnabled bool `yaml:"awsEnabled"` - PublishSize int `yaml:"publishSize"` + PublishSize int `yaml:"publishSize,omitempty"` + MaxRequestSize int64 `yaml:"maxRequestSize,omitempty"` } func (k *Kafka) GenerateDefault() { diff --git a/lib/kafkalib/kafka.go b/lib/kafkalib/kafka.go index 8222d2f7..8898cd92 100644 --- a/lib/kafkalib/kafka.go +++ b/lib/kafkalib/kafka.go @@ -3,17 +3,16 @@ package kafkalib import ( "context" "crypto/tls" - "github.com/artie-labs/reader/config" - "github.com/artie-labs/reader/constants" - "github.com/artie-labs/reader/lib/logger" awsCfg "github.com/aws/aws-sdk-go-v2/config" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2" "strings" "time" -) -const maxKafkaItemBytes = 4000000 + "github.com/artie-labs/reader/config" + "github.com/artie-labs/reader/constants" + "github.com/artie-labs/reader/lib/logger" +) func FromContext(ctx context.Context) *kafka.Writer { log := logger.FromContext(ctx) @@ -42,13 +41,16 @@ func InjectIntoContext(ctx context.Context) context.Context { writer := &kafka.Writer{ Addr: kafka.TCP(strings.Split(cfg.Kafka.BootstrapServers, ",")...), - BatchBytes: maxKafkaItemBytes, Compression: kafka.Gzip, Balancer: &kafka.LeastBytes{}, WriteTimeout: 5 * time.Second, AllowAutoTopicCreation: true, } + if cfg.Kafka.MaxRequestSize > 0 { + writer.BatchBytes = cfg.Kafka.MaxRequestSize + } + if cfg.Kafka.AwsEnabled { saslCfg, err := awsCfg.LoadDefaultConfig(ctx) if err != nil {