From 02ae29de0bda0aa39ad17f72683c196da50aa89c Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 13 Aug 2024 13:56:21 +0200 Subject: [PATCH] redis: Only set already_acked_or_nacked after ack succeeded --- omniqueue/src/backends/redis/fallback.rs | 4 ++-- omniqueue/src/backends/redis/streams.rs | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index fd8d0ca..039cce2 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -92,8 +92,6 @@ impl Acker for RedisFallbackAcker { return Err(QueueError::CannotAckOrNackTwice); } - self.already_acked_or_nacked = true; - let _: () = self .redis .get() @@ -103,6 +101,8 @@ impl Acker for RedisFallbackAcker { .await .map_err(QueueError::generic)?; + self.already_acked_or_nacked = true; + Ok(()) } diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index f495d02..451808f 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -139,17 +139,19 @@ impl Acker for RedisStreamsAcker { return Err(QueueError::CannotAckOrNackTwice); } - self.already_acked_or_nacked = true; - let mut pipeline = redis::pipe(); pipeline.xack(&self.queue_key, &self.consumer_group, &[&self.entry_id]); pipeline.xdel(&self.queue_key, &[&self.entry_id]); let mut conn = self.redis.get().await.map_err(QueueError::generic)?; - pipeline + let _: () = pipeline .query_async(&mut *conn) .await - .map_err(QueueError::generic) + .map_err(QueueError::generic)?; + + self.already_acked_or_nacked = true; + + Ok(()) } async fn nack(&mut self) -> Result<()> {