diff --git a/rust/capture/src/config.rs b/rust/capture/src/config.rs index 0a659e9ea18f69..ae502e5f20b8d7 100644 --- a/rust/capture/src/config.rs +++ b/rust/capture/src/config.rs @@ -89,4 +89,8 @@ pub struct KafkaConfig { pub kafka_tls: bool, #[envconfig(default = "")] pub kafka_client_id: String, + #[envconfig(default = "60000")] + pub kafka_metadata_max_age_ms: u32, + #[envconfig(default = "2")] + pub kafka_producer_max_retries: u32, } diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index 2e008a900ccd6c..b1d51713903475 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -129,6 +129,14 @@ impl KafkaSink { .set("bootstrap.servers", &config.kafka_hosts) .set("statistics.interval.ms", "10000") .set("partitioner", "murmur2_random") // Compatibility with python-kafka + .set( + "metadata.max.age.ms", + config.kafka_metadata_max_age_ms.to_string(), + ) + .set( + "message.send.max.retries", + config.kafka_producer_max_retries.to_string(), + ) .set("linger.ms", config.kafka_producer_linger_ms.to_string()) .set( "message.max.bytes", @@ -371,6 +379,8 @@ mod tests { kafka_heatmaps_topic: "events_plugin_ingestion".to_string(), kafka_tls: false, kafka_client_id: "".to_string(), + kafka_metadata_max_age_ms: 60000, + kafka_producer_max_retries: 2, }; let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink"); (cluster, sink) diff --git a/rust/capture/tests/common.rs b/rust/capture/tests/common.rs index fe7e5bb9d8cccd..e9b636ac9a7355 100644 --- a/rust/capture/tests/common.rs +++ b/rust/capture/tests/common.rs @@ -51,6 +51,8 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { kafka_heatmaps_topic: "events_plugin_ingestion".to_string(), kafka_tls: false, kafka_client_id: "".to_string(), + kafka_metadata_max_age_ms: 60000, + kafka_producer_max_retries: 2, }, otel_url: None, otel_sampling_rate: 0.0,