From 22f18dd6c664341860425a19cae1c1049aa13c7d Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Sun, 28 Jul 2024 17:44:08 -0700 Subject: [PATCH] Make all I/O errors fatal. Fixes potential corruption If a transient I/O error occurred during the updates to a btree, but then I/O errors did not occur when the Database was dropped. It was possible for the file to be flushed with a clean recovery flag and broken transaction, leading to corruption --- src/db.rs | 62 +++++++++++-- src/error.rs | 16 ++++ src/tree_store/page_store/cached_file.rs | 108 ++++++++++++++++------- 3 files changed, 147 insertions(+), 39 deletions(-) diff --git a/src/db.rs b/src/db.rs index 76e5b4e5..105b6b0a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1004,23 +1004,24 @@ impl std::fmt::Debug for Database { mod test { use crate::backends::FileBackend; use crate::{ - Database, DatabaseError, Durability, ReadableTable, StorageBackend, StorageError, - TableDefinition, + CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend, + StorageError, TableDefinition, }; - use std::io::ErrorKind; + use std::io::{ErrorKind, Read, Seek, SeekFrom}; use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::Arc; #[derive(Debug)] struct FailingBackend { inner: FileBackend, - countdown: AtomicU64, + countdown: Arc, } impl FailingBackend { fn new(backend: FileBackend, countdown: u64) -> Self { Self { inner: backend, - countdown: AtomicU64::new(countdown), + countdown: Arc::new(AtomicU64::new(countdown)), } } @@ -1114,6 +1115,57 @@ mod test { .unwrap(); } + #[test] + fn transient_io_error() { + let tmpfile = crate::create_tempfile(); + + let backend = FailingBackend::new( + FileBackend::new(tmpfile.as_file().try_clone().unwrap()).unwrap(), + u64::MAX, + ); + let countdown = backend.countdown.clone(); + let db = Database::builder() + .set_cache_size(0) + .create_with_backend(backend) + .unwrap(); + + let table_def: TableDefinition = TableDefinition::new("x"); + + // Create some garbage + let tx = db.begin_write().unwrap(); + { + let mut table = tx.open_table(table_def).unwrap(); + table.insert(0, 0).unwrap(); + } + tx.commit().unwrap(); + let tx = db.begin_write().unwrap(); + { + let mut table = tx.open_table(table_def).unwrap(); + table.insert(0, 1).unwrap(); + } + tx.commit().unwrap(); + + let tx = db.begin_write().unwrap(); + // Cause an error in the commit + countdown.store(0, Ordering::SeqCst); + let result = tx.commit().err().unwrap(); + assert!(matches!(result, CommitError::Storage(StorageError::Io(_)))); + let result = db.begin_write().unwrap().commit().err().unwrap(); + assert!(matches!( + result, + CommitError::Storage(StorageError::PreviousIo) + )); + // Simulate a transient error + countdown.store(u64::MAX, Ordering::SeqCst); + drop(db); + + // Check that recovery flag is set, even though the error has "cleared" + tmpfile.as_file().seek(SeekFrom::Start(9)).unwrap(); + let mut god_byte = vec![0u8]; + assert_eq!(tmpfile.as_file().read(&mut god_byte).unwrap(), 1); + assert_ne!(god_byte[0] & 2, 0); + } + #[test] fn small_pages() { let tmpfile = crate::create_tempfile(); diff --git a/src/error.rs b/src/error.rs index 46e97d48..b9e6ce43 100644 --- a/src/error.rs +++ b/src/error.rs @@ -13,6 +13,7 @@ pub enum StorageError { /// The value being inserted exceeds the maximum of 3GiB ValueTooLarge(usize), Io(io::Error), + PreviousIo, LockPoisoned(&'static panic::Location<'static>), } @@ -34,6 +35,7 @@ impl From for Error { StorageError::Corrupted(msg) => Error::Corrupted(msg), StorageError::ValueTooLarge(x) => Error::ValueTooLarge(x), StorageError::Io(x) => Error::Io(x), + StorageError::PreviousIo => Error::PreviousIo, StorageError::LockPoisoned(location) => Error::LockPoisoned(location), } } @@ -55,6 +57,12 @@ impl Display for StorageError { StorageError::Io(err) => { write!(f, "I/O error: {err}") } + StorageError::PreviousIo => { + write!( + f, + "Previous I/O error occurred. Please close and re-open the database." + ) + } StorageError::LockPoisoned(location) => { write!(f, "Poisoned internal lock: {location}") } @@ -472,6 +480,8 @@ pub enum Error { // mutable references to the same dirty pages, or multiple mutable references via insert_reserve() TableAlreadyOpen(String, &'static panic::Location<'static>), Io(io::Error), + /// A previous IO error occurred. The database must be closed and re-opened + PreviousIo, LockPoisoned(&'static panic::Location<'static>), /// The transaction is still referenced by a table or other object ReadTransactionStillInUse(ReadTransaction), @@ -541,6 +551,12 @@ impl Display for Error { Error::Io(err) => { write!(f, "I/O error: {err}") } + Error::PreviousIo => { + write!( + f, + "Previous I/O error occurred. Please close and re-open the database." + ) + } Error::LockPoisoned(location) => { write!(f, "Poisoned internal lock: {location}") } diff --git a/src/tree_store/page_store/cached_file.rs b/src/tree_store/page_store/cached_file.rs index c98a31f9..62712301 100644 --- a/src/tree_store/page_store/cached_file.rs +++ b/src/tree_store/page_store/cached_file.rs @@ -2,7 +2,6 @@ use crate::tree_store::page_store::base::PageHint; use crate::tree_store::LEAF; use crate::{DatabaseError, Result, StorageBackend, StorageError}; use std::collections::BTreeMap; -use std::io; use std::ops::{Index, IndexMut}; use std::slice::SliceIndex; #[cfg(feature = "cache_metrics")] @@ -213,8 +212,76 @@ impl PrioritizedWriteCache { } } -pub(super) struct PagedCachedFile { +#[derive(Debug)] +struct CheckedBackend { file: Box, + io_failed: AtomicBool, +} + +impl CheckedBackend { + fn new(file: Box) -> Self { + Self { + file, + io_failed: AtomicBool::new(false), + } + } + + fn check_failure(&self) -> Result<()> { + if self.io_failed.load(Ordering::Acquire) { + Err(StorageError::PreviousIo) + } else { + Ok(()) + } + } + + fn len(&self) -> Result { + self.check_failure()?; + let result = self.file.len(); + if result.is_err() { + self.io_failed.store(true, Ordering::Release); + } + result.map_err(StorageError::from) + } + + fn read(&self, offset: u64, len: usize) -> Result> { + self.check_failure()?; + let result = self.file.read(offset, len); + if result.is_err() { + self.io_failed.store(true, Ordering::Release); + } + result.map_err(StorageError::from) + } + + fn set_len(&self, len: u64) -> Result<()> { + self.check_failure()?; + let result = self.file.set_len(len); + if result.is_err() { + self.io_failed.store(true, Ordering::Release); + } + result.map_err(StorageError::from) + } + + fn sync_data(&self, eventual: bool) -> Result<()> { + self.check_failure()?; + let result = self.file.sync_data(eventual); + if result.is_err() { + self.io_failed.store(true, Ordering::Release); + } + result.map_err(StorageError::from) + } + + fn write(&self, offset: u64, data: &[u8]) -> Result<()> { + self.check_failure()?; + let result = self.file.write(offset, data); + if result.is_err() { + self.io_failed.store(true, Ordering::Release); + } + result.map_err(StorageError::from) + } +} + +pub(super) struct PagedCachedFile { + file: CheckedBackend, page_size: u64, max_read_cache_bytes: usize, read_cache_bytes: AtomicUsize, @@ -224,7 +291,6 @@ pub(super) struct PagedCachedFile { reads_total: AtomicU64, #[cfg(feature = "cache_metrics")] reads_hits: AtomicU64, - fsync_failed: AtomicBool, read_cache: Box<[RwLock]>, // TODO: maybe move this cache to WriteTransaction? write_buffer: Arc>, @@ -242,7 +308,7 @@ impl PagedCachedFile { .collect(); Ok(Self { - file, + file: CheckedBackend::new(file), page_size, max_read_cache_bytes, read_cache_bytes: AtomicUsize::new(0), @@ -252,36 +318,20 @@ impl PagedCachedFile { reads_total: Default::default(), #[cfg(feature = "cache_metrics")] reads_hits: Default::default(), - fsync_failed: Default::default(), read_cache, write_buffer: Arc::new(Mutex::new(PrioritizedWriteCache::new())), }) } pub(crate) fn raw_file_len(&self) -> Result { - self.file.len().map_err(StorageError::from) + self.file.len() } const fn lock_stripes() -> u64 { 131 } - #[inline] - fn check_fsync_failure(&self) -> Result<()> { - if self.fsync_failed.load(Ordering::Acquire) { - Err(StorageError::Io(io::Error::from(io::ErrorKind::Other))) - } else { - Ok(()) - } - } - - #[inline] - fn set_fsync_failed(&self, failed: bool) { - self.fsync_failed.store(failed, Ordering::Release); - } - fn flush_write_buffer(&self) -> Result { - self.check_fsync_failure()?; let mut write_buffer = self.write_buffer.lock().unwrap(); for (offset, buffer) in write_buffer.cache.iter() { @@ -333,20 +383,13 @@ impl PagedCachedFile { // TODO: be more fine-grained about this invalidation self.invalidate_cache_all(); - self.file.set_len(len).map_err(StorageError::from) + self.file.set_len(len) } pub(super) fn flush(&self, #[allow(unused_variables)] eventual: bool) -> Result { - self.check_fsync_failure()?; self.flush_write_buffer()?; - let res = self.file.sync_data(eventual).map_err(StorageError::from); - if res.is_err() { - self.set_fsync_failed(true); - return res; - } - - Ok(()) + self.file.sync_data(eventual) } // Make writes visible to readers, but does not guarantee any durability @@ -356,8 +399,7 @@ impl PagedCachedFile { // Read directly from the file, ignoring any cached data pub(super) fn read_direct(&self, offset: u64, len: usize) -> Result> { - self.check_fsync_failure()?; - Ok(self.file.read(offset, len)?) + self.file.read(offset, len) } // Read with caching. Caller must not read overlapping ranges without first calling invalidate_cache(). @@ -369,7 +411,6 @@ impl PagedCachedFile { hint: PageHint, cache_policy: impl Fn(&[u8]) -> CachePriority, ) -> Result> { - self.check_fsync_failure()?; debug_assert_eq!(0, offset % self.page_size); #[cfg(feature = "cache_metrics")] self.reads_total.fetch_add(1, Ordering::AcqRel); @@ -457,7 +498,6 @@ impl PagedCachedFile { overwrite: bool, cache_policy: impl Fn(&[u8]) -> CachePriority, ) -> Result { - self.check_fsync_failure()?; assert_eq!(0, offset % self.page_size); let mut lock = self.write_buffer.lock().unwrap();