diff --git a/.github/workflows/loco-rs-ci.yml b/.github/workflows/loco-rs-ci.yml index 5129cfc2..c055fe22 100644 --- a/.github/workflows/loco-rs-ci.yml +++ b/.github/workflows/loco-rs-ci.yml @@ -44,6 +44,21 @@ jobs: permissions: contents: read + services: + postgres: + image: postgres + env: + POSTGRES_DB: postgres_test + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + ports: + - "5432:5432" + # Set health checks to wait until postgres has started + options: --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + steps: - name: Checkout the code uses: actions/checkout@v4 @@ -58,3 +73,5 @@ jobs: with: command: test args: --all-features --workspace --exclude loco-gen --exclude loco + env: + DATABASE_URL: postgres://postgres:postgres@localhost:5432/postgres_test diff --git a/Cargo.toml b/Cargo.toml index 0ee1de14..be586f6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,8 @@ cache_inmem = ["dep:moka"] bg_redis = ["dep:rusty-sidekiq", "dep:bb8"] bg_pg = ["dep:sqlx", "dep:ulid"] bg_sqlt = ["dep:sqlx", "dep:ulid"] +## Testing feature flags +integration_test = [] [dependencies] loco-gen = { version = "0.13.2", path = "./loco-gen" } @@ -197,3 +199,10 @@ tree-fs = { version = "0.2.1" } reqwest = { version = "0.12.7" } serial_test = "3.1.1" tower = { workspace = true, features = ["util"] } +sqlx = { version = "0.8.2", default-features = false, features = [ + "macros", + "json", + "postgres", + "chrono", + "sqlite", +] } diff --git a/README.ru.md b/README.ru.md index a4e8aae1..3a68237b 100644 --- a/README.ru.md +++ b/README.ru.md @@ -6,7 +6,7 @@

-🚂 *Loco* - Rust on Rails. +🚂 Loco is Rust on Rails.

@@ -45,7 +45,7 @@ ```sh cargo install loco -cargo install sea-orm-cli # Для работы с базами данных +cargo install sea-orm-cli # Only when DB is needed ``` @@ -56,13 +56,18 @@ cargo install sea-orm-cli # Для работы с базами данных ```sh ❯ loco new ✔ ❯ App name? · myapp -✔ ❯ What would you like to build? · SaaS app (with DB and user auth) +✔ ❯ What would you like to build? · Saas App with client side rendering ✔ ❯ Select a DB Provider · Sqlite ✔ ❯ Select your background worker type · Async (in-process tokio async tasks) -✔ ❯ Select an asset serving configuration · Client (configures assets for frontend serving) 🚂 Loco app generated successfully in: myapp/ + +- assets: You've selected `clientside` for your asset serving configuration. + +Next step, build your frontend: + $ cd frontend/ + $ npm install && npm run build ``` diff --git a/docs-site/content/docs/processing/task.md b/docs-site/content/docs/processing/task.md index 114d9f58..51d88c54 100644 --- a/docs-site/content/docs/processing/task.md +++ b/docs-site/content/docs/processing/task.md @@ -32,7 +32,17 @@ Generate the task: ```sh -cd ./examples/demo && cargo loco generate task --help +Generate a Task based on the given name + +Usage: demo_app-cli generate task [OPTIONS] + +Arguments: + Name of the thing to generate + +Options: + -e, --environment Specify the environment [default: development] + -h, --help Print help + -V, --version Print version ``` diff --git a/docs-site/content/docs/processing/workers.md b/docs-site/content/docs/processing/workers.md index 5809ed43..49ecf576 100644 --- a/docs-site/content/docs/processing/workers.md +++ b/docs-site/content/docs/processing/workers.md @@ -187,6 +187,52 @@ workers: mode: BackgroundQueue ``` +## Manage a Workers From UI +You can manage the jobs queue with the [Loco admin job project](https://github.com/loco-rs/admin-jobs). +![](https://github.com/loco-rs/admin-jobs/raw/main/media/screenshot.png) + +### Managing Job Queues via CLI + +The job queue management feature provides a powerful and flexible way to handle the lifecycle of jobs in your application. It allows you to cancel, clean up, remove outdated jobs, export job details, and import jobs, ensuring efficient and organized job processing. + +## Features Overview + +- **Cancel Jobs** + Provides the ability to cancel specific jobs by name, updating their status to `cancelled`. This is useful for stopping jobs that are no longer needed, relevant, or if you want to prevent them from being processed when a bug is detected. +- **Clean Up Jobs** + Enables the removal of jobs that have already been completed or cancelled. This helps maintain a clean and efficient job queue by eliminating unnecessary entries. +- **Purge Outdated Jobs** + Allows you to delete jobs based on their age, measured in days. This is particularly useful for maintaining a lean job queue by removing older, irrelevant jobs. + **Note**: You can use the `--dump` option to export job details to a file, manually modify the job parameters in the exported file, and then use the `import` feature to reintroduce the updated jobs into the system. +- **Export Job Details** + Supports exporting the details of all jobs to a specified location in file format. This feature is valuable for backups, audits, or further analysis. +- **Import Jobs** + Facilitates importing jobs from external files, making it easy to restore or add new jobs to the system. This ensures seamless integration of external job data into your application's workflow. + +To access the job management commands, use the following CLI structure: + +```sh +Managing jobs queue + +Usage: demo_app-cli jobs [OPTIONS] + +Commands: + cancel Cancels jobs with the specified names, setting their status to `cancelled` + tidy Deletes jobs that are either completed or cancelled + purge Deletes jobs based on their age in days + dump Saves the details of all jobs to files in the specified folder + import Imports jobs from a file + help Print this message or the help of the given subcommand(s) + +Options: + -e, --environment Specify the environment [default: development] + -h, --help Print help + -V, --version Print version +``` + + + + ## Testing a Worker You can easily test your worker background jobs using `Loco`. Ensure that your worker is set to the `ForegroundBlocking` mode, which blocks the job, ensuring it runs synchronously. When testing the worker, the test will wait until your worker is completed, allowing you to verify if the worker accomplished its intended tasks. diff --git a/docs-site/content/docs/the-app/models.md b/docs-site/content/docs/the-app/models.md index 47637d28..a8f66df1 100644 --- a/docs-site/content/docs/the-app/models.md +++ b/docs-site/content/docs/the-app/models.md @@ -526,44 +526,31 @@ impl Hooks for App { This implementation ensures that the seed is executed when the seed function is called. Adjust the specifics based on your application's structure and requirements. -## Running seeds +## Managing Seed via CLI -The seed process is not executed automatically. You can trigger the seed process either through a task or during testing. +- **Reset the Database** + Clear all existing data before importing seed files. This is useful when you want to start with a fresh database state, ensuring no old data remains. +- **Dump Database Tables to Files** + Export the contents of your database tables to files. This feature allows you to back up the current state of your database or prepare data for reuse across environments. -### Using a Task - -1. Create a seeding task by following the instructions in the [Task Documentation](@/docs/processing/task.md). -2. Configure the task to execute the `seed` function, as demonstrated in the example below: - -```rust -use std::collections::BTreeMap; - -use async_trait::async_trait; -use loco_rs::{ - app::AppContext, - db, - task::{Task, TaskInfo}, - Result, -}; -use sea_orm::EntityTrait; +To access the seed commands, use the following CLI structure: + +```sh +Seed your database with initial data or dump tables to files -use crate::{app::App, models::_entities::users}; +Usage: demo_app-cli db seed [OPTIONS] -pub struct SeedData; -#[async_trait] -impl Task for SeedData { - fn task(&self) -> TaskInfo { - TaskInfo { - name: "seed".to_string(), - detail: "Seeding data".to_string(), - } - } - async fn run(&self, app_context: &AppContext, vars: &BTreeMap) -> Result<()> { - let path = std::path::Path::new("src/fixtures"); - db::run_app_seed::(&app_context.db, path).await - } -} +Options: + -r, --reset Clears all data in the database before seeding + -d, --dump Dumps all database tables to files + --dump-tables Specifies specific tables to dump + --from Specifies the folder containing seed files (defaults to 'src/fixtures') [default: src/fixtures] + -e, --environment Specify the environment [default: development] + -h, --help Print help + -V, --version Print version ``` + + ### Using a Test diff --git a/docs-site/content/docs/the-app/your-project.md b/docs-site/content/docs/the-app/your-project.md index 08347648..9cf4b619 100644 --- a/docs-site/content/docs/the-app/your-project.md +++ b/docs-site/content/docs/the-app/your-project.md @@ -48,7 +48,28 @@ cargo loco --help ```sh -cd ./examples/demo && cargo loco --help +The one-person framework for Rust + +Usage: demo_app-cli [OPTIONS] + +Commands: + start Start an app + db Perform DB operations + routes Describe all application endpoints + middleware Describe all application middlewares + task Run a custom task + jobs Managing jobs queue + scheduler Run the scheduler + generate code generation creates a set of files and code templates based on a predefined set of rules + doctor Validate and diagnose configurations + version Display the app version + watch Watch and restart the app + help Print this message or the help of the given subcommand(s) + +Options: + -e, --environment Specify the environment [default: development] + -h, --help Print help + -V, --version Print version ``` @@ -119,7 +140,22 @@ Scaffolding is an efficient and speedy method for generating key components of a See scaffold command: ```sh -cd ./examples/demo && cargo loco generate scaffold --help +Generates a CRUD scaffold, model and controller + +Usage: demo_app-cli generate scaffold [OPTIONS] [FIELDS]... + +Arguments: + Name of the thing to generate + [FIELDS]... Model fields, eg. title:string hits:int + +Options: + -k, --kind The kind of scaffold to generate [possible values: api, html, htmx] + --htmx Use HTMX scaffold + --html Use HTML scaffold + --api Use API scaffold + -e, --environment Specify the environment [default: development] + -h, --help Print help + -V, --version Print version ``` diff --git a/examples/demo/tests/cmd/cli.trycmd b/examples/demo/tests/cmd/cli.trycmd index cd5df6fd..ca64094b 100644 --- a/examples/demo/tests/cmd/cli.trycmd +++ b/examples/demo/tests/cmd/cli.trycmd @@ -11,6 +11,7 @@ Commands: routes Describe all application endpoints middleware Describe all application middlewares task Run a custom task + jobs Managing jobs queue scheduler Run the scheduler generate code generation creates a set of files and code templates based on a predefined set of rules doctor Validate and diagnose configurations diff --git a/snipdoc.yml b/snipdoc.yml index 18314538..a793cb85 100644 --- a/snipdoc.yml +++ b/snipdoc.yml @@ -138,3 +138,9 @@ snippets: cli-middleware-list: content: cargo loco middleware --config path: ./snipdoc.yml + jobs-help-command: + content: cd ./examples/demo && cargo loco jobs --help + path: ./snipdoc.yml + seed-help-command: + content: cd ./examples/demo && cargo loco db seed --help + path: ./snipdoc.yml diff --git a/src/bgworker/mod.rs b/src/bgworker/mod.rs index c5a765ed..be6b43b9 100644 --- a/src/bgworker/mod.rs +++ b/src/bgworker/mod.rs @@ -1,8 +1,15 @@ -use std::sync::Arc; +use std::{ + fs::File, + io::Write, + path::{Path, PathBuf}, + sync::Arc, +}; use async_trait::async_trait; -use serde::Serialize; -use tracing::{debug, error}; +#[cfg(feature = "cli")] +use clap::ValueEnum; +use serde::{Deserialize, Serialize}; +use serde_variant::to_variant_name; #[cfg(feature = "bg_pg")] pub mod pg; #[cfg(feature = "bg_redis")] @@ -19,6 +26,42 @@ use crate::{ Error, Result, }; +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[cfg_attr(feature = "cli", derive(ValueEnum))] +pub enum JobStatus { + #[serde(rename = "queued")] + Queued, + #[serde(rename = "processing")] + Processing, + #[serde(rename = "completed")] + Completed, + #[serde(rename = "failed")] + Failed, + #[serde(rename = "cancelled")] + Cancelled, +} + +impl std::str::FromStr for JobStatus { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "queued" => Ok(Self::Queued), + "processing" => Ok(Self::Processing), + "completed" => Ok(Self::Completed), + "failed" => Ok(Self::Failed), + "cancelled" => Ok(Self::Cancelled), + _ => Err(format!("Invalid status: {s}")), + } + } +} + +impl std::fmt::Display for JobStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + to_variant_name(self).expect("only enum supported").fmt(f) + } +} + // Queue struct now holds both a QueueProvider and QueueRegistrar pub enum Queue { #[cfg(feature = "bg_redis")] @@ -30,13 +73,13 @@ pub enum Queue { #[cfg(feature = "bg_pg")] Postgres( pg::PgPool, - std::sync::Arc>, + std::sync::Arc>, pg::RunOpts, ), #[cfg(feature = "bg_sqlt")] Sqlite( sqlt::SqlitePool, - std::sync::Arc>, + std::sync::Arc>, sqlt::RunOpts, ), None, @@ -55,7 +98,7 @@ impl Queue { queue: Option, args: A, ) -> Result<()> { - debug!(worker = class, "job enqueue"); + tracing::debug!(worker = class, "job enqueue"); match self { #[cfg(feature = "bg_redis")] Self::Redis(pool, _, _) => { @@ -103,7 +146,7 @@ impl Queue { &self, worker: W, ) -> Result<()> { - debug!(worker = W::class_name(), "register worker"); + tracing::debug!(worker = W::class_name(), "register worker"); match self { #[cfg(feature = "bg_redis")] Self::Redis(_, p, _) => { @@ -131,7 +174,7 @@ impl Queue { /// /// This function will return an error if fails pub async fn run(&self) -> Result<()> { - debug!("running background jobs"); + tracing::debug!("running background jobs"); match self { #[cfg(feature = "bg_redis")] Self::Redis(_, p, _) => { @@ -154,7 +197,7 @@ impl Queue { } } _ => { - error!( + tracing::error!( "no queue provider is configured: compile with at least one queue provider \ feature" ); @@ -169,7 +212,7 @@ impl Queue { /// /// This function will return an error if fails pub async fn setup(&self) -> Result<()> { - debug!("workers setup"); + tracing::debug!("workers setup"); match self { #[cfg(feature = "bg_redis")] Self::Redis(_, _, _) => {} @@ -192,7 +235,7 @@ impl Queue { /// /// This function will return an error if fails pub async fn clear(&self) -> Result<()> { - debug!("clearing job queues"); + tracing::debug!("clearing job"); match self { #[cfg(feature = "bg_redis")] Self::Redis(pool, _, _) => { @@ -217,7 +260,7 @@ impl Queue { /// /// This function will return an error if fails pub async fn ping(&self) -> Result<()> { - debug!("job queue ping requested"); + tracing::debug!("job queue ping requested"); match self { #[cfg(feature = "bg_redis")] Self::Redis(pool, _, _) => { @@ -254,7 +297,7 @@ impl Queue { /// Does not currently return an error, but the postgres or other future /// queue implementations might, so using Result here as return type. pub fn shutdown(&self) -> Result<()> { - println!("waiting for running jobs to finish..."); + tracing::debug!("waiting for running jobs to finish..."); match self { #[cfg(feature = "bg_redis")] Self::Redis(_, _, cancellation_token) => cancellation_token.cancel(), @@ -263,6 +306,229 @@ impl Queue { Ok(()) } + + async fn get_jobs( + &self, + status: Option<&Vec>, + age_days: Option, + ) -> Result { + tracing::debug!(status = ?status, age_days = ?age_days, "getting jobs"); + let jobs = match self { + #[cfg(feature = "bg_pg")] + Self::Postgres(pool, _, _) => { + let jobs = pg::get_jobs(pool, status, age_days) + .await + .map_err(Box::from)?; + serde_json::to_value(jobs)? + } + #[cfg(feature = "bg_sqlt")] + Self::Sqlite(pool, _, _) => { + let jobs = sqlt::get_jobs(pool, status, age_days) + .await + .map_err(Box::from)?; + + serde_json::to_value(jobs)? + } + #[cfg(feature = "bg_redis")] + Self::Redis(_, _, _) => { + tracing::error!("getting jobs for redis provider not implemented"); + return Err(Error::string( + "getting jobs not supported for redis provider", + )); + } + Self::None => { + tracing::error!( + "no queue provider is configured: compile with at least one queue provider \ + feature" + ); + return Err(Error::string("provider not configure")); + } + }; + + Ok(jobs) + } + + /// Cancels jobs based on the given job name for the configured queue provider. + /// + /// # Errors + /// - If no queue provider is configured, it will return an error indicating the lack of configuration. + /// - If the Redis provider is selected, it will return an error stating that cancellation is not supported. + /// - Any error in the underlying provider's cancellation logic will propagate from the respective function. + /// + pub async fn cancel_jobs(&self, job_name: &str) -> Result<()> { + tracing::debug!(job_name = ?job_name, "cancel jobs"); + + match self { + #[cfg(feature = "bg_pg")] + Self::Postgres(pool, _, _) => pg::cancel_jobs_by_name(pool, job_name).await, + #[cfg(feature = "bg_sqlt")] + Self::Sqlite(pool, _, _) => sqlt::cancel_jobs_by_name(pool, job_name).await, + #[cfg(feature = "bg_redis")] + Self::Redis(_, _, _) => { + tracing::error!("canceling jobs for redis provider not implemented"); + Err(Error::string( + "canceling jobs not supported for redis provider", + )) + } + Self::None => { + tracing::error!( + "no queue provider is configured: compile with at least one queue provider \ + feature" + ); + Err(Error::string("provider not configure")) + } + } + } + + /// Clears jobs older than a specified number of days for the configured queue provider. + /// + /// # Errors + /// - If no queue provider is configured, it will return an error indicating the lack of configuration. + /// - If the Redis provider is selected, it will return an error stating that clearing jobs is not supported. + /// - Any error in the underlying provider's job clearing logic will propagate from the respective function. + /// + pub async fn clear_jobs_older_than( + &self, + age_days: i64, + status: &Vec, + ) -> Result<()> { + tracing::debug!(age_days = age_days, status = ?status, "cancel jobs with age"); + + match self { + #[cfg(feature = "bg_pg")] + Self::Postgres(pool, _, _) => { + pg::clear_jobs_older_than(pool, age_days, Some(status)).await + } + #[cfg(feature = "bg_sqlt")] + Self::Sqlite(pool, _, _) => { + sqlt::clear_jobs_older_than(pool, age_days, Some(status)).await + } + #[cfg(feature = "bg_redis")] + Self::Redis(_, _, _) => { + tracing::error!("clear jobs for redis provider not implemented"); + Err(Error::string("clear jobs not supported for redis provider")) + } + Self::None => { + tracing::error!( + "no queue provider is configured: compile with at least one queue provider \ + feature" + ); + Err(Error::string("provider not configure")) + } + } + } + + /// Clears jobs based on their status for the configured queue provider. + /// + /// # Errors + /// - If no queue provider is configured, it will return an error indicating the lack of configuration. + /// - If the Redis provider is selected, it will return an error stating that clearing jobs is not supported. + /// - Any error in the underlying provider's job clearing logic will propagate from the respective function. + pub async fn clear_by_status(&self, status: Vec) -> Result<()> { + tracing::debug!(status = ?status, "clear jobs by status"); + match self { + #[cfg(feature = "bg_pg")] + Self::Postgres(pool, _, _) => pg::clear_by_status(pool, status).await, + #[cfg(feature = "bg_sqlt")] + Self::Sqlite(pool, _, _) => sqlt::clear_by_status(pool, status).await, + #[cfg(feature = "bg_redis")] + Self::Redis(_, _, _) => { + tracing::error!("clear jobs for redis provider not implemented"); + Err(Error::string("clear jobs not supported for redis provider")) + } + Self::None => { + tracing::error!( + "no queue provider is configured: compile with at least one queue provider \ + feature" + ); + Err(Error::string("provider not configure")) + } + } + } + + /// Dumps the list of jobs to a YAML file at the specified path. + /// + /// This function retrieves jobs from the queue, optionally filtered by their status, and + /// writes the job data to a YAML file. + /// + /// # Errors + /// - If the specified path cannot be created, an error will be returned. + /// - If the job retrieval or YAML serialization fails, an error will be returned. + /// - If there is an issue creating the dump file, an error will be returned + pub async fn dump( + &self, + path: &Path, + status: Option<&Vec>, + age_days: Option, + ) -> Result { + tracing::debug!(path = %path.display(), status = ?status, age_days = ?age_days, "dumping jobs"); + + if !path.exists() { + tracing::debug!(path = %path.display(), "folder not exists, creating..."); + std::fs::create_dir_all(path)?; + } + + let dump_file = path.join(format!( + "loco-dump-jobs-{}.yaml", + chrono::Utc::now().format("%Y-%m-%d-%H-%M-%S") + )); + + let jobs = self.get_jobs(status, age_days).await?; + + let data = serde_yaml::to_string(&jobs)?; + let mut file = File::create(&dump_file)?; + file.write_all(data.as_bytes())?; + + Ok(dump_file) + } + + /// Imports jobs from a YAML file into the configured queue provider. + /// + /// This function reads job data from a YAML file located at the specified `path` and imports + /// the jobs into the queue. + /// + /// # Errors + /// - If there is an issue opening or reading the YAML file, an error will be returned. + /// - If the queue provider is Redis or none, an error will be returned indicating the lack of support. + /// - If any issues occur while enqueuing the jobs, the function will return an error. + /// + pub async fn import(&self, path: &Path) -> Result<()> { + tracing::debug!(path = %path.display(), "import jobs"); + + match &self { + #[cfg(feature = "bg_pg")] + Self::Postgres(_, _, _) => { + let jobs: Vec = serde_yaml::from_reader(File::open(path)?)?; + for job in jobs { + self.enqueue(job.name.to_string(), None, job.data).await?; + } + + Ok(()) + } + #[cfg(feature = "bg_sqlt")] + Self::Sqlite(_, _, _) => { + let jobs: Vec = serde_yaml::from_reader(File::open(path)?)?; + for job in jobs { + self.enqueue(job.name.to_string(), None, job.data).await?; + } + Ok(()) + } + #[cfg(feature = "bg_redis")] + Self::Redis(_, _, _) => { + tracing::error!("import jobs for redis provider not implemented"); + Err(Error::string( + "getting jobs not supported for redis provider", + )) + } + Self::None => { + tracing::error!( + "no queue provider is configured: compile with at least one queue provider \ + feature" + ); + Err(Error::string("provider not configure")) + } + } + } } #[async_trait] @@ -294,7 +560,7 @@ pub trait BackgroundWorker: Send + if let Some(p) = &ctx.queue_provider { p.enqueue(Self::class_name(), Self::queue(), args).await?; } else { - error!( + tracing::error!( "perform_later: background queue is selected, but queue was not populated \ in context" ); @@ -307,7 +573,7 @@ pub trait BackgroundWorker: Send + let dx = ctx.clone(); tokio::spawn(async move { if let Err(err) = Self::build(&dx).perform(args).await { - error!(err = err.to_string(), "worker failed to perform job"); + tracing::error!(err = err.to_string(), "worker failed to perform job"); } }); } @@ -399,3 +665,104 @@ pub async fn create_queue_provider(config: &Config) -> Result> Ok(None) } } + +#[cfg(test)] +mod tests { + + use std::path::Path; + + use insta::assert_debug_snapshot; + + use super::*; + use crate::tests_cfg; + + fn sqlite_config(db_path: &Path) -> SqliteQueueConfig { + SqliteQueueConfig { + uri: format!( + "sqlite://{}?mode=rwc", + db_path.join("sample.sqlite").display() + ), + dangerously_flush: false, + enable_logging: false, + max_connections: 1, + min_connections: 1, + connect_timeout: 500, + idle_timeout: 500, + poll_interval_sec: 1, + num_workers: 1, + } + } + + #[tokio::test] + async fn can_dump_jobs() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let qcfg = sqlite_config(tree_fs.root.as_path()); + let queue = sqlt::create_provider(&qcfg) + .await + .expect("create sqlite queue"); + + let pool = sqlx::SqlitePool::connect(&qcfg.uri) + .await + .expect("connect to sqlite db"); + + queue.setup().await.expect("setup sqlite db"); + tests_cfg::queue::sqlite_seed_data(&pool).await; + + let dump_file = queue + .dump( + tree_fs.root.as_path(), + Some(&vec![JobStatus::Failed, JobStatus::Cancelled]), + None, + ) + .await + .expect("dump jobs"); + + assert_debug_snapshot!(std::fs::read_to_string(dump_file)); + } + + #[tokio::test] + async fn cat_import_jobs_form_file() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let qcfg = sqlite_config(tree_fs.root.as_path()); + let queue = sqlt::create_provider(&qcfg) + .await + .expect("create sqlite queue"); + + let pool = sqlx::SqlitePool::connect(&qcfg.uri) + .await + .expect("connect to sqlite db"); + + queue.setup().await.expect("setup sqlite db"); + + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sqlt_loco_queue") + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!(count, 0); + + queue + .import( + PathBuf::from("tests") + .join("fixtures") + .join("queue") + .join("jobs.yaml") + .as_path(), + ) + .await + .expect("dump import"); + + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sqlt_loco_queue") + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!(count, 14); + } +} diff --git a/src/bgworker/pg.rs b/src/bgworker/pg.rs index 786acbd9..f2dd888a 100644 --- a/src/bgworker/pg.rs +++ b/src/bgworker/pg.rs @@ -13,38 +13,39 @@ use tokio::{task::JoinHandle, time::sleep}; use tracing::{debug, error, trace}; use ulid::Ulid; -use super::{BackgroundWorker, Queue}; +use super::{BackgroundWorker, JobStatus, Queue}; use crate::{config::PostgresQueueConfig, Error, Result}; -type TaskId = String; -type TaskData = JsonValue; -type TaskStatus = String; +type JobId = String; +type JobData = JsonValue; -type TaskHandler = Box< +type JobHandler = Box< dyn Fn( - TaskId, - TaskData, + JobId, + JobData, ) -> Pin> + Send>> + Send + Sync, >; -#[derive(Debug, Deserialize, Serialize)] -struct Task { - pub id: TaskId, +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Job { + pub id: JobId, pub name: String, - #[allow(clippy::struct_field_names)] - pub task_data: TaskData, - pub status: TaskStatus, + #[serde(rename = "task_data")] + pub data: JobData, + pub status: JobStatus, pub run_at: DateTime, pub interval: Option, + pub created_at: Option>, + pub updated_at: Option>, } -pub struct TaskRegistry { - handlers: Arc>, +pub struct JobRegistry { + handlers: Arc>, } -impl TaskRegistry { - /// Creates a new `TaskRegistry`. +impl JobRegistry { + /// Creates a new `JobRegistry`. #[must_use] pub fn new() -> Self { Self { @@ -52,7 +53,7 @@ impl TaskRegistry { } } - /// Registers a task handler with the provided name. + /// Registers a job handler with the provided name. /// # Errors /// Fails if cannot register worker pub fn register_worker(&mut self, name: String, worker: W) -> Result<()> @@ -62,11 +63,11 @@ impl TaskRegistry { for<'de> Args: Deserialize<'de>, { let worker = Arc::new(worker); - let wrapped_handler = move |_task_id: String, task_data: TaskData| { + let wrapped_handler = move |_job_id: String, job_data: JobData| { let w = worker.clone(); Box::pin(async move { - let args = serde_json::from_value::(task_data); + let args = serde_json::from_value::(job_data); match args { Ok(args) => w.perform(args).await, Err(err) => Err(err.into()), @@ -80,30 +81,30 @@ impl TaskRegistry { Ok(()) } - /// Returns a reference to the task handlers. + /// Returns a reference to the job handlers. #[must_use] - pub fn handlers(&self) -> &Arc> { + pub fn handlers(&self) -> &Arc> { &self.handlers } - /// Runs the task handlers with the provided number of workers. + /// Runs the job handlers with the provided number of workers. #[must_use] pub fn run(&self, pool: &PgPool, opts: &RunOpts) -> Vec> { - let mut tasks = Vec::new(); + let mut jobs = Vec::new(); let interval = opts.poll_interval_sec; for idx in 0..opts.num_workers { let handlers = self.handlers.clone(); let pool = pool.clone(); - let task = tokio::spawn(async move { + let job = tokio::spawn(async move { loop { trace!( pool_conns = pool.num_idle(), worker_num = idx, "pg workers stats" ); - let task_opt = match dequeue(&pool).await { + let job_opt = match dequeue(&pool).await { Ok(t) => t, Err(err) => { error!(err = err.to_string(), "cannot fetch from queue"); @@ -111,33 +112,33 @@ impl TaskRegistry { } }; - if let Some(task) = task_opt { - debug!(task_id = task.id, name = task.name, "working on task"); - if let Some(handler) = handlers.get(&task.name) { - match handler(task.id.clone(), task.task_data.clone()).await { + if let Some(job) = job_opt { + debug!(job_id = job.id, name = job.name, "working on job"); + if let Some(handler) = handlers.get(&job.name) { + match handler(job.id.clone(), job.data.clone()).await { Ok(()) => { if let Err(err) = - complete_task(&pool, &task.id, task.interval).await + complete_job(&pool, &job.id, job.interval).await { error!( err = err.to_string(), - task = ?task, - "cannot complete task" + job = ?job, + "cannot complete job" ); } } Err(err) => { - if let Err(err) = fail_task(&pool, &task.id, &err).await { + if let Err(err) = fail_job(&pool, &job.id, &err).await { error!( err = err.to_string(), - task = ?task, - "cannot fail task" + job = ?job, + "cannot fail job" ); } } } } else { - error!(task = task.name, "no handler found for task"); + error!(job = job.name, "no handler found for job"); } } else { sleep(Duration::from_secs(interval.into())).await; @@ -145,14 +146,14 @@ impl TaskRegistry { } }); - tasks.push(task); + jobs.push(job); } - tasks + jobs } } -impl Default for TaskRegistry { +impl Default for JobRegistry { fn default() -> Self { Self::new() } @@ -173,33 +174,34 @@ async fn connect(cfg: &PostgresQueueConfig) -> Result { Ok(pool) } -/// Initialize task tables +/// Initialize job tables /// /// # Errors /// /// This function will return an error if it fails pub async fn initialize_database(pool: &PgPool) -> Result<()> { debug!("pg worker: initialize database"); - sqlx::raw_sql( + sqlx::raw_sql(&format!( r" CREATE TABLE IF NOT EXISTS pg_loco_queue ( id VARCHAR NOT NULL, name VARCHAR NOT NULL, task_data JSONB NOT NULL, - status VARCHAR NOT NULL DEFAULT 'queued', + status VARCHAR NOT NULL DEFAULT '{}', run_at TIMESTAMPTZ NOT NULL, interval BIGINT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); ", - ) + JobStatus::Queued + )) .execute(pool) .await?; Ok(()) } -/// Add a task +/// Add a job /// /// # Errors /// @@ -207,11 +209,11 @@ pub async fn initialize_database(pool: &PgPool) -> Result<()> { pub async fn enqueue( pool: &PgPool, name: &str, - task_data: TaskData, + data: JobData, run_at: DateTime, interval: Option, -) -> Result { - let task_data_json = serde_json::to_value(task_data)?; +) -> Result { + let data_json = serde_json::to_value(data)?; #[allow(clippy::cast_possible_truncation)] let interval_ms: Option = interval.map(|i| i.as_millis() as i64); @@ -222,7 +224,7 @@ pub async fn enqueue( $4, $5)", ) .bind(id.clone()) - .bind(task_data_json) + .bind(data_json) .bind(name) .bind(run_at) .bind(interval_ms) @@ -231,90 +233,164 @@ pub async fn enqueue( Ok(id) } -async fn dequeue(client: &PgPool) -> Result> { +async fn dequeue(client: &PgPool) -> Result> { let mut tx = client.begin().await?; let row = sqlx::query( "SELECT id, name, task_data, status, run_at, interval FROM pg_loco_queue WHERE status = \ - 'queued' AND run_at <= NOW() ORDER BY run_at LIMIT 1 FOR UPDATE SKIP LOCKED", + $1 AND run_at <= NOW() ORDER BY run_at LIMIT 1 FOR UPDATE SKIP LOCKED", ) - // avoid using FromRow because it requires the 'macros' feature, which nothing - // in our dep tree uses, so it'll create smaller, faster builds if we do this manually - .map(|row: PgRow| Task { - id: row.get("id"), - name: row.get("name"), - task_data: row.get("task_data"), - status: row.get("status"), - run_at: row.get("run_at"), - interval: row.get("interval"), - }) + .bind(JobStatus::Queued.to_string()) + .map(|row: PgRow| to_job(&row).ok()) .fetch_optional(&mut *tx) - .await?; + .await? + .flatten(); - if let Some(task) = row { - sqlx::query( - "UPDATE pg_loco_queue SET status = 'processing', updated_at = NOW() WHERE id = $1", - ) - .bind(&task.id) - .execute(&mut *tx) - .await?; + if let Some(job) = row { + sqlx::query("UPDATE pg_loco_queue SET status = $1, updated_at = NOW() WHERE id = $2") + .bind(JobStatus::Processing.to_string()) + .bind(&job.id) + .execute(&mut *tx) + .await?; tx.commit().await?; - Ok(Some(task)) + Ok(Some(job)) } else { Ok(None) } } -async fn complete_task(pool: &PgPool, task_id: &TaskId, interval_ms: Option) -> Result<()> { - if let Some(interval_ms) = interval_ms { - let next_run_at = Utc::now() + chrono::Duration::milliseconds(interval_ms); - sqlx::query( - "UPDATE pg_loco_queue SET status = 'queued', updated_at = NOW(), run_at = $1 WHERE id \ - = $2", - ) - .bind(next_run_at) - .bind(task_id) - .execute(pool) - .await?; - } else { - sqlx::query( - "UPDATE pg_loco_queue SET status = 'completed', updated_at = NOW() WHERE id = $1", - ) - .bind(task_id) - .execute(pool) - .await?; - } +async fn complete_job(pool: &PgPool, id: &JobId, interval_ms: Option) -> Result<()> { + let (status, run_at) = interval_ms.map_or_else( + || (JobStatus::Completed.to_string(), Utc::now()), + |interval_ms| { + ( + JobStatus::Queued.to_string(), + Utc::now() + chrono::Duration::milliseconds(interval_ms), + ) + }, + ); + + sqlx::query( + "UPDATE pg_loco_queue SET status = $1, updated_at = NOW(), run_at = $2 WHERE id = $3", + ) + .bind(status) + .bind(run_at) + .bind(id) + .execute(pool) + .await?; + Ok(()) } -async fn fail_task(pool: &PgPool, task_id: &TaskId, error: &crate::Error) -> Result<()> { +async fn fail_job(pool: &PgPool, id: &JobId, error: &crate::Error) -> Result<()> { let msg = error.to_string(); - error!(err = msg, "failed task"); + error!(err = msg, "failed job"); let error_json = serde_json::json!({ "error": msg }); sqlx::query( - "UPDATE pg_loco_queue SET status = 'failed', updated_at = NOW(), task_data = task_data || \ - $1::jsonb WHERE id = $2", + "UPDATE pg_loco_queue SET status = $1, updated_at = NOW(), task_data = task_data || \ + $2::jsonb WHERE id = $3", ) + .bind(JobStatus::Failed.to_string()) .bind(error_json) - .bind(task_id) + .bind(id) .execute(pool) .await?; Ok(()) } -/// Clear all tasks +/// Cancels jobs in the `pg_loco_queue` table by their name. +/// +/// This function updates the status of all jobs with the given `name` and a status of +/// [`JobStatus::Queued`] to [`JobStatus::Cancelled`]. The update also sets the `updated_at` timestamp to the +/// current time. +/// +/// # Errors +/// +/// This function will return an error if it fails +pub async fn cancel_jobs_by_name(pool: &PgPool, name: &str) -> Result<()> { + sqlx::query( + "UPDATE pg_loco_queue SET status = $1, updated_at = NOW() WHERE name = $2 AND status = $3", + ) + .bind(JobStatus::Cancelled.to_string()) + .bind(name) + .bind(JobStatus::Queued.to_string()) + .execute(pool) + .await?; + Ok(()) +} + +/// Clear all jobs /// /// # Errors /// /// This function will return an error if it fails pub async fn clear(pool: &PgPool) -> Result<()> { - sqlx::query("DELETE from pg_loco_queue") + sqlx::query("DELETE FROM pg_loco_queue") + .execute(pool) + .await?; + Ok(()) +} + +/// Deletes jobs from the `pg_loco_queue` table based on their status. +/// +/// This function removes all jobs with a status that matches any of the statuses provided +/// in the `status` argument. The statuses are checked against the `status` column in the +/// database, and any matching rows are deleted. +/// +/// # Errors +/// +/// This function will return an error if it fails +pub async fn clear_by_status(pool: &PgPool, status: Vec) -> Result<()> { + let status_in = status + .iter() + .map(std::string::ToString::to_string) + .collect::>(); + + sqlx::query("DELETE FROM pg_loco_queue WHERE status = ANY($1)") + .bind(status_in) .execute(pool) .await?; Ok(()) } +/// Deletes jobs from the `pg_loco_queue` table that are older than a specified number of days. +/// +/// This function removes jobs that have a `created_at` timestamp older than the provided +/// number of days. Additionally, if a `status` is provided, only jobs with a status matching +/// one of the provided values will be deleted. +/// +/// # Errors +/// +/// This function will return an error if it fails +pub async fn clear_jobs_older_than( + pool: &PgPool, + age_days: i64, + status: Option<&Vec>, +) -> Result<()> { + let mut query_builder = sqlx::query_builder::QueryBuilder::::new( + "DELETE FROM pg_loco_queue WHERE created_at < NOW() - INTERVAL '1 day' * ", + ); + + query_builder.push_bind(age_days); + + if let Some(status_list) = status { + if !status_list.is_empty() { + let status_in = status_list + .iter() + .map(|s| format!("'{s}'")) + .collect::>() + .join(","); + + query_builder.push(format!(" AND status IN ({status_in})")); + } + } + + query_builder.build().execute(pool).await?; + + Ok(()) +} + /// Ping system /// /// # Errors @@ -327,6 +403,68 @@ pub async fn ping(pool: &PgPool) -> Result<()> { Ok(()) } +/// Retrieves a list of jobs from the `pg_loco_queue` table in the database. +/// +/// This function queries the database for jobs, optionally filtering by their +/// `status`. If a status is provided, only jobs with statuses included in the +/// provided list will be fetched. If no status is provided, all jobs will be +/// returned. +/// +/// # Errors +/// +/// This function will return an error if it fails +pub async fn get_jobs( + pool: &PgPool, + status: Option<&Vec>, + age_days: Option, +) -> Result, sqlx::Error> { + let mut query = String::from("SELECT * FROM pg_loco_queue where true"); + + if let Some(status) = status { + let status_in = status + .iter() + .map(|s| format!("'{s}'")) + .collect::>() + .join(","); + query.push_str(&format!(" AND status in ({status_in})")); + } + + if let Some(age_days) = age_days { + query.push_str(&format!( + "AND created_at <= NOW() - INTERVAL '1 day' * {age_days}" + )); + } + + let rows = sqlx::query(&query).fetch_all(pool).await?; + Ok(rows.iter().filter_map(|row| to_job(row).ok()).collect()) +} + +/// Converts a row from the database into a [`Job`] object. +/// +/// This function takes a row from the `Postgres` database and manually extracts the necessary +/// fields to populate a [`Job`] object. +/// +/// **Note:** This function manually extracts values from the database row instead of using +/// the `FromRow` trait, which would require enabling the 'macros' feature in the dependencies. +/// The decision to avoid `FromRow` is made to keep the build smaller and faster, as the 'macros' +/// feature is unnecessary in the current dependency tree. +fn to_job(row: &PgRow) -> Result { + Ok(Job { + id: row.get("id"), + name: row.get("name"), + data: row.get("task_data"), + status: row.get::("status").parse().map_err(|err| { + let status: String = row.get("status"); + tracing::error!(status, err, "job status is unsupported"); + Error::string("invalid job status") + })?, + run_at: row.get("run_at"), + interval: row.get("interval"), + created_at: row.try_get("created_at").unwrap_or_default(), + updated_at: row.try_get("updated_at").unwrap_or_default(), + }) +} + #[derive(Debug)] pub struct RunOpts { pub num_workers: u32, @@ -340,7 +478,7 @@ pub struct RunOpts { /// This function will return an error if it fails pub async fn create_provider(qcfg: &PostgresQueueConfig) -> Result { let pool = connect(qcfg).await.map_err(Box::from)?; - let registry = TaskRegistry::new(); + let registry = JobRegistry::new(); Ok(Queue::Postgres( pool, Arc::new(tokio::sync::Mutex::new(registry)), @@ -350,3 +488,469 @@ pub async fn create_provider(qcfg: &PostgresQueueConfig) -> Result { }, )) } + +#[cfg(all(test, feature = "integration_test"))] +use serial_test::serial; +#[cfg(all(test, feature = "integration_test"))] +mod tests { + + use chrono::{NaiveDate, NaiveTime, TimeZone}; + use insta::{assert_debug_snapshot, with_settings}; + use sqlx::{query_as, FromRow}; + + use super::*; + use crate::tests_cfg; + + fn reduction() -> &'static [(&'static str, &'static str)] { + &[ + ("[A-Z0-9]{26}", ""), + ( + r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?Z", + "", + ), + ] + } + + #[derive(Debug, Serialize, FromRow)] + pub struct TableInfo { + pub table_schema: Option, + pub column_name: Option, + pub column_default: Option, + pub is_nullable: Option, + pub data_type: Option, + pub is_updatable: Option, + } + + async fn init() -> PgPool { + let qcfg = PostgresQueueConfig { + uri: std::env::var("DATABASE_URL") + .expect("environment variable should be exists 'DATABASE_URL'"), + dangerously_flush: false, + enable_logging: false, + max_connections: 1, + min_connections: 1, + connect_timeout: 500, + idle_timeout: 500, + poll_interval_sec: 1, + num_workers: 1, + }; + + let pool = connect(&qcfg).await.unwrap(); + sqlx::raw_sql("DROP TABLE IF EXISTS pg_loco_queue;") + .execute(&pool) + .await + .expect("drop table if exists"); + + pool + } + + async fn get_all_jobs(pool: &PgPool) -> Vec { + sqlx::query("select * from pg_loco_queue") + .fetch_all(pool) + .await + .expect("get jobs") + .iter() + .filter_map(|row| to_job(row).ok()) + .collect() + } + + async fn get_job(pool: &PgPool, id: &str) -> Job { + sqlx::query(&format!("select * from pg_loco_queue where id = '{id}'")) + .fetch_all(pool) + .await + .expect("get jobs") + .first() + .and_then(|row| to_job(row).ok()) + .expect("job not found") + } + + #[tokio::test] + #[serial] + async fn can_initialize_database() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + + let table_info: Vec = query_as::<_, TableInfo>( + "SELECT * FROM information_schema.columns WHERE table_name = + 'pg_loco_queue'", + ) + .fetch_all(&pool) + .await + .unwrap(); + + assert_debug_snapshot!(table_info); + } + + #[tokio::test] + #[serial] + async fn can_enqueue() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + + let jobs = get_all_jobs(&pool).await; + + assert_eq!(jobs.len(), 0); + + let run_at = Utc.from_utc_datetime( + &NaiveDate::from_ymd_opt(2023, 1, 15) + .unwrap() + .and_time(NaiveTime::from_hms_opt(12, 30, 0).unwrap()), + ); + + let job_data: JobData = serde_json::json!({"user_id": 1}); + assert!( + enqueue(&pool, "PasswordChangeNotification", job_data, run_at, None) + .await + .is_ok() + ); + + let jobs = get_all_jobs(&pool).await; + + assert_eq!(jobs.len(), 1); + with_settings!({ + filters => reduction().iter().map(|&(pattern, replacement)| + (pattern, replacement)), }, { + assert_debug_snapshot!(jobs); + }); + } + + #[tokio::test] + #[serial] + async fn can_dequeue() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + + let run_at = Utc.from_utc_datetime( + &NaiveDate::from_ymd_opt(2023, 1, 15) + .unwrap() + .and_time(NaiveTime::from_hms_opt(12, 30, 0).unwrap()), + ); + + let job_data: JobData = serde_json::json!({"user_id": 1}); + assert!( + enqueue(&pool, "PasswordChangeNotification", job_data, run_at, None) + .await + .is_ok() + ); + + let job_before_dequeue = get_all_jobs(&pool) + .await + .first() + .cloned() + .expect("gets first job"); + + assert_eq!(job_before_dequeue.status, JobStatus::Queued); + + std::thread::sleep(std::time::Duration::from_secs(1)); + + assert!(dequeue(&pool).await.is_ok()); + + let job_after_dequeue = get_all_jobs(&pool) + .await + .first() + .cloned() + .expect("gets first job"); + + assert_ne!(job_after_dequeue.updated_at, job_before_dequeue.updated_at); + with_settings!({ + filters => reduction().iter().map(|&(pattern, replacement)| + (pattern, replacement)), }, { + assert_debug_snapshot!(job_after_dequeue); + }); + } + + #[tokio::test] + #[serial] + async fn can_complete_job_without_interval() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::postgres_seed_data(&pool).await; + + let job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA99").await; + + assert_eq!(job.status, JobStatus::Queued); + assert!(complete_job(&pool, &job.id, None).await.is_ok()); + + let job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA99").await; + + assert_eq!(job.status, JobStatus::Completed); + } + + #[tokio::test] + #[serial] + async fn can_complete_job_with_interval() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::postgres_seed_data(&pool).await; + + let before_complete_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA98").await; + + assert_eq!(before_complete_job.status, JobStatus::Completed); + + std::thread::sleep(std::time::Duration::from_secs(1)); + + assert!(complete_job(&pool, &before_complete_job.id, Some(10)) + .await + .is_ok()); + + let after_complete_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA98").await; + + assert_ne!( + after_complete_job.updated_at, + before_complete_job.updated_at + ); + with_settings!({ + filters => reduction().iter().map(|&(pattern, replacement)| (pattern, + replacement)), }, { + assert_debug_snapshot!(after_complete_job); + }); + } + + #[tokio::test] + #[serial] + async fn can_fail_job() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::postgres_seed_data(&pool).await; + + let before_fail_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA97").await; + + std::thread::sleep(std::time::Duration::from_secs(1)); + + assert!(fail_job( + &pool, + &before_fail_job.id, + &crate::Error::string("some error") + ) + .await + .is_ok()); + + let after_fail_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA97").await; + + assert_ne!(after_fail_job.updated_at, before_fail_job.updated_at); + with_settings!({ + filters => reduction().iter().map(|&(pattern, replacement)| (pattern, + replacement)), }, { + assert_debug_snapshot!(after_fail_job); + }); + } + + #[tokio::test] + #[serial] + async fn can_cancel_job_by_name() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::postgres_seed_data(&pool).await; + + let count_cancelled_jobs = get_all_jobs(&pool) + .await + .iter() + .filter(|j| j.status == JobStatus::Cancelled) + .count(); + + assert_eq!(count_cancelled_jobs, 1); + + assert!(cancel_jobs_by_name(&pool, "UserAccountActivation") + .await + .is_ok()); + + let count_cancelled_jobs = get_all_jobs(&pool) + .await + .iter() + .filter(|j| j.status == JobStatus::Cancelled) + .count(); + + assert_eq!(count_cancelled_jobs, 2); + } + + #[tokio::test] + #[serial] + async fn can_clear() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::postgres_seed_data(&pool).await; + + let job_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM pg_loco_queue") + .fetch_one(&pool) + .await + .unwrap(); + + assert_ne!(job_count, 0); + + assert!(clear(&pool).await.is_ok()); + let job_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM pg_loco_queue") + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!(job_count, 0); + } + + #[tokio::test] + #[serial] + async fn can_clear_by_status() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::postgres_seed_data(&pool).await; + + let jobs = get_all_jobs(&pool).await; + + assert_eq!(jobs.len(), 14); + assert_eq!( + jobs.iter() + .filter(|j| j.status == JobStatus::Completed) + .count(), + 3 + ); + assert_eq!( + jobs.iter() + .filter(|j| j.status == JobStatus::Failed) + .count(), + 2 + ); + + assert!( + clear_by_status(&pool, vec![JobStatus::Completed, JobStatus::Failed]) + .await + .is_ok() + ); + let jobs = get_all_jobs(&pool).await; + + assert_eq!(jobs.len(), 9); + assert_eq!( + jobs.iter() + .filter(|j| j.status == JobStatus::Completed) + .count(), + 0 + ); + assert_eq!( + jobs.iter() + .filter(|j| j.status == JobStatus::Failed) + .count(), + 0 + ); + } + + #[tokio::test] + #[serial] + async fn can_clear_jobs_older_than() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + + sqlx::query( + r"INSERT INTO pg_loco_queue (id, name, task_data, status, run_at,created_at, updated_at) VALUES + ('job1', 'Test Job 1', '{}', 'queued', NOW(), NOW() - INTERVAL '15days', NOW()), + ('job2', 'Test Job 2', '{}', 'queued', NOW(),NOW() - INTERVAL '5 days', NOW()), + ('job3', 'Test Job 3', '{}','queued', NOW(), NOW(), NOW())" + ) + .execute(&pool) + .await + .unwrap(); + + assert_eq!(get_all_jobs(&pool).await.len(), 3); + assert!(clear_jobs_older_than(&pool, 10, None).await.is_ok()); + assert_eq!(get_all_jobs(&pool).await.len(), 2); + } + + #[tokio::test] + #[serial] + async fn can_clear_jobs_older_than_with_status() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + + sqlx::query( + r"INSERT INTO pg_loco_queue (id, name, task_data, status, run_at,created_at, updated_at) VALUES + ('job1', 'Test Job 1', '{}', 'completed', NOW(), NOW() - INTERVAL '20days', NOW()), + ('job2', 'Test Job 2', '{}', 'failed', NOW(),NOW() - INTERVAL '15 days', NOW()), + ('job3', 'Test Job 3', '{}', 'completed', NOW(),NOW() - INTERVAL '5 days', NOW()), + ('job4', 'Test Job 3', '{}','cancelled', NOW(), NOW(), NOW())" + ) + .execute(&pool) + .await + .unwrap(); + + assert_eq!(get_all_jobs(&pool).await.len(), 4); + assert!(clear_jobs_older_than( + &pool, + 10, + Some(&vec![JobStatus::Cancelled, JobStatus::Completed]) + ) + .await + .is_ok()); + + assert_eq!(get_all_jobs(&pool).await.len(), 3); + } + + #[tokio::test] + #[serial] + async fn can_get_jobs() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::postgres_seed_data(&pool).await; + + assert_eq!( + get_jobs(&pool, Some(&vec![JobStatus::Failed]), None) + .await + .expect("get jobs") + .len(), + 2 + ); + assert_eq!( + get_jobs( + &pool, + Some(&vec![JobStatus::Failed, JobStatus::Completed]), + None + ) + .await + .expect("get jobs") + .len(), + 5 + ); + assert_eq!( + get_jobs(&pool, None, None).await.expect("get jobs").len(), + 14 + ); + } + + #[tokio::test] + #[serial] + async fn can_get_jobs_with_age() { + let pool = init().await; + + assert!(initialize_database(&pool).await.is_ok()); + + sqlx::query( + r"INSERT INTO pg_loco_queue (id, name, task_data, status, run_at,created_at, updated_at) VALUES + ('job1', 'Test Job 1', '{}', 'completed', NOW(), NOW() - INTERVAL '20days', NOW()), + ('job2', 'Test Job 2', '{}', 'failed', NOW(),NOW() - INTERVAL '15 days', NOW()), + ('job3', 'Test Job 3', '{}', 'completed', NOW(),NOW() - INTERVAL '5 days', NOW()), + ('job4', 'Test Job 3', '{}','cancelled', NOW(), NOW(), NOW())" + ) + .execute(&pool) + .await + .unwrap(); + assert_eq!( + get_jobs( + &pool, + Some(&vec![JobStatus::Failed, JobStatus::Completed]), + Some(11) + ) + .await + .expect("get jobs") + .len(), + 2 + ); + } +} diff --git a/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_complete_job_with_interval.snap b/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_complete_job_with_interval.snap new file mode 100644 index 00000000..80ae35fd --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_complete_job_with_interval.snap @@ -0,0 +1,22 @@ +--- +source: src/bgworker/pg.rs +expression: after_complete_job +--- +Job { + id: "", + name: "PasswordChangeNotification", + data: Object { + "change_time": String(""), + "email": String("user12@example.com"), + "user_id": Number(134), + }, + status: Queued, + run_at: , + interval: None, + created_at: Some( + , + ), + updated_at: Some( + , + ), +} diff --git a/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_dequeue.snap b/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_dequeue.snap new file mode 100644 index 00000000..a59fe073 --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_dequeue.snap @@ -0,0 +1,20 @@ +--- +source: src/bgworker/pg.rs +expression: job_after_dequeue +--- +Job { + id: "", + name: "PasswordChangeNotification", + data: Object { + "user_id": Number(1), + }, + status: Processing, + run_at: , + interval: None, + created_at: Some( + , + ), + updated_at: Some( + , + ), +} diff --git a/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_enqueue.snap b/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_enqueue.snap new file mode 100644 index 00000000..20dc978d --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_enqueue.snap @@ -0,0 +1,22 @@ +--- +source: src/bgworker/pg.rs +expression: jobs +--- +[ + Job { + id: "", + name: "PasswordChangeNotification", + data: Object { + "user_id": Number(1), + }, + status: Queued, + run_at: , + interval: None, + created_at: Some( + , + ), + updated_at: Some( + , + ), + }, +] diff --git a/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_fail_job.snap b/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_fail_job.snap new file mode 100644 index 00000000..851f0f89 --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_fail_job.snap @@ -0,0 +1,23 @@ +--- +source: src/bgworker/pg.rs +expression: after_fail_job +--- +Job { + id: "", + name: "SendInvoice", + data: Object { + "email": String("user13@example.com"), + "error": String("some error"), + "invoice_id": String("INV-2024-01"), + "user_id": Number(135), + }, + status: Failed, + run_at: , + interval: None, + created_at: Some( + , + ), + updated_at: Some( + , + ), +} diff --git a/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_initialize_database.snap b/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_initialize_database.snap new file mode 100644 index 00000000..3cbca818 --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__pg__tests__can_initialize_database.snap @@ -0,0 +1,156 @@ +--- +source: src/bgworker/pg.rs +expression: table_info +--- +[ + TableInfo { + table_schema: Some( + "public", + ), + column_name: Some( + "updated_at", + ), + column_default: Some( + "now()", + ), + is_nullable: Some( + "NO", + ), + data_type: Some( + "timestamp with time zone", + ), + is_updatable: Some( + "YES", + ), + }, + TableInfo { + table_schema: Some( + "public", + ), + column_name: Some( + "run_at", + ), + column_default: None, + is_nullable: Some( + "NO", + ), + data_type: Some( + "timestamp with time zone", + ), + is_updatable: Some( + "YES", + ), + }, + TableInfo { + table_schema: Some( + "public", + ), + column_name: Some( + "interval", + ), + column_default: None, + is_nullable: Some( + "YES", + ), + data_type: Some( + "bigint", + ), + is_updatable: Some( + "YES", + ), + }, + TableInfo { + table_schema: Some( + "public", + ), + column_name: Some( + "created_at", + ), + column_default: Some( + "now()", + ), + is_nullable: Some( + "NO", + ), + data_type: Some( + "timestamp with time zone", + ), + is_updatable: Some( + "YES", + ), + }, + TableInfo { + table_schema: Some( + "public", + ), + column_name: Some( + "task_data", + ), + column_default: None, + is_nullable: Some( + "NO", + ), + data_type: Some( + "jsonb", + ), + is_updatable: Some( + "YES", + ), + }, + TableInfo { + table_schema: Some( + "public", + ), + column_name: Some( + "name", + ), + column_default: None, + is_nullable: Some( + "NO", + ), + data_type: Some( + "character varying", + ), + is_updatable: Some( + "YES", + ), + }, + TableInfo { + table_schema: Some( + "public", + ), + column_name: Some( + "id", + ), + column_default: None, + is_nullable: Some( + "NO", + ), + data_type: Some( + "character varying", + ), + is_updatable: Some( + "YES", + ), + }, + TableInfo { + table_schema: Some( + "public", + ), + column_name: Some( + "status", + ), + column_default: Some( + "'queued'::character varying", + ), + is_nullable: Some( + "NO", + ), + data_type: Some( + "character varying", + ), + is_updatable: Some( + "YES", + ), + }, +] diff --git a/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_complete_job_with_interval.snap b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_complete_job_with_interval.snap new file mode 100644 index 00000000..c99cae44 --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_complete_job_with_interval.snap @@ -0,0 +1,22 @@ +--- +source: src/bgworker/sqlt.rs +expression: after_complete_job +--- +Job { + id: "", + name: "PasswordChangeNotification", + data: Object { + "change_time": String(""), + "email": String("user12@example.com"), + "user_id": Number(134), + }, + status: Queued, + run_at: , + interval: None, + created_at: Some( + , + ), + updated_at: Some( + , + ), +} diff --git a/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_dequeue.snap b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_dequeue.snap new file mode 100644 index 00000000..193d2507 --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_dequeue.snap @@ -0,0 +1,20 @@ +--- +source: src/bgworker/sqlt.rs +expression: job_after_dequeue +--- +Job { + id: "", + name: "PasswordChangeNotification", + data: Object { + "user_id": Number(1), + }, + status: Processing, + run_at: , + interval: None, + created_at: Some( + , + ), + updated_at: Some( + , + ), +} diff --git a/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_enqueue.snap b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_enqueue.snap new file mode 100644 index 00000000..80f38428 --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_enqueue.snap @@ -0,0 +1,22 @@ +--- +source: src/bgworker/sqlt.rs +expression: jobs +--- +[ + Job { + id: "", + name: "PasswordChangeNotification", + data: Object { + "user_id": Number(1), + }, + status: Queued, + run_at: , + interval: None, + created_at: Some( + , + ), + updated_at: Some( + , + ), + }, +] diff --git a/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_fail_job.snap b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_fail_job.snap new file mode 100644 index 00000000..ec76a9fe --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__can_fail_job.snap @@ -0,0 +1,23 @@ +--- +source: src/bgworker/sqlt.rs +expression: after_fail_job +--- +Job { + id: "", + name: "SendInvoice", + data: Object { + "email": String("user13@example.com"), + "error": String("some error"), + "invoice_id": String("INV-2024-01"), + "user_id": Number(135), + }, + status: Failed, + run_at: , + interval: None, + created_at: Some( + , + ), + updated_at: Some( + , + ), +} diff --git a/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__sqlt_loco_queue.snap b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__sqlt_loco_queue.snap new file mode 100644 index 00000000..030d9b98 --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__sqlt_loco_queue.snap @@ -0,0 +1,76 @@ +--- +source: src/bgworker/sqlt.rs +expression: table_info +--- +[ + TableInfo { + cid: 0, + name: "id", + _type: "TEXT", + notnull: true, + dflt_value: None, + pk: false, + }, + TableInfo { + cid: 1, + name: "name", + _type: "TEXT", + notnull: true, + dflt_value: None, + pk: false, + }, + TableInfo { + cid: 2, + name: "task_data", + _type: "JSON", + notnull: true, + dflt_value: None, + pk: false, + }, + TableInfo { + cid: 3, + name: "status", + _type: "TEXT", + notnull: true, + dflt_value: Some( + "'queued'", + ), + pk: false, + }, + TableInfo { + cid: 4, + name: "run_at", + _type: "TIMESTAMP", + notnull: true, + dflt_value: None, + pk: false, + }, + TableInfo { + cid: 5, + name: "interval", + _type: "INTEGER", + notnull: false, + dflt_value: None, + pk: false, + }, + TableInfo { + cid: 6, + name: "created_at", + _type: "TIMESTAMP", + notnull: true, + dflt_value: Some( + "CURRENT_TIMESTAMP", + ), + pk: false, + }, + TableInfo { + cid: 7, + name: "updated_at", + _type: "TIMESTAMP", + notnull: true, + dflt_value: Some( + "CURRENT_TIMESTAMP", + ), + pk: false, + }, +] diff --git a/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__sqlt_loco_queue_lock.snap b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__sqlt_loco_queue_lock.snap new file mode 100644 index 00000000..0b24e715 --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__sqlt__tests__sqlt_loco_queue_lock.snap @@ -0,0 +1,32 @@ +--- +source: src/bgworker/sqlt.rs +expression: table_info +--- +[ + TableInfo { + cid: 0, + name: "id", + _type: "INTEGER", + notnull: false, + dflt_value: None, + pk: true, + }, + TableInfo { + cid: 1, + name: "is_locked", + _type: "BOOLEAN", + notnull: true, + dflt_value: Some( + "FALSE", + ), + pk: false, + }, + TableInfo { + cid: 2, + name: "locked_at", + _type: "TIMESTAMP", + notnull: false, + dflt_value: None, + pk: false, + }, +] diff --git a/src/bgworker/snapshots/loco_rs__bgworker__tests__can_dump_jobs.snap b/src/bgworker/snapshots/loco_rs__bgworker__tests__can_dump_jobs.snap new file mode 100644 index 00000000..44f9ac2b --- /dev/null +++ b/src/bgworker/snapshots/loco_rs__bgworker__tests__can_dump_jobs.snap @@ -0,0 +1,7 @@ +--- +source: src/bgworker/mod.rs +expression: "std::fs::read_to_string(dump_file)" +--- +Ok( + "- created_at: 2024-11-28T08:03:25Z\n id: 01JDM0X8EVAM823JZBGKYNBA94\n interval: null\n name: DataBackup\n run_at: 2024-11-28T08:04:25Z\n status: cancelled\n task_data:\n backup_id: backup-12345\n email: user16@example.com\n user_id: 138\n updated_at: 2024-11-28T08:03:25Z\n- created_at: 2024-11-28T08:03:25Z\n id: 01JDM0X8EVAM823JZBGKYNBA96\n interval: null\n name: UserDeactivation\n run_at: 2024-11-28T08:04:25Z\n status: failed\n task_data:\n deactivation_reason: user requested\n email: user14@example.com\n user_id: 136\n updated_at: 2024-11-28T08:03:25Z\n- created_at: 2024-11-28T08:03:25Z\n id: 01JDM0X8EVAM823JZBGKYNBA87\n interval: null\n name: UserDeactivation\n run_at: 2024-11-28T08:04:25Z\n status: failed\n task_data:\n deactivation_reason: account inactive\n email: user24@example.com\n user_id: 146\n updated_at: 2024-11-28T08:03:25Z\n", +) diff --git a/src/bgworker/sqlt.rs b/src/bgworker/sqlt.rs index f1edf40c..4d9e99dc 100644 --- a/src/bgworker/sqlt.rs +++ b/src/bgworker/sqlt.rs @@ -7,44 +7,45 @@ use serde_json::Value as JsonValue; pub use sqlx::SqlitePool; use sqlx::{ sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteRow}, - ConnectOptions, Row, + ConnectOptions, QueryBuilder, Row, }; use tokio::{task::JoinHandle, time::sleep}; use tracing::{debug, error, trace}; use ulid::Ulid; -use super::{BackgroundWorker, Queue}; +use super::{BackgroundWorker, JobStatus, Queue}; use crate::{config::SqliteQueueConfig, Error, Result}; -type TaskId = String; -type TaskData = JsonValue; -type TaskStatus = String; +type JobId = String; +type JobData = JsonValue; -type TaskHandler = Box< +type JobHandler = Box< dyn Fn( - TaskId, - TaskData, + JobId, + JobData, ) -> Pin> + Send>> + Send + Sync, >; -#[derive(Debug, Deserialize, Serialize)] -struct Task { - pub id: TaskId, +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Job { + pub id: JobId, pub name: String, - #[allow(clippy::struct_field_names)] - pub task_data: TaskData, - pub status: TaskStatus, + #[serde(rename = "task_data")] + pub data: JobData, + pub status: JobStatus, pub run_at: DateTime, pub interval: Option, + pub created_at: Option>, + pub updated_at: Option>, } -pub struct TaskRegistry { - handlers: Arc>, +pub struct JobRegistry { + handlers: Arc>, } -impl TaskRegistry { - /// Creates a new `TaskRegistry`. +impl JobRegistry { + /// Creates a new `JobRegistry`. #[must_use] pub fn new() -> Self { Self { @@ -52,7 +53,7 @@ impl TaskRegistry { } } - /// Registers a task handler with the provided name. + /// Registers a job handler with the provided name. /// # Errors /// Fails if cannot register worker pub fn register_worker(&mut self, name: String, worker: W) -> Result<()> @@ -62,11 +63,11 @@ impl TaskRegistry { for<'de> Args: Deserialize<'de>, { let worker = Arc::new(worker); - let wrapped_handler = move |_task_id: String, task_data: TaskData| { + let wrapped_handler = move |_job_id: String, job_data: JobData| { let w = worker.clone(); Box::pin(async move { - let args = serde_json::from_value::(task_data); + let args = serde_json::from_value::(job_data); match args { Ok(args) => w.perform(args).await, Err(err) => Err(err.into()), @@ -80,30 +81,30 @@ impl TaskRegistry { Ok(()) } - /// Returns a reference to the task handlers. + /// Returns a reference to the job handlers. #[must_use] - pub fn handlers(&self) -> &Arc> { + pub fn handlers(&self) -> &Arc> { &self.handlers } - /// Runs the task handlers with the provided number of workers. + /// Runs the job handlers with the provided number of workers. #[must_use] pub fn run(&self, pool: &SqlitePool, opts: &RunOpts) -> Vec> { - let mut tasks = Vec::new(); + let mut jobs = Vec::new(); let interval = opts.poll_interval_sec; for idx in 0..opts.num_workers { let handlers = self.handlers.clone(); let pool = pool.clone(); - let task = tokio::spawn(async move { + let job: JoinHandle<()> = tokio::spawn(async move { loop { trace!( pool_conns = pool.num_idle(), worker_num = idx, "sqlite workers stats" ); - let task_opt = match dequeue(&pool).await { + let job_opt = match dequeue(&pool).await { Ok(t) => t, Err(err) => { error!(err = err.to_string(), "cannot fetch from queue"); @@ -111,33 +112,33 @@ impl TaskRegistry { } }; - if let Some(task) = task_opt { - debug!(task_id = task.id, name = task.name, "working on task"); - if let Some(handler) = handlers.get(&task.name) { - match handler(task.id.clone(), task.task_data.clone()).await { + if let Some(job) = job_opt { + debug!(job_id = job.id, name = job.name, "working on job"); + if let Some(handler) = handlers.get(&job.name) { + match handler(job.id.clone(), job.data.clone()).await { Ok(()) => { if let Err(err) = - complete_task(&pool, &task.id, task.interval).await + complete_job(&pool, &job.id, job.interval).await { error!( err = err.to_string(), - task = ?task, - "cannot complete task" + job = ?job, + "cannot complete job" ); } } Err(err) => { - if let Err(err) = fail_task(&pool, &task.id, &err).await { + if let Err(err) = fail_job(&pool, &job.id, &err).await { error!( err = err.to_string(), - task = ?task, - "cannot fail task" + job = ?job, + "cannot fail job" ); } } } } else { - error!(task = task.name, "no handler found for task"); + error!(job_name = job.name, "no handler found for job"); } } else { sleep(Duration::from_secs(interval.into())).await; @@ -145,14 +146,14 @@ impl TaskRegistry { } }); - tasks.push(task); + jobs.push(job); } - tasks + jobs } } -impl Default for TaskRegistry { +impl Default for JobRegistry { fn default() -> Self { Self::new() } @@ -173,7 +174,7 @@ async fn connect(cfg: &SqliteQueueConfig) -> Result { Ok(pool) } -/// Initialize task tables +/// Initialize job tables /// /// # Errors /// @@ -181,12 +182,12 @@ async fn connect(cfg: &SqliteQueueConfig) -> Result { pub async fn initialize_database(pool: &SqlitePool) -> Result<()> { debug!("sqlite worker: initialize database"); sqlx::query( - r" + &format!(r" CREATE TABLE IF NOT EXISTS sqlt_loco_queue ( id TEXT NOT NULL, name TEXT NOT NULL, task_data JSON NOT NULL, - status TEXT NOT NULL DEFAULT 'queued', + status TEXT NOT NULL DEFAULT '{}', run_at TIMESTAMP NOT NULL, interval INTEGER, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -202,14 +203,14 @@ pub async fn initialize_database(pool: &SqlitePool) -> Result<()> { INSERT OR IGNORE INTO sqlt_loco_queue_lock (id, is_locked) VALUES (1, FALSE); CREATE INDEX IF NOT EXISTS idx_sqlt_queue_status_run_at ON sqlt_loco_queue(status, run_at); - ", + ", JobStatus::Queued), ) .execute(pool) .await?; Ok(()) } -/// Add a task +/// Add a job /// /// # Errors /// @@ -217,11 +218,11 @@ pub async fn initialize_database(pool: &SqlitePool) -> Result<()> { pub async fn enqueue( pool: &SqlitePool, name: &str, - task_data: TaskData, + data: JobData, run_at: DateTime, interval: Option, -) -> Result { - let task_data_json = serde_json::to_value(task_data)?; +) -> Result { + let data = serde_json::to_value(data)?; #[allow(clippy::cast_possible_truncation)] let interval_ms: Option = interval.map(|i| i.as_millis() as i64); @@ -232,7 +233,7 @@ pub async fn enqueue( DATETIME($4), $5)", ) .bind(id.clone()) - .bind(task_data_json) + .bind(data) .bind(name) .bind(run_at) .bind(interval_ms) @@ -241,7 +242,7 @@ pub async fn enqueue( Ok(id) } -async fn dequeue(client: &SqlitePool) -> Result> { +async fn dequeue(client: &SqlitePool) -> Result> { let mut tx = client.begin().await?; let acquired_write_lock = sqlx::query( @@ -263,29 +264,22 @@ async fn dequeue(client: &SqlitePool) -> Result> { "SELECT id, name, task_data, status, run_at, interval FROM sqlt_loco_queue WHERE - status = 'queued' AND + status = ? AND run_at <= CURRENT_TIMESTAMP ORDER BY run_at LIMIT 1", ) - // avoid using FromRow because it requires the 'macros' feature, which nothing - // in our dep tree uses, so it'll create smaller, faster builds if we do this manually - .map(|row: SqliteRow| Task { - id: row.get("id"), - name: row.get("name"), - task_data: row.get("task_data"), - status: row.get("status"), - run_at: row.get("run_at"), - interval: row.get("interval"), - }) + .bind(JobStatus::Queued.to_string()) + .map(|row: SqliteRow| to_job(&row).ok()) .fetch_optional(&mut *tx) - .await?; + .await? + .flatten(); - if let Some(task) = row { + if let Some(job) = row { sqlx::query( - "UPDATE sqlt_loco_queue SET status = 'processing', updated_at = CURRENT_TIMESTAMP \ - WHERE id = $1", + "UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2", ) - .bind(&task.id) + .bind(JobStatus::Processing.to_string()) + .bind(&job.id) .execute(&mut *tx) .await?; @@ -301,9 +295,9 @@ async fn dequeue(client: &SqlitePool) -> Result> { tx.commit().await?; - Ok(Some(task)) + Ok(Some(job)) } else { - // Release the write lock, no task found + // Release the write lock, no job found sqlx::query( "UPDATE sqlt_loco_queue_lock SET is_locked = FALSE, @@ -318,57 +312,146 @@ async fn dequeue(client: &SqlitePool) -> Result> { } } -async fn complete_task( - pool: &SqlitePool, - task_id: &TaskId, - interval_ms: Option, -) -> Result<()> { +async fn complete_job(pool: &SqlitePool, id: &JobId, interval_ms: Option) -> Result<()> { if let Some(interval_ms) = interval_ms { let next_run_at = Utc::now() + chrono::Duration::milliseconds(interval_ms); sqlx::query( - "UPDATE sqlt_loco_queue SET status = 'queued', updated_at = CURRENT_TIMESTAMP, run_at \ - = DATETIME($1) WHERE id = $2", + "UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP, run_at = \ + DATETIME($2) WHERE id = $3", ) + .bind(JobStatus::Queued.to_string()) .bind(next_run_at) - .bind(task_id) + .bind(id) .execute(pool) .await?; } else { sqlx::query( - "UPDATE sqlt_loco_queue SET status = 'completed', updated_at = CURRENT_TIMESTAMP \ - WHERE id = $1", + "UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2", ) - .bind(task_id) + .bind(JobStatus::Completed.to_string()) + .bind(id) .execute(pool) .await?; } Ok(()) } -async fn fail_task(pool: &SqlitePool, task_id: &TaskId, error: &crate::Error) -> Result<()> { +async fn fail_job(pool: &SqlitePool, id: &JobId, error: &crate::Error) -> Result<()> { let msg = error.to_string(); - error!(err = msg, "failed task"); + error!(err = msg, "failed job"); let error_json = serde_json::json!({ "error": msg }); sqlx::query( - "UPDATE sqlt_loco_queue SET status = 'failed', updated_at = CURRENT_TIMESTAMP, task_data \ - = json_patch(task_data, $1) WHERE id = $2", + "UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP, task_data = \ + json_patch(task_data, $2) WHERE id = $3", ) + .bind(JobStatus::Failed.to_string()) .bind(error_json) - .bind(task_id) + .bind(id) .execute(pool) .await?; Ok(()) } -/// Clear all tasks +/// Cancels jobs in the `sqlt_loco_queue` table by their name. +/// +/// This function updates the status of all jobs with the given `name` and a status of +/// [`JobStatus::Queued`] to [`JobStatus::Cancelled`]. The update also sets the `updated_at` timestamp to the +/// current time. +/// +/// # Errors +/// +/// This function will return an error if it fails +pub async fn cancel_jobs_by_name(pool: &SqlitePool, name: &str) -> Result<()> { + sqlx::query( + "UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE name = $2 \ + AND status = $3", + ) + .bind(JobStatus::Cancelled.to_string()) + .bind(name) + .bind(JobStatus::Queued.to_string()) + .execute(pool) + .await?; + Ok(()) +} + +/// Clear all jobs /// /// # Errors /// /// This function will return an error if it fails pub async fn clear(pool: &SqlitePool) -> Result<()> { - sqlx::query("DELETE from sqlt_loco_queue") - .execute(pool) - .await?; + // Clear all rows in the relevant tables + sqlx::query( + " + DELETE FROM sqlt_loco_queue; + DELETE FROM sqlt_loco_queue_lock; + ", + ) + .execute(pool) + .await?; + + Ok(()) +} + +/// Deletes jobs from the `sqlt_loco_queue` table based on their status. +/// +/// This function removes all jobs with a status that matches any of the statuses provided +/// in the `status` argument. The statuses are checked against the `status` column in the +/// database, and any matching rows are deleted. +/// +/// # Errors +/// +/// This function will return an error if it fails +pub async fn clear_by_status(pool: &SqlitePool, status: Vec) -> Result<()> { + let status_in = status + .iter() + .map(|s| format!("'{s}'")) + .collect::>() + .join(","); + + sqlx::query(&format!( + "DELETE FROM sqlt_loco_queue WHERE status IN ({status_in})" + )) + .execute(pool) + .await?; + + Ok(()) +} + +/// Deletes jobs from the `sqlt_loco_queue` table that are older than a specified number of days. +/// +/// This function removes jobs that have a `created_at` timestamp older than the provided +/// number of days. Additionally, if a `status` is provided, only jobs with a status matching +/// one of the provided values will be deleted. +/// +/// # Errors +/// +/// This function will return an error if it fails +pub async fn clear_jobs_older_than( + pool: &SqlitePool, + age_days: i64, + status: Option<&Vec>, +) -> Result<()> { + let cutoff_date = Utc::now() - chrono::Duration::days(age_days); + let threshold_date = cutoff_date.format("%+").to_string(); + + let mut query_builder = + QueryBuilder::::new("DELETE FROM sqlt_loco_queue WHERE created_at <= "); + query_builder.push_bind(threshold_date); + + if let Some(status_list) = status { + if !status_list.is_empty() { + let status_in = status_list + .iter() + .map(|s| format!("'{s}'")) + .collect::>() + .join(","); + + query_builder.push(format!(" AND status IN ({status_in})")); + } + } + + query_builder.build().execute(pool).await?; Ok(()) } @@ -397,7 +480,7 @@ pub struct RunOpts { /// This function will return an error if it fails pub async fn create_provider(qcfg: &SqliteQueueConfig) -> Result { let pool = connect(qcfg).await.map_err(Box::from)?; - let registry = TaskRegistry::new(); + let registry = JobRegistry::new(); Ok(Queue::Sqlite( pool, Arc::new(tokio::sync::Mutex::new(registry)), @@ -407,3 +490,599 @@ pub async fn create_provider(qcfg: &SqliteQueueConfig) -> Result { }, )) } + +/// Retrieves a list of jobs from the `sqlt_loco_queue` table in the database. +/// +/// This function queries the database for jobs, optionally filtering by their +/// `status`. If a status is provided, only jobs with statuses included in the +/// provided list will be fetched. If no status is provided, all jobs will be +/// returned. +/// +/// # Errors +/// +/// This function will return an error if it fails +pub async fn get_jobs( + pool: &SqlitePool, + status: Option<&Vec>, + age_days: Option, +) -> Result> { + let mut query = String::from("SELECT * FROM sqlt_loco_queue WHERE 1 = 1 "); + + if let Some(status) = status { + let status_in = status + .iter() + .map(|s| format!("'{s}'")) + .collect::>() + .join(","); + query.push_str(&format!("AND status IN ({status_in}) ")); + } + + if let Some(age_days) = age_days { + let cutoff_date = Utc::now() - chrono::Duration::days(age_days); + let threshold_date = cutoff_date.format("%+").to_string(); + query.push_str(&format!("AND created_at <= '{threshold_date}' ")); + } + + let rows = sqlx::query(&query).fetch_all(pool).await?; + Ok(rows.iter().filter_map(|row| to_job(row).ok()).collect()) +} + +/// Converts a row from the database into a [`Job`] object. +/// +/// This function takes a row from the `SQLite` database and manually extracts the necessary +/// fields to populate a [`Job`] object. +/// +/// **Note:** This function manually extracts values from the database row instead of using +/// the `FromRow` trait, which would require enabling the 'macros' feature in the dependencies. +/// The decision to avoid `FromRow` is made to keep the build smaller and faster, as the 'macros' +/// feature is unnecessary in the current dependency tree. +fn to_job(row: &SqliteRow) -> Result { + Ok(Job { + id: row.get("id"), + name: row.get("name"), + data: row.get("task_data"), + status: row.get::("status").parse().map_err(|err| { + let status: String = row.get("status"); + tracing::error!(status, err, "job status is unsupported"); + Error::string("invalid job status") + })?, + run_at: row.get("run_at"), + interval: row.get("interval"), + created_at: row.try_get("created_at").unwrap_or_default(), + updated_at: row.try_get("updated_at").unwrap_or_default(), + }) +} + +#[cfg(test)] +mod tests { + + use std::path::Path; + + use chrono::{NaiveDate, NaiveTime, TimeZone}; + use insta::{assert_debug_snapshot, with_settings}; + use sqlx::{query_as, FromRow, Pool, Sqlite}; + + use super::*; + use crate::tests_cfg; + + #[derive(Debug, Serialize, FromRow)] + pub struct TableInfo { + cid: i32, + name: String, + #[sqlx(rename = "type")] + _type: String, + notnull: bool, + dflt_value: Option, + pk: bool, + } + + #[derive(Debug, Serialize, FromRow)] + struct JobQueueLock { + id: i32, + is_locked: bool, + locked_at: Option>, + } + + fn reduction() -> &'static [(&'static str, &'static str)] { + &[ + ("[A-Z0-9]{26}", ""), + (r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", ""), + ] + } + + async fn init(db_path: &Path) -> Pool { + let qcfg = SqliteQueueConfig { + uri: format!( + "sqlite://{}?mode=rwc", + db_path.join("sample.sqlite").display() + ), + dangerously_flush: false, + enable_logging: false, + max_connections: 1, + min_connections: 1, + connect_timeout: 500, + idle_timeout: 500, + poll_interval_sec: 1, + num_workers: 1, + }; + + let pool = connect(&qcfg).await.unwrap(); + sqlx::raw_sql( + r" + DROP TABLE IF EXISTS sqlt_loco_queue; + DROP TABLE IF EXISTS sqlt_loco_queue_lock; + ", + ) + .execute(&pool) + .await + .expect("drop table if exists"); + + pool + } + + async fn get_all_jobs(pool: &SqlitePool) -> Vec { + sqlx::query("select * from sqlt_loco_queue") + .fetch_all(pool) + .await + .expect("get jobs") + .iter() + .filter_map(|row| to_job(row).ok()) + .collect() + } + + async fn get_job(pool: &SqlitePool, id: &str) -> Job { + sqlx::query(&format!("select * from sqlt_loco_queue where id = '{id}'")) + .fetch_all(pool) + .await + .expect("get jobs") + .first() + .and_then(|row| to_job(row).ok()) + .expect("job not found") + } + + #[tokio::test] + async fn can_initialize_database() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + + for table in ["sqlt_loco_queue", "sqlt_loco_queue_lock"] { + let table_info: Vec = + query_as::<_, TableInfo>(&format!("PRAGMA table_info({table})")) + .fetch_all(&pool) + .await + .unwrap(); + + assert_debug_snapshot!(table, table_info); + } + } + + #[tokio::test] + async fn can_enqueue() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + + let jobs = get_all_jobs(&pool).await; + + assert_eq!(jobs.len(), 0); + + let run_at = Utc.from_utc_datetime( + &NaiveDate::from_ymd_opt(2023, 1, 15) + .unwrap() + .and_time(NaiveTime::from_hms_opt(12, 30, 0).unwrap()), + ); + + let job_data = serde_json::json!({"user_id": 1}); + assert!( + enqueue(&pool, "PasswordChangeNotification", job_data, run_at, None) + .await + .is_ok() + ); + + let jobs = get_all_jobs(&pool).await; + + assert_eq!(jobs.len(), 1); + with_settings!({ + filters => reduction().iter().map(|&(pattern, replacement)| (pattern, replacement)), + }, { + assert_debug_snapshot!(jobs); + }); + + // validate lock status + let job_lock: JobQueueLock = + query_as::<_, JobQueueLock>("select * from sqlt_loco_queue_lock") + .fetch_one(&pool) + .await + .unwrap(); + + assert!(!job_lock.is_locked); + } + + #[tokio::test] + async fn can_dequeue() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + + let jobs = get_all_jobs(&pool).await; + + assert_eq!(jobs.len(), 0); + + let run_at = Utc.from_utc_datetime( + &NaiveDate::from_ymd_opt(2023, 1, 15) + .unwrap() + .and_time(NaiveTime::from_hms_opt(12, 30, 0).unwrap()), + ); + + let job_data = serde_json::json!({"user_id": 1}); + assert!( + enqueue(&pool, "PasswordChangeNotification", job_data, run_at, None) + .await + .is_ok() + ); + + let job_before_dequeue = get_all_jobs(&pool) + .await + .first() + .cloned() + .expect("gets first job"); + assert_eq!(job_before_dequeue.status, JobStatus::Queued); + + std::thread::sleep(std::time::Duration::from_secs(1)); + + assert!(dequeue(&pool).await.is_ok()); + + let job_after_dequeue = get_all_jobs(&pool) + .await + .first() + .cloned() + .expect("gets first job"); + + assert_ne!(job_after_dequeue.updated_at, job_before_dequeue.updated_at); + with_settings!({ + filters => reduction().iter().map(|&(pattern, replacement)| (pattern, replacement)), + }, { + assert_debug_snapshot!(job_after_dequeue); + }); + } + + #[tokio::test] + async fn can_complete_job_without_interval() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::sqlite_seed_data(&pool).await; + + let job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA99").await; + + assert_eq!(job.status, JobStatus::Queued); + assert!(complete_job(&pool, &job.id, None).await.is_ok()); + + let job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA99").await; + + assert_eq!(job.status, JobStatus::Completed); + } + + #[tokio::test] + async fn can_complete_job_with_interval() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::sqlite_seed_data(&pool).await; + + let before_complete_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA98").await; + assert_eq!(before_complete_job.status, JobStatus::Completed); + + std::thread::sleep(std::time::Duration::from_secs(1)); + + assert!(complete_job(&pool, &before_complete_job.id, Some(10)) + .await + .is_ok()); + + let after_complete_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA98").await; + + assert_ne!( + after_complete_job.updated_at, + before_complete_job.updated_at + ); + with_settings!({ + filters => reduction().iter().map(|&(pattern, replacement)| (pattern, replacement)), + }, { + assert_debug_snapshot!(after_complete_job); + }); + } + + #[tokio::test] + async fn can_fail_job() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::sqlite_seed_data(&pool).await; + + let before_fail_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA97").await; + + std::thread::sleep(std::time::Duration::from_secs(1)); + + assert!(fail_job( + &pool, + &before_fail_job.id, + &crate::Error::string("some error") + ) + .await + .is_ok()); + + let after_fail_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA97").await; + + assert_ne!(after_fail_job.updated_at, before_fail_job.updated_at); + with_settings!({ + filters => reduction().iter().map(|&(pattern, replacement)| (pattern, replacement)), + }, { + assert_debug_snapshot!(after_fail_job); + }); + } + + #[tokio::test] + async fn can_cancel_job_by_name() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::sqlite_seed_data(&pool).await; + + let count_cancelled_jobs = get_all_jobs(&pool) + .await + .iter() + .filter(|j| j.status == JobStatus::Cancelled) + .count(); + + assert_eq!(count_cancelled_jobs, 1); + + assert!(cancel_jobs_by_name(&pool, "UserAccountActivation") + .await + .is_ok()); + + let count_cancelled_jobs = get_all_jobs(&pool) + .await + .iter() + .filter(|j| j.status == JobStatus::Cancelled) + .count(); + + assert_eq!(count_cancelled_jobs, 2); + } + + #[tokio::test] + async fn can_clear() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::sqlite_seed_data(&pool).await; + + let job_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sqlt_loco_queue") + .fetch_one(&pool) + .await + .unwrap(); + let lock_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sqlt_loco_queue_lock") + .fetch_one(&pool) + .await + .unwrap(); + assert_ne!(job_count, 0); + assert_ne!(lock_count, 0); + + assert!(clear(&pool).await.is_ok()); + let job_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sqlt_loco_queue") + .fetch_one(&pool) + .await + .unwrap(); + let lock_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sqlt_loco_queue_lock") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(job_count, 0); + assert_eq!(lock_count, 0); + } + + #[tokio::test] + async fn can_clear_by_status() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::sqlite_seed_data(&pool).await; + + let jobs = get_all_jobs(&pool).await; + + assert_eq!(jobs.len(), 14); + assert_eq!( + jobs.iter() + .filter(|j| j.status == JobStatus::Completed) + .count(), + 3 + ); + assert_eq!( + jobs.iter() + .filter(|j| j.status == JobStatus::Failed) + .count(), + 2 + ); + + assert!( + clear_by_status(&pool, vec![JobStatus::Completed, JobStatus::Failed]) + .await + .is_ok() + ); + let jobs = get_all_jobs(&pool).await; + + assert_eq!(jobs.len(), 9); + assert_eq!( + jobs.iter() + .filter(|j| j.status == JobStatus::Completed) + .count(), + 0 + ); + assert_eq!( + jobs.iter() + .filter(|j| j.status == JobStatus::Failed) + .count(), + 0 + ); + } + + #[tokio::test] + async fn can_clear_jobs_older_than() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + + sqlx::query( + r"INSERT INTO sqlt_loco_queue (id, name, task_data, status,run_at, created_at, updated_at) VALUES + ('job1', 'Test Job 1', '{}', 'queued', CURRENT_TIMESTAMP,DATETIME('now', '-15 days'), CURRENT_TIMESTAMP), + ('job2', 'Test Job 2', '{}', 'queued', CURRENT_TIMESTAMP, DATETIME('now', '-5 days'), CURRENT_TIMESTAMP), + ('job3', 'Test Job 3', '{}', 'queued', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)", + ) + .execute(&pool) + .await + .unwrap(); + + assert_eq!(get_all_jobs(&pool).await.len(), 3); + assert!(clear_jobs_older_than(&pool, 10, None).await.is_ok()); + assert_eq!(get_all_jobs(&pool).await.len(), 2); + } + + #[tokio::test] + async fn can_clear_jobs_older_than_with_status() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + + sqlx::query( + r"INSERT INTO sqlt_loco_queue (id, name, task_data, status,run_at, created_at, updated_at) VALUES + ('job1', 'Test Job 1', '{}', 'completed', CURRENT_TIMESTAMP,DATETIME('now', '-20 days'), CURRENT_TIMESTAMP), + ('job2', 'Test Job 2', '{}', 'failed', CURRENT_TIMESTAMP,DATETIME('now', '-15 days'), CURRENT_TIMESTAMP), + ('job3', 'Test Job 3', '{}', 'completed', CURRENT_TIMESTAMP, DATETIME('now', '-5 days'), CURRENT_TIMESTAMP), + ('job4', 'Test Job 4', '{}', 'cancelled', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)", + ) + .execute(&pool) + .await + .unwrap(); + + assert_eq!(get_all_jobs(&pool).await.len(), 4); + assert!(clear_jobs_older_than( + &pool, + 10, + Some(&vec![JobStatus::Cancelled, JobStatus::Completed]) + ) + .await + .is_ok()); + + assert_eq!(get_all_jobs(&pool).await.len(), 3); + } + + #[tokio::test] + async fn can_get_jobs() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::sqlite_seed_data(&pool).await; + + assert_eq!( + get_jobs(&pool, Some(&vec![JobStatus::Failed]), None) + .await + .expect("get jobs") + .len(), + 2 + ); + assert_eq!( + get_jobs( + &pool, + Some(&vec![JobStatus::Failed, JobStatus::Completed]), + None + ) + .await + .expect("get jobs") + .len(), + 5 + ); + assert_eq!( + get_jobs(&pool, None, None).await.expect("get jobs").len(), + 14 + ); + } + + #[tokio::test] + async fn can_get_jobs_with_age() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + assert!(initialize_database(&pool).await.is_ok()); + + sqlx::query( + r"INSERT INTO sqlt_loco_queue (id, name, task_data, status,run_at, created_at, updated_at) VALUES + ('job1', 'Test Job 1', '{}', 'completed', CURRENT_TIMESTAMP,DATETIME('now', '-20 days'), CURRENT_TIMESTAMP), + ('job2', 'Test Job 2', '{}', 'failed', CURRENT_TIMESTAMP,DATETIME('now', '-15 days'), CURRENT_TIMESTAMP), + ('job3', 'Test Job 3', '{}', 'completed', CURRENT_TIMESTAMP, DATETIME('now', '-5 days'), CURRENT_TIMESTAMP), + ('job4', 'Test Job 4', '{}', 'cancelled', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)", + ) + .execute(&pool) + .await + .unwrap(); + assert_eq!( + get_jobs( + &pool, + Some(&vec![JobStatus::Failed, JobStatus::Completed]), + Some(10) + ) + .await + .expect("get jobs") + .len(), + 2 + ); + } +} diff --git a/src/cli.rs b/src/cli.rs index bbcfa30e..d380c800 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -32,6 +32,7 @@ use loco_gen::{Component, ScaffoldKind}; use crate::{ app::{AppContext, Hooks}, + bgworker::JobStatus, boot::{ create_app, create_context, list_endpoints, list_middlewares, run_scheduler, run_task, start, RunDbCommand, ServeParams, StartMode, @@ -105,6 +106,12 @@ enum Commands { #[clap(value_parser = parse_key_val::)] params: Vec<(String, String)>, }, + #[cfg(any(feature = "bg_redis", feature = "bg_pg", feature = "bg_sqlt"))] + /// Managing jobs queue. + Jobs { + #[command(subcommand)] + command: JobsCommands, + }, /// Run the scheduler Scheduler { /// Run a specific job by its name. @@ -418,6 +425,49 @@ impl From for RunDbCommand { } } +#[cfg(any(feature = "bg_redis", feature = "bg_pg", feature = "bg_sqlt"))] +#[derive(Subcommand)] +enum JobsCommands { + /// Cancels jobs with the specified names, setting their status to + /// `cancelled`. + Cancel { + /// Names of jobs to cancel. + #[arg(long)] + name: String, + }, + /// Deletes jobs that are either completed or cancelled. + Tidy {}, + /// Deletes jobs based on their age in days. + Purge { + /// Deletes jobs with errors or cancelled, older than the specified maximum age in days. + #[arg(long, default_value_t = 90)] + max_age: i64, + /// Limits the jobs being saved to those with specific criteria like + /// completed or queued. + #[arg(long, use_value_delimiter = true)] + status: Option>, + /// Saves the details of jobs into a file before deleting them. + #[arg(long)] + dump: Option, + }, + /// Saves the details of all jobs to files in the specified folder. + Dump { + /// Limits the jobs being saved to those with specific criteria like + /// completed or queued. + #[arg(long, use_value_delimiter = true)] + status: Option>, + /// Folder to save the job files (default: current directory). + #[arg(short, long, default_value = ".")] + folder: PathBuf, + }, + /// Imports jobs from a file. + Import { + /// Path to the file containing job details to import. + #[arg(short, long)] + file: PathBuf, + }, +} + /// Parse a single key-value pair fn parse_key_val( s: &str, @@ -524,6 +574,8 @@ pub async fn main() -> crate::Result<()> { run_db::(&app_context, command.into()).await?; } } + #[cfg(any(feature = "bg_redis", feature = "bg_pg", feature = "bg_sqlt"))] + Commands::Jobs { command } => handle_job_command::(command, &environment).await?, Commands::Routes {} => { let app_context = create_context::(&environment).await?; show_list_endpoints::(&app_context); @@ -689,6 +741,8 @@ pub async fn main() -> crate::Result<()> { let app_context = create_context::(&environment).await?; run_task::(&app_context, name.as_ref(), &vars).await?; } + #[cfg(any(feature = "bg_redis", feature = "bg_pg", feature = "bg_sqlt"))] + Commands::Jobs { command } => handle_job_command::(command, &environment).await?, Commands::Scheduler { name, config, @@ -745,3 +799,60 @@ fn show_list_endpoints(ctx: &AppContext) { fn create_root_span(environment: &Environment) -> tracing::Span { tracing::span!(tracing::Level::DEBUG, "app", environment = %environment) } + +#[cfg(any(feature = "bg_redis", feature = "bg_pg", feature = "bg_sqlt"))] +async fn handle_job_command( + command: JobsCommands, + environment: &Environment, +) -> crate::Result<()> { + let app_context = create_context::(environment).await?; + let queue = app_context.queue_provider.map_or_else( + || { + println!("queue not configured"); + exit(1); + }, + |queue_provider| queue_provider, + ); + + match &command { + JobsCommands::Cancel { name } => queue.cancel_jobs(name).await, + JobsCommands::Tidy {} => { + queue + .clear_by_status(vec![JobStatus::Completed, JobStatus::Cancelled]) + .await + } + JobsCommands::Purge { + max_age, + status, + dump, + } => { + let status = status.as_ref().map_or_else( + || { + vec![ + JobStatus::Failed, + JobStatus::Cancelled, + JobStatus::Queued, + JobStatus::Completed, + ] + }, + std::clone::Clone::clone, + ); + + if let Some(path) = dump { + let dump_path = queue + .dump(path.as_path(), Some(&status), Some(*max_age)) + .await?; + + println!("Jobs successfully dumped to: {}", dump_path.display()); + } + + queue.clear_jobs_older_than(*max_age, &status).await + } + JobsCommands::Dump { status, folder } => { + let dump_path = queue.dump(folder.as_path(), status.as_ref(), None).await?; + println!("Jobs successfully dumped to: {}", dump_path.display()); + Ok(()) + } + JobsCommands::Import { file } => queue.import(file.as_path()).await, + } +} diff --git a/src/errors.rs b/src/errors.rs index d26a101f..d5aef1c9 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -132,7 +132,7 @@ pub enum Error { #[error(transparent)] Redis(#[from] sidekiq::redis_rs::RedisError), - #[cfg(feature = "bg_pg")] + #[cfg(any(feature = "bg_pg", feature = "bg_sqlt"))] #[error(transparent)] Sqlx(#[from] sqlx::Error), diff --git a/src/tests_cfg/mod.rs b/src/tests_cfg/mod.rs index de88ab4e..f32873b3 100644 --- a/src/tests_cfg/mod.rs +++ b/src/tests_cfg/mod.rs @@ -1,6 +1,7 @@ -#[cfg(feature = "with-db")] -pub mod db; - pub mod app; pub mod config; +#[cfg(feature = "with-db")] +pub mod db; +#[cfg(any(feature = "bg_pg", feature = "bg_sqlt"))] +pub mod queue; pub mod task; diff --git a/src/tests_cfg/queue.rs b/src/tests_cfg/queue.rs new file mode 100644 index 00000000..8d59105f --- /dev/null +++ b/src/tests_cfg/queue.rs @@ -0,0 +1,85 @@ +#[cfg(any(feature = "bg_pg", feature = "bg_sqlt"))] +use crate::bgworker; +use std::path::PathBuf; + +#[cfg(feature = "bg_pg")] +/// # Panics +/// +/// This function will panic if it fails to prepare or insert the seed data, causing the tests to fail quickly +/// and preventing further test execution with incomplete setup. +pub async fn postgres_seed_data(pool: &sqlx::PgPool) { + let yaml_tasks = std::fs::read_to_string( + PathBuf::from("tests") + .join("fixtures") + .join("queue") + .join("jobs.yaml"), + ) + .expect("Failed to read YAML file"); + + let tasks: Vec = + serde_yaml::from_str(&yaml_tasks).expect("Failed to parse YAML"); + for task in tasks { + sqlx::query( + r" + INSERT INTO pg_loco_queue (id, name, task_data, status, run_at, interval, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, NULL, $6, $7) + ", + ) + .bind(task.id) + .bind(task.name) + .bind(task.data) + .bind(task.status.to_string()) + .bind(task.run_at) + .bind(task.created_at) + .bind(task.updated_at) + .execute(pool) + .await.expect("execute insert query"); + } +} + +#[cfg(feature = "bg_sqlt")] +/// # Panics +/// +/// This function will panic if it fails to prepare or insert the seed data, causing the tests to fail quickly +/// and preventing further test execution with incomplete setup. +pub async fn sqlite_seed_data(pool: &sqlx::Pool) { + let yaml_tasks = std::fs::read_to_string( + PathBuf::from("tests") + .join("fixtures") + .join("queue") + .join("jobs.yaml"), + ) + .expect("Failed to read YAML file"); + + let tasks: Vec = + serde_yaml::from_str(&yaml_tasks).expect("Failed to parse YAML"); + for task in tasks { + sqlx::query( + r" + INSERT INTO sqlt_loco_queue (id, name, task_data, status, run_at, interval, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, NULL, ?, ?) + " + ) + .bind(task.id) + .bind(task.name) + .bind(task.data.to_string()) + .bind(task.status.to_string()) + .bind(task.run_at) + .bind(task.created_at) + .bind(task.updated_at) + .execute(pool) + .await.expect("create row"); + } + + sqlx::query( + r" + INSERT INTO sqlt_loco_queue_lock (id, is_locked, locked_at) + VALUES (1, FALSE, NULL) + ON CONFLICT (id) DO NOTHING; + + ", + ) + .execute(pool) + .await + .expect("execute insert query"); +} diff --git a/tests/fixtures/queue/jobs.yaml b/tests/fixtures/queue/jobs.yaml new file mode 100644 index 00000000..a0eb461b --- /dev/null +++ b/tests/fixtures/queue/jobs.yaml @@ -0,0 +1,153 @@ +- id: "01JDM0X8EVAM823JZBGKYNBA99" + name: "UserAccountActivation" + task_data: + user_id: 133 + email: "user11@example.com" + activation_token: "abcdef123456" + status: "queued" + run_at: "2024-11-28T08:19:08Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA98" + name: "PasswordChangeNotification" + task_data: + user_id: 134 + email: "user12@example.com" + change_time: "2024-11-27T12:30:00Z" + status: "completed" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA97" + name: "SendInvoice" + task_data: + user_id: 135 + email: "user13@example.com" + invoice_id: "INV-2024-01" + status: "processing" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA96" + name: "UserDeactivation" + task_data: + user_id: 136 + email: "user14@example.com" + deactivation_reason: "user requested" + status: "failed" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA95" + name: "SubscriptionReminder" + task_data: + user_id: 137 + email: "user15@example.com" + renewal_date: "2024-12-01" + status: "queued" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA94" + name: "DataBackup" + task_data: + backup_id: "backup-12345" + user_id: 138 + email: "user16@example.com" + status: "cancelled" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA93" + name: "SecurityAlert" + task_data: + user_id: 139 + email: "user17@example.com" + alert_type: "login attempt from new device" + status: "queued" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA92" + name: "WeeklyReportEmail" + task_data: + user_id: 140 + email: "user18@example.com" + report_period: "2024-11-20 to 2024-11-27" + status: "processing" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA91" + name: "AccountDeletion" + task_data: + user_id: 142 + email: "user20@example.com" + deletion_request_time: "2024-11-27T14:00:00Z" + status: "queued" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA90" + name: "UserAccountActivation" + task_data: + user_id: 143 + email: "user21@example.com" + activation_token: "xyz987654" + status: "completed" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA89" + name: "PasswordChangeNotification" + task_data: + user_id: 144 + email: "user22@example.com" + change_time: "2024-11-27T15:00:00Z" + status: "completed" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA88" + name: "SendInvoice" + task_data: + user_id: 145 + email: "user23@example.com" + invoice_id: "INV-2024-02" + status: "processing" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA87" + name: "UserDeactivation" + task_data: + user_id: 146 + email: "user24@example.com" + deactivation_reason: "account inactive" + status: "failed" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" + +- id: "01JDM0X8EVAM823JZBGKYNBA86" + name: "SubscriptionReminder" + task_data: + user_id: 147 + email: "user25@example.com" + renewal_date: "2024-12-05" + status: "queued" + run_at: "2024-11-28T08:04:25Z" + created_at: "2024-11-28T08:03:25Z" + updated_at: "2024-11-28T08:03:25Z" \ No newline at end of file