From 102f65bf396465a8a0a13e7f92873cae7657a5f8 Mon Sep 17 00:00:00 2001 From: Frank Hamand Date: Thu, 29 Aug 2024 12:48:46 +0100 Subject: [PATCH] feat: allow configuring client.id for rust capture (#24662) --- rust/capture/src/config.rs | 2 ++ rust/capture/src/sinks/kafka.rs | 5 +++++ rust/capture/tests/common.rs | 1 + 3 files changed, 8 insertions(+) diff --git a/rust/capture/src/config.rs b/rust/capture/src/config.rs index c6ca77461d11c..d012c354a5f0e 100644 --- a/rust/capture/src/config.rs +++ b/rust/capture/src/config.rs @@ -85,4 +85,6 @@ pub struct KafkaConfig { pub kafka_heatmaps_topic: String, #[envconfig(default = "false")] pub kafka_tls: bool, + #[envconfig(default = "")] + pub kafka_client_id: String, } diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index 498b2b71e837a..17266af2a7d07 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -144,6 +144,10 @@ impl KafkaSink { (config.kafka_producer_queue_mib * 1024).to_string(), ); + if !&config.kafka_client_id.is_empty() { + client_config.set("client.id", &config.kafka_client_id); + } + if config.kafka_tls { client_config .set("security.protocol", "ssl") @@ -358,6 +362,7 @@ mod tests { kafka_exceptions_topic: "events_plugin_ingestion".to_string(), kafka_heatmaps_topic: "events_plugin_ingestion".to_string(), kafka_tls: false, + kafka_client_id: "".to_string(), }; let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink"); (cluster, sink) diff --git a/rust/capture/tests/common.rs b/rust/capture/tests/common.rs index e59e2c0af2acc..3f97270dea69a 100644 --- a/rust/capture/tests/common.rs +++ b/rust/capture/tests/common.rs @@ -50,6 +50,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { kafka_exceptions_topic: "events_plugin_ingestion".to_string(), kafka_heatmaps_topic: "events_plugin_ingestion".to_string(), kafka_tls: false, + kafka_client_id: "".to_string(), }, otel_url: None, otel_sampling_rate: 0.0,