diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index 039cce2..87206bb 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -9,7 +9,10 @@ use svix_ksuid::{KsuidLike as _, KsuidMs}; use time::OffsetDateTime; use tracing::{error, trace}; -use super::{from_key, to_key, RawPayload, RedisConnection, RedisConsumer, RedisProducer}; +use super::{ + internal_from_list, internal_to_list_payload, InternalPayloadOwned, RawPayload, + RedisConnection, RedisConsumer, RedisProducer, +}; use crate::{queue::Acker, Delivery, QueueError, Result}; pub(super) async fn send_raw( @@ -21,7 +24,7 @@ pub(super) async fn send_raw( .get() .await .map_err(QueueError::generic)? - .lpush(&producer.queue_key, to_key(payload)) + .lpush(&producer.queue_key, internal_to_list_payload((payload, 0))) .await .map_err(QueueError::generic) } @@ -45,7 +48,7 @@ async fn receive_with_timeout( consumer: &RedisConsumer, timeout: Duration, ) -> Result> { - let key: Option> = consumer + let payload: Option> = consumer .redis .get() .await @@ -61,29 +64,52 @@ async fn receive_with_timeout( .await .map_err(QueueError::generic)?; - key.map(|key| make_delivery(consumer, &key)).transpose() + match payload { + Some(old_payload) => { + let (payload, num_receives) = internal_from_list(&old_payload)?; + Some(internal_to_delivery( + (payload.to_vec(), num_receives), + consumer, + old_payload, + )) + .transpose() + } + None => Ok(None), + } } -fn make_delivery(consumer: &RedisConsumer, key: &[u8]) -> Result { - let (_, payload) = from_key(key)?; - +fn internal_to_delivery( + internal: InternalPayloadOwned, + consumer: &RedisConsumer, + old_payload: Vec, +) -> Result { + let (payload, num_receives) = internal; Ok(Delivery::new( - payload.to_owned(), + payload, RedisFallbackAcker { redis: consumer.redis.clone(), processing_queue_key: consumer.processing_queue_key.clone(), - key: key.to_owned(), + old_payload, already_acked_or_nacked: false, + max_receives: consumer.max_receives, + num_receives, }, )) } -struct RedisFallbackAcker { - redis: bb8::Pool, - processing_queue_key: String, - key: RawPayload, +pub(super) struct RedisFallbackAcker { + pub(super) redis: bb8::Pool, + pub(super) processing_queue_key: String, + // We delete based on the payload -- and since the + // `num_receives` changes after receiving it's the + // `old_payload`, since `num_receives` is part of the + // payload. Make sense? + pub(super) old_payload: RawPayload, + + pub(super) already_acked_or_nacked: bool, - already_acked_or_nacked: bool, + pub(super) max_receives: usize, + pub(super) num_receives: usize, } impl Acker for RedisFallbackAcker { @@ -97,7 +123,7 @@ impl Acker for RedisFallbackAcker { .get() .await .map_err(QueueError::generic)? - .lrem(&self.processing_queue_key, 1, &self.key) + .lrem(&self.processing_queue_key, 1, &self.old_payload) .await .map_err(QueueError::generic)?; @@ -107,6 +133,11 @@ impl Acker for RedisFallbackAcker { } async fn nack(&mut self) -> Result<()> { + if self.num_receives >= self.max_receives { + trace!("Maximum attempts reached"); + return self.ack().await; + } + if self.already_acked_or_nacked { return Err(QueueError::CannotAckOrNackTwice); } @@ -144,13 +175,19 @@ pub(super) async fn background_task_processing( queue_key: String, processing_queue_key: String, ack_deadline_ms: i64, + max_receives: usize, ) -> Result<()> { // FIXME: ack_deadline_ms should be unsigned let ack_deadline = Duration::from_millis(ack_deadline_ms as _); loop { - if let Err(err) = - reenqueue_timed_out_messages(&pool, &queue_key, &processing_queue_key, ack_deadline) - .await + if let Err(err) = reenqueue_timed_out_messages( + &pool, + &queue_key, + &processing_queue_key, + ack_deadline, + max_receives, + ) + .await { error!("{err}"); tokio::time::sleep(Duration::from_millis(500)).await; @@ -164,6 +201,7 @@ async fn reenqueue_timed_out_messages( queue_key: &str, processing_queue_key: &str, ack_deadline: Duration, + max_receives: usize, ) -> Result<(), Box> { const BATCH_SIZE: isize = 50; @@ -180,10 +218,24 @@ async fn reenqueue_timed_out_messages( let keys: Vec = conn.lrange(processing_queue_key, 0, BATCH_SIZE).await?; for key in keys { if key <= validity_limit { + let internal = internal_from_list(&key)?; + let num_receives = internal.1; + if num_receives >= max_receives { + trace!( + num_receives = num_receives, + "Maximum attempts reached for message, not reenqueuing", + ); + } else { + trace!( + num_receives = num_receives, + "Pushing back overdue task to queue" + ); + let _: () = conn + .rpush(queue_key, internal_to_list_payload(internal)) + .await?; + } + // We use LREM to be sure we only delete the keys we should be deleting - trace!("Pushing back overdue task to queue"); - let refreshed_key = regenerate_key(&key)?; - let _: () = conn.rpush(queue_key, &refreshed_key).await?; let _: () = conn.lrem(processing_queue_key, 1, &key).await?; } } @@ -196,6 +248,5 @@ async fn reenqueue_timed_out_messages( } fn regenerate_key(key: &[u8]) -> Result { - let (_, payload) = from_key(key)?; - Ok(to_key(payload)) + Ok(internal_to_list_payload(internal_from_list(key)?)) } diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 34007d8..ca6a255 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -85,6 +85,63 @@ impl RedisConnection for RedisClusterConnectionManager { } } +// First element is the raw payload slice, second +// is `num_receives`, the number of the times +// the message has previously been received. +type InternalPayload<'a> = (&'a [u8], usize); + +// The same as `InternalPayload` but with an +// owned payload. +type InternalPayloadOwned = (Vec, usize); + +fn internal_from_list(payload: &[u8]) -> Result> { + // All information is stored in the key in which the ID and the [optional] + // number of prior receives are separated by a `#`, and the JSON + // formatted task is delimited by a `|` So, take the key, then take the + // optional receive count, then take the part after the `|` to get the + // payload. + let count_sep_pos = payload.iter().position(|&byte| byte == b'#'); + let payload_sep_pos = payload + .iter() + .position(|&byte| byte == b'|') + .ok_or_else(|| QueueError::Generic("Improper key format".into()))?; + + let id_end_pos = match count_sep_pos { + Some(count_sep_pos) if count_sep_pos < payload_sep_pos => count_sep_pos, + _ => payload_sep_pos, + }; + let _id = str::from_utf8(&payload[..id_end_pos]) + .map_err(|_| QueueError::Generic("Non-UTF8 key ID".into()))?; + + // This should be backward-compatible with messages that don't include + // `num_receives` + let num_receives = if let Some(count_sep_pos) = count_sep_pos { + let num_receives = std::str::from_utf8(&payload[(count_sep_pos + 1)..payload_sep_pos]) + .map_err(|_| QueueError::Generic("Improper key format".into()))? + .parse::() + .map_err(|_| QueueError::Generic("Improper key format".into()))?; + num_receives + 1 + } else { + 1 + }; + + Ok((&payload[payload_sep_pos + 1..], num_receives)) +} + +fn internal_to_list_payload(internal: InternalPayload) -> Vec { + let id = delayed_key_id(); + let (payload, num_receives) = internal; + let num_receives = num_receives.to_string(); + let mut result = + Vec::with_capacity(id.len() + num_receives.as_bytes().len() + payload.len() + 3); + result.extend(id.as_bytes()); + result.push(b'#'); + result.extend(num_receives.as_bytes()); + result.push(b'|'); + result.extend(payload); + result +} + #[derive(Debug, Error)] enum EvictionCheckError { #[error("Unable to verify eviction policy. Ensure `maxmemory-policy` set to `noeviction` or `volatile-*`")] @@ -139,6 +196,7 @@ pub struct RedisConfig { pub consumer_name: String, pub payload_key: String, pub ack_deadline_ms: i64, + pub max_receives: Option, } pub struct RedisBackend(PhantomData); @@ -290,6 +348,7 @@ impl RedisBackendBuilder { payload_key: self.config.payload_key, use_redis_streams: self.use_redis_streams, _background_tasks: background_tasks.clone(), + max_receives: self.config.max_receives.unwrap_or(usize::MAX), }, )) } @@ -332,6 +391,7 @@ impl RedisBackendBuilder { consumer_name: self.config.consumer_name, payload_key: self.config.payload_key, use_redis_streams: self.use_redis_streams, + max_receives: self.config.max_receives.unwrap_or(usize::MAX), _background_tasks, }) } @@ -395,6 +455,8 @@ impl RedisBackendBuilder { self.config.consumer_group.to_owned(), self.config.consumer_name.to_owned(), self.config.ack_deadline_ms, + self.config.max_receives.unwrap_or(usize::MAX), + self.config.payload_key.to_owned(), )); } else { join_set.spawn(fallback::background_task_processing( @@ -402,6 +464,7 @@ impl RedisBackendBuilder { self.config.queue_key.to_owned(), self.get_processing_queue_key(), self.config.ack_deadline_ms, + self.config.max_receives.unwrap_or(usize::MAX), )); } @@ -555,7 +618,11 @@ impl RedisProducer { .get() .await .map_err(QueueError::generic)? - .zadd(&self.delayed_queue_key, to_key(payload), timestamp) + .zadd( + &self.delayed_queue_key, + internal_to_list_payload((payload, 0)), + timestamp, + ) .await .map_err(QueueError::generic)?; @@ -594,31 +661,6 @@ fn delayed_key_id() -> String { svix_ksuid::Ksuid::new(None, None).to_base62() } -/// Prefixes a payload with an id, separated by a pipe, e.g `ID|payload`. -fn to_key(payload: &[u8]) -> RawPayload { - let id = delayed_key_id(); - - let mut result = Vec::with_capacity(id.len() + payload.len() + 1); - result.extend(id.as_bytes()); - result.push(b'|'); - result.extend(payload); - result -} - -/// Splits a key encoded with [`to_key`] into ID and payload. -fn from_key(key: &[u8]) -> Result<(&str, &[u8])> { - // All information is stored in the key in which the ID and JSON formatted task - // are separated by a `|`. So, take the key, then take the part after the `|`. - let sep_pos = key - .iter() - .position(|&byte| byte == b'|') - .ok_or_else(|| QueueError::Generic("Improper key format".into()))?; - let id = str::from_utf8(&key[..sep_pos]) - .map_err(|_| QueueError::Generic("Non-UTF8 key ID".into()))?; - - Ok((id, &key[sep_pos + 1..])) -} - pub struct RedisConsumer { redis: bb8::Pool, queue_key: String, @@ -627,6 +669,7 @@ pub struct RedisConsumer { consumer_name: String, payload_key: String, use_redis_streams: bool, + max_receives: usize, _background_tasks: Arc>>, } diff --git a/omniqueue/src/backends/redis/streams.rs b/omniqueue/src/backends/redis/streams.rs index c926630..e4f73dd 100644 --- a/omniqueue/src/backends/redis/streams.rs +++ b/omniqueue/src/backends/redis/streams.rs @@ -11,7 +11,9 @@ use redis::{ }; use tracing::{error, trace}; -use super::{from_key, RedisConnection, RedisConsumer, RedisProducer}; +use super::{ + internal_from_list, InternalPayloadOwned, RedisConnection, RedisConsumer, RedisProducer, +}; use crate::{queue::Acker, Delivery, QueueError, Result}; /// Special ID for XADD command's which generates a stream ID automatically @@ -24,6 +26,15 @@ const LISTEN_STREAM_ID: &str = ">"; // FIXME(onelson): expose in config? const PENDING_BATCH_SIZE: usize = 1000; +macro_rules! internal_to_stream_payload { + (($payload:expr, $num_receives:expr), $payload_key:expr) => { + &[ + ($payload_key, $payload), + (NUM_RECEIVES, $num_receives.to_string().as_bytes()), + ] + }; +} + pub(super) async fn send_raw( producer: &RedisProducer, payload: &[u8], @@ -36,7 +47,7 @@ pub(super) async fn send_raw( .xadd( &producer.queue_key, GENERATE_STREAM_ID, - &[(&producer.payload_key, payload)], + internal_to_stream_payload!((payload, 0), producer.payload_key.as_str()), ) .await .map_err(QueueError::generic) @@ -63,7 +74,8 @@ pub(super) async fn receive(consumer: &RedisConsumer) -> let queue = read_out.keys.into_iter().next().ok_or(QueueError::NoData)?; let entry = queue.ids.into_iter().next().ok_or(QueueError::NoData)?; - wrap_entry(consumer, entry) + let internal = internal_from_stream(&entry, &consumer.payload_key)?; + Ok(internal_to_delivery(internal, consumer, entry.id)) } pub(super) async fn receive_all( @@ -96,25 +108,44 @@ pub(super) async fn receive_all( if let Some(queue) = read_out.keys.into_iter().next() { for entry in queue.ids { - let wrapped = wrap_entry(consumer, entry)?; - out.push(wrapped); + let internal = internal_from_stream(&entry, &consumer.payload_key)?; + let delivery = internal_to_delivery(internal, consumer, entry.id); + out.push(delivery); } } Ok(out) } -fn wrap_entry( +const NUM_RECEIVES: &str = "num_receives"; + +fn internal_from_stream(stream_id: &StreamId, payload_key: &str) -> Result { + let StreamId { map, .. } = stream_id; + + let num_receives = if let Some(redis::Value::BulkString(data)) = map.get(NUM_RECEIVES) { + let count = std::str::from_utf8(data) + .map_err(|_| QueueError::Generic("Improper key format".into()))? + .parse::() + .map_err(QueueError::generic)?; + count + 1 + } else { + 1 + }; + + let payload: Vec = map + .get(payload_key) + .ok_or(QueueError::NoData) + .and_then(|x| redis::from_redis_value(x).map_err(QueueError::generic))?; + + Ok((payload, num_receives)) +} + +fn internal_to_delivery( + internal: InternalPayloadOwned, consumer: &RedisConsumer, - entry: StreamId, -) -> Result { - let entry_id = entry.id.clone(); - let payload = entry - .map - .get(&consumer.payload_key) - .ok_or(QueueError::NoData)?; - let payload: Vec = redis::from_redis_value(payload).map_err(QueueError::generic)?; - - Ok(Delivery::new( + entry_id: String, +) -> Delivery { + let (payload, num_receives) = internal; + Delivery::new( payload, RedisStreamsAcker { redis: consumer.redis.clone(), @@ -122,17 +153,21 @@ fn wrap_entry( consumer_group: consumer.consumer_group.to_owned(), entry_id, already_acked_or_nacked: false, + max_receives: consumer.max_receives, + num_receives, }, - )) + ) } -struct RedisStreamsAcker { - redis: bb8::Pool, - queue_key: String, - consumer_group: String, - entry_id: String, +pub(super) struct RedisStreamsAcker { + pub(super) redis: bb8::Pool, + pub(super) queue_key: String, + pub(super) consumer_group: String, + pub(super) entry_id: String, - already_acked_or_nacked: bool, + pub(super) already_acked_or_nacked: bool, + pub(super) max_receives: usize, + pub(super) num_receives: usize, } impl Acker for RedisStreamsAcker { @@ -157,6 +192,11 @@ impl Acker for RedisStreamsAcker { } async fn nack(&mut self) -> Result<()> { + if self.num_receives >= self.max_receives { + trace!(entry_id = self.entry_id, "Maximum attempts reached"); + return self.ack().await; + } + if self.already_acked_or_nacked { return Err(QueueError::CannotAckOrNackTwice); } @@ -181,11 +221,13 @@ pub(super) async fn add_to_main_queue( ) -> Result<()> { let mut pipe = redis::pipe(); for key in keys { - let (_, payload) = from_key(key)?; + // We don't care about `num_receives` here since we're + // re-queuing from delayed queue: + let (payload, _) = internal_from_list(key)?; let _ = pipe.xadd( main_queue_name, GENERATE_STREAM_ID, - &[(payload_key, payload)], + internal_to_stream_payload!((payload, 0), payload_key), ); } @@ -223,6 +265,8 @@ pub(super) async fn background_task_pending( consumer_group: String, consumer_name: String, ack_deadline_ms: i64, + max_receives: usize, + payload_key: String, ) -> Result<()> { loop { if let Err(err) = reenqueue_timed_out_messages( @@ -231,6 +275,8 @@ pub(super) async fn background_task_pending( &consumer_group, &consumer_name, ack_deadline_ms, + max_receives, + &payload_key, ) .await { @@ -247,6 +293,8 @@ async fn reenqueue_timed_out_messages( consumer_group: &str, consumer_name: &str, ack_deadline_ms: i64, + max_receives: usize, + payload_key: &str, ) -> Result<()> { let mut conn = pool.get().await.map_err(QueueError::generic)?; @@ -270,19 +318,20 @@ async fn reenqueue_timed_out_messages( let mut pipe = redis::pipe(); // And reinsert the map of KV pairs into the MAIN queue with a new stream ID - for StreamId { map, .. } in &ids { + for stream_id in &ids { + let (payload, num_receives) = internal_from_stream(stream_id, payload_key)?; + if num_receives >= max_receives { + trace!( + entry_id = stream_id.id, + "Maximum attempts reached for message, not reenqueuing", + ); + continue; + } + let _ = pipe.xadd( main_queue_name, GENERATE_STREAM_ID, - &map.iter() - .filter_map(|(k, v)| { - if let redis::Value::BulkString(data) = v { - Some((k.as_str(), data.as_slice())) - } else { - None - } - }) - .collect::>(), + internal_to_stream_payload!((payload.as_slice(), num_receives), payload_key), ); } diff --git a/omniqueue/tests/it/redis.rs b/omniqueue/tests/it/redis.rs index 6869fa7..69abfbd 100644 --- a/omniqueue/tests/it/redis.rs +++ b/omniqueue/tests/it/redis.rs @@ -48,6 +48,7 @@ async fn make_test_queue() -> (RedisBackendBuilder, RedisStreamDrop) { consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), ack_deadline_ms: 5_000, + max_receives: None, }; (RedisBackend::builder(config), RedisStreamDrop(stream_name)) @@ -290,3 +291,126 @@ async fn test_pending() { .unwrap() .is_empty()); } + +#[tokio::test] +async fn test_max_receives() { + let payload = ExType { a: 1 }; + + let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let client = Client::open(ROOT_URL).unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); + + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) + .await + .unwrap(); + + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), + delayed_lock_key: format!("{stream_name}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 1, + max_receives: Some(max_receives), + }; + + let (builder, _drop) = (RedisBackend::builder(config), RedisStreamDrop(stream_name)); + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + + for _ in 0..max_receives { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); +} + +// A message without a `num_receives` field shouldn't +// cause issues: +#[tokio::test] +async fn test_backward_compatible() { + let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let client = Client::open(ROOT_URL).unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); + + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) + .await + .unwrap(); + + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), + delayed_lock_key: format!("{stream_name}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 1, + max_receives: Some(max_receives), + }; + + let (builder, _drop) = ( + RedisBackend::builder(config), + RedisStreamDrop(stream_name.clone()), + ); + + let (_p, mut c) = builder.build_pair().await.unwrap(); + + let org_payload = ExType { a: 1 }; + let org_payload_str = serde_json::to_string(&org_payload).unwrap(); + + let _: () = conn + .xadd( + &stream_name, + "*", + &[("payload", org_payload_str.as_bytes())], + ) + .await + .unwrap(); + + for _ in 0..max_receives { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&org_payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); +} diff --git a/omniqueue/tests/it/redis_cluster.rs b/omniqueue/tests/it/redis_cluster.rs index 70af224..13077d0 100644 --- a/omniqueue/tests/it/redis_cluster.rs +++ b/omniqueue/tests/it/redis_cluster.rs @@ -48,6 +48,7 @@ async fn make_test_queue() -> (RedisClusterBackendBuilder, RedisStreamDrop) { consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), ack_deadline_ms: 5_000, + max_receives: None, }; ( diff --git a/omniqueue/tests/it/redis_fallback.rs b/omniqueue/tests/it/redis_fallback.rs index 5606770..885dd6c 100644 --- a/omniqueue/tests/it/redis_fallback.rs +++ b/omniqueue/tests/it/redis_fallback.rs @@ -1,8 +1,10 @@ +use core::str; use std::time::{Duration, Instant}; use omniqueue::backends::{redis::RedisBackendBuilder, RedisBackend, RedisConfig}; -use redis::{Client, Commands}; +use redis::{AsyncCommands, Client, Commands}; use serde::{Deserialize, Serialize}; +use svix_ksuid::KsuidLike; const ROOT_URL: &str = "redis://localhost"; @@ -40,6 +42,7 @@ async fn make_test_queue() -> (RedisBackendBuilder, RedisKeyDrop) { consumer_name: "test_cn".to_owned(), payload_key: "payload".to_owned(), ack_deadline_ms: 5_000, + max_receives: None, }; ( @@ -289,3 +292,115 @@ async fn test_pending() { .unwrap() .is_empty()); } + +#[tokio::test] +async fn test_max_receives() { + let payload = ExType { a: 1 }; + + let queue_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: queue_key.clone(), + delayed_queue_key: format!("{queue_key}::delayed"), + delayed_lock_key: format!("{queue_key}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 1, + max_receives: Some(max_receives), + }; + + let (builder, _drop) = ( + RedisBackend::builder(config).use_redis_streams(false), + RedisKeyDrop(queue_key), + ); + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + + for _ in 0..max_receives { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); +} + +// A message without a `num_receives` field shouldn't +// cause issues: +#[tokio::test] +async fn test_backward_compatible() { + let queue_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: queue_key.clone(), + delayed_queue_key: format!("{queue_key}::delayed"), + delayed_lock_key: format!("{queue_key}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 1, + max_receives: Some(max_receives), + }; + + let (builder, _drop) = ( + RedisBackend::builder(config).use_redis_streams(false), + RedisKeyDrop(queue_key.clone()), + ); + + let (_p, mut c) = builder.build_pair().await.unwrap(); + + let org_payload = ExType { a: 1 }; + + // Old payload format: + let id = svix_ksuid::Ksuid::new(None, None).to_base62(); + let org_payload_str = serde_json::to_string(&org_payload).unwrap(); + let mut payload = Vec::with_capacity(id.len() + org_payload_str.as_bytes().len() + 1); + payload.extend(id.as_bytes()); + payload.push(b'|'); + payload.extend(org_payload_str.as_bytes()); + + let client = Client::open(ROOT_URL).unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); + let _: () = conn.lpush(&queue_key, &payload).await.unwrap(); + + for _ in 0..max_receives { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&org_payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); +}