From 57efb61d8a7e993f97deab3d0a973ba85d03d810 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9o=20Coletta?= Date: Sun, 4 Feb 2024 15:27:48 +0100 Subject: [PATCH] tests: Add more run_once cases & refactor --- tests/helpers.rs | 49 ++++++++++- tests/run_task_list_once.rs | 161 ++++++++++++++++++++++++++++++------ 2 files changed, 185 insertions(+), 25 deletions(-) diff --git a/tests/helpers.rs b/tests/helpers.rs index ec7d5cc..b54fdee 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -1,13 +1,16 @@ +#![allow(dead_code)] + use chrono::{DateTime, Utc}; use graphile_worker::WorkerOptions; use serde_json::Value; use sqlx::postgres::PgConnectOptions; use sqlx::FromRow; use sqlx::PgPool; +use tokio::sync::Mutex; +use tokio::sync::OnceCell; use tokio::task::LocalSet; #[derive(FromRow, Debug)] -#[allow(dead_code)] pub struct Job { pub id: i64, pub job_queue_id: Option, @@ -76,7 +79,6 @@ impl TestDatabase { .expect("Failed to get jobs") } - #[allow(dead_code)] pub async fn get_job_queues(&self) -> Vec { sqlx::query_as( r#" @@ -93,6 +95,24 @@ impl TestDatabase { .await .expect("Failed to get job queues") } + + pub async fn make_jobs_run_now(&self, task_identifier: &str) { + sqlx::query( + r#" + update graphile_worker._private_jobs + set run_at = now() + where task_id = ( + select id + from graphile_worker._private_tasks + where identifier = $1 + ) + "#, + ) + .bind(task_identifier) + .execute(&self.test_pool) + .await + .expect("Failed to update jobs"); + } } async fn create_test_database() -> TestDatabase { @@ -155,3 +175,28 @@ where }) .await; } + +pub struct StaticCounter { + cell: OnceCell>, +} +async fn init_job_count() -> Mutex { + Mutex::new(0) +} +impl StaticCounter { + pub const fn new() -> Self { + Self { + cell: OnceCell::const_new(), + } + } + + pub async fn increment(&self) { + let cell = self.cell.get_or_init(init_job_count).await; + let mut count = cell.lock().await; + *count += 1; + } + + pub async fn get(&self) -> u32 { + let cell = self.cell.get_or_init(init_job_count).await; + *cell.lock().await + } +} diff --git a/tests/run_task_list_once.rs b/tests/run_task_list_once.rs index 1f7a8b4..b8007ba 100644 --- a/tests/run_task_list_once.rs +++ b/tests/run_task_list_once.rs @@ -1,27 +1,24 @@ +use crate::helpers::StaticCounter; use chrono::Utc; use graphile_worker::JobSpec; use serde_json::json; -use std::sync::OnceLock; -use tokio::sync::Mutex; mod helpers; #[tokio::test] async fn it_should_run_jobs() { - static JOB2_CALL_COUNT: OnceLock> = OnceLock::new(); - static JOB3_CALL_COUNT: OnceLock> = OnceLock::new(); + static JOB2_CALL_COUNT: StaticCounter = StaticCounter::new(); + static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new(); helpers::with_test_db(|test_db| async move { let worker = test_db .create_worker_options() .define_raw_job("job3", |_, _: serde_json::Value| async move { - let mut job_count = JOB3_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await; - *job_count += 1; + JOB3_CALL_COUNT.increment().await; Ok(()) as Result<(), ()> }) .define_raw_job("job2", |_, _: serde_json::Value| async move { - let mut job_count = JOB2_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await; - *job_count += 1; + JOB2_CALL_COUNT.increment().await; Ok(()) as Result<(), ()> }) .init() @@ -59,14 +56,8 @@ async fn it_should_run_jobs() { assert_eq!(job_queue.locked_by, None); worker.run_once().await.expect("Failed to run worker"); - assert_eq!( - *JOB3_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await, - 1 - ); - assert_eq!( - *JOB2_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await, - 0 - ); + assert_eq!(JOB3_CALL_COUNT.get().await, 1); + assert_eq!(JOB2_CALL_COUNT.get().await, 0); let jobs = test_db.get_jobs().await; assert_eq!(jobs.len(), 0); }) @@ -75,14 +66,13 @@ async fn it_should_run_jobs() { #[tokio::test] async fn it_should_schedule_errors_for_retry() { - static JOB3_CALL_COUNT: OnceLock> = OnceLock::new(); + static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new(); helpers::with_test_db(|test_db| async move { let worker = test_db .create_worker_options() .define_raw_job("job3", |_, _: serde_json::Value| async move { - let mut job_count = JOB3_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await; - *job_count += 1; + JOB3_CALL_COUNT.increment().await; Err("fail".to_string()) as Result<(), String> }) .init() @@ -125,10 +115,7 @@ async fn it_should_schedule_errors_for_retry() { } worker.run_once().await.expect("Failed to run worker"); - assert_eq!( - *JOB3_CALL_COUNT.get_or_init(|| Mutex::new(0)).lock().await, - 1 - ); + assert_eq!(JOB3_CALL_COUNT.get().await, 1); { let jobs = test_db.get_jobs().await; @@ -156,3 +143,131 @@ async fn it_should_schedule_errors_for_retry() { }) .await; } + +#[tokio::test] +async fn it_should_retry_jobs() { + static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new(); + + helpers::with_test_db(|test_db| async move { + let worker = test_db + .create_worker_options() + .define_raw_job("job3", |_, _: serde_json::Value| async move { + JOB3_CALL_COUNT.increment().await; + Err("fail 2".to_string()) as Result<(), String> + }) + .init() + .await + .expect("Failed to create worker"); + + { + let utils = worker.create_utils(); + + utils + .add_raw_job( + "job3", + json!({ "a": 1 }), + Some(JobSpec { + queue_name: Some("myqueue".to_string()), + ..Default::default() + }), + ) + .await + .expect("Failed to add job"); + } + + // Run the job (it will error) + worker.run_once().await.expect("Failed to run worker"); + assert_eq!(JOB3_CALL_COUNT.get().await, 1); + + // Should do nothing the second time, because it's queued for the future (assuming we run this fast enough afterwards!) + worker.run_once().await.expect("Failed to run worker"); + assert_eq!(JOB3_CALL_COUNT.get().await, 1); + + // Tell the job to run now + test_db.make_jobs_run_now("job3").await; + let start = Utc::now(); + worker.run_once().await.expect("Failed to run worker"); + assert_eq!(JOB3_CALL_COUNT.get().await, 2); + + // Should be rejected again + { + let jobs = test_db.get_jobs().await; + assert_eq!(jobs.len(), 1); + let job = &jobs[0]; + assert_eq!(job.task_identifier, "job3"); + assert_eq!(job.attempts, 2); + assert_eq!(job.max_attempts, 25); + assert_eq!( + job.last_error, + Some("TaskError(\"\\\"fail 2\\\"\")".to_string()) + ); + // It's the second attempt, so delay is exp(2) ~= 7.389 seconds + assert!(job.run_at > start + chrono::Duration::milliseconds(7389)); + assert!(job.run_at < Utc::now() + chrono::Duration::milliseconds(7389)); + + let job_queues = test_db.get_job_queues().await; + assert_eq!(job_queues.len(), 1); + let q = &job_queues[0]; + assert_eq!(q.queue_name, "myqueue"); + assert_eq!(q.job_count, 1); + assert_eq!(q.locked_at, None); + assert_eq!(q.locked_by, None); + } + }) + .await; +} + +#[tokio::test] +async fn it_should_supports_future_scheduled_jobs() { + static JOB3_CALL_COUNT: StaticCounter = StaticCounter::new(); + + helpers::with_test_db(|test_db| async move { + let worker = test_db + .create_worker_options() + .define_raw_job("job3", |_, _: serde_json::Value| async move { + JOB3_CALL_COUNT.increment().await; + Ok(()) as Result<(), ()> + }) + .init() + .await + .expect("Failed to create worker"); + + { + let utils = worker.create_utils(); + + utils + .add_raw_job( + "job3", + json!({ "a": 1 }), + Some(JobSpec { + run_at: Some(Utc::now() + chrono::Duration::seconds(3)), + ..Default::default() + }), + ) + .await + .expect("Failed to add job"); + } + + // Run all the jobs now (none should run) + worker.run_once().await.expect("Failed to run worker"); + assert_eq!(JOB3_CALL_COUNT.get().await, 0); + + // Still not ready + worker.run_once().await.expect("Failed to run worker"); + assert_eq!(JOB3_CALL_COUNT.get().await, 0); + + // Make the job ready + test_db.make_jobs_run_now("job3").await; + worker.run_once().await.expect("Failed to run worker"); + assert_eq!(JOB3_CALL_COUNT.get().await, 1); + + // It should be successful + { + let jobs = test_db.get_jobs().await; + assert_eq!(jobs.len(), 0); + let job_queues = test_db.get_job_queues().await; + assert_eq!(job_queues.len(), 0); + } + }) + .await; +}