From dc0b4087095b4968cca0201e233919de8cff9918 Mon Sep 17 00:00:00 2001 From: Frank Hamand Date: Mon, 12 Aug 2024 22:27:14 +0100 Subject: [PATCH] enhancement(kafka sink): update service to set Errored status on events (#21036) * update kafka-sink service to set Errored status on events currently all kafka errors result it a "Rejected" event, which for the HTTP sink returns a 400 to the user. however most errors should be "Errored", which implies an error happened and not a bad event that can't be sent to the sink * spelling fix * add changelog entry * Update changelog Signed-off-by: Jesse Szwedko * cargo fmt --------- Signed-off-by: Jesse Szwedko Co-authored-by: Jesse Szwedko --- ...036_kafka_sink_error_status.enhancement.md | 4 +++ src/sinks/kafka/service.rs | 31 +++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) create mode 100644 changelog.d/21036_kafka_sink_error_status.enhancement.md diff --git a/changelog.d/21036_kafka_sink_error_status.enhancement.md b/changelog.d/21036_kafka_sink_error_status.enhancement.md new file mode 100644 index 0000000000000..f9dc85d8e0d05 --- /dev/null +++ b/changelog.d/21036_kafka_sink_error_status.enhancement.md @@ -0,0 +1,4 @@ +The `kafka` sink now retries sending events that failed to be sent for transient reasons. Previously +it would reject these events. + +authors: frankh diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 607e7cd5fd4ea..087795864b594 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -14,6 +14,7 @@ use rdkafka::{ producer::{FutureProducer, FutureRecord}, types::RDKafkaErrorCode, }; +use vector_lib::config; use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*}; @@ -34,11 +35,12 @@ pub struct KafkaRequestMetadata { pub struct KafkaResponse { event_byte_size: GroupedCountByteSize, raw_byte_size: usize, + event_status: EventStatus, } impl DriverResponse for KafkaResponse { fn event_status(&self) -> EventStatus { - EventStatus::Delivered + self.event_status } fn events_sent(&self) -> &GroupedCountByteSize { @@ -153,6 +155,7 @@ impl Service for KafkaService { .map(|_| KafkaResponse { event_byte_size, raw_byte_size, + event_status: EventStatus::Delivered, }) .map_err(|(err, _)| err); } @@ -168,8 +171,30 @@ impl Service for KafkaService { record = original_record; tokio::time::sleep(Duration::from_millis(100)).await; } - // A different error occurred. - Err((err, _)) => return Err(err), + // A final/non-retriable error occurred. + Err(( + err @ KafkaError::MessageProduction( + RDKafkaErrorCode::InvalidMessage + | RDKafkaErrorCode::InvalidMessageSize + | RDKafkaErrorCode::MessageSizeTooLarge + | RDKafkaErrorCode::UnknownTopicOrPartition + | RDKafkaErrorCode::InvalidRecord + | RDKafkaErrorCode::InvalidRequiredAcks + | RDKafkaErrorCode::TopicAuthorizationFailed + | RDKafkaErrorCode::UnsupportedForMessageFormat + | RDKafkaErrorCode::ClusterAuthorizationFailed, + ), + _, + )) => return Err(err), + + // A different error occurred. Set event status to Errored not Rejected. + Err(_) => { + return Ok(KafkaResponse { + event_byte_size: config::telemetry().create_request_count_byte_size(), + raw_byte_size: 0, + event_status: EventStatus::Errored, + }) + } }; } })