Skip to content

Commit

Permalink
Add test of redis DLQ ordering
Browse files Browse the repository at this point in the history
I'm not sure if we need to generically guarantee this in our docs,
but it seems reasonable to validate it in our tests for now.
  • Loading branch information
jaymell committed Sep 9, 2024
1 parent d0b518b commit ef7d82e
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 1 deletion.
97 changes: 96 additions & 1 deletion omniqueue/tests/it/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async fn test_bytes_send_recv() {
d.ack().await.unwrap();
}

#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub struct ExType {
a: u8,
}
Expand Down Expand Up @@ -421,6 +421,101 @@ async fn test_deadletter_config() {
check_dlq(0).await;
}

// Asssert that ordering is as expected. I don't know

Check warning on line 424 in omniqueue/tests/it/redis.rs

View workflow job for this annotation

GitHub Actions / Check for typos

"Asssert" should be "Assert".
// that we need to guarantee this in our docs, but it's
// good to at least validate it for now:
#[tokio::test]
async fn test_deadletter_config_order() {
let payload1 = ExType { a: 1 };
let payload2 = ExType { a: 2 };
let payload3 = ExType { a: 3 };

let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let client = Client::open(ROOT_URL).unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();

let _: () = conn
.xgroup_create_mkstream(&stream_name, "test_cg", 0i8)
.await
.unwrap();

let max_receives = 1;

let config = RedisConfig {
dsn: ROOT_URL.to_owned(),
max_connections: 8,
reinsert_on_nack: false,
queue_key: stream_name.clone(),
delayed_queue_key: format!("{stream_name}::delayed"),
delayed_lock_key: format!("{stream_name}::delayed_lock"),
consumer_group: "test_cg".to_owned(),
consumer_name: "test_cn".to_owned(),
payload_key: "payload".to_owned(),
ack_deadline_ms: 20,
dlq_config: Some(DeadLetterQueueConfig {
queue_key: dlq_key.to_owned(),
max_receives,
}),
};

let check_dlq = |asserted_len: usize| {
let dlq_key = dlq_key.clone();
async move {
let client = Client::open(ROOT_URL).unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
let mut res: Vec<String> = conn.lrange(&dlq_key, 0, -1).await.unwrap();
assert!(res.len() == asserted_len);
res.pop()
}
};

let (builder, _drop) = (
RedisBackend::builder(config),
RedisStreamDrop(stream_name.clone()),
);

let (p, mut c) = builder.build_pair().await.unwrap();

// Test send to DLQ via `ack_deadline_ms` expiration:
p.send_serde_json(&payload1).await.unwrap();
p.send_serde_json(&payload2).await.unwrap();
p.send_serde_json(&payload3).await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.nack().await.unwrap();
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Expected messages should be on DLQ:
check_dlq(3).await;

// Redrive DLQ, receive from main queue, ack:
p.redrive_dlq().await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.ack().await.unwrap();
}
}

// A message without a `num_receives` field shouldn't
// cause issues:
#[tokio::test]
Expand Down
91 changes: 91 additions & 0 deletions omniqueue/tests/it/redis_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,97 @@ async fn test_deadletter_config() {
check_dlq(0).await;
}

#[tokio::test]
async fn test_deadletter_config_order() {
let payload1 = ExType { a: 1 };
let payload2 = ExType { a: 2 };
let payload3 = ExType { a: 3 };

let stream_name: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let client = ClusterClient::new(vec![ROOT_URL]).unwrap();
let mut conn = client.get_async_connection().await.unwrap();

let _: () = conn
.xgroup_create_mkstream(&stream_name, "test_cg", 0i8)
.await
.unwrap();

let max_receives = 1;

let config = RedisConfig {
dsn: ROOT_URL.to_owned(),
max_connections: 8,
reinsert_on_nack: false,
queue_key: stream_name.clone(),
delayed_queue_key: format!("{stream_name}::delayed"),
delayed_lock_key: format!("{stream_name}::delayed_lock"),
consumer_group: "test_cg".to_owned(),
consumer_name: "test_cn".to_owned(),
payload_key: "payload".to_owned(),
ack_deadline_ms: 20,
dlq_config: Some(DeadLetterQueueConfig {
queue_key: dlq_key.to_owned(),
max_receives,
}),
};

let check_dlq = |asserted_len: usize| {
let dlq_key = dlq_key.clone();
let client = client.clone();
async move {
let mut conn = client.get_async_connection().await.unwrap();
let mut res: Vec<String> = conn.lrange(&dlq_key, 0, -1).await.unwrap();
assert!(res.len() == asserted_len);
res.pop()
}
};

let (builder, _drop) = (
RedisBackend::builder(config).cluster(),
RedisStreamDrop(stream_name.clone()),
);

let (p, mut c) = builder.build_pair().await.unwrap();

// Test send to DLQ via `ack_deadline_ms` expiration:
p.send_serde_json(&payload1).await.unwrap();
p.send_serde_json(&payload2).await.unwrap();
p.send_serde_json(&payload3).await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.nack().await.unwrap();
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Expected messages should be on DLQ:
check_dlq(3).await;

// Redrive DLQ, receive from main queue, ack:
p.redrive_dlq().await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.ack().await.unwrap();
}
}
// A message without a `num_receives` field shouldn't
// cause issues:
#[tokio::test]
Expand Down
84 changes: 84 additions & 0 deletions omniqueue/tests/it/redis_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,90 @@ async fn test_deadletter_config() {
*/
}

#[tokio::test]
async fn test_deadletter_config_order() {
let payload1 = ExType { a: 1 };
let payload2 = ExType { a: 2 };
let payload3 = ExType { a: 3 };

let queue_key: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let dlq_key: String = std::iter::repeat_with(fastrand::alphanumeric)
.take(8)
.collect();

let max_receives = 1;

let config = RedisConfig {
dsn: ROOT_URL.to_owned(),
max_connections: 8,
reinsert_on_nack: false,
queue_key: queue_key.clone(),
delayed_queue_key: format!("{queue_key}::delayed"),
delayed_lock_key: format!("{queue_key}::delayed_lock"),
consumer_group: "test_cg".to_owned(),
consumer_name: "test_cn".to_owned(),
payload_key: "payload".to_owned(),
ack_deadline_ms: 1,
dlq_config: Some(DeadLetterQueueConfig {
queue_key: dlq_key.to_owned(),
max_receives,
}),
};

let check_dlq = |asserted_len: usize| {
let dlq_key = dlq_key.clone();
async move {
let client = Client::open(ROOT_URL).unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
let mut res: Vec<String> = conn.lrange(&dlq_key, 0, -1).await.unwrap();
assert!(res.len() == asserted_len);
res.pop()
}
};

let (builder, _drop) = (
RedisBackend::builder(config).use_redis_streams(false),
RedisKeyDrop(queue_key),
);

let (p, mut c) = builder.build_pair().await.unwrap();

// Test send to DLQ via `ack_deadline_ms` expiration:
p.send_serde_json(&payload1).await.unwrap();
p.send_serde_json(&payload2).await.unwrap();
p.send_serde_json(&payload3).await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.nack().await.unwrap();
}

// Give this some time because the reenqueuing can sleep for up to 500ms
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Expected messages should be on DLQ:
check_dlq(3).await;

// Redrive DLQ, receive from main queue, ack:
p.redrive_dlq().await.unwrap();

for payload in [&payload1, &payload2, &payload3] {
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(payload),
delivery.payload_serde_json().unwrap().as_ref()
);
delivery.ack().await.unwrap();
}
}

// A message without a `num_receives` field shouldn't
// cause issues:
#[tokio::test]
Expand Down

0 comments on commit ef7d82e

Please sign in to comment.