From dc96a592b4e4ba72aebd2c87b2b4f50d02bdd49c Mon Sep 17 00:00:00 2001 From: Laurent Dechoux <34473609+ldechoux@users.noreply.github.com> Date: Wed, 28 Aug 2024 12:13:22 +0200 Subject: [PATCH] ENH - Set message max bytes (#27) * Add messages.max.bytes proprety * update tests * Update readme --- README.md | 5 +++++ config/config.go | 2 ++ config/config_test.go | 1 + internal/service/kafka.go | 1 + 4 files changed, 9 insertions(+) diff --git a/README.md b/README.md index e887b8c4..a98ad0c8 100644 --- a/README.md +++ b/README.md @@ -177,6 +177,11 @@ Configuration variables with prefix are first loaded and then without prefix. Fo A big value here can increase the heap memory of the application as all the payload that have to be sent to Kafka will be maintained in channel. +#### KAFKA_MESSAGE_MAX_BYTES +*Type*: integer + +*Description*: The maximum message size in bytes at the producer level (default: 1024*1024) + #### LOG_CLI_VERBOSE *Type*: boolean diff --git a/config/config.go b/config/config.go index 3090d884..3e0513ef 100644 --- a/config/config.go +++ b/config/config.go @@ -71,6 +71,7 @@ type Kafka struct { Topic string `config:"KAFKA_TOPIC"` ProduceChannelSize int `config:"KAFKA_PRODUCE_CHANNEL_SIZE"` WithDecorators bool `config:"KAFKA_WITH_DECORATORS"` + MessageMaxBytes int `config:"KAFKA_MESSAGE_MAX_BYTES"` } // NewBase returns a new base configuration @@ -107,6 +108,7 @@ func NewBase(ctx context.Context, configPrefix string) *Base { Topic: "kafka-mongo-watcher", ProduceChannelSize: 10000, WithDecorators: true, + MessageMaxBytes: 1024 * 1024, }, } diff --git a/config/config_test.go b/config/config_test.go index 6a5e7bab..ec9a5410 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -42,6 +42,7 @@ var cfg = &Base{ Topic: "kafka-mongo-watcher", ProduceChannelSize: 10000, WithDecorators: true, + MessageMaxBytes: 1024 * 1024, }, } diff --git a/internal/service/kafka.go b/internal/service/kafka.go index 1611e9a5..cdc71112 100644 --- a/internal/service/kafka.go +++ b/internal/service/kafka.go @@ -12,6 +12,7 @@ func (container *Container) GetKafkaProducer() *kafkaconfluent.Producer { producer, err := kafkaconfluent.NewProducer(&kafkaconfluent.ConfigMap{ "bootstrap.servers": container.Cfg.Kafka.BootstrapServers, "go.produce.channel.size": container.Cfg.Kafka.ProduceChannelSize, + "message.max.bytes": container.Cfg.Kafka.MessageMaxBytes, }) if err != nil { panic(err)