Skip to content

Commit

Permalink
enhancement(kafka sink): update service to set Errored status on even…
Browse files Browse the repository at this point in the history
…ts (#21036)

* update kafka-sink service to set Errored status on events

currently all kafka errors result it a "Rejected" event, which for the
HTTP sink returns a 400 to the user.

however most errors should be "Errored", which implies an error happened
and not a bad event that can't be sent to the sink

* spelling fix

* add changelog entry

* Update changelog

Signed-off-by: Jesse Szwedko <[email protected]>

* cargo fmt

---------

Signed-off-by: Jesse Szwedko <[email protected]>
Co-authored-by: Jesse Szwedko <[email protected]>
  • Loading branch information
frankh and jszwedko authored Aug 12, 2024
1 parent 38fdd46 commit dc0b408
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
4 changes: 4 additions & 0 deletions changelog.d/21036_kafka_sink_error_status.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The `kafka` sink now retries sending events that failed to be sent for transient reasons. Previously
it would reject these events.

authors: frankh
31 changes: 28 additions & 3 deletions src/sinks/kafka/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use rdkafka::{
producer::{FutureProducer, FutureRecord},
types::RDKafkaErrorCode,
};
use vector_lib::config;

use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*};

Expand All @@ -34,11 +35,12 @@ pub struct KafkaRequestMetadata {
pub struct KafkaResponse {
event_byte_size: GroupedCountByteSize,
raw_byte_size: usize,
event_status: EventStatus,
}

impl DriverResponse for KafkaResponse {
fn event_status(&self) -> EventStatus {
EventStatus::Delivered
self.event_status
}

fn events_sent(&self) -> &GroupedCountByteSize {
Expand Down Expand Up @@ -153,6 +155,7 @@ impl Service<KafkaRequest> for KafkaService {
.map(|_| KafkaResponse {
event_byte_size,
raw_byte_size,
event_status: EventStatus::Delivered,
})
.map_err(|(err, _)| err);
}
Expand All @@ -168,8 +171,30 @@ impl Service<KafkaRequest> for KafkaService {
record = original_record;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// A different error occurred.
Err((err, _)) => return Err(err),
// A final/non-retriable error occurred.
Err((
err @ KafkaError::MessageProduction(
RDKafkaErrorCode::InvalidMessage
| RDKafkaErrorCode::InvalidMessageSize
| RDKafkaErrorCode::MessageSizeTooLarge
| RDKafkaErrorCode::UnknownTopicOrPartition
| RDKafkaErrorCode::InvalidRecord
| RDKafkaErrorCode::InvalidRequiredAcks
| RDKafkaErrorCode::TopicAuthorizationFailed
| RDKafkaErrorCode::UnsupportedForMessageFormat
| RDKafkaErrorCode::ClusterAuthorizationFailed,
),
_,
)) => return Err(err),

// A different error occurred. Set event status to Errored not Rejected.
Err(_) => {
return Ok(KafkaResponse {
event_byte_size: config::telemetry().create_request_count_byte_size(),
raw_byte_size: 0,
event_status: EventStatus::Errored,
})
}
};
}
})
Expand Down

0 comments on commit dc0b408

Please sign in to comment.