From f3703b00941ff71e5f509998b7d2632f18659f77 Mon Sep 17 00:00:00 2001 From: James Lucas Date: Thu, 5 Sep 2024 23:58:19 -0500 Subject: [PATCH] Fix `LRANGE` usage in non-stream redis We `LPUSH` onto front of processing queue, so we should `LRANGE` the end of the queue when looking for messages to reenqueue. --- omniqueue/src/backends/redis/fallback.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index aeefc1d..20789a0 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -240,7 +240,7 @@ async fn reenqueue_timed_out_messages( let mut conn = pool.get().await?; - let keys: Vec = conn.lrange(processing_queue_key, 0, 1).await?; + let keys: Vec = conn.lrange(processing_queue_key, -1, -1).await?; // If the key is older than now, it means we should be processing keys let validity_limit = KsuidMs::new(Some(OffsetDateTime::now_utc() - ack_deadline), None) @@ -248,7 +248,7 @@ async fn reenqueue_timed_out_messages( .into_bytes(); if !keys.is_empty() && keys[0] <= validity_limit { - let keys: Vec = conn.lrange(processing_queue_key, 0, BATCH_SIZE).await?; + let keys: Vec = conn.lrange(processing_queue_key, -BATCH_SIZE, -1).await?; for key in keys { if key <= validity_limit { let internal = internal_from_list(&key)?;