diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index aeefc1d..20789a0 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -240,7 +240,7 @@ async fn reenqueue_timed_out_messages( let mut conn = pool.get().await?; - let keys: Vec = conn.lrange(processing_queue_key, 0, 1).await?; + let keys: Vec = conn.lrange(processing_queue_key, -1, -1).await?; // If the key is older than now, it means we should be processing keys let validity_limit = KsuidMs::new(Some(OffsetDateTime::now_utc() - ack_deadline), None) @@ -248,7 +248,7 @@ async fn reenqueue_timed_out_messages( .into_bytes(); if !keys.is_empty() && keys[0] <= validity_limit { - let keys: Vec = conn.lrange(processing_queue_key, 0, BATCH_SIZE).await?; + let keys: Vec = conn.lrange(processing_queue_key, -BATCH_SIZE, -1).await?; for key in keys { if key <= validity_limit { let internal = internal_from_list(&key)?;