Skip to content

Commit

Permalink
redis: Only set already_acked_or_nacked after ack succeeded
Browse files Browse the repository at this point in the history
  • Loading branch information
svix-jplatte committed Aug 13, 2024
1 parent 1e1cca5 commit 8c132ca
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
4 changes: 2 additions & 2 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {
return Err(QueueError::CannotAckOrNackTwice);
}

self.already_acked_or_nacked = true;

let _: () = self
.redis
.get()
Expand All @@ -103,6 +101,8 @@ impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {
.await
.map_err(QueueError::generic)?;

self.already_acked_or_nacked = true;

Ok(())
}

Expand Down
10 changes: 6 additions & 4 deletions omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,19 @@ impl<R: RedisConnection> Acker for RedisStreamsAcker<R> {
return Err(QueueError::CannotAckOrNackTwice);
}

self.already_acked_or_nacked = true;

let mut pipeline = redis::pipe();
pipeline.xack(&self.queue_key, &self.consumer_group, &[&self.entry_id]);
pipeline.xdel(&self.queue_key, &[&self.entry_id]);

let mut conn = self.redis.get().await.map_err(QueueError::generic)?;
pipeline
let _: () = pipeline
.query_async(&mut *conn)
.await
.map_err(QueueError::generic)
.map_err(QueueError::generic)?;

self.already_acked_or_nacked = true;

Ok(())
}

async fn nack(&mut self) -> Result<()> {
Expand Down

0 comments on commit 8c132ca

Please sign in to comment.