Skip to content

Commit

Permalink
Add 'set_ack_deadline' to Delivery, and implement for SQS/GCP
Browse files Browse the repository at this point in the history
This allows changing the 'ack deadline' (e.g. the visibility
timeout in SQS) for an existing message, which allows the consumer
to spend more time processing the message before calling 'ack()'.

At the moment, this is only implemented for SQS and GCP.
To support this, I've added back the `QueueError::Unsupported`
enum variant (which is a breaking change).

This is gated behind an off-by-default 'beta' feature flag at
the moment
  • Loading branch information
svix-aaron1011 committed Jun 27, 2024
1 parent 379b34b commit e30bfee
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 1 deletion.
1 change: 1 addition & 0 deletions omniqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"]
redis_cluster = ["redis", "redis/cluster-async"]
sqs = ["dep:aws-config", "dep:aws-sdk-sqs"]
azure_queue_storage = ["dep:azure_storage", "dep:azure_storage_queues"]
beta = []
6 changes: 6 additions & 0 deletions omniqueue/src/backends/azure_queue_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ impl Acker for AqsAcker {
async fn nack(&mut self) -> Result<()> {
Ok(())
}

async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> {
Err(QueueError::Unsupported(
"set_ack_deadline is not yet supported by InMemoryBackend",
))
}
}

impl AqsConsumer {
Expand Down
13 changes: 13 additions & 0 deletions omniqueue/src/backends/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,17 @@ impl Acker for GcpPubSubAcker {
async fn nack(&mut self) -> Result<()> {
self.recv_msg.nack().await.map_err(QueueError::generic)
}

async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()> {
let duration_secs = duration.as_secs().try_into().map_err(|e| {
QueueError::Generic(Box::<dyn std::error::Error + Send + Sync>::from(format!(
"set_ack_deadline duration {duration:?} is too large: {e:?}"
)))
})?;

self.recv_msg
.modify_ack_deadline(duration_secs)
.await
.map_err(QueueError::generic)
}
}
6 changes: 6 additions & 0 deletions omniqueue/src/backends/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ impl Acker for InMemoryAcker {
.map_err(QueueError::generic)
}
}

async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> {
Err(QueueError::Unsupported(
"set_ack_deadline is not yet supported by InMemoryBackend",
))
}
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions omniqueue/src/backends/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,10 @@ impl Acker for RabbitMqAcker {
.map(|_| ())
.map_err(QueueError::generic)
}

async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> {
Err(QueueError::Unsupported(
"set_ack_deadline is not supported by RabbitMQ",
))
}
}
6 changes: 6 additions & 0 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {

Ok(())
}

async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> {
Err(QueueError::Unsupported(
"set_ack_deadline is not yet supported by redis fallback backend",
))
}
}

pub(super) async fn add_to_main_queue(
Expand Down
6 changes: 6 additions & 0 deletions omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ impl<R: RedisConnection> Acker for RedisStreamsAcker<R> {

Ok(())
}

async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> {
Err(QueueError::Unsupported(
"set_ack_deadline is not yet supported by redis streams backend",
))
}
}

pub(super) async fn add_to_main_queue(
Expand Down
28 changes: 28 additions & 0 deletions omniqueue/src/backends/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,34 @@ impl Acker for SqsAcker {
async fn nack(&mut self) -> Result<()> {
Ok(())
}

async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()> {
if let Some(receipt_handle) = &self.receipt_handle {
let duration_secs = duration.as_secs().try_into().map_err(|e| {
QueueError::Generic(Box::<dyn std::error::Error + Send + Sync>::from(format!(
"set_ack_deadline duration {duration:?} is too large: {e:?}"
)))
})?;
self.ack_client
.change_message_visibility()
.set_visibility_timeout(Some(duration_secs))
.queue_url(&self.queue_dsn)
.receipt_handle(receipt_handle)
.send()
.await
.map_err(aws_to_queue_error)?;

Ok(())
} else {
Err(QueueError::generic(
DeleteMessageError::ReceiptHandleIsInvalid(
ReceiptHandleIsInvalid::builder()
.message("receipt handle must be Some to set ack deadline")
.build(),
),
))
}
}
}

pub struct SqsProducer {
Expand Down
3 changes: 3 additions & 0 deletions omniqueue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ pub enum QueueError {

#[error("{0}")]
Generic(Box<dyn std::error::Error + Send + Sync>),

#[error("{0}")]
Unsupported(&'static str),
}

impl QueueError {
Expand Down
17 changes: 16 additions & 1 deletion omniqueue/src/queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt, future::Future};
use std::{fmt, future::Future, time::Duration};

use async_trait::async_trait;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -59,6 +59,19 @@ impl Delivery {
self.acker.ack().await.map_err(|e| (e, self))
}

#[cfg(feature = "beta")]
/// Sets the deadline for acknowledging this [`Delivery`] to `duration`,
/// starting from the time this method is called.
///
/// The exact nature of this will vary per backend, but usually ensures
/// that the same message will not be reprocessed if `ack()` is called
/// within an interval of `duration` from the time this method is
/// called. For example, this corresponds to the 'visibility timeout' in
/// SQS, and the 'ack deadline' in GCP
pub async fn set_ack_deadline(&mut self, duration: Duration) -> Result<(), QueueError> {
self.acker.set_ack_deadline(duration).await
}

/// Explicitly does not Acknowledge the successful processing of this
/// [`Delivery`].
///
Expand Down Expand Up @@ -102,4 +115,6 @@ impl fmt::Debug for Delivery {
pub(crate) trait Acker: Send + Sync {
async fn ack(&mut self) -> Result<()>;
async fn nack(&mut self) -> Result<()>;
#[cfg_attr(not(feature = "beta"), allow(dead_code))]
async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()>;
}

0 comments on commit e30bfee

Please sign in to comment.