Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid reordering events due to split partition queues #2437

Merged
merged 5 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/ingress-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ anyhow = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
parking_lot = { workspace = true }
rdkafka = { git = "https://github.com/restatedev/rust-rdkafka", rev = "4b5946309bdb669eb0c884cd9b7ad05578a0f6c6", features = ["libz-static", "cmake-build", "ssl-vendored"] }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
Expand Down
287 changes: 207 additions & 80 deletions crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,26 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use std::fmt::{self, Display};
use std::sync::{Arc, OnceLock, Weak};

use base64::Engine;
use bytes::Bytes;
use metrics::counter;
use opentelemetry::trace::TraceContextExt;
use rdkafka::consumer::stream_consumer::StreamPartitionQueue;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::message::BorrowedMessage;
use rdkafka::{ClientConfig, Message};
use tokio::sync::oneshot;
use tracing::{debug, info, info_span, Instrument};
use rdkafka::topic_partition_list::TopicPartitionListElem;
use rdkafka::types::RDKafkaErrorCode;
use rdkafka::{ClientConfig, ClientContext, Message};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info, info_span, warn, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use restate_core::{cancellation_watcher, TaskCenter, TaskId, TaskKind};
use restate_core::{TaskCenter, TaskHandle, TaskKind};
use restate_ingress_dispatcher::{
DeduplicationId, DispatchIngressRequest, IngressDispatcher, IngressDispatcherRequest,
};
Expand All @@ -52,11 +53,11 @@ pub enum Error {
},
#[error("ingress dispatcher channel is closed")]
IngressDispatcherClosed,
#[error("topic {0} partition {1} queue split didn't succeed")]
TopicPartitionSplit(String, i32),
#[error("received a message on the main partition queue for topic {0} partition {1} despite partitioned queues")]
UnexpectedMainQueueMessage(String, i32),
}

type MessageConsumer = StreamConsumer<DefaultConsumerContext>;
type MessageConsumer = StreamConsumer<RebalanceContext>;

#[derive(Debug, Hash)]
pub struct KafkaDeduplicationId {
Expand Down Expand Up @@ -232,92 +233,218 @@ impl ConsumerTask {
self.topics, self.client_config
);

let consumer: Arc<MessageConsumer> = Arc::new(self.client_config.create()?);
let (failures_tx, failures_rx) = mpsc::unbounded_channel();

let rebalance_context = RebalanceContext {
task_center: self.task_center.clone(),
consumer: OnceLock::new(),
topic_partition_tasks: parking_lot::Mutex::new(HashMap::new()),
failures_tx,
sender: self.sender.clone(),
consumer_group_id,
};
let consumer: Arc<MessageConsumer> =
Arc::new(self.client_config.create_with_context(rebalance_context)?);
// this OnceLock<Weak> dance is needed because the rebalance callbacks don't get a handle on the consumer,
// which is strange because practically everything you'd want to do with them involves the consumer.
_ = consumer.context().consumer.set(Arc::downgrade(&consumer));
jackkleeman marked this conversation as resolved.
Show resolved Hide resolved

// ensure partitioned tasks are cancelled when this function exits/stops being polled
let consumer = ConsumerDrop(consumer);

let topics: Vec<&str> = self.topics.iter().map(|x| &**x).collect();
consumer.subscribe(&topics)?;

let mut topic_partition_tasks: HashMap<(String, i32), TaskId> = Default::default();

let result = loop {
tokio::select! {
res = consumer.recv() => {
let msg = match res {
Ok(msg) => msg,
Err(e) => break Err(e.into())
};
let topic = msg.topic().to_owned();
let partition = msg.partition();
let offset = msg.offset();

// If we didn't split the queue, let's do it and start the topic partition consumer
if let Entry::Vacant(e) = topic_partition_tasks.entry((topic.clone(), partition)) {
let topic_partition_consumer = match consumer
.split_partition_queue(&topic, partition) {
Some(q) => q,
None => break Err(Error::TopicPartitionSplit(topic.clone(), partition))
};

let task = topic_partition_queue_consumption_loop(
self.sender.clone(),
topic.clone(), partition,
topic_partition_consumer,
Arc::clone(&consumer),
consumer_group_id.clone()
);

if let Ok(task_id) = self.task_center.spawn_child(TaskKind::Ingress, "partition-queue", None, task) {
e.insert(task_id);
} else {
break Ok(());
}
}
let mut failures_rx = std::pin::pin!(failures_rx);

tokio::select! {
// we have to poll the main consumer for callbacks to be processed, but we expect to only see messages on the partitioned queues
res = consumer.recv() => {
match res {
// We shouldn't see any messages on the main consumer loop, because we split the queues into partitioned queues before they
// are ever assigned. Messages here should be treated as a bug in our assumptions.
Ok(msg) => Err(Error::UnexpectedMainQueueMessage(msg.topic().into(), msg.partition())),
jackkleeman marked this conversation as resolved.
Show resolved Hide resolved
Err(e) => Err(e.into()),
}
}
// watch for errors in the partitioned consumers - they should only ever abort, not return errors
Some(err) = failures_rx.recv() => {
Err(err)
}
_ = &mut rx => {
Ok(())
}
}
}
}

#[derive(derive_more::Deref)]
struct ConsumerDrop(Arc<MessageConsumer>);

// We got this message, let's send it through
if let Err(e) = self.sender.send(&consumer_group_id, msg).await {
break Err(e)
impl Drop for ConsumerDrop {
fn drop(&mut self) {
debug!(
"Stopping consumer with id {}",
self.context().consumer_group_id
);

// we have to clear this because the partitioned tasks themselves hold a reference to MessageConsumer
self.context().topic_partition_tasks.lock().clear();
}
}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
struct TopicPartition(String, i32);

impl<'a> From<TopicPartitionListElem<'a>> for TopicPartition {
fn from(value: TopicPartitionListElem<'a>) -> Self {
Self(value.topic().into(), value.partition())
}
}

impl Display for TopicPartition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.0, self.1)
}
}

struct RebalanceContext {
task_center: TaskCenter,
consumer: OnceLock<Weak<MessageConsumer>>,
topic_partition_tasks: parking_lot::Mutex<HashMap<TopicPartition, AbortOnDrop>>,
failures_tx: mpsc::UnboundedSender<Error>,
sender: MessageSender,
consumer_group_id: String,
}

impl ClientContext for RebalanceContext {}

// This callback is called synchronously with the poll of the main queue, so we don't want to block here.
// Once the pre balance steps finish assign() will be called. If we have not split at this point,
// then queues will be created defaulting to forward to the main loop - which we don't want.
// However, if we have split the partition before assign is called, the queue will be created
// with a flag RD_KAFKA_Q_F_FWD_APP and this flag will ensure that the queue will not be sent to the
// main loop. Therefore its critical that the splits happen synchronously before the pre_rebalance ends.
//
// On non-cooperative rebalance during assign all the existing partitions are revoked,
// and their queues are destroyed. Split partition queues will stop working in this case. We should ensure
// that they are not polled again after the assign. Then there will be a further rebalance callback after the revoke
// and we will set up new split partition streams before the assign.
impl ConsumerContext for RebalanceContext {
fn pre_rebalance(&self, rebalance: &Rebalance<'_>) {
let mut topic_partition_tasks = self.topic_partition_tasks.lock();
let consumer = self
.consumer
.get()
.expect("consumer must have been set in context at rebalance time");

let Some(consumer) = consumer.upgrade() else {
// if the consumer has been dropped, we don't need to maintain tasks any more
return;
};

match rebalance {
Rebalance::Assign(partitions) => {
for partition in partitions.elements() {
let partition: TopicPartition = partition.into();

if let Some(task_id) = topic_partition_tasks.remove(&partition) {
jackkleeman marked this conversation as resolved.
Show resolved Hide resolved
// This probably implies a problem in our assumptions, because librdkafka shouldn't be assigning us a partition again without having revoked it.
// However its fair to assume that the existing partitioned consumer is now invalid.
warn!("Kafka informed us of an assigned partition {partition} which we already consider assigned, cancelling the existing partitioned consumer");
drop(task_id);
}

// This method tells rdkafka that we have processed this message,
// so its offset can be safely committed.
// rdkafka periodically commits these offsets asynchronously, with a period configurable
// with auto.commit.interval.ms
if let Err(e) = consumer.store_offset(&topic, partition, offset) {
break Err(e.into())
match consumer.split_partition_queue(&partition.0, partition.1) {
Some(queue) => {
let task = topic_partition_queue_consumption_loop(
self.sender.clone(),
partition.clone(),
queue,
Arc::clone(&consumer),
self.consumer_group_id.clone(),
self.failures_tx.clone(),
);

if let Ok(task_handle) = self.task_center.spawn_unmanaged(
TaskKind::Ingress,
"kafka-partition-ingest",
None,
task,
) {
topic_partition_tasks.insert(partition, AbortOnDrop(task_handle));
} else {
// shutting down
return;
}
}
None => {
warn!("Invalid partition {partition} given to us in rebalance, ignoring it");
continue;
}
}
}
}
Rebalance::Revoke(partitions) => {
for partition in partitions.elements() {
let partition = partition.into();
match topic_partition_tasks.remove(&partition)
{
Some(task_id) => {
debug!("Stopping partitioned consumer for partition {partition} due to rebalance");
// The partitioned queue will not be polled again.
// It might be mid-poll right now, but if so its result will not be sent anywhere.
drop(task_id);
}
None => warn!("Kafka informed us of a revoked partition {partition} which we had no consumer task for"),
}
}
_ = &mut rx => {
break Ok(());

match consumer.commit_consumer_state(CommitMode::Async) {
Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => {
// Success
}
Err(error) => warn!("Failed to commit the current consumer state: {error}"),
}
}
};
for task_id in topic_partition_tasks.into_values() {
self.task_center.cancel_task(task_id);
Rebalance::Error(_) => {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets propagated in the main loop i suppose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it doesnt get propagated, but also from my reading of librdkafka it cant happen, and actually their code generally assumes that it doesnt happen and has weird behaviour if it does. however, i can see that rust-rdkafka treats this scenario as equivalent to revoke, so i guess i can do the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i cant treat it as equivalent to revoke, as they dont give me a handle on what the provided partitions are. my feeling is we can either ignore or panic

Copy link
Contributor

@slinkydeveloper slinkydeveloper Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's panic only if there's a panic handler somewhere that makes sure the panic won't get propagated and tear down the whole node. I think it should be the case with task center/subscription controller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo its better to just ignore it

}
result
}
}

struct AbortOnDrop(TaskHandle<()>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: This is a handy type if you want to move to task_center's so others can also use it.


impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}

async fn topic_partition_queue_consumption_loop(
sender: MessageSender,
topic: String,
partition: i32,
topic_partition_consumer: StreamPartitionQueue<DefaultConsumerContext>,
partition: TopicPartition,
topic_partition_consumer: StreamPartitionQueue<impl ConsumerContext>,
consumer: Arc<MessageConsumer>,
consumer_group_id: String,
) -> Result<(), anyhow::Error> {
let mut shutdown = std::pin::pin!(cancellation_watcher());

loop {
tokio::select! {
res = topic_partition_consumer.recv() => {
let msg = res?;
let offset = msg.offset();
sender.send(&consumer_group_id, msg).await?;
consumer.store_offset(&topic, partition, offset)?;
}
_ = &mut shutdown => {
return Ok(())
}
failed: mpsc::UnboundedSender<Error>,
) {
debug!("Starting partitioned consumer for partition {partition}");

// this future will be aborted when the partition is no longer needed, so any exit is a failure
let err = loop {
let res = topic_partition_consumer.recv().await;
let msg = match res {
Ok(msg) => msg,
Err(err) => break err.into(),
};
let offset = msg.offset();
if let Err(err) = sender.send(&consumer_group_id, msg).await {
break err;
}
}
if let Err(err) = consumer.store_offset(&partition.0, partition.1, offset) {
break err.into();
}
};

_ = failed.send(err);
}
Loading
Loading