From e7bbc16971f7cb1d2ae2b7f0ea5d73e8646f9816 Mon Sep 17 00:00:00 2001 From: James Lucas Date: Thu, 29 Aug 2024 20:11:32 -0500 Subject: [PATCH] Rename field for clarity, add tests --- omniqueue/src/backends/redis/fallback.rs | 21 ++++++--- omniqueue/src/backends/redis/mod.rs | 3 +- omniqueue/tests/it/redis.rs | 55 ++++++++++++++++++++++++ omniqueue/tests/it/redis_fallback.rs | 50 +++++++++++++++++++++ 4 files changed, 123 insertions(+), 6 deletions(-) diff --git a/omniqueue/src/backends/redis/fallback.rs b/omniqueue/src/backends/redis/fallback.rs index a9fa34f..2689c0d 100644 --- a/omniqueue/src/backends/redis/fallback.rs +++ b/omniqueue/src/backends/redis/fallback.rs @@ -50,7 +50,7 @@ async fn receive_with_timeout( consumer: &RedisConsumer, timeout: Duration, ) -> Result> { - let key: Option> = consumer + let payload: Option> = consumer .redis .get() .await @@ -66,15 +66,26 @@ async fn receive_with_timeout( .await .map_err(QueueError::generic)?; - key.and_then(|key| InternalPayload::from_list_item(&key).ok()) - .map(|payload| payload.into_fallback_delivery(consumer)) + payload + .and_then(|payload| { + if let Ok(new_payload) = InternalPayload::from_list_item(&payload) { + Some((payload, new_payload)) + } else { + None + } + }) + .map(|(old_payload, payload)| payload.into_fallback_delivery(consumer, &old_payload)) .transpose() } pub(super) struct RedisFallbackAcker { pub(super) redis: bb8::Pool, pub(super) processing_queue_key: String, - pub(super) key: RawPayload, + // We delete based on the payload -- and since the + // `num_receives` changes after receiving it's the + // `old_payload`, since `num_receives` is part of the + // payload. Make sense? + pub(super) old_payload: RawPayload, pub(super) already_acked_or_nacked: bool, @@ -93,7 +104,7 @@ impl Acker for RedisFallbackAcker { .get() .await .map_err(QueueError::generic)? - .lrem(&self.processing_queue_key, 1, &self.key) + .lrem(&self.processing_queue_key, 1, &self.old_payload) .await .map_err(QueueError::generic)?; diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index c2a5b85..aaf2c4c 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -211,6 +211,7 @@ impl InternalPayload { fn into_fallback_delivery( self, consumer: &RedisConsumer, + old_payload: &[u8], ) -> Result { let InternalPayload { payload, @@ -222,7 +223,7 @@ impl InternalPayload { RedisFallbackAcker { redis: consumer.redis.clone(), processing_queue_key: consumer.processing_queue_key.clone(), - key: payload.to_owned(), + old_payload: old_payload.to_owned(), already_acked_or_nacked: false, max_receives: consumer.max_receives, num_receives, diff --git a/omniqueue/tests/it/redis.rs b/omniqueue/tests/it/redis.rs index c3c3ad3..376240e 100644 --- a/omniqueue/tests/it/redis.rs +++ b/omniqueue/tests/it/redis.rs @@ -291,3 +291,58 @@ async fn test_pending() { .unwrap() .is_empty()); } + +#[tokio::test] +async fn test_max_receives() { + let payload = ExType { a: 1 }; + + let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let client = Client::open(ROOT_URL).unwrap(); + let mut conn = client.get_multiplexed_async_connection().await.unwrap(); + + let _: () = conn + .xgroup_create_mkstream(&stream_name, "test_cg", 0i8) + .await + .unwrap(); + + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: stream_name.clone(), + delayed_queue_key: format!("{stream_name}::delayed"), + delayed_lock_key: format!("{stream_name}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 5_000, + max_receives: Some(max_receives), + }; + + let (builder, _drop) = (RedisBackend::builder(config), RedisStreamDrop(stream_name)); + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + + for _ in 0..max_receives { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); +} diff --git a/omniqueue/tests/it/redis_fallback.rs b/omniqueue/tests/it/redis_fallback.rs index e48a0d5..dc3a6b1 100644 --- a/omniqueue/tests/it/redis_fallback.rs +++ b/omniqueue/tests/it/redis_fallback.rs @@ -290,3 +290,53 @@ async fn test_pending() { .unwrap() .is_empty()); } + +#[tokio::test] +async fn test_max_receives() { + let payload = ExType { a: 1 }; + + let queue_key: String = std::iter::repeat_with(fastrand::alphanumeric) + .take(8) + .collect(); + + let max_receives = 5; + + let config = RedisConfig { + dsn: ROOT_URL.to_owned(), + max_connections: 8, + reinsert_on_nack: false, + queue_key: queue_key.clone(), + delayed_queue_key: format!("{queue_key}::delayed"), + delayed_lock_key: format!("{queue_key}::delayed_lock"), + consumer_group: "test_cg".to_owned(), + consumer_name: "test_cn".to_owned(), + payload_key: "payload".to_owned(), + ack_deadline_ms: 1, + max_receives: Some(max_receives), + }; + + let (builder, _drop) = ( + RedisBackend::builder(config).use_redis_streams(false), + RedisKeyDrop(queue_key), + ); + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + + for _ in 0..max_receives { + let delivery = c.receive().await.unwrap(); + assert_eq!( + Some(&payload), + delivery.payload_serde_json().unwrap().as_ref() + ); + } + + // Give this some time because the reenqueuing can sleep for up to 500ms + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let delivery = c + .receive_all(1, std::time::Duration::from_millis(1)) + .await + .unwrap(); + assert!(delivery.is_empty()); +}