Skip to content

Commit

Permalink
tests: Add more run_once cases & refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
leo91000 committed Feb 4, 2024
1 parent 3d0af07 commit 57efb61
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 25 deletions.
49 changes: 47 additions & 2 deletions tests/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#![allow(dead_code)]

use chrono::{DateTime, Utc};
use graphile_worker::WorkerOptions;
use serde_json::Value;
use sqlx::postgres::PgConnectOptions;
use sqlx::FromRow;
use sqlx::PgPool;
use tokio::sync::Mutex;
use tokio::sync::OnceCell;
use tokio::task::LocalSet;

#[derive(FromRow, Debug)]
#[allow(dead_code)]
pub struct Job {
pub id: i64,
pub job_queue_id: Option<i32>,
Expand Down Expand Up @@ -76,7 +79,6 @@ impl TestDatabase {
.expect("Failed to get jobs")
}

#[allow(dead_code)]
pub async fn get_job_queues(&self) -> Vec<JobQueue> {
sqlx::query_as(
r#"
Expand All @@ -93,6 +95,24 @@ impl TestDatabase {
.await
.expect("Failed to get job queues")
}

pub async fn make_jobs_run_now(&self, task_identifier: &str) {
sqlx::query(
r#"
update graphile_worker._private_jobs
set run_at = now()
where task_id = (
select id
from graphile_worker._private_tasks
where identifier = $1
)
"#,
)
.bind(task_identifier)
.execute(&self.test_pool)
.await
.expect("Failed to update jobs");
}
}

async fn create_test_database() -> TestDatabase {
Expand Down Expand Up @@ -155,3 +175,28 @@ where
})
.await;
}

pub struct StaticCounter {
cell: OnceCell<Mutex<u32>>,
}
async fn init_job_count() -> Mutex<u32> {
Mutex::new(0)
}
impl StaticCounter {
pub const fn new() -> Self {
Self {
cell: OnceCell::const_new(),
}
}

pub async fn increment(&self) {
let cell = self.cell.get_or_init(init_job_count).await;
let mut count = cell.lock().await;
*count += 1;
}

pub async fn get(&self) -> u32 {
let cell = self.cell.get_or_init(init_job_count).await;
*cell.lock().await
}
}
161 changes: 138 additions & 23 deletions tests/run_task_list_once.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
use crate::helpers::StaticCounter;
use chrono::Utc;
use graphile_worker::JobSpec;
use serde_json::json;
use std::sync::OnceLock;
use tokio::sync::Mutex;

mod helpers;

#[tokio::test]
async fn it_should_run_jobs() {
static JOB2_CALL_COUNT: OnceLock<Mutex<u32>> = OnceLock::new();
static JOB3_CALL_COUNT: OnceLock<Mutex<u32>> = OnceLock::new();
static JOB2_CALL_COUNT: StaticCounter = StaticCounter::new();
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();

helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_raw_job("job3", |_, _: serde_json::Value| async move {
let mut job_count = JOB3_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await;
*job_count += 1;
JOB3_CALL_COUNT.increment().await;
Ok(()) as Result<(), ()>
})
.define_raw_job("job2", |_, _: serde_json::Value| async move {
let mut job_count = JOB2_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await;
*job_count += 1;
JOB2_CALL_COUNT.increment().await;
Ok(()) as Result<(), ()>
})
.init()
Expand Down Expand Up @@ -59,14 +56,8 @@ async fn it_should_run_jobs() {
assert_eq!(job_queue.locked_by, None);

worker.run_once().await.expect("Failed to run worker");
assert_eq!(
*JOB3_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await,
1
);
assert_eq!(
*JOB2_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await,
0
);
assert_eq!(JOB3_CALL_COUNT.get().await, 1);
assert_eq!(JOB2_CALL_COUNT.get().await, 0);
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 0);
})
Expand All @@ -75,14 +66,13 @@ async fn it_should_run_jobs() {

#[tokio::test]
async fn it_should_schedule_errors_for_retry() {
static JOB3_CALL_COUNT: OnceLock<Mutex<u32>> = OnceLock::new();
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();

helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_raw_job("job3", |_, _: serde_json::Value| async move {
let mut job_count = JOB3_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await;
*job_count += 1;
JOB3_CALL_COUNT.increment().await;
Err("fail".to_string()) as Result<(), String>
})
.init()
Expand Down Expand Up @@ -125,10 +115,7 @@ async fn it_should_schedule_errors_for_retry() {
}

worker.run_once().await.expect("Failed to run worker");
assert_eq!(
*JOB3_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await,
1
);
assert_eq!(JOB3_CALL_COUNT.get().await, 1);

{
let jobs = test_db.get_jobs().await;
Expand Down Expand Up @@ -156,3 +143,131 @@ async fn it_should_schedule_errors_for_retry() {
})
.await;
}

#[tokio::test]
async fn it_should_retry_jobs() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();

helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_raw_job("job3", |_, _: serde_json::Value| async move {
JOB3_CALL_COUNT.increment().await;
Err("fail 2".to_string()) as Result<(), String>
})
.init()
.await
.expect("Failed to create worker");

{
let utils = worker.create_utils();

utils
.add_raw_job(
"job3",
json!({ "a": 1 }),
Some(JobSpec {
queue_name: Some("myqueue".to_string()),
..Default::default()
}),
)
.await
.expect("Failed to add job");
}

// Run the job (it will error)
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);

// Should do nothing the second time, because it's queued for the future (assuming we run this fast enough afterwards!)
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);

// Tell the job to run now
test_db.make_jobs_run_now("job3").await;
let start = Utc::now();
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 2);

// Should be rejected again
{
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1);
let job = &jobs[0];
assert_eq!(job.task_identifier, "job3");
assert_eq!(job.attempts, 2);
assert_eq!(job.max_attempts, 25);
assert_eq!(
job.last_error,
Some("TaskError(\"\\\"fail 2\\\"\")".to_string())
);
// It's the second attempt, so delay is exp(2) ~= 7.389 seconds
assert!(job.run_at > start + chrono::Duration::milliseconds(7389));
assert!(job.run_at < Utc::now() + chrono::Duration::milliseconds(7389));

let job_queues = test_db.get_job_queues().await;
assert_eq!(job_queues.len(), 1);
let q = &job_queues[0];
assert_eq!(q.queue_name, "myqueue");
assert_eq!(q.job_count, 1);
assert_eq!(q.locked_at, None);
assert_eq!(q.locked_by, None);
}
})
.await;
}

#[tokio::test]
async fn it_should_supports_future_scheduled_jobs() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();

helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_raw_job("job3", |_, _: serde_json::Value| async move {
JOB3_CALL_COUNT.increment().await;
Ok(()) as Result<(), ()>
})
.init()
.await
.expect("Failed to create worker");

{
let utils = worker.create_utils();

utils
.add_raw_job(
"job3",
json!({ "a": 1 }),
Some(JobSpec {
run_at: Some(Utc::now() + chrono::Duration::seconds(3)),
..Default::default()
}),
)
.await
.expect("Failed to add job");
}

// Run all the jobs now (none should run)
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 0);

// Still not ready
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 0);

// Make the job ready
test_db.make_jobs_run_now("job3").await;
worker.run_once().await.expect("Failed to run worker");
assert_eq!(JOB3_CALL_COUNT.get().await, 1);

// It should be successful
{
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 0);
let job_queues = test_db.get_job_queues().await;
assert_eq!(job_queues.len(), 0);
}
})
.await;
}

0 comments on commit 57efb61

Please sign in to comment.