diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index aeefc1d..2fc9bee 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -170,14 +170,18 @@ impl Acker for RedisFallbackAcker { } pub(super) async fn add_to_main_queue( - keys: &[Vec], + keys: Vec>, main_queue_name: &str, conn: &mut impl AsyncCommands, ) -> Result<()> { + // We don't care about existing `num_receives` + // since we're pushing onto a different queue. let new_keys = keys - .iter() - .map(|x| regenerate_key(x)) - .collect::>>()?; + .into_iter() + // So reset it to avoid carrying state over: + .map(|x| InternalPayload::new(x.payload)) + .map(internal_to_list_payload) + .collect::>(); let _: () = conn .lpush(main_queue_name, new_keys) .await @@ -284,7 +288,3 @@ async fn reenqueue_timed_out_messages( Ok(()) } - -fn regenerate_key(key: &[u8]) -> Result { - Ok(internal_to_list_payload(internal_from_list(key)?)) -} diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 46b9225..e2ef496 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -592,23 +592,31 @@ async fn background_task_delayed( let timestamp = unix_timestamp(SystemTime::now() - Duration::from_secs(1)) .map_err(QueueError::generic)?; - let keys: Vec = conn + let old_keys: Vec = conn .zrangebyscore_limit(delayed_queue_name, 0, timestamp, 0, BATCH_SIZE) .await .map_err(QueueError::generic)?; - if !keys.is_empty() { - trace!("Moving {} messages from delayed to main queue", keys.len()); + if !old_keys.is_empty() { + let new_keys = old_keys + .iter() + .map(|x| internal_from_list(x)) + .collect::>>()?; + trace!( + "Moving {} messages from delayed to main queue", + new_keys.len() + ); if use_redis_streams { - streams::add_to_main_queue(&keys, main_queue_name, payload_key, &mut *conn).await?; + streams::add_to_main_queue(new_keys, main_queue_name, payload_key, &mut *conn) + .await?; } else { - fallback::add_to_main_queue(&keys, main_queue_name, &mut *conn).await?; + fallback::add_to_main_queue(new_keys, main_queue_name, &mut *conn).await?; } // Then remove the tasks from the delayed queue so they aren't resent let _: () = conn - .zrem(delayed_queue_name, keys) + .zrem(delayed_queue_name, old_keys) .await .map_err(QueueError::generic)?; diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index 32b1ab4..e034eea 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -13,8 +13,8 @@ use redis::{ use tracing::{error, trace}; use super::{ - internal_from_list, DeadLetterQueueConfig, InternalPayload, InternalPayloadOwned, - RedisConnection, RedisConsumer, RedisProducer, + DeadLetterQueueConfig, InternalPayload, InternalPayloadOwned, RedisConnection, RedisConsumer, + RedisProducer, }; use crate::{queue::Acker, Delivery, QueueError, Result}; @@ -241,20 +241,21 @@ impl Acker for RedisStreamsAcker { } pub(super) async fn add_to_main_queue( - keys: &[Vec], + keys: Vec>, main_queue_name: &str, payload_key: &str, conn: &mut impl redis::aio::ConnectionLike, ) -> Result<()> { let mut pipe = redis::pipe(); - for key in keys { - // We don't care about `num_receives` here since we're - // re-queuing from delayed queue: - let InternalPayload { payload, .. } = internal_from_list(key)?; + // We don't care about existing `num_receives` + // since we're pushing onto a different queue. + for InternalPayload { payload, .. } in keys { + // So reset it to avoid carrying state over: + let internal = InternalPayload::new(payload); let _ = pipe.xadd( main_queue_name, GENERATE_STREAM_ID, - internal_to_stream_payload!(InternalPayload::new(payload), payload_key), + internal_to_stream_payload!(internal, payload_key), ); }