From f1d50081f364e16d43c1a779c41eab94c09c92d3 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Fri, 23 Aug 2024 20:20:00 +0300 Subject: [PATCH] fetch and janitor run migrations --- rust/cyclotron-core/src/lib.rs | 3 +++ rust/cyclotron-core/src/ops/meta.rs | 10 +++++++++- rust/cyclotron-core/src/worker.rs | 10 +++++++++- rust/cyclotron-fetch/src/main.rs | 2 ++ rust/cyclotron-fetch/tests/fetch.rs | 8 ++++++++ rust/cyclotron-janitor/src/janitor.rs | 4 ++++ rust/cyclotron-janitor/src/main.rs | 2 ++ 7 files changed, 37 insertions(+), 2 deletions(-) diff --git a/rust/cyclotron-core/src/lib.rs b/rust/cyclotron-core/src/lib.rs index dde31d7d3e1e2..3e8b523aedcd8 100644 --- a/rust/cyclotron-core/src/lib.rs +++ b/rust/cyclotron-core/src/lib.rs @@ -36,6 +36,9 @@ mod config; pub use config::ManagerConfig; pub use config::PoolConfig; +// Meta +pub use ops::meta::run_migrations; + // Some data is shared between workers and janitors on a given shard, using // the metadata table. These keys are reserved for that purpose diff --git a/rust/cyclotron-core/src/ops/meta.rs b/rust/cyclotron-core/src/ops/meta.rs index 7257b1f6b36dc..883c901276351 100644 --- a/rust/cyclotron-core/src/ops/meta.rs +++ b/rust/cyclotron-core/src/ops/meta.rs @@ -1,4 +1,4 @@ -use sqlx::postgres::PgQueryResult; +use sqlx::{postgres::PgQueryResult, PgPool}; use uuid::Uuid; use crate::error::QueueError; @@ -24,3 +24,11 @@ pub fn throw_if_no_rows(res: PgQueryResult, job: Uuid, lock: Uuid) -> Result<(), Ok(()) } } + +/// Run the latest cyclotron migrations. Panics if the migrations can't be run - failure to run migrations is purposefully fatal. +pub async fn run_migrations(pool: &PgPool) { + sqlx::migrate!("./migrations") + .run(pool) + .await + .expect("Failed to run migrations"); +} diff --git a/rust/cyclotron-core/src/worker.rs b/rust/cyclotron-core/src/worker.rs index b99be77f3f52e..03a4c19bdab83 100644 --- a/rust/cyclotron-core/src/worker.rs +++ b/rust/cyclotron-core/src/worker.rs @@ -6,7 +6,10 @@ use std::sync::Mutex; use uuid::Uuid; use crate::{ - ops::worker::{dequeue_jobs, dequeue_with_vm_state, flush_job, get_vm_state, set_heartbeat}, + ops::{ + meta::run_migrations, + worker::{dequeue_jobs, dequeue_with_vm_state, flush_job, get_vm_state, set_heartbeat}, + }, Job, JobState, JobUpdate, PoolConfig, QueueError, }; @@ -49,6 +52,11 @@ impl Worker { } } + /// Run the latest cyclotron migrations. Panics if the migrations can't be run - failure to run migrations is purposefully fatal. + pub async fn run_migrations(&self) { + run_migrations(&self.pool).await; + } + /// Dequeues jobs from the queue, and returns them. Job sorting happens at the queue level, /// workers can't provide any filtering or sorting criteria - queue managers decide which jobs are run, /// workers just run them. diff --git a/rust/cyclotron-fetch/src/main.rs b/rust/cyclotron-fetch/src/main.rs index 7e32ca2929384..c0c02c6f5404b 100644 --- a/rust/cyclotron-fetch/src/main.rs +++ b/rust/cyclotron-fetch/src/main.rs @@ -75,6 +75,8 @@ async fn main() { .await .expect("failed to create app context"); + context.worker.run_migrations().await; + let http_server = tokio::spawn(listen(app, bind)); let worker_loop = tokio::spawn(worker_loop(context)); diff --git a/rust/cyclotron-fetch/tests/fetch.rs b/rust/cyclotron-fetch/tests/fetch.rs index 0fe7d565eca19..18a7469c6e7fe 100644 --- a/rust/cyclotron-fetch/tests/fetch.rs +++ b/rust/cyclotron-fetch/tests/fetch.rs @@ -13,6 +13,14 @@ use utils::{ mod utils; +#[sqlx::test(migrations = "../cyclotron-core/migrations")] +pub async fn test_run_migrations(db: PgPool) { + // This is a no-op, since the db sqlx::test gives use already has the migrations run, but it asserts that the migrations + // being run repeatedly doesn't cause any issues, and that the migrations being run are the same as the ones in the core + let context = get_app_test_context(db).await; + context.worker.run_migrations().await; +} + #[sqlx::test(migrations = "../cyclotron-core/migrations")] pub async fn test_completes_fetch(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); diff --git a/rust/cyclotron-janitor/src/janitor.rs b/rust/cyclotron-janitor/src/janitor.rs index 19b6bf404888b..60796e9f7e11b 100644 --- a/rust/cyclotron-janitor/src/janitor.rs +++ b/rust/cyclotron-janitor/src/janitor.rs @@ -56,6 +56,10 @@ impl Janitor { } } + pub async fn run_migrations(&self) { + cyclotron_core::run_migrations(&self.pool).await; + } + pub async fn run_once(&self) -> Result { info!("Running janitor loop"); let _loop_start = common_metrics::timing_guard(RUN_TIME, &self.metrics_labels); diff --git a/rust/cyclotron-janitor/src/main.rs b/rust/cyclotron-janitor/src/main.rs index e46a158c58d54..0db35e52b8bc5 100644 --- a/rust/cyclotron-janitor/src/main.rs +++ b/rust/cyclotron-janitor/src/main.rs @@ -70,6 +70,8 @@ async fn main() { .await .expect("failed to create janitor"); + janitor.run_migrations().await; + let janitor_liveness = liveness .register( "janitor".to_string(),