diff --git a/orm/src/migrations.rs b/orm/src/migrations.rs index 64df5e47e..d81f851df 100644 --- a/orm/src/migrations.rs +++ b/orm/src/migrations.rs @@ -1,66 +1,119 @@ use deadpool_diesel::postgres::Object; use deadpool_diesel::InteractError; -use diesel::prelude::*; // Add this for RunQueryDsl -use diesel::sql_types::Bool; // Add this for the SQL query type -use diesel_migrations::{ - embed_migrations, EmbeddedMigrations, MigrationHarness, -}; -use std::{thread, time::Duration}; // Use thread::sleep instead of tokio::sleep +use diesel::prelude::*; +use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; +use std::{thread, time::Duration}; +use thiserror::Error; +use tracing::{debug, error, info, warn}; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); +const MIGRATION_LOCK_ID: i64 = 42; // Arbitrary but consistent lock ID +const MAX_RETRIES: u32 = 5; +const BASE_RETRY_DELAY_SECS: u64 = 1; -pub async fn run_migrations(conn: &Object) -> Result<(), InteractError> { - let max_retries = 5; +#[derive(Debug, Error)] +pub enum MigrationError { + #[error("Max retries ({0}) exceeded while attempting migrations")] + MaxRetriesExceeded(u32), + #[error("Database interaction error: {0}")] + DbError(#[from] InteractError), + #[error("Failed to acquire migration lock after {0} attempts")] + LockAcquisitionFailed(u32), +} + +pub async fn run_migrations(conn: &Object) -> Result<(), MigrationError> { let mut attempt = 0; - while attempt < max_retries { - let result = conn.interact(|transaction_conn| { - // Try to acquire an advisory lock - let lock_result: QueryResult = diesel::sql_query("SELECT pg_try_advisory_lock(1)") - .get_result::<(bool,)>(transaction_conn) - .map(|row| row.0); - - match lock_result { - Ok(true) => { - // We got the lock, try to run migrations - let migration_result = transaction_conn.run_pending_migrations(MIGRATIONS); - - // Release the lock regardless of migration success - let _ = diesel::sql_query("SELECT pg_advisory_unlock(1)") - .execute(transaction_conn); - - migration_result.map(|_| ()) - }, - Ok(false) => { - // Someone else has the lock, treat as temporary failure - Ok(()) - }, - Err(e) => Err(e), - } - }) - .await; + while attempt < MAX_RETRIES { + info!("Attempting database migration (attempt {}/{})", attempt + 1, MAX_RETRIES); - match result { - Ok(_) => return Ok(()), // Success or someone else running migrations + match try_run_migration(conn).await { + Ok(_) => { + info!("Database migration completed successfully"); + return Ok(()); + } Err(e) => { - if e.to_string().contains("already exists") { - // If it's just a "prepared statement already exists" error, - // migrations probably succeeded - return Ok(()); + if let MigrationError::DbError(ref db_error) = e { + if db_error.to_string().contains("already exists") { + info!("Migrations appear to be already applied"); + return Ok(()); + } } - // For other errors, retry after delay + attempt += 1; - if attempt < max_retries { - thread::sleep(Duration::from_secs(1 << attempt)); // Use thread::sleep + if attempt < MAX_RETRIES { + let delay = BASE_RETRY_DELAY_SECS << attempt; + warn!("Migration attempt failed, retrying in {} seconds: {}", delay, e); + thread::sleep(Duration::from_secs(delay)); continue; } - return Err(e); + + error!("All migration attempts failed: {}", e); + return Err(MigrationError::MaxRetriesExceeded(MAX_RETRIES)); } } } - Err(InteractError::Error(Box::new(std::io::Error::new( // Fixed error creation - std::io::ErrorKind::Other, - "Max retries exceeded" - )))) + Err(MigrationError::MaxRetriesExceeded(MAX_RETRIES)) +} + +async fn try_run_migration(conn: &Object) -> Result<(), MigrationError> { + conn.interact(|transaction_conn| { + // Try to acquire an advisory lock + #[derive(QueryableByName)] + struct LockResult { + #[diesel(sql_type = diesel::sql_types::Bool)] + pg_try_advisory_lock: bool, + } + + let lock_acquired = diesel::sql_query( + format!("SELECT pg_try_advisory_lock({}) as pg_try_advisory_lock", MIGRATION_LOCK_ID) + ) + .get_result::(transaction_conn) + .map(|r| r.pg_try_advisory_lock)?; + + if !lock_acquired { + debug!("Migration lock is held by another process"); + return Ok(()); + } + + info!("Acquired migration lock, running migrations..."); + + // Use a transaction for the migration + let result = transaction_conn + .build_transaction() + .read_write() + .run(|conn| conn.run_pending_migrations(MIGRATIONS)); + + // Always release the lock, regardless of migration success + let _ = diesel::sql_query( + format!("SELECT pg_advisory_unlock({})", MIGRATION_LOCK_ID) + ) + .execute(transaction_conn); + + match result { + Ok(_) => { + info!("Successfully ran migrations"); + Ok(()) + } + Err(e) => { + error!("Migration failed: {}", e); + Err(e.into()) + } + } + }) + .await? +} + +#[cfg(test)] +mod tests { + use super::*; + use test_helpers::db::TestDb; + + #[tokio::test] + async fn test_migration_lock_handling() { + let db = TestDb::new(); + let result = run_migrations(&db.get_connection()).await; + assert!(result.is_ok()); + } }