diff --git a/changelog.d/21036_kafka_sink_error_status.enhancement.md b/changelog.d/21036_kafka_sink_error_status.enhancement.md new file mode 100644 index 0000000000000..f9dc85d8e0d05 --- /dev/null +++ b/changelog.d/21036_kafka_sink_error_status.enhancement.md @@ -0,0 +1,4 @@ +The `kafka` sink now retries sending events that failed to be sent for transient reasons. Previously +it would reject these events. + +authors: frankh diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 607e7cd5fd4ea..087795864b594 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -14,6 +14,7 @@ use rdkafka::{ producer::{FutureProducer, FutureRecord}, types::RDKafkaErrorCode, }; +use vector_lib::config; use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*}; @@ -34,11 +35,12 @@ pub struct KafkaRequestMetadata { pub struct KafkaResponse { event_byte_size: GroupedCountByteSize, raw_byte_size: usize, + event_status: EventStatus, } impl DriverResponse for KafkaResponse { fn event_status(&self) -> EventStatus { - EventStatus::Delivered + self.event_status } fn events_sent(&self) -> &GroupedCountByteSize { @@ -153,6 +155,7 @@ impl Service for KafkaService { .map(|_| KafkaResponse { event_byte_size, raw_byte_size, + event_status: EventStatus::Delivered, }) .map_err(|(err, _)| err); } @@ -168,8 +171,30 @@ impl Service for KafkaService { record = original_record; tokio::time::sleep(Duration::from_millis(100)).await; } - // A different error occurred. - Err((err, _)) => return Err(err), + // A final/non-retriable error occurred. + Err(( + err @ KafkaError::MessageProduction( + RDKafkaErrorCode::InvalidMessage + | RDKafkaErrorCode::InvalidMessageSize + | RDKafkaErrorCode::MessageSizeTooLarge + | RDKafkaErrorCode::UnknownTopicOrPartition + | RDKafkaErrorCode::InvalidRecord + | RDKafkaErrorCode::InvalidRequiredAcks + | RDKafkaErrorCode::TopicAuthorizationFailed + | RDKafkaErrorCode::UnsupportedForMessageFormat + | RDKafkaErrorCode::ClusterAuthorizationFailed, + ), + _, + )) => return Err(err), + + // A different error occurred. Set event status to Errored not Rejected. + Err(_) => { + return Ok(KafkaResponse { + event_byte_size: config::telemetry().create_request_count_byte_size(), + raw_byte_size: 0, + event_status: EventStatus::Errored, + }) + } }; } })