forked from anoma/namada-indexer
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
54 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,60 @@ | ||
use deadpool_diesel::postgres::Object; | ||
use deadpool_diesel::InteractError; | ||
use diesel_migrations::{ | ||
embed_migrations, EmbeddedMigrations, MigrationHarness, | ||
}; | ||
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; | ||
use std::time::Duration; | ||
use tokio::time::sleep; | ||
|
||
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); | ||
|
||
pub async fn run_migrations(conn: &Object) -> Result<(), InteractError> { | ||
conn.interact(|transaction_conn| { | ||
transaction_conn | ||
.run_pending_migrations(MIGRATIONS) | ||
.expect("Failed to run all migrations"); | ||
}) | ||
.await | ||
.map(|_| ()) | ||
let max_retries = 5; | ||
let mut attempt = 0; | ||
|
||
while attempt < max_retries { | ||
let result = conn.interact(|transaction_conn| { | ||
// Try to acquire an advisory lock | ||
let lock_result: Result<bool, _> = diesel::sql_query("SELECT pg_try_advisory_lock(1)") | ||
.get_result(transaction_conn) | ||
.map(|row: (bool,)| row.0); | ||
|
||
match lock_result { | ||
Ok(true) => { | ||
// We got the lock, try to run migrations | ||
let migration_result = transaction_conn.run_pending_migrations(MIGRATIONS); | ||
|
||
// Release the lock regardless of migration success | ||
let _ = diesel::sql_query("SELECT pg_advisory_unlock(1)") | ||
.execute(transaction_conn); | ||
|
||
migration_result.map(|_| ()) | ||
}, | ||
Ok(false) => { | ||
// Someone else has the lock, treat as temporary failure | ||
Ok(()) | ||
}, | ||
Err(e) => Err(e), | ||
} | ||
}) | ||
.await; | ||
|
||
match result { | ||
Ok(_) => return Ok(()), // Success or someone else running migrations | ||
Err(e) => { | ||
if e.to_string().contains("already exists") { | ||
// If it's just a "prepared statement already exists" error, | ||
// migrations probably succeeded | ||
return Ok(()); | ||
} | ||
// For other errors, retry after delay | ||
attempt += 1; | ||
if attempt < max_retries { | ||
sleep(Duration::from_secs(1 << attempt)).await; // Exponential backoff | ||
continue; | ||
} | ||
return Err(e); | ||
} | ||
} | ||
} | ||
|
||
Err(InteractError::Message("Max retries exceeded".into())) | ||
} |