Skip to content

Commit

Permalink
Rename field for clarity, add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymell committed Aug 30, 2024
1 parent b86aa68 commit e7bbc16
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 6 deletions.
21 changes: 16 additions & 5 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn receive_with_timeout<R: RedisConnection>(
consumer: &RedisConsumer<R>,
timeout: Duration,
) -> Result<Option<Delivery>> {
let key: Option<Vec<u8>> = consumer
let payload: Option<Vec<u8>> = consumer
.redis
.get()
.await
Expand All @@ -66,15 +66,26 @@ async fn receive_with_timeout<R: RedisConnection>(
.await
.map_err(QueueError::generic)?;

key.and_then(|key| InternalPayload::from_list_item(&key).ok())
.map(|payload| payload.into_fallback_delivery(consumer))
payload
.and_then(|payload| {
if let Ok(new_payload) = InternalPayload::from_list_item(&payload) {
Some((payload, new_payload))
} else {
None
}
})
.map(|(old_payload, payload)| payload.into_fallback_delivery(consumer, &old_payload))
.transpose()
}

pub(super) struct RedisFallbackAcker<M: ManageConnection> {
pub(super) redis: bb8::Pool<M>,
pub(super) processing_queue_key: String,
pub(super) key: RawPayload,
// We delete based on the payload -- and since the
// `num_receives` changes after receiving it's the
// `old_payload`, since `num_receives` is part of the
// payload. Make sense?
pub(super) old_payload: RawPayload,

pub(super) already_acked_or_nacked: bool,

Expand All @@ -93,7 +104,7 @@ impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {
.get()
.await
.map_err(QueueError::generic)?
.lrem(&self.processing_queue_key, 1, &self.key)
.lrem(&self.processing_queue_key, 1, &self.old_payload)
.await
.map_err(QueueError::generic)?;

Expand Down
3 changes: 2 additions & 1 deletion omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ impl InternalPayload {
fn into_fallback_delivery<R: RedisConnection>(
self,
consumer: &RedisConsumer<R>,
old_payload: &[u8],
) -> Result<Delivery> {
let InternalPayload {
payload,
Expand All @@ -222,7 +223,7 @@ impl InternalPayload {
RedisFallbackAcker {
redis: consumer.redis.clone(),
processing_queue_key: consumer.processing_queue_key.clone(),
key: payload.to_owned(),
old_payload: old_payload.to_owned(),
already_acked_or_nacked: false,
max_receives: consumer.max_receives,
num_receives,
Expand Down
55 changes: 55 additions & 0 deletions omniqueue/tests/it/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,58 @@ async fn test_pending() {
.unwrap()
.is_empty());
}

#[tokio::test]
async fn test_max_receives() {
let payload = ExType { a: 1 };

let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let client = Client::open(ROOT_URL).unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();

let _: () = conn
.xgroup_create_mkstream(&stream_name, "test_cg", 0i8)
.await
.unwrap();

let max_receives = 5;

let config = RedisConfig {
dsn: ROOT_URL.to_owned(),
max_connections: 8,
reinsert_on_nack: false,
queue_key: stream_name.clone(),
delayed_queue_key: format!("{stream_name}::delayed"),
delayed_lock_key: format!("{stream_name}::delayed_lock"),
consumer_group: "test_cg".to_owned(),
consumer_name: "test_cn".to_owned(),
payload_key: "payload".to_owned(),
ack_deadline_ms: 5_000,
max_receives: Some(max_receives),
};

let (builder, _drop) = (RedisBackend::builder(config), RedisStreamDrop(stream_name));

let (p, mut c) = builder.build_pair().await.unwrap();

p.send_serde_json(&payload).await.unwrap();

for _ in 0..max_receives {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(&payload),
delivery.payload_serde_json().unwrap().as_ref()
);
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let delivery = c
.receive_all(1, std::time::Duration::from_millis(1))
.await
.unwrap();
assert!(delivery.is_empty());
}
50 changes: 50 additions & 0 deletions omniqueue/tests/it/redis_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,53 @@ async fn test_pending() {
.unwrap()
.is_empty());
}

#[tokio::test]
async fn test_max_receives() {
let payload = ExType { a: 1 };

let queue_key: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let max_receives = 5;

let config = RedisConfig {
dsn: ROOT_URL.to_owned(),
max_connections: 8,
reinsert_on_nack: false,
queue_key: queue_key.clone(),
delayed_queue_key: format!("{queue_key}::delayed"),
delayed_lock_key: format!("{queue_key}::delayed_lock"),
consumer_group: "test_cg".to_owned(),
consumer_name: "test_cn".to_owned(),
payload_key: "payload".to_owned(),
ack_deadline_ms: 1,
max_receives: Some(max_receives),
};

let (builder, _drop) = (
RedisBackend::builder(config).use_redis_streams(false),
RedisKeyDrop(queue_key),
);

let (p, mut c) = builder.build_pair().await.unwrap();

p.send_serde_json(&payload).await.unwrap();

for _ in 0..max_receives {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(&payload),
delivery.payload_serde_json().unwrap().as_ref()
);
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let delivery = c
.receive_all(1, std::time::Duration::from_millis(1))
.await
.unwrap();
assert!(delivery.is_empty());
}

0 comments on commit e7bbc16

Please sign in to comment.