diff --git a/omniqueue/Cargo.toml b/omniqueue/Cargo.toml index 2289c9d..cac40f1 100644 --- a/omniqueue/Cargo.toml +++ b/omniqueue/Cargo.toml @@ -50,3 +50,4 @@ redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"] redis_cluster = ["redis", "redis/cluster-async"] sqs = ["dep:aws-config", "dep:aws-sdk-sqs"] azure_queue_storage = ["dep:azure_storage", "dep:azure_storage_queues"] +beta = [] diff --git a/omniqueue/src/backends/azure_queue_storage.rs b/omniqueue/src/backends/azure_queue_storage.rs index 993df46..49e68c5 100644 --- a/omniqueue/src/backends/azure_queue_storage.rs +++ b/omniqueue/src/backends/azure_queue_storage.rs @@ -170,6 +170,12 @@ impl Acker for AqsAcker { async fn nack(&mut self) -> Result<()> { Ok(()) } + + async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> { + Err(QueueError::Unsupported( + "set_ack_deadline is not yet supported by InMemoryBackend", + )) + } } impl AqsConsumer { diff --git a/omniqueue/src/backends/gcp_pubsub.rs b/omniqueue/src/backends/gcp_pubsub.rs index 17b4988..958a7ee 100644 --- a/omniqueue/src/backends/gcp_pubsub.rs +++ b/omniqueue/src/backends/gcp_pubsub.rs @@ -281,4 +281,17 @@ impl Acker for GcpPubSubAcker { async fn nack(&mut self) -> Result<()> { self.recv_msg.nack().await.map_err(QueueError::generic) } + + async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()> { + let duration_secs = duration.as_secs().try_into().map_err(|e| { + QueueError::Generic(Box::::from(format!( + "set_ack_deadline duration {duration:?} is too large: {e:?}" + ))) + })?; + + self.recv_msg + .modify_ack_deadline(duration_secs) + .await + .map_err(QueueError::generic) + } } diff --git a/omniqueue/src/backends/in_memory.rs b/omniqueue/src/backends/in_memory.rs index 08c93c5..01ce3ad 100644 --- a/omniqueue/src/backends/in_memory.rs +++ b/omniqueue/src/backends/in_memory.rs @@ -176,6 +176,12 @@ impl Acker for InMemoryAcker { .map_err(QueueError::generic) } } + + async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> { + Err(QueueError::Unsupported( + "set_ack_deadline is not yet supported by InMemoryBackend", + )) + } } #[cfg(test)] diff --git a/omniqueue/src/backends/rabbitmq.rs b/omniqueue/src/backends/rabbitmq.rs index a550b28..8ef46b4 100644 --- a/omniqueue/src/backends/rabbitmq.rs +++ b/omniqueue/src/backends/rabbitmq.rs @@ -301,4 +301,10 @@ impl Acker for RabbitMqAcker { .map(|_| ()) .map_err(QueueError::generic) } + + async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> { + Err(QueueError::Unsupported( + "set_ack_deadline is not supported by RabbitMQ", + )) + } } diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index a774414..f36a768 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -116,6 +116,12 @@ impl Acker for RedisFallbackAcker { Ok(()) } + + async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> { + Err(QueueError::Unsupported( + "set_ack_deadline is not yet supported by redis fallback backend", + )) + } } pub(super) async fn add_to_main_queue( diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index 3f50843..ac12b4b 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -163,6 +163,12 @@ impl Acker for RedisStreamsAcker { Ok(()) } + + async fn set_ack_deadline(&mut self, _duration: Duration) -> Result<()> { + Err(QueueError::Unsupported( + "set_ack_deadline is not yet supported by redis streams backend", + )) + } } pub(super) async fn add_to_main_queue( diff --git a/omniqueue/src/backends/sqs.rs b/omniqueue/src/backends/sqs.rs index 8e6903c..12c7ad6 100644 --- a/omniqueue/src/backends/sqs.rs +++ b/omniqueue/src/backends/sqs.rs @@ -225,6 +225,34 @@ impl Acker for SqsAcker { async fn nack(&mut self) -> Result<()> { Ok(()) } + + async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()> { + if let Some(receipt_handle) = &self.receipt_handle { + let duration_secs = duration.as_secs().try_into().map_err(|e| { + QueueError::Generic(Box::::from(format!( + "set_ack_deadline duration {duration:?} is too large: {e:?}" + ))) + })?; + self.ack_client + .change_message_visibility() + .set_visibility_timeout(Some(duration_secs)) + .queue_url(&self.queue_dsn) + .receipt_handle(receipt_handle) + .send() + .await + .map_err(aws_to_queue_error)?; + + Ok(()) + } else { + Err(QueueError::generic( + DeleteMessageError::ReceiptHandleIsInvalid( + ReceiptHandleIsInvalid::builder() + .message("receipt handle must be Some to set ack deadline") + .build(), + ), + )) + } + } } pub struct SqsProducer { diff --git a/omniqueue/src/lib.rs b/omniqueue/src/lib.rs index b6069f2..1a32766 100644 --- a/omniqueue/src/lib.rs +++ b/omniqueue/src/lib.rs @@ -134,6 +134,9 @@ pub enum QueueError { #[error("{0}")] Generic(Box), + + #[error("{0}")] + Unsupported(&'static str), } impl QueueError { diff --git a/omniqueue/src/queue/mod.rs b/omniqueue/src/queue/mod.rs index f63d386..d3845b3 100644 --- a/omniqueue/src/queue/mod.rs +++ b/omniqueue/src/queue/mod.rs @@ -1,4 +1,4 @@ -use std::{fmt, future::Future}; +use std::{fmt, future::Future, time::Duration}; use async_trait::async_trait; use serde::de::DeserializeOwned; @@ -59,6 +59,19 @@ impl Delivery { self.acker.ack().await.map_err(|e| (e, self)) } + #[cfg(feature = "beta")] + /// Sets the deadline for acknowledging this [`Delivery`] to `duration`, + /// starting from the time this method is called. + /// + /// The exact nature of this will vary per backend, but usually ensures + /// that the same message will not be reprocessed if `ack()` is called + /// within an interval of `duration` from the time this method is + /// called. For example, this corresponds to the 'visibility timeout' in + /// SQS, and the 'ack deadline' in GCP + pub async fn set_ack_deadline(&mut self, duration: Duration) -> Result<(), QueueError> { + self.acker.set_ack_deadline(duration).await + } + /// Explicitly does not Acknowledge the successful processing of this /// [`Delivery`]. /// @@ -102,4 +115,6 @@ impl fmt::Debug for Delivery { pub(crate) trait Acker: Send + Sync { async fn ack(&mut self) -> Result<()>; async fn nack(&mut self) -> Result<()>; + #[cfg_attr(not(feature = "beta"), allow(dead_code))] + async fn set_ack_deadline(&mut self, duration: Duration) -> Result<()>; }