diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index fd8d0ca..039cce2 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -92,8 +92,6 @@ impl Acker for RedisFallbackAcker { return Err(QueueError::CannotAckOrNackTwice); } - self.already_acked_or_nacked = true; - let _: () = self .redis .get() @@ -103,6 +101,8 @@ impl Acker for RedisFallbackAcker { .await .map_err(QueueError::generic)?; + self.already_acked_or_nacked = true; + Ok(()) } diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index f495d02..451808f 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -139,17 +139,19 @@ impl Acker for RedisStreamsAcker { return Err(QueueError::CannotAckOrNackTwice); } - self.already_acked_or_nacked = true; - let mut pipeline = redis::pipe(); pipeline.xack(&self.queue_key, &self.consumer_group, &[&self.entry_id]); pipeline.xdel(&self.queue_key, &[&self.entry_id]); let mut conn = self.redis.get().await.map_err(QueueError::generic)?; - pipeline + let _: () = pipeline .query_async(&mut *conn) .await - .map_err(QueueError::generic) + .map_err(QueueError::generic)?; + + self.already_acked_or_nacked = true; + + Ok(()) } async fn nack(&mut self) -> Result<()> {