Skip to content

Commit

Permalink
Update tests to validate DLQ redrive
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymell committed Sep 6, 2024
1 parent 98bc138 commit d0b518b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 36 deletions.
35 changes: 24 additions & 11 deletions omniqueue/tests/it/redis.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{
num::NonZeroUsize,
time::{Duration, Instant},
};

use omniqueue::backends::{
redis::{DeadLetterQueueConfig, RedisBackendBuilder},
RedisBackend, RedisConfig,
use std::time::{Duration, Instant};

use omniqueue::{
backends::{
redis::{DeadLetterQueueConfig, RedisBackendBuilder},
RedisBackend, RedisConfig,
},
Delivery,
};
use redis::{AsyncCommands, Client, Commands};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -343,7 +343,7 @@ async fn test_deadletter_config() {
async move {
let client = Client::open(ROOT_URL).unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
let mut res: Vec<String> = conn.lpop(&dlq_key, NonZeroUsize::new(100)).await.unwrap();
let mut res: Vec<String> = conn.lrange(&dlq_key, 0, 0).await.unwrap();
assert!(res.len() == asserted_len);
res.pop()
}
Expand Down Expand Up @@ -386,12 +386,16 @@ async fn test_deadletter_config() {
.await
.unwrap();

for _ in 0..max_receives {
let delivery = c.receive().await.unwrap();
let assert_delivery = |delivery: &Delivery| {
assert_eq!(
Some(&payload),
delivery.payload_serde_json().unwrap().as_ref()
);
};

for _ in 0..max_receives {
let delivery = c.receive().await.unwrap();
assert_delivery(&delivery);
delivery.nack().await.unwrap();
}

Expand All @@ -406,6 +410,15 @@ async fn test_deadletter_config() {
// Expected message should be on DLQ:
let res = check_dlq(1).await;
assert_eq!(payload_str, res.unwrap());

// Redrive DLQ, receive from main queue, ack:
p.redrive_dlq().await.unwrap();

let delivery = c.receive().await.unwrap();
assert_delivery(&delivery);
delivery.ack().await.unwrap();

check_dlq(0).await;
}

// A message without a `num_receives` field shouldn't
Expand Down
31 changes: 22 additions & 9 deletions omniqueue/tests/it/redis_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{
num::NonZeroUsize,
time::{Duration, Instant},
};
use std::time::{Duration, Instant};

use omniqueue::backends::{
redis::DeadLetterQueueConfig, RedisBackend, RedisClusterBackendBuilder, RedisConfig,
use omniqueue::{
backends::{
redis::DeadLetterQueueConfig, RedisBackend, RedisClusterBackendBuilder, RedisConfig,
},
Delivery,
};
use redis::{cluster::ClusterClient, AsyncCommands, Commands};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -345,7 +345,7 @@ async fn test_deadletter_config() {
let client = client.clone();
async move {
let mut conn = client.get_async_connection().await.unwrap();
let mut res: Vec<String> = conn.lpop(&dlq_key, NonZeroUsize::new(100)).await.unwrap();
let mut res: Vec<String> = conn.lrange(&dlq_key, 0, 0).await.unwrap();
assert!(res.len() == asserted_len);
res.pop()
}
Expand Down Expand Up @@ -388,12 +388,16 @@ async fn test_deadletter_config() {
.await
.unwrap();

for _ in 0..max_receives {
let delivery = c.receive().await.unwrap();
let assert_delivery = |delivery: &Delivery| {
assert_eq!(
Some(&payload),
delivery.payload_serde_json().unwrap().as_ref()
);
};

for _ in 0..max_receives {
let delivery = c.receive().await.unwrap();
assert_delivery(&delivery);
delivery.nack().await.unwrap();
}

Expand All @@ -408,6 +412,15 @@ async fn test_deadletter_config() {
// Expected message should be on DLQ:
let res = check_dlq(1).await;
assert_eq!(payload_str, res.unwrap());

// Redrive DLQ, receive from main queue, ack:
p.redrive_dlq().await.unwrap();

let delivery = c.receive().await.unwrap();
assert_delivery(&delivery);
delivery.ack().await.unwrap();

check_dlq(0).await;
}

// A message without a `num_receives` field shouldn't
Expand Down
42 changes: 26 additions & 16 deletions omniqueue/tests/it/redis_fallback.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use core::str;
use std::{
num::NonZeroUsize,
time::{Duration, Instant},
};

use omniqueue::backends::{
redis::{DeadLetterQueueConfig, RedisBackendBuilder},
RedisBackend, RedisConfig,
use std::time::{Duration, Instant};

use omniqueue::{
backends::{
redis::{DeadLetterQueueConfig, RedisBackendBuilder},
RedisBackend, RedisConfig,
},
Delivery,
};
use redis::{AsyncCommands, Client, Commands};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -335,7 +335,7 @@ async fn test_deadletter_config() {
async move {
let client = Client::open(ROOT_URL).unwrap();
let mut conn = client.get_multiplexed_async_connection().await.unwrap();
let mut res: Vec<String> = conn.lpop(&dlq_key, NonZeroUsize::new(100)).await.unwrap();
let mut res: Vec<String> = conn.lrange(&dlq_key, 0, 0).await.unwrap();
assert!(res.len() == asserted_len);
res.pop()
}
Expand All @@ -351,13 +351,17 @@ async fn test_deadletter_config() {
// Test send to DLQ via `ack_deadline_ms` expiration:
p.send_serde_json(&payload).await.unwrap();

for _ in 0..max_receives {
check_dlq(0).await;
let delivery = c.receive().await.unwrap();
let assert_delivery = |delivery: &Delivery| {
assert_eq!(
Some(&payload),
delivery.payload_serde_json().unwrap().as_ref()
);
};

for _ in 0..max_receives {
check_dlq(0).await;
let delivery = c.receive().await.unwrap();
assert_delivery(&delivery);
}

// Give this some time because the reenqueuing can sleep for up to 500ms
Expand All @@ -372,6 +376,15 @@ async fn test_deadletter_config() {
let res = check_dlq(1).await;
assert_eq!(serde_json::to_string(&payload).unwrap(), res.unwrap());

// Redrive DLQ, receive from main queue, ack:
p.redrive_dlq().await.unwrap();

let delivery = c.receive().await.unwrap();
assert_delivery(&delivery);
delivery.ack().await.unwrap();

check_dlq(0).await;

/* This portion of test is flaky due to https://github.com/svix/omniqueue-rs/issues/102
// Test send to DLQ via explicit `nack`ing:
Expand All @@ -380,10 +393,7 @@ async fn test_deadletter_config() {
for _ in 0..max_receives {
check_dlq(0).await;
let delivery = c.receive().await.unwrap();
assert_eq!(
Some(&payload),
delivery.payload_serde_json().unwrap().as_ref()
);
assert_delivery(&delivery);
delivery.nack().await.unwrap();
}
Expand Down

0 comments on commit d0b518b

Please sign in to comment.