Skip to content

Commit

Permalink
Avoid reordering events due to split partition queues (#2437)
Browse files Browse the repository at this point in the history
* Avoid reordering events due to split partition queues

Great care is required when splitting queues to avoid losing ordering.
We rely on ordering for dedupe so reordering -> dropped kafka messages.

* Commit consumer state on rebalance

* Review comments

* Use spawn_unmanaged

* Install prometheus recorder earlier so kafka metrics work

* Handle panic with empty partitions
  • Loading branch information
jackkleeman committed Dec 20, 2024
1 parent 75a462d commit f13c77f
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 103 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/ingress-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ anyhow = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
parking_lot = { workspace = true }
rdkafka = { git = "https://github.com/restatedev/rust-rdkafka", rev = "4b5946309bdb669eb0c884cd9b7ad05578a0f6c6", features = ["libz-static", "cmake-build", "ssl-vendored"] }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
Expand Down
290 changes: 210 additions & 80 deletions crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,26 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::fmt::{self, Display};
use std::sync::{Arc, OnceLock, Weak};

use base64::Engine;
use bytes::Bytes;
use metrics::counter;
use opentelemetry::trace::TraceContextExt;
use rdkafka::consumer::stream_consumer::StreamPartitionQueue;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::message::BorrowedMessage;
use rdkafka::{ClientConfig, Message};
use tokio::sync::oneshot;
use tracing::{debug, info, info_span, Instrument};
use rdkafka::topic_partition_list::TopicPartitionListElem;
use rdkafka::types::RDKafkaErrorCode;
use rdkafka::{ClientConfig, ClientContext, Message};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info, info_span, warn, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use restate_core::{cancellation_watcher, TaskCenter, TaskId, TaskKind};
use restate_core::{TaskCenter, TaskHandle, TaskKind};
use restate_ingress_dispatcher::{
DeduplicationId, DispatchIngressRequest, IngressDispatcher, IngressDispatcherRequest,
};
Expand All @@ -52,11 +53,11 @@ pub enum Error {
},
#[error("ingress dispatcher channel is closed")]
IngressDispatcherClosed,
#[error("topic {0} partition {1} queue split didn't succeed")]
TopicPartitionSplit(String, i32),
#[error("received a message on the main partition queue for topic {0} partition {1} despite partitioned queues")]
UnexpectedMainQueueMessage(String, i32),
}

type MessageConsumer = StreamConsumer<DefaultConsumerContext>;
type MessageConsumer = StreamConsumer<RebalanceContext>;

#[derive(Debug, Hash)]
pub struct KafkaDeduplicationId {
Expand Down Expand Up @@ -232,92 +233,221 @@ impl ConsumerTask {
self.topics, self.client_config
);

let consumer: Arc<MessageConsumer> = Arc::new(self.client_config.create()?);
let (failures_tx, failures_rx) = mpsc::unbounded_channel();

let rebalance_context = RebalanceContext {
task_center: self.task_center.clone(),
consumer: OnceLock::new(),
topic_partition_tasks: parking_lot::Mutex::new(HashMap::new()),
failures_tx,
sender: self.sender.clone(),
consumer_group_id,
};
let consumer: Arc<MessageConsumer> =
Arc::new(self.client_config.create_with_context(rebalance_context)?);
// this OnceLock<Weak> dance is needed because the rebalance callbacks don't get a handle on the consumer,
// which is strange because practically everything you'd want to do with them involves the consumer.
_ = consumer.context().consumer.set(Arc::downgrade(&consumer));

// ensure partitioned tasks are cancelled when this function exits/stops being polled
let consumer = ConsumerDrop(consumer);

let topics: Vec<&str> = self.topics.iter().map(|x| &**x).collect();
consumer.subscribe(&topics)?;

let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default();

let result = loop {
tokio::select! {
res = consumer.recv() => {
let msg = match res {
Ok(msg) => msg,
Err(e) => break Err(e.into())
};
let topic = msg.topic().to_owned();
let partition = msg.partition();
let offset = msg.offset();

// If we didn't split the queue, let's do it and start the topic partition consumer
if let Entry::Vacant(e) = topic_partition_tasks.entry((topic.clone(), partition)) {
let topic_partition_consumer = match consumer
.split_partition_queue(&topic, partition) {
Some(q) => q,
None => break Err(Error::TopicPartitionSplit(topic.clone(), partition))
};

let task = topic_partition_queue_consumption_loop(
self.sender.clone(),
topic.clone(), partition,
topic_partition_consumer,
Arc::clone(&consumer),
consumer_group_id.clone()
);

if let Ok(task_id) = self.task_center.spawn_child(TaskKind::Ingress, "partition-queue", None, task) {
e.insert(task_id);
} else {
break Ok(());
}
}
let mut failures_rx = std::pin::pin!(failures_rx);

tokio::select! {
// we have to poll the main consumer for callbacks to be processed, but we expect to only see messages on the partitioned queues
res = consumer.recv() => {
match res {
// We shouldn't see any messages on the main consumer loop, because we split the queues into partitioned queues before they
// are ever assigned. Messages here should be treated as a bug in our assumptions.
Ok(msg) => Err(Error::UnexpectedMainQueueMessage(msg.topic().into(), msg.partition())),
Err(e) => Err(e.into()),
}
}
// watch for errors in the partitioned consumers - they should only ever abort, not return errors
Some(err) = failures_rx.recv() => {
Err(err)
}
_ = &mut rx => {
Ok(())
}
}
}
}

#[derive(derive_more::Deref)]
struct ConsumerDrop(Arc<MessageConsumer>);

// We got this message, let's send it through
if let Err(e) = self.sender.send(&consumer_group_id, msg).await {
break Err(e)
impl Drop for ConsumerDrop {
fn drop(&mut self) {
debug!(
"Stopping consumer with id {}",
self.context().consumer_group_id
);

// we have to clear this because the partitioned tasks themselves hold a reference to MessageConsumer
self.context().topic_partition_tasks.lock().clear();
}
}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
struct TopicPartition(String, i32);

impl<'a> From<TopicPartitionListElem<'a>> for TopicPartition {
fn from(value: TopicPartitionListElem<'a>) -> Self {
Self(value.topic().into(), value.partition())
}
}

impl Display for TopicPartition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.0, self.1)
}
}

struct RebalanceContext {
task_center: TaskCenter,
consumer: OnceLock<Weak<MessageConsumer>>,
topic_partition_tasks: parking_lot::Mutex<HashMap<TopicPartition, AbortOnDrop>>,
failures_tx: mpsc::UnboundedSender<Error>,
sender: MessageSender,
consumer_group_id: String,
}

impl ClientContext for RebalanceContext {}

// This callback is called synchronously with the poll of the main queue, so we don't want to block here.
// Once the pre balance steps finish assign() will be called. If we have not split at this point,
// then queues will be created defaulting to forward to the main loop - which we don't want.
// However, if we have split the partition before assign is called, the queue will be created
// with a flag RD_KAFKA_Q_F_FWD_APP and this flag will ensure that the queue will not be sent to the
// main loop. Therefore its critical that the splits happen synchronously before the pre_rebalance ends.
//
// On non-cooperative rebalance during assign all the existing partitions are revoked,
// and their queues are destroyed. Split partition queues will stop working in this case. We should ensure
// that they are not polled again after the assign. Then there will be a further rebalance callback after the revoke
// and we will set up new split partition streams before the assign.
impl ConsumerContext for RebalanceContext {
fn pre_rebalance(&self, rebalance: &Rebalance<'_>) {
let mut topic_partition_tasks = self.topic_partition_tasks.lock();
let consumer = self
.consumer
.get()
.expect("consumer must have been set in context at rebalance time");

let Some(consumer) = consumer.upgrade() else {
// if the consumer has been dropped, we don't need to maintain tasks any more
return;
};

match rebalance {
Rebalance::Assign(partitions) if partitions.count() > 0 => {
for partition in partitions.elements() {
let partition: TopicPartition = partition.into();

if let Some(task_id) = topic_partition_tasks.remove(&partition) {
// This probably implies a problem in our assumptions, because librdkafka shouldn't be assigning us a partition again without having revoked it.
// However its fair to assume that the existing partitioned consumer is now invalid.
warn!("Kafka informed us of an assigned partition {partition} which we already consider assigned, cancelling the existing partitioned consumer");
drop(task_id);
}

// This method tells rdkafka that we have processed this message,
// so its offset can be safely committed.
// rdkafka periodically commits these offsets asynchronously, with a period configurable
// with auto.commit.interval.ms
if let Err(e) = consumer.store_offset(&topic, partition, offset) {
break Err(e.into())
match consumer.split_partition_queue(&partition.0, partition.1) {
Some(queue) => {
let task = topic_partition_queue_consumption_loop(
self.sender.clone(),
partition.clone(),
queue,
Arc::clone(&consumer),
self.consumer_group_id.clone(),
self.failures_tx.clone(),
);

if let Ok(task_handle) = self.task_center.spawn_unmanaged(
TaskKind::Ingress,
"kafka-partition-ingest",
None,
task,
) {
topic_partition_tasks.insert(partition, AbortOnDrop(task_handle));
} else {
// shutting down
return;
}
}
None => {
warn!("Invalid partition {partition} given to us in rebalance, ignoring it");
continue;
}
}
}
}
Rebalance::Revoke(partitions) if partitions.count() > 0 => {
for partition in partitions.elements() {
let partition = partition.into();
match topic_partition_tasks.remove(&partition)
{
Some(task_id) => {
debug!("Stopping partitioned consumer for partition {partition} due to rebalance");
// The partitioned queue will not be polled again.
// It might be mid-poll right now, but if so its result will not be sent anywhere.
drop(task_id);
}
None => warn!("Kafka informed us of a revoked partition {partition} which we had no consumer task for"),
}
}
_ = &mut rx => {
break Ok(());

match consumer.commit_consumer_state(CommitMode::Async) {
Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => {
// Success
}
Err(error) => warn!("Failed to commit the current consumer state: {error}"),
}
}
};
for task_id in topic_partition_tasks.into_values() {
self.task_center.cancel_task(task_id);
// called with empty partitions; important to not call .elements() as this panics apparently.
// unclear why we are called with no partitions
Rebalance::Assign(_) | Rebalance::Revoke(_) => {}
Rebalance::Error(_) => {}
}
result
}
}

struct AbortOnDrop(TaskHandle<()>);

impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}

async fn topic_partition_queue_consumption_loop(
sender: MessageSender,
topic: String,
partition: i32,
topic_partition_consumer: StreamPartitionQueue<DefaultConsumerContext>,
partition: TopicPartition,
topic_partition_consumer: StreamPartitionQueue<impl ConsumerContext>,
consumer: Arc<MessageConsumer>,
consumer_group_id: String,
) -> Result<(), anyhow::Error> {
let mut shutdown = std::pin::pin!(cancellation_watcher());

loop {
tokio::select! {
res = topic_partition_consumer.recv() => {
let msg = res?;
let offset = msg.offset();
sender.send(&consumer_group_id, msg).await?;
consumer.store_offset(&topic, partition, offset)?;
}
_ = &mut shutdown => {
return Ok(())
}
failed: mpsc::UnboundedSender<Error>,
) {
debug!("Starting partitioned consumer for partition {partition}");

// this future will be aborted when the partition is no longer needed, so any exit is a failure
let err = loop {
let res = topic_partition_consumer.recv().await;
let msg = match res {
Ok(msg) => msg,
Err(err) => break err.into(),
};
let offset = msg.offset();
if let Err(err) = sender.send(&consumer_group_id, msg).await {
break err;
}
}
if let Err(err) = consumer.store_offset(&partition.0, partition.1, offset) {
break err.into();
}
};

_ = failed.send(err);
}
Loading

0 comments on commit f13c77f

Please sign in to comment.