Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid reordering events due to split partition queues #2437

Merged
merged 5 commits into from
Dec 19, 2024

Conversation

jackkleeman
Copy link
Contributor

Great care is required when splitting queues to avoid losing ordering. We rely on ordering for dedupe so reordering -> dropped kafka messages.

Great care is required when splitting queues to avoid losing ordering.
We rely on ordering for dedupe so reordering -> dropped kafka messages.
@slinkydeveloper
Copy link
Contributor

We also need this on main I guess

@jackkleeman
Copy link
Contributor Author

ill base on main and we can cherry pick later

Copy link
Contributor

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me, just minor comments

crates/ingress-kafka/src/consumer_task.rs Show resolved Hide resolved
crates/ingress-kafka/src/consumer_task.rs Outdated Show resolved Hide resolved
crates/ingress-kafka/src/consumer_task.rs Show resolved Hide resolved
crates/ingress-kafka/src/consumer_task.rs Show resolved Hide resolved
};
for task_id in topic_partition_tasks.into_values() {
self.task_center.cancel_task(task_id);
Rebalance::Error(_) => {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets propagated in the main loop i suppose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it doesnt get propagated, but also from my reading of librdkafka it cant happen, and actually their code generally assumes that it doesnt happen and has weird behaviour if it does. however, i can see that rust-rdkafka treats this scenario as equivalent to revoke, so i guess i can do the same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i cant treat it as equivalent to revoke, as they dont give me a handle on what the provided partitions are. my feeling is we can either ignore or panic

Copy link
Contributor

@slinkydeveloper slinkydeveloper Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's panic only if there's a panic handler somewhere that makes sure the panic won't get propagated and tear down the whole node. I think it should be the case with task center/subscription controller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo its better to just ignore it

crates/ingress-kafka/src/consumer_task.rs Outdated Show resolved Hide resolved
@jackkleeman jackkleeman changed the base branch from 1.1.5-pprof to release/1.1.5 December 18, 2024 09:00
crates/ingress-kafka/src/consumer_task.rs Outdated Show resolved Hide resolved
crates/ingress-kafka/src/consumer_task.rs Outdated Show resolved Hide resolved
crates/ingress-kafka/src/consumer_task.rs Outdated Show resolved Hide resolved
crates/ingress-kafka/src/consumer_task.rs Outdated Show resolved Hide resolved
crates/ingress-kafka/src/subscription_controller.rs Outdated Show resolved Hide resolved
@@ -418,13 +412,11 @@ impl ConsumerContext for RebalanceContext {
}
}

struct AbortOnDrop(TaskCenter, TaskId);
struct AbortOnDrop(TaskHandle<()>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: This is a handy type if you want to move to task_center's so others can also use it.

Copy link
Contributor

@AhmedSoliman AhmedSoliman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good to me. I have no experience with librdkafka, but the rest of the parts make sense.

@jackkleeman jackkleeman merged commit aaecdc2 into restatedev:release/1.1.5 Dec 19, 2024
9 of 10 checks passed
@jackkleeman jackkleeman deleted the kafka-ordering branch December 19, 2024 16:11
slinkydeveloper pushed a commit to slinkydeveloper/restate that referenced this pull request Dec 19, 2024
* Avoid reordering events due to split partition queues

Great care is required when splitting queues to avoid losing ordering.
We rely on ordering for dedupe so reordering -> dropped kafka messages.

* Commit consumer state on rebalance

* Review comments

* Use spawn_unmanaged

* Install prometheus recorder earlier so kafka metrics work
jackkleeman added a commit that referenced this pull request Dec 19, 2024
* Avoid reordering events due to split partition queues

Great care is required when splitting queues to avoid losing ordering.
We rely on ordering for dedupe so reordering -> dropped kafka messages.

* Commit consumer state on rebalance

* Review comments

* Use spawn_unmanaged

* Install prometheus recorder earlier so kafka metrics work
jackkleeman added a commit that referenced this pull request Dec 19, 2024
* Avoid reordering events due to split partition queues (#2437)

* Avoid reordering events due to split partition queues

Great care is required when splitting queues to avoid losing ordering.
We rely on ordering for dedupe so reordering -> dropped kafka messages.

* Commit consumer state on rebalance

* Review comments

* Use spawn_unmanaged

* Install prometheus recorder earlier so kafka metrics work

* Rebase changes

* Handle panic with empty partitions

---------

Co-authored-by: Jack Kleeman <[email protected]>
jackkleeman added a commit that referenced this pull request Dec 20, 2024
* Avoid reordering events due to split partition queues

Great care is required when splitting queues to avoid losing ordering.
We rely on ordering for dedupe so reordering -> dropped kafka messages.

* Commit consumer state on rebalance

* Review comments

* Use spawn_unmanaged

* Install prometheus recorder earlier so kafka metrics work

* Handle panic with empty partitions
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants