Skip to content

Commit

Permalink
tests: Add more complex test cases for run_once
Browse files Browse the repository at this point in the history
  • Loading branch information
leo91000 committed Feb 4, 2024
1 parent 3f62ac2 commit 4a6f4a9
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 3 deletions.
3 changes: 2 additions & 1 deletion tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,11 @@ impl StaticCounter {
}
}

pub async fn increment(&self) {
pub async fn increment(&self) -> u32 {
let cell = self.cell.get_or_init(init_job_count).await;
let mut count = cell.lock().await;
*count += 1;
*count
}

pub async fn get(&self) -> u32 {
Expand Down
225 changes: 223 additions & 2 deletions tests/run_once.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::ops::Add;
use std::{ops::Add, rc::Rc};

use crate::helpers::StaticCounter;
use chrono::{Timelike, Utc};
use graphile_worker::JobSpec;
use graphile_worker::{JobSpec, WorkerContext};
use serde_json::json;
use tokio::{
sync::{oneshot, Mutex, OnceCell},
task::spawn_local,
};

mod helpers;

Expand Down Expand Up @@ -405,3 +409,220 @@ async fn it_schedules_a_new_job_if_existing_is_completed() {
})
.await;
}

#[tokio::test]
async fn schedules_a_new_job_if_existing_is_being_processed() {
static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new();
static RX1: OnceCell<Mutex<Option<oneshot::Receiver<()>>>> = OnceCell::const_new();
static RX2: OnceCell<Mutex<Option<oneshot::Receiver<()>>>> = OnceCell::const_new();

helpers::with_test_db(|test_db| async move {
let (tx1, rx1) = oneshot::channel::<()>();
let (tx2, rx2) = oneshot::channel::<()>();
RX1.set(Mutex::new(Some(rx1))).unwrap();
RX2.set(Mutex::new(Some(rx2))).unwrap();

let worker = test_db
.create_worker_options()
.define_raw_job("job3", move |_, _: serde_json::Value| async move {
let n = JOB3_CALL_COUNT.increment().await;
match n {
1 => {
let mut rx_opt = RX1.get().unwrap().lock().await;
if let Some(rx) = rx_opt.take() {
rx.await.unwrap();
}
}
2 => {
let mut rx_opt = RX2.get().unwrap().lock().await;
if let Some(rx) = rx_opt.take() {
rx.await.unwrap();
}
}
_ => unreachable!("Job3 should only be called twice"),
};
Ok(()) as Result<(), ()>
})
.init()
.await
.expect("Failed to create worker");

// Schedule the first job
worker
.create_utils()
.add_raw_job(
"job3",
json!({ "a": "first" }),
Some(JobSpec {
job_key: Some("abc".into()),
..Default::default()
}),
)
.await
.expect("Failed to add first job");

let worker = Rc::new(worker);

tracing::info!("Starting worker");
// Run the first job
let run_once_1 = spawn_local({
let worker = worker.clone();
async move {
worker.run_once().await.expect("Failed to run worker");
}
});

tracing::info!("Waiting for first job to be picked up");
// Ensure the first job is picked up by waiting for a small delay
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
assert_eq!(JOB3_CALL_COUNT.get().await, 1);

// Schedule a new job with the same key while the first one is being processed
worker
.create_utils()
.add_raw_job(
"job3",
json!({ "a": "second" }),
Some(JobSpec {
job_key: Some("abc".into()),
..Default::default()
}),
)
.await
.expect("Failed to add second job");

// Complete the first job
tx1.send(()).unwrap();

run_once_1.await.expect("Failed to run worker");

// Run the worker again to pick up the second job
let run_once_2 = spawn_local({
let worker = worker.clone();
async move {
worker.run_once().await.expect("Failed to run worker");
}
});

// Complete the second job
tx2.send(()).unwrap();

run_once_2.await.expect("Failed to run worker");

// Ensure both jobs have been processed
assert_eq!(JOB3_CALL_COUNT.get().await, 2);
})
.await;
}

#[tokio::test]
async fn schedules_a_new_job_if_the_existing_is_pending_retry() {
static JOB5_CALL_COUNT: StaticCounter = StaticCounter::new();

#[derive(serde::Deserialize, serde::Serialize)]
struct Job5Args {
succeed: bool,
}

#[graphile_worker::task]
async fn job5(args: Job5Args, _: WorkerContext) -> Result<(), String> {
JOB5_CALL_COUNT.increment().await;
if !args.succeed {
return Err("fail".to_string());
}

Ok(())
}

helpers::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_job(job5)
.init()
.await
.expect("Failed to create worker");

// Schedule a job that is initially set to fail
worker
.create_utils()
.add_job::<job5>(
Job5Args { succeed: false },
Some(JobSpec {
job_key: Some("abc".into()),
..Default::default()
}),
)
.await
.expect("Failed to add job");

// Run the worker to process the job, which should fail
worker.run_once().await.expect("Failed to run worker");
assert_eq!(
JOB5_CALL_COUNT.get().await,
1,
"job5 should have been called once"
);

// Check that the job is scheduled for retry
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1, "There should be one job scheduled for retry");
let job = &jobs[0];
assert_eq!(job.attempts, 1, "The job should have one failed attempt");
let last_error = job
.last_error
.as_ref()
.expect("The job should have a last error");
assert!(
last_error.contains("fail"),
"The job's last error should contain 'fail'"
);

// No job should run now as it is scheduled for the future
worker.run_once().await.expect("Failed to run worker");
assert_eq!(
JOB5_CALL_COUNT.get().await,
1,
"job5 should still be called only once"
);

// Update the job to succeed
worker
.create_utils()
.add_job::<job5>(
Job5Args { succeed: true },
Some(JobSpec {
job_key: Some("abc".into()),
run_at: Some(Utc::now()),
..Default::default()
}),
)
.await
.expect("Failed to update job");
//
// Assert that the job was updated and not created as a new one
let updated_jobs = test_db.get_jobs().await;
assert_eq!(
updated_jobs.len(),
1,
"There should still be only one job in the database"
);
let updated_job = &updated_jobs[0];
assert_eq!(
updated_job.attempts, 0,
"The job's attempts should be reset to 0"
);
assert!(
updated_job.last_error.is_none(),
"The job's last error should be cleared"
);

// Run the worker again, and the job should now succeed
worker.run_once().await.expect("Failed to run worker");
assert_eq!(
JOB5_CALL_COUNT.get().await,
2,
"job5 should have been called twice"
);
})
.await;
}

0 comments on commit 4a6f4a9

Please sign in to comment.