Skip to content

Commit

Permalink
Merge pull request #24 from svix/jplatte/retry-delivery-ack
Browse files Browse the repository at this point in the history
Allow retrying Delivery::{ack, nack}
  • Loading branch information
svix-gabriel authored Feb 7, 2024
2 parents e93eb01 + ca9ce8b commit 823be74
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions omniqueue/src/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,29 @@ pub struct Delivery {
}

impl Delivery {
/// Acknowledges the receipt and successful processing of this [`Delivery`]. The exact nature of
/// this will vary per backend, but usually it ensures that the same message is not reprocessed.
pub async fn ack(mut self) -> Result<(), QueueError> {
self.acker.ack().await
/// Acknowledges the receipt and successful processing of this [`Delivery`].
///
/// On failure, `self` is returned alongside the error to allow retrying.
///
/// The exact nature of this will vary per backend, but usually it ensures that the same message
/// is not reprocessed.
pub async fn ack(mut self) -> Result<(), (QueueError, Self)> {
self.acker.ack().await.map_err(|e| (e, self))
}

/// Explicitly does not Acknowledge the successful processing of this [`Delivery`]. The exact
/// nature of this will vary by backend, but usually it ensures that the same message is either
/// reinserted into the same queue or is sent to a separate collection.
pub async fn nack(mut self) -> Result<(), QueueError> {
self.acker.nack().await
/// Explicitly does not Acknowledge the successful processing of this [`Delivery`].
///
/// On failure, `self` is returned alongside the error to allow retrying.
///
/// The exact nature of this will vary by backend, but usually it ensures that the same message
/// is either reinserted into the same queue or is sent to a separate collection.
pub async fn nack(mut self) -> Result<(), (QueueError, Self)> {
self.acker.nack().await.map_err(|e| (e, self))
}

/// This method will deserialize the contained bytes using the configured decoder. If a decoder
/// does not exist for the type parameter T, this function will return an error.
/// This method will deserialize the contained bytes using the configured decoder.
///
/// If a decoder does not exist for the type parameter T, this function will return an error.
///
/// This method does not consume the payload.
pub fn payload_custom<T: 'static>(&self) -> Result<Option<T>, QueueError> {
Expand Down

0 comments on commit 823be74

Please sign in to comment.