From 044d6334a0ac6b89241e6ef47a5fd32007c8afe5 Mon Sep 17 00:00:00 2001 From: Elvis <43846394+Elvis339@users.noreply.github.com> Date: Fri, 6 Sep 2024 20:22:32 +0400 Subject: [PATCH] feat(kafka-producer): ping kafka brokers (#24836) Co-authored-by: Brett Hoerner --- rust/common/kafka/src/kafka_producer.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/rust/common/kafka/src/kafka_producer.rs b/rust/common/kafka/src/kafka_producer.rs index 4f30850125415..965406a1ce08c 100644 --- a/rust/common/kafka/src/kafka_producer.rs +++ b/rust/common/kafka/src/kafka_producer.rs @@ -3,12 +3,12 @@ use crate::config::KafkaConfig; use futures::future::join_all; use health::HealthHandle; use rdkafka::error::KafkaError; -use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::producer::{FutureProducer, FutureRecord, Producer}; use rdkafka::ClientConfig; use serde::Serialize; use serde_json::error::Error as SerdeError; use thiserror::Error; -use tracing::debug; +use tracing::{debug, error, info}; pub struct KafkaContext { liveness: HealthHandle, @@ -55,7 +55,22 @@ pub async fn create_kafka_producer( let api: FutureProducer = client_config.create_with_context(KafkaContext { liveness })?; - // TODO: ping the kafka brokers to confirm configuration is OK (copy capture) + // "Ping" the Kafka brokers by requesting metadata + match api + .client() + .fetch_metadata(None, std::time::Duration::from_secs(15)) + { + Ok(metadata) => { + info!( + "Successfully connected to Kafka brokers. Found {} topics.", + metadata.topics().len() + ); + } + Err(error) => { + error!("Failed to fetch metadata from Kafka brokers: {:?}", error); + return Err(error); + } + } Ok(api) }