Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow retrying Delivery::{ack, nack} #24

Merged
merged 2 commits into from
Feb 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions omniqueue/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,29 @@ pub struct Delivery {
}

impl Delivery {
/// Acknowledges the receipt and successful processing of this [`Delivery`]. The exact nature of
/// this will vary per backend, but usually it ensures that the same message is not reprocessed.
pub async fn ack(mut self) -> Result<(), QueueError> {
self.acker.ack().await
/// Acknowledges the receipt and successful processing of this [`Delivery`].
///
/// On failure, `self` is returned alongside the error to allow retrying.
///
/// The exact nature of this will vary per backend, but usually it ensures that the same message
/// is not reprocessed.
pub async fn ack(mut self) -> Result<(), (QueueError, Self)> {
self.acker.ack().await.map_err(|e| (e, self))
}

/// Explicitly does not Acknowledge the successful processing of this [`Delivery`]. The exact
/// nature of this will vary by backend, but usually it ensures that the same message is either
/// reinserted into the same queue or is sent to a separate collection.
pub async fn nack(mut self) -> Result<(), QueueError> {
self.acker.nack().await
/// Explicitly does not Acknowledge the successful processing of this [`Delivery`].
///
/// On failure, `self` is returned alongside the error to allow retrying.
///
/// The exact nature of this will vary by backend, but usually it ensures that the same message
/// is either reinserted into the same queue or is sent to a separate collection.
pub async fn nack(mut self) -> Result<(), (QueueError, Self)> {
self.acker.nack().await.map_err(|e| (e, self))
}

/// This method will deserialize the contained bytes using the configured decoder. If a decoder
/// does not exist for the type parameter T, this function will return an error.
/// This method will deserialize the contained bytes using the configured decoder.
///
/// If a decoder does not exist for the type parameter T, this function will return an error.
///
/// This method does not consume the payload.
pub fn payload_custom<T: 'static>(&self) -> Result<Option<T>, QueueError> {
Expand Down
Loading