From c3dcc7d831c93e8841ae42a129efed47e07520c5 Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Thu, 12 Oct 2023 16:12:50 -0700 Subject: [PATCH] Add option to set a repair handler callback This allows the user to abort the repair process and receive progress notifications --- src/db.rs | 90 +++++++++++++++++++++++++++-- src/error.rs | 11 ++++ src/lib.rs | 4 +- src/tree_store/page_store/header.rs | 33 ++++++++++- 4 files changed, 130 insertions(+), 8 deletions(-) diff --git a/src/db.rs b/src/db.rs index de8b1226..e0886f24 100644 --- a/src/db.rs +++ b/src/db.rs @@ -327,7 +327,10 @@ impl Database { return Ok(true); } - Self::do_repair(&mut self.mem)?; + Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err { + DatabaseError::Storage(storage_err) => storage_err, + _ => unreachable!(), + })?; self.mem.begin_writable()?; Ok(false) @@ -542,19 +545,35 @@ impl Database { Ok(()) } - fn do_repair(mem: &mut TransactionalMemory) -> Result { + fn do_repair( + mem: &mut TransactionalMemory, + repair_callback: &(dyn Fn(&mut RepairHandle) + 'static), + ) -> Result<(), DatabaseError> { if !Self::verify_primary_checksums(mem)? { + // 0.3 because the repair takes 3 full scans and the first is done now + let mut handle = RepairHandle::new(0.3); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } + mem.repair_primary_corrupted(); // We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may // have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since // that rolls back a partially committed transaction. mem.clear_read_cache(); if !Self::verify_primary_checksums(mem)? { - return Err(StorageError::Corrupted( + return Err(DatabaseError::Storage(StorageError::Corrupted( "Failed to repair database. All roots are corrupted".to_string(), - )); + ))); } } + // 0.6 because the repair takes 3 full scans and the second is done now + let mut handle = RepairHandle::new(0.6); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } mem.begin_repair()?; @@ -582,6 +601,13 @@ impl Database { }; drop(freed_table); + // 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left + let mut handle = RepairHandle::new(0.9); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } + let system_root = mem.get_system_root(); if let Some((root, _)) = system_root { Self::mark_tables_recursive(root, mem, false)?; @@ -613,6 +639,7 @@ impl Database { region_size: Option, read_cache_size_bytes: usize, write_cache_size_bytes: usize, + repair_callback: &(dyn Fn(&mut RepairHandle) + 'static), ) -> Result { #[cfg(feature = "logging")] let file_path = format!("{:?}", &file); @@ -628,7 +655,12 @@ impl Database { if mem.needs_repair()? { #[cfg(feature = "logging")] warn!("Database {:?} not shutdown cleanly. Repairing", &file_path); - Self::do_repair(&mut mem)?; + let mut handle = RepairHandle::new(0.0); + repair_callback(&mut handle); + if handle.aborted() { + return Err(DatabaseError::RepairAborted); + } + Self::do_repair(&mut mem, repair_callback)?; } mem.begin_writable()?; @@ -720,12 +752,41 @@ impl Database { } } +pub struct RepairHandle { + progress: f64, + aborted: bool, +} + +impl RepairHandle { + pub(crate) fn new(progress: f64) -> Self { + Self { + progress, + aborted: false, + } + } + + pub(crate) fn aborted(&self) -> bool { + self.aborted + } + + /// Abort the repair process. The coorresponding call to [Builder::open] or [Builder::create] will return an error + pub fn abort(&mut self) { + self.aborted = true; + } + + /// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete. + pub fn progress(&self) -> f64 { + self.progress + } +} + /// Configuration builder of a redb [Database]. pub struct Builder { page_size: usize, region_size: Option, read_cache_size_bytes: usize, write_cache_size_bytes: usize, + repair_callback: Box, } impl Builder { @@ -746,12 +807,28 @@ impl Builder { read_cache_size_bytes: 0, // TODO: Default should probably take into account the total system memory write_cache_size_bytes: 0, + repair_callback: Box::new(|_| {}), }; result.set_cache_size(1024 * 1024 * 1024); result } + /// Set a callback which will be invoked periodically in the event that the database file needs + /// to be repaired. + /// + /// The [RepairHandle] argument can be used to control the repair process. + /// + /// If the database file needs repair, the callback will be invoked at least once. + /// There is no upper limit on the number of times it may be called. + pub fn set_repair_callback( + &mut self, + callback: impl Fn(&mut RepairHandle) + 'static, + ) -> &mut Self { + self.repair_callback = Box::new(callback); + self + } + /// Set the internal page size of the database /// /// Valid values are powers of two, greater than or equal to 512 @@ -798,6 +875,7 @@ impl Builder { self.region_size, self.read_cache_size_bytes, self.write_cache_size_bytes, + &self.repair_callback, ) } @@ -815,6 +893,7 @@ impl Builder { None, self.read_cache_size_bytes, self.write_cache_size_bytes, + &self.repair_callback, ) } @@ -828,6 +907,7 @@ impl Builder { self.region_size, self.read_cache_size_bytes, self.write_cache_size_bytes, + &self.repair_callback, ) } } diff --git a/src/error.rs b/src/error.rs index 639daf19..6c551f8b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -196,6 +196,8 @@ impl std::error::Error for TableError {} pub enum DatabaseError { /// The Database is already open. Cannot acquire lock. DatabaseAlreadyOpen, + /// [crate::RepairHandle::abort] was called. + RepairAborted, /// The database file is in an old file format and must be manually upgraded UpgradeRequired(u8), /// Error from underlying storage @@ -206,6 +208,7 @@ impl From for Error { fn from(err: DatabaseError) -> Error { match err { DatabaseError::DatabaseAlreadyOpen => Error::DatabaseAlreadyOpen, + DatabaseError::RepairAborted => Error::RepairAborted, DatabaseError::UpgradeRequired(x) => Error::UpgradeRequired(x), DatabaseError::Storage(storage) => storage.into(), } @@ -230,6 +233,9 @@ impl Display for DatabaseError { DatabaseError::UpgradeRequired(actual) => { write!(f, "Manual upgrade required. Expected file format version {FILE_FORMAT_VERSION}, but file is version {actual}") } + DatabaseError::RepairAborted => { + write!(f, "Database repair aborted.") + } DatabaseError::DatabaseAlreadyOpen => { write!(f, "Database already open. Cannot acquire lock.") } @@ -425,6 +431,8 @@ pub enum Error { /// Savepoints become invalid when an older savepoint is restored after it was created, /// and savepoints cannot be created if the transaction is "dirty" (any tables have been opened) InvalidSavepoint, + /// [crate::RepairHandle::abort] was called. + RepairAborted, /// A persistent savepoint exists PersistentSavepointExists, /// An Ephemeral savepoint exists @@ -533,6 +541,9 @@ impl Display for Error { Error::DatabaseAlreadyOpen => { write!(f, "Database already open. Cannot acquire lock.") } + Error::RepairAborted => { + write!(f, "Database repair aborted.") + } Error::PersistentSavepointExists => { write!( f, diff --git a/src/lib.rs b/src/lib.rs index 1abf7c16..ca586e67 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,8 +54,8 @@ //! [design]: https://github.com/cberner/redb/blob/master/docs/design.md pub use db::{ - Builder, Database, MultimapTableDefinition, MultimapTableHandle, TableDefinition, TableHandle, - UntypedMultimapTableHandle, UntypedTableHandle, + Builder, Database, MultimapTableDefinition, MultimapTableHandle, RepairHandle, TableDefinition, + TableHandle, UntypedMultimapTableHandle, UntypedTableHandle, }; pub use error::{ CommitError, CompactionError, DatabaseError, Error, SavepointError, StorageError, TableError, diff --git a/src/tree_store/page_store/header.rs b/src/tree_store/page_store/header.rs index 4bd43116..268220df 100644 --- a/src/tree_store/page_store/header.rs +++ b/src/tree_store/page_store/header.rs @@ -413,7 +413,7 @@ mod test { use crate::tree_store::page_store::TransactionalMemory; #[cfg(not(target_os = "windows"))] use crate::StorageError; - use crate::{Database, ReadableTable}; + use crate::{Database, DatabaseError, ReadableTable}; use std::fs::OpenOptions; use std::io::{Read, Seek, SeekFrom, Write}; use std::mem::size_of; @@ -559,6 +559,37 @@ mod test { Database::open(tmpfile.path()).unwrap(); } + #[test] + fn abort_repair() { + let tmpfile = crate::create_tempfile(); + let db = Database::builder().create(tmpfile.path()).unwrap(); + drop(db); + + let mut file = OpenOptions::new() + .read(true) + .write(true) + .open(tmpfile.path()) + .unwrap(); + + file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap(); + let mut buffer = [0u8; 1]; + file.read_exact(&mut buffer).unwrap(); + file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap(); + buffer[0] |= RECOVERY_REQUIRED; + file.write_all(&buffer).unwrap(); + + assert!(TransactionalMemory::new(file, PAGE_SIZE, None, 0, 0) + .unwrap() + .needs_repair() + .unwrap()); + + let err = Database::builder() + .set_repair_callback(|handle| handle.abort()) + .open(tmpfile.path()) + .unwrap_err(); + assert!(matches!(err, DatabaseError::RepairAborted)); + } + #[test] fn repair_insert_reserve_regression() { let tmpfile = crate::create_tempfile();