forked from anoma/namada-indexer
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
101 additions
and
48 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,66 +1,119 @@ | ||
use deadpool_diesel::postgres::Object; | ||
use deadpool_diesel::InteractError; | ||
use diesel::prelude::*; // Add this for RunQueryDsl | ||
use diesel::sql_types::Bool; // Add this for the SQL query type | ||
use diesel_migrations::{ | ||
embed_migrations, EmbeddedMigrations, MigrationHarness, | ||
}; | ||
use std::{thread, time::Duration}; // Use thread::sleep instead of tokio::sleep | ||
use diesel::prelude::*; | ||
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; | ||
use std::{thread, time::Duration}; | ||
use thiserror::Error; | ||
use tracing::{debug, error, info, warn}; | ||
|
||
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); | ||
const MIGRATION_LOCK_ID: i64 = 42; // Arbitrary but consistent lock ID | ||
const MAX_RETRIES: u32 = 5; | ||
const BASE_RETRY_DELAY_SECS: u64 = 1; | ||
|
||
pub async fn run_migrations(conn: &Object) -> Result<(), InteractError> { | ||
let max_retries = 5; | ||
#[derive(Debug, Error)] | ||
pub enum MigrationError { | ||
#[error("Max retries ({0}) exceeded while attempting migrations")] | ||
MaxRetriesExceeded(u32), | ||
#[error("Database interaction error: {0}")] | ||
DbError(#[from] InteractError), | ||
#[error("Failed to acquire migration lock after {0} attempts")] | ||
LockAcquisitionFailed(u32), | ||
} | ||
|
||
pub async fn run_migrations(conn: &Object) -> Result<(), MigrationError> { | ||
let mut attempt = 0; | ||
|
||
while attempt < max_retries { | ||
let result = conn.interact(|transaction_conn| { | ||
// Try to acquire an advisory lock | ||
let lock_result: QueryResult<bool> = diesel::sql_query("SELECT pg_try_advisory_lock(1)") | ||
.get_result::<(bool,)>(transaction_conn) | ||
.map(|row| row.0); | ||
|
||
match lock_result { | ||
Ok(true) => { | ||
// We got the lock, try to run migrations | ||
let migration_result = transaction_conn.run_pending_migrations(MIGRATIONS); | ||
|
||
// Release the lock regardless of migration success | ||
let _ = diesel::sql_query("SELECT pg_advisory_unlock(1)") | ||
.execute(transaction_conn); | ||
|
||
migration_result.map(|_| ()) | ||
}, | ||
Ok(false) => { | ||
// Someone else has the lock, treat as temporary failure | ||
Ok(()) | ||
}, | ||
Err(e) => Err(e), | ||
} | ||
}) | ||
.await; | ||
while attempt < MAX_RETRIES { | ||
info!("Attempting database migration (attempt {}/{})", attempt + 1, MAX_RETRIES); | ||
|
||
match result { | ||
Ok(_) => return Ok(()), // Success or someone else running migrations | ||
match try_run_migration(conn).await { | ||
Ok(_) => { | ||
info!("Database migration completed successfully"); | ||
return Ok(()); | ||
} | ||
Err(e) => { | ||
if e.to_string().contains("already exists") { | ||
// If it's just a "prepared statement already exists" error, | ||
// migrations probably succeeded | ||
return Ok(()); | ||
if let MigrationError::DbError(ref db_error) = e { | ||
if db_error.to_string().contains("already exists") { | ||
info!("Migrations appear to be already applied"); | ||
return Ok(()); | ||
} | ||
} | ||
// For other errors, retry after delay | ||
|
||
attempt += 1; | ||
if attempt < max_retries { | ||
thread::sleep(Duration::from_secs(1 << attempt)); // Use thread::sleep | ||
if attempt < MAX_RETRIES { | ||
let delay = BASE_RETRY_DELAY_SECS << attempt; | ||
warn!("Migration attempt failed, retrying in {} seconds: {}", delay, e); | ||
thread::sleep(Duration::from_secs(delay)); | ||
continue; | ||
} | ||
return Err(e); | ||
|
||
error!("All migration attempts failed: {}", e); | ||
return Err(MigrationError::MaxRetriesExceeded(MAX_RETRIES)); | ||
} | ||
} | ||
} | ||
|
||
Err(InteractError::Error(Box::new(std::io::Error::new( // Fixed error creation | ||
std::io::ErrorKind::Other, | ||
"Max retries exceeded" | ||
)))) | ||
Err(MigrationError::MaxRetriesExceeded(MAX_RETRIES)) | ||
} | ||
|
||
async fn try_run_migration(conn: &Object) -> Result<(), MigrationError> { | ||
conn.interact(|transaction_conn| { | ||
// Try to acquire an advisory lock | ||
#[derive(QueryableByName)] | ||
struct LockResult { | ||
#[diesel(sql_type = diesel::sql_types::Bool)] | ||
pg_try_advisory_lock: bool, | ||
} | ||
|
||
let lock_acquired = diesel::sql_query( | ||
format!("SELECT pg_try_advisory_lock({}) as pg_try_advisory_lock", MIGRATION_LOCK_ID) | ||
) | ||
.get_result::<LockResult>(transaction_conn) | ||
.map(|r| r.pg_try_advisory_lock)?; | ||
|
||
if !lock_acquired { | ||
debug!("Migration lock is held by another process"); | ||
return Ok(()); | ||
} | ||
|
||
info!("Acquired migration lock, running migrations..."); | ||
|
||
// Use a transaction for the migration | ||
let result = transaction_conn | ||
.build_transaction() | ||
.read_write() | ||
.run(|conn| conn.run_pending_migrations(MIGRATIONS)); | ||
|
||
// Always release the lock, regardless of migration success | ||
let _ = diesel::sql_query( | ||
format!("SELECT pg_advisory_unlock({})", MIGRATION_LOCK_ID) | ||
) | ||
.execute(transaction_conn); | ||
|
||
match result { | ||
Ok(_) => { | ||
info!("Successfully ran migrations"); | ||
Ok(()) | ||
} | ||
Err(e) => { | ||
error!("Migration failed: {}", e); | ||
Err(e.into()) | ||
} | ||
} | ||
}) | ||
.await? | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use test_helpers::db::TestDb; | ||
|
||
#[tokio::test] | ||
async fn test_migration_lock_handling() { | ||
let db = TestDb::new(); | ||
let result = run_migrations(&db.get_connection()).await; | ||
assert!(result.is_ok()); | ||
} | ||
} |