diff --git a/tests/helpers.rs b/tests/helpers.rs index 8a64ec1..02b054a 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -192,10 +192,11 @@ impl StaticCounter { } } - pub async fn increment(&self) { + pub async fn increment(&self) -> u32 { let cell = self.cell.get_or_init(init_job_count).await; let mut count = cell.lock().await; *count += 1; + *count } pub async fn get(&self) -> u32 { diff --git a/tests/run_once.rs b/tests/run_once.rs index e8f4bd3..dfa59df 100644 --- a/tests/run_once.rs +++ b/tests/run_once.rs @@ -1,9 +1,13 @@ -use std::ops::Add; +use std::{ops::Add, rc::Rc}; use crate::helpers::StaticCounter; use chrono::{Timelike, Utc}; -use graphile_worker::JobSpec; +use graphile_worker::{JobSpec, WorkerContext}; use serde_json::json; +use tokio::{ + sync::{oneshot, Mutex, OnceCell}, + task::spawn_local, +}; mod helpers; @@ -405,3 +409,220 @@ async fn it_schedules_a_new_job_if_existing_is_completed() { }) .await; } + +#[tokio::test] +async fn schedules_a_new_job_if_existing_is_being_processed() { + static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new(); + static RX1: OnceCell>>> = OnceCell::const_new(); + static RX2: OnceCell>>> = OnceCell::const_new(); + + helpers::with_test_db(|test_db| async move { + let (tx1, rx1) = oneshot::channel::<()>(); + let (tx2, rx2) = oneshot::channel::<()>(); + RX1.set(Mutex::new(Some(rx1))).unwrap(); + RX2.set(Mutex::new(Some(rx2))).unwrap(); + + let worker = test_db + .create_worker_options() + .define_raw_job("job3", move |_, _: serde_json::Value| async move { + let n = JOB3_CALL_COUNT.increment().await; + match n { + 1 => { + let mut rx_opt = RX1.get().unwrap().lock().await; + if let Some(rx) = rx_opt.take() { + rx.await.unwrap(); + } + } + 2 => { + let mut rx_opt = RX2.get().unwrap().lock().await; + if let Some(rx) = rx_opt.take() { + rx.await.unwrap(); + } + } + _ => unreachable!("Job3 should only be called twice"), + }; + Ok(()) as Result<(), ()> + }) + .init() + .await + .expect("Failed to create worker"); + + // Schedule the first job + worker + .create_utils() + .add_raw_job( + "job3", + json!({ "a": "first" }), + Some(JobSpec { + job_key: Some("abc".into()), + ..Default::default() + }), + ) + .await + .expect("Failed to add first job"); + + let worker = Rc::new(worker); + + tracing::info!("Starting worker"); + // Run the first job + let run_once_1 = spawn_local({ + let worker = worker.clone(); + async move { + worker.run_once().await.expect("Failed to run worker"); + } + }); + + tracing::info!("Waiting for first job to be picked up"); + // Ensure the first job is picked up by waiting for a small delay + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + assert_eq!(JOB3_CALL_COUNT.get().await, 1); + + // Schedule a new job with the same key while the first one is being processed + worker + .create_utils() + .add_raw_job( + "job3", + json!({ "a": "second" }), + Some(JobSpec { + job_key: Some("abc".into()), + ..Default::default() + }), + ) + .await + .expect("Failed to add second job"); + + // Complete the first job + tx1.send(()).unwrap(); + + run_once_1.await.expect("Failed to run worker"); + + // Run the worker again to pick up the second job + let run_once_2 = spawn_local({ + let worker = worker.clone(); + async move { + worker.run_once().await.expect("Failed to run worker"); + } + }); + + // Complete the second job + tx2.send(()).unwrap(); + + run_once_2.await.expect("Failed to run worker"); + + // Ensure both jobs have been processed + assert_eq!(JOB3_CALL_COUNT.get().await, 2); + }) + .await; +} + +#[tokio::test] +async fn schedules_a_new_job_if_the_existing_is_pending_retry() { + static JOB5_CALL_COUNT: StaticCounter = StaticCounter::new(); + + #[derive(serde::Deserialize, serde::Serialize)] + struct Job5Args { + succeed: bool, + } + + #[graphile_worker::task] + async fn job5(args: Job5Args, _: WorkerContext) -> Result<(), String> { + JOB5_CALL_COUNT.increment().await; + if !args.succeed { + return Err("fail".to_string()); + } + + Ok(()) + } + + helpers::with_test_db(|test_db| async move { + let worker = test_db + .create_worker_options() + .define_job(job5) + .init() + .await + .expect("Failed to create worker"); + + // Schedule a job that is initially set to fail + worker + .create_utils() + .add_job::( + Job5Args { succeed: false }, + Some(JobSpec { + job_key: Some("abc".into()), + ..Default::default() + }), + ) + .await + .expect("Failed to add job"); + + // Run the worker to process the job, which should fail + worker.run_once().await.expect("Failed to run worker"); + assert_eq!( + JOB5_CALL_COUNT.get().await, + 1, + "job5 should have been called once" + ); + + // Check that the job is scheduled for retry + let jobs = test_db.get_jobs().await; + assert_eq!(jobs.len(), 1, "There should be one job scheduled for retry"); + let job = &jobs[0]; + assert_eq!(job.attempts, 1, "The job should have one failed attempt"); + let last_error = job + .last_error + .as_ref() + .expect("The job should have a last error"); + assert!( + last_error.contains("fail"), + "The job's last error should contain 'fail'" + ); + + // No job should run now as it is scheduled for the future + worker.run_once().await.expect("Failed to run worker"); + assert_eq!( + JOB5_CALL_COUNT.get().await, + 1, + "job5 should still be called only once" + ); + + // Update the job to succeed + worker + .create_utils() + .add_job::( + Job5Args { succeed: true }, + Some(JobSpec { + job_key: Some("abc".into()), + run_at: Some(Utc::now()), + ..Default::default() + }), + ) + .await + .expect("Failed to update job"); + // + // Assert that the job was updated and not created as a new one + let updated_jobs = test_db.get_jobs().await; + assert_eq!( + updated_jobs.len(), + 1, + "There should still be only one job in the database" + ); + let updated_job = &updated_jobs[0]; + assert_eq!( + updated_job.attempts, 0, + "The job's attempts should be reset to 0" + ); + assert!( + updated_job.last_error.is_none(), + "The job's last error should be cleared" + ); + + // Run the worker again, and the job should now succeed + worker.run_once().await.expect("Failed to run worker"); + assert_eq!( + JOB5_CALL_COUNT.get().await, + 2, + "job5 should have been called twice" + ); + }) + .await; +}