Skip to content

Commit

Permalink
tests: Add the first integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
leo91000 committed Feb 1, 2024
1 parent 4061d97 commit 039bc15
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 12 deletions.
18 changes: 14 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ jobs:
test:
timeout-minutes: 20
runs-on: ubuntu-22.04
services:
postgres:
# Minimum is 13
image: postgres:16
env:
POSTGRES_USER: testuser
POSTGRES_PASSWORD: testpassword
POSTGRES_DB: testdb
ports:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -30,7 +41,6 @@ jobs:
with:
token: ${{ secrets.GITHUB_TOKEN }}
- name: Test
uses: actions-rs/cargo@v1
with:
command: test
args: --all
run: cargo test --all
env:
DATABASE_URL: postgres://testuser:testpassword@localhost:5432/testdb
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,4 @@ cfg-if = "1.0.0"
[dev-dependencies]
tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
uuid = { version = "1.7.0", features = ["v7"] }
16 changes: 14 additions & 2 deletions crates/migrations/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,23 @@ where
Ok(())
}

#[derive(FromRow, Default)]
#[derive(FromRow, Debug)]
pub struct LastMigration {
server_version_num: String,
id: Option<i32>,
biggest_breaking_id: Option<i32>,
}

impl Default for LastMigration {
fn default() -> Self {
Self {
server_version_num: String::from("120000"),
id: None,
biggest_breaking_id: None,
}
}
}

/// Returns the last migration that was run against the database.
/// It also installs the Graphile Worker schema if it doesn't exist.
async fn get_last_migration<'e, E>(
Expand Down Expand Up @@ -92,7 +102,7 @@ where

if code == "42P01" {
install_schema(executor.clone(), escaped_schema).await?;
Default::default()
return Ok(Default::default());
}

return Err(MigrateError::SqlError(SqlxError::Database(e)));
Expand All @@ -118,8 +128,10 @@ where
E: PgExecutor<'e> + Acquire<'e, Database = Postgres> + Send + Sync + Clone,
{
let last_migration = get_last_migration(&executor, escaped_schema).await?;
dbg!(5, &last_migration);

check_postgres_version(&last_migration.server_version_num)?;
dbg!(6, &last_migration);
let latest_migration = last_migration.id;
let latest_breaking_migration = last_migration.biggest_breaking_id;

Expand Down
2 changes: 2 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set dotenv-load

test:
cargo test --all

Expand Down
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
mod builder;
pub mod builder;
pub mod errors;
mod helpers;
mod runner;
mod sql;
mod streams;
mod utils;
pub mod helpers;
pub mod runner;
pub mod sql;
pub mod streams;
pub mod utils;

pub use crate::sql::add_job::JobSpec;
pub use graphile_worker_crontab_parser::parse_crontab;
Expand Down
130 changes: 130 additions & 0 deletions tests/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use chrono::{DateTime, Utc};
use graphile_worker::WorkerOptions;
use serde_json::Value;
use sqlx::postgres::PgConnectOptions;
use sqlx::FromRow;
use sqlx::PgPool;
use tokio::task::LocalSet;

#[derive(FromRow, Debug)]
#[allow(dead_code)]
pub struct Job {
pub id: i64,
pub job_queue_id: Option<i32>,
pub queue_name: Option<String>,
pub payload: serde_json::Value,
pub priority: i16,
pub run_at: DateTime<Utc>,
pub attempts: i16,
pub max_attempts: i16,
pub last_error: Option<String>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub key: Option<String>,
pub revision: i32,
pub locked_at: Option<DateTime<Utc>>,
pub locked_by: Option<String>,
pub flags: Option<Value>,
pub task_id: i32,
}

#[derive(Clone)]
pub struct TestDatabase {
pub source_pool: PgPool,
pub test_pool: PgPool,
pub name: String,
}

impl TestDatabase {
async fn drop(&self) {
self.test_pool.close().await;
sqlx::query(&format!("DROP DATABASE {} WITH (FORCE)", self.name))
.execute(&self.source_pool)
.await
.expect("Failed to drop test database");
}

pub fn create_worker_options(&self) -> WorkerOptions {
WorkerOptions::default()
.pg_pool(self.test_pool.clone())
.schema("graphile_worker")
.concurrency(4)
}

pub async fn get_jobs(&self) -> Vec<Job> {
sqlx::query_as(
r#"
select jobs.*, identifier as task_identifier, job_queues.queue_name as queue_name
from graphile_worker._private_jobs as jobs
left join graphile_worker._private_tasks as tasks on jobs.task_id = tasks.id
left join graphile_worker._private_job_queues as job_queues on jobs.job_queue_id = job_queues.id
order by jobs.id asc
"#
)
.fetch_all(&self.test_pool)
.await
.expect("Failed to get jobs")
}
}

async fn create_test_database() -> TestDatabase {
let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let pg_conn_options: PgConnectOptions = db_url.parse().expect("Failed to parse DATABASE_URL");

let pg_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect_with(pg_conn_options.clone())
.await
.expect("Failed to connect to database");

let db_id = uuid::Uuid::now_v7();
let db_name = format!("__test_graphile_worker_{}", db_id.simple());

sqlx::query(&format!("CREATE DATABASE {}", db_name))
.execute(&pg_pool)
.await
.expect("Failed to create test database");

sqlx::query(&format!("CREATE SCHEMA {}", db_name))
.execute(&pg_pool)
.await
.expect("Failed to create test schema");

let test_options = pg_conn_options.database(&db_name);

let test_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect_with(test_options)
.await
.expect("Failed to connect to test database");

TestDatabase {
source_pool: pg_pool,
test_pool,
name: db_name,
}
}

// The `with_test_db` function
pub async fn with_test_db<F, Fut>(test_fn: F)
where
F: FnOnce(TestDatabase) -> Fut + 'static,
Fut: std::future::Future<Output = ()>,
{
let local_set = LocalSet::new();

local_set
.run_until(async move {
let test_db = create_test_database().await;
let test_db_2 = test_db.clone();

let result = tokio::task::spawn_local(async move {
test_fn(test_db_2).await;
})
.await;

test_db.drop().await;
result.expect("Test failed");
})
.await;
}
46 changes: 46 additions & 0 deletions tests/forbidden_flags.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use graphile_worker::JobSpec;
use serde_json::json;

mod common;

#[tokio::test]
async fn it_supports_the_flags_api() {
common::with_test_db(|test_db| async move {
let worker = test_db
.create_worker_options()
.define_raw_job("job3" as &str, |_, _: serde_json::Value| async move {
Ok(()) as Result<(), ()>
})
.init()
.await
.expect("Failed to create worker");
let utils = worker.create_utils();

utils
.add_raw_job(
"job3",
json!({ "a": 1 }),
Some(JobSpec {
flags: Some(vec!["a".to_string(), "b".to_string()]),
..Default::default()
}),
)
.await
.expect("Failed to add job");
drop(utils);

let jobs = test_db.get_jobs().await;

assert_eq!(jobs.len(), 1);
assert_eq!(
jobs[0].flags,
Some(json!({
"a": true,
"b": true,
}))
);

worker.run_once().await.expect("Failed to run worker");
})
.await;
}

0 comments on commit 039bc15

Please sign in to comment.