diff --git a/rust/common/kafka/src/kafka_producer.rs b/rust/common/kafka/src/kafka_producer.rs index 4f30850125415..965406a1ce08c 100644 --- a/rust/common/kafka/src/kafka_producer.rs +++ b/rust/common/kafka/src/kafka_producer.rs @@ -3,12 +3,12 @@ use crate::config::KafkaConfig; use futures::future::join_all; use health::HealthHandle; use rdkafka::error::KafkaError; -use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::producer::{FutureProducer, FutureRecord, Producer}; use rdkafka::ClientConfig; use serde::Serialize; use serde_json::error::Error as SerdeError; use thiserror::Error; -use tracing::debug; +use tracing::{debug, error, info}; pub struct KafkaContext { liveness: HealthHandle, @@ -55,7 +55,22 @@ pub async fn create_kafka_producer( let api: FutureProducer = client_config.create_with_context(KafkaContext { liveness })?; - // TODO: ping the kafka brokers to confirm configuration is OK (copy capture) + // "Ping" the Kafka brokers by requesting metadata + match api + .client() + .fetch_metadata(None, std::time::Duration::from_secs(15)) + { + Ok(metadata) => { + info!( + "Successfully connected to Kafka brokers. Found {} topics.", + metadata.topics().len() + ); + } + Err(error) => { + error!("Failed to fetch metadata from Kafka brokers: {:?}", error); + return Err(error); + } + } Ok(api) }