Skip to content

Commit

Permalink
Add option to set a repair handler callback
Browse files Browse the repository at this point in the history
This allows the user to abort the repair process and receive progress
notifications
  • Loading branch information
cberner committed Oct 15, 2023
1 parent 187d212 commit c3dcc7d
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 8 deletions.
90 changes: 85 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,10 @@ impl Database {
return Ok(true);
}

Self::do_repair(&mut self.mem)?;
Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
DatabaseError::Storage(storage_err) => storage_err,
_ => unreachable!(),
})?;
self.mem.begin_writable()?;

Ok(false)
Expand Down Expand Up @@ -542,19 +545,35 @@ impl Database {
Ok(())
}

fn do_repair(mem: &mut TransactionalMemory) -> Result {
fn do_repair(
mem: &mut TransactionalMemory,
repair_callback: &(dyn Fn(&mut RepairHandle) + 'static),
) -> Result<(), DatabaseError> {
if !Self::verify_primary_checksums(mem)? {
// 0.3 because the repair takes 3 full scans and the first is done now
let mut handle = RepairHandle::new(0.3);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}

mem.repair_primary_corrupted();
// We need to invalidate the userspace cache, because walking the tree in verify_primary_checksums() may
// have poisoned it with pages that just got rolled back by repair_primary_corrupted(), since
// that rolls back a partially committed transaction.
mem.clear_read_cache();
if !Self::verify_primary_checksums(mem)? {
return Err(StorageError::Corrupted(
return Err(DatabaseError::Storage(StorageError::Corrupted(
"Failed to repair database. All roots are corrupted".to_string(),
));
)));
}
}
// 0.6 because the repair takes 3 full scans and the second is done now
let mut handle = RepairHandle::new(0.6);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}

mem.begin_repair()?;

Expand Down Expand Up @@ -582,6 +601,13 @@ impl Database {
};
drop(freed_table);

// 0.9 because the repair takes 3 full scans and the third is done now. There is just some system tables left
let mut handle = RepairHandle::new(0.9);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}

let system_root = mem.get_system_root();
if let Some((root, _)) = system_root {
Self::mark_tables_recursive(root, mem, false)?;
Expand Down Expand Up @@ -613,6 +639,7 @@ impl Database {
region_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
repair_callback: &(dyn Fn(&mut RepairHandle) + 'static),
) -> Result<Self, DatabaseError> {
#[cfg(feature = "logging")]
let file_path = format!("{:?}", &file);
Expand All @@ -628,7 +655,12 @@ impl Database {
if mem.needs_repair()? {
#[cfg(feature = "logging")]
warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
Self::do_repair(&mut mem)?;
let mut handle = RepairHandle::new(0.0);
repair_callback(&mut handle);
if handle.aborted() {
return Err(DatabaseError::RepairAborted);
}
Self::do_repair(&mut mem, repair_callback)?;
}

mem.begin_writable()?;
Expand Down Expand Up @@ -720,12 +752,41 @@ impl Database {
}
}

pub struct RepairHandle {
progress: f64,
aborted: bool,
}

impl RepairHandle {
pub(crate) fn new(progress: f64) -> Self {
Self {
progress,
aborted: false,
}
}

pub(crate) fn aborted(&self) -> bool {
self.aborted
}

/// Abort the repair process. The coorresponding call to [Builder::open] or [Builder::create] will return an error
pub fn abort(&mut self) {
self.aborted = true;
}

/// Returns an estimate of the repair progress in the range [0.0, 1.0). At 1.0 the repair is complete.
pub fn progress(&self) -> f64 {
self.progress
}
}

/// Configuration builder of a redb [Database].
pub struct Builder {
page_size: usize,
region_size: Option<u64>,
read_cache_size_bytes: usize,
write_cache_size_bytes: usize,
repair_callback: Box<dyn Fn(&mut RepairHandle)>,
}

impl Builder {
Expand All @@ -746,12 +807,28 @@ impl Builder {
read_cache_size_bytes: 0,
// TODO: Default should probably take into account the total system memory
write_cache_size_bytes: 0,
repair_callback: Box::new(|_| {}),
};

result.set_cache_size(1024 * 1024 * 1024);
result
}

/// Set a callback which will be invoked periodically in the event that the database file needs
/// to be repaired.
///
/// The [RepairHandle] argument can be used to control the repair process.
///
/// If the database file needs repair, the callback will be invoked at least once.
/// There is no upper limit on the number of times it may be called.
pub fn set_repair_callback(
&mut self,
callback: impl Fn(&mut RepairHandle) + 'static,
) -> &mut Self {
self.repair_callback = Box::new(callback);
self
}

/// Set the internal page size of the database
///
/// Valid values are powers of two, greater than or equal to 512
Expand Down Expand Up @@ -798,6 +875,7 @@ impl Builder {
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}

Expand All @@ -815,6 +893,7 @@ impl Builder {
None,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}

Expand All @@ -828,6 +907,7 @@ impl Builder {
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
&self.repair_callback,
)
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ impl std::error::Error for TableError {}
pub enum DatabaseError {
/// The Database is already open. Cannot acquire lock.
DatabaseAlreadyOpen,
/// [crate::RepairHandle::abort] was called.
RepairAborted,
/// The database file is in an old file format and must be manually upgraded
UpgradeRequired(u8),
/// Error from underlying storage
Expand All @@ -206,6 +208,7 @@ impl From<DatabaseError> for Error {
fn from(err: DatabaseError) -> Error {
match err {
DatabaseError::DatabaseAlreadyOpen => Error::DatabaseAlreadyOpen,
DatabaseError::RepairAborted => Error::RepairAborted,
DatabaseError::UpgradeRequired(x) => Error::UpgradeRequired(x),
DatabaseError::Storage(storage) => storage.into(),
}
Expand All @@ -230,6 +233,9 @@ impl Display for DatabaseError {
DatabaseError::UpgradeRequired(actual) => {
write!(f, "Manual upgrade required. Expected file format version {FILE_FORMAT_VERSION}, but file is version {actual}")
}
DatabaseError::RepairAborted => {
write!(f, "Database repair aborted.")
}
DatabaseError::DatabaseAlreadyOpen => {
write!(f, "Database already open. Cannot acquire lock.")
}
Expand Down Expand Up @@ -425,6 +431,8 @@ pub enum Error {
/// Savepoints become invalid when an older savepoint is restored after it was created,
/// and savepoints cannot be created if the transaction is "dirty" (any tables have been opened)
InvalidSavepoint,
/// [crate::RepairHandle::abort] was called.
RepairAborted,
/// A persistent savepoint exists
PersistentSavepointExists,
/// An Ephemeral savepoint exists
Expand Down Expand Up @@ -533,6 +541,9 @@ impl Display for Error {
Error::DatabaseAlreadyOpen => {
write!(f, "Database already open. Cannot acquire lock.")
}
Error::RepairAborted => {
write!(f, "Database repair aborted.")
}
Error::PersistentSavepointExists => {
write!(
f,
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
//! [design]: https://github.com/cberner/redb/blob/master/docs/design.md
pub use db::{
Builder, Database, MultimapTableDefinition, MultimapTableHandle, TableDefinition, TableHandle,
UntypedMultimapTableHandle, UntypedTableHandle,
Builder, Database, MultimapTableDefinition, MultimapTableHandle, RepairHandle, TableDefinition,
TableHandle, UntypedMultimapTableHandle, UntypedTableHandle,
};
pub use error::{
CommitError, CompactionError, DatabaseError, Error, SavepointError, StorageError, TableError,
Expand Down
33 changes: 32 additions & 1 deletion src/tree_store/page_store/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ mod test {
use crate::tree_store::page_store::TransactionalMemory;
#[cfg(not(target_os = "windows"))]
use crate::StorageError;
use crate::{Database, ReadableTable};
use crate::{Database, DatabaseError, ReadableTable};
use std::fs::OpenOptions;
use std::io::{Read, Seek, SeekFrom, Write};
use std::mem::size_of;
Expand Down Expand Up @@ -559,6 +559,37 @@ mod test {
Database::open(tmpfile.path()).unwrap();
}

#[test]
fn abort_repair() {
let tmpfile = crate::create_tempfile();
let db = Database::builder().create(tmpfile.path()).unwrap();
drop(db);

let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(tmpfile.path())
.unwrap();

file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
let mut buffer = [0u8; 1];
file.read_exact(&mut buffer).unwrap();
file.seek(SeekFrom::Start(GOD_BYTE_OFFSET as u64)).unwrap();
buffer[0] |= RECOVERY_REQUIRED;
file.write_all(&buffer).unwrap();

assert!(TransactionalMemory::new(file, PAGE_SIZE, None, 0, 0)
.unwrap()
.needs_repair()
.unwrap());

let err = Database::builder()
.set_repair_callback(|handle| handle.abort())
.open(tmpfile.path())
.unwrap_err();
assert!(matches!(err, DatabaseError::RepairAborted));
}

#[test]
fn repair_insert_reserve_regression() {
let tmpfile = crate::create_tempfile();
Expand Down

0 comments on commit c3dcc7d

Please sign in to comment.