Skip to content

Commit

Permalink
Make all I/O errors fatal. Fixes potential corruption
Browse files Browse the repository at this point in the history
If a transient I/O error occurred during the updates to a btree, but
then I/O errors did not occur when the Database was dropped. It was
possible for the file to be flushed with a clean recovery flag and
broken transaction, leading to corruption.

After an I/O all operations that touch the StorageBackend will now
return StorageError::PreviousIo
  • Loading branch information
cberner committed Jul 29, 2024
1 parent 4f05d1f commit dc6f44b
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 39 deletions.
62 changes: 57 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,23 +1004,24 @@ impl std::fmt::Debug for Database {
mod test {
use crate::backends::FileBackend;
use crate::{
Database, DatabaseError, Durability, ReadableTable, StorageBackend, StorageError,
TableDefinition,
CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
StorageError, TableDefinition,
};
use std::io::ErrorKind;
use std::io::{ErrorKind, Read, Seek, SeekFrom};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

#[derive(Debug)]
struct FailingBackend {
inner: FileBackend,
countdown: AtomicU64,
countdown: Arc<AtomicU64>,
}

impl FailingBackend {
fn new(backend: FileBackend, countdown: u64) -> Self {
Self {
inner: backend,
countdown: AtomicU64::new(countdown),
countdown: Arc::new(AtomicU64::new(countdown)),
}
}

Expand Down Expand Up @@ -1114,6 +1115,57 @@ mod test {
.unwrap();
}

#[test]
fn transient_io_error() {
let tmpfile = crate::create_tempfile();

let backend = FailingBackend::new(
FileBackend::new(tmpfile.as_file().try_clone().unwrap()).unwrap(),
u64::MAX,
);
let countdown = backend.countdown.clone();
let db = Database::builder()
.set_cache_size(0)
.create_with_backend(backend)
.unwrap();

let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");

// Create some garbage
let tx = db.begin_write().unwrap();
{
let mut table = tx.open_table(table_def).unwrap();
table.insert(0, 0).unwrap();
}
tx.commit().unwrap();
let tx = db.begin_write().unwrap();
{
let mut table = tx.open_table(table_def).unwrap();
table.insert(0, 1).unwrap();
}
tx.commit().unwrap();

let tx = db.begin_write().unwrap();
// Cause an error in the commit
countdown.store(0, Ordering::SeqCst);
let result = tx.commit().err().unwrap();
assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
let result = db.begin_write().unwrap().commit().err().unwrap();
assert!(matches!(
result,
CommitError::Storage(StorageError::PreviousIo)
));
// Simulate a transient error
countdown.store(u64::MAX, Ordering::SeqCst);
drop(db);

// Check that recovery flag is set, even though the error has "cleared"
tmpfile.as_file().seek(SeekFrom::Start(9)).unwrap();
let mut god_byte = vec![0u8];
assert_eq!(tmpfile.as_file().read(&mut god_byte).unwrap(), 1);
assert_ne!(god_byte[0] & 2, 0);
}

#[test]
fn small_pages() {
let tmpfile = crate::create_tempfile();
Expand Down
16 changes: 16 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub enum StorageError {
/// The value being inserted exceeds the maximum of 3GiB
ValueTooLarge(usize),
Io(io::Error),
PreviousIo,
LockPoisoned(&'static panic::Location<'static>),
}

Expand All @@ -34,6 +35,7 @@ impl From<StorageError> for Error {
StorageError::Corrupted(msg) => Error::Corrupted(msg),
StorageError::ValueTooLarge(x) => Error::ValueTooLarge(x),
StorageError::Io(x) => Error::Io(x),
StorageError::PreviousIo => Error::PreviousIo,
StorageError::LockPoisoned(location) => Error::LockPoisoned(location),
}
}
Expand All @@ -55,6 +57,12 @@ impl Display for StorageError {
StorageError::Io(err) => {
write!(f, "I/O error: {err}")
}
StorageError::PreviousIo => {
write!(
f,
"Previous I/O error occurred. Please close and re-open the database."
)
}
StorageError::LockPoisoned(location) => {
write!(f, "Poisoned internal lock: {location}")
}
Expand Down Expand Up @@ -472,6 +480,8 @@ pub enum Error {
// mutable references to the same dirty pages, or multiple mutable references via insert_reserve()
TableAlreadyOpen(String, &'static panic::Location<'static>),
Io(io::Error),
/// A previous IO error occurred. The database must be closed and re-opened
PreviousIo,
LockPoisoned(&'static panic::Location<'static>),
/// The transaction is still referenced by a table or other object
ReadTransactionStillInUse(ReadTransaction),
Expand Down Expand Up @@ -541,6 +551,12 @@ impl Display for Error {
Error::Io(err) => {
write!(f, "I/O error: {err}")
}
Error::PreviousIo => {
write!(
f,
"Previous I/O error occurred. Please close and re-open the database."
)
}
Error::LockPoisoned(location) => {
write!(f, "Poisoned internal lock: {location}")
}
Expand Down
108 changes: 74 additions & 34 deletions src/tree_store/page_store/cached_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::tree_store::page_store::base::PageHint;
use crate::tree_store::LEAF;
use crate::{DatabaseError, Result, StorageBackend, StorageError};
use std::collections::BTreeMap;
use std::io;
use std::ops::{Index, IndexMut};
use std::slice::SliceIndex;
#[cfg(feature = "cache_metrics")]
Expand Down Expand Up @@ -213,8 +212,76 @@ impl PrioritizedWriteCache {
}
}

pub(super) struct PagedCachedFile {
#[derive(Debug)]
struct CheckedBackend {
file: Box<dyn StorageBackend>,
io_failed: AtomicBool,
}

impl CheckedBackend {
fn new(file: Box<dyn StorageBackend>) -> Self {
Self {
file,
io_failed: AtomicBool::new(false),
}
}

fn check_failure(&self) -> Result<()> {
if self.io_failed.load(Ordering::Acquire) {
Err(StorageError::PreviousIo)
} else {
Ok(())
}
}

fn len(&self) -> Result<u64> {
self.check_failure()?;
let result = self.file.len();
if result.is_err() {
self.io_failed.store(true, Ordering::Release);
}
result.map_err(StorageError::from)
}

fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>> {
self.check_failure()?;
let result = self.file.read(offset, len);
if result.is_err() {
self.io_failed.store(true, Ordering::Release);
}
result.map_err(StorageError::from)
}

fn set_len(&self, len: u64) -> Result<()> {
self.check_failure()?;
let result = self.file.set_len(len);
if result.is_err() {
self.io_failed.store(true, Ordering::Release);
}
result.map_err(StorageError::from)
}

fn sync_data(&self, eventual: bool) -> Result<()> {
self.check_failure()?;
let result = self.file.sync_data(eventual);
if result.is_err() {
self.io_failed.store(true, Ordering::Release);
}
result.map_err(StorageError::from)
}

fn write(&self, offset: u64, data: &[u8]) -> Result<()> {
self.check_failure()?;
let result = self.file.write(offset, data);
if result.is_err() {
self.io_failed.store(true, Ordering::Release);
}
result.map_err(StorageError::from)
}
}

pub(super) struct PagedCachedFile {
file: CheckedBackend,
page_size: u64,
max_read_cache_bytes: usize,
read_cache_bytes: AtomicUsize,
Expand All @@ -224,7 +291,6 @@ pub(super) struct PagedCachedFile {
reads_total: AtomicU64,
#[cfg(feature = "cache_metrics")]
reads_hits: AtomicU64,
fsync_failed: AtomicBool,
read_cache: Box<[RwLock<PrioritizedCache>]>,
// TODO: maybe move this cache to WriteTransaction?
write_buffer: Arc<Mutex<PrioritizedWriteCache>>,
Expand All @@ -242,7 +308,7 @@ impl PagedCachedFile {
.collect();

Ok(Self {
file,
file: CheckedBackend::new(file),
page_size,
max_read_cache_bytes,
read_cache_bytes: AtomicUsize::new(0),
Expand All @@ -252,36 +318,20 @@ impl PagedCachedFile {
reads_total: Default::default(),
#[cfg(feature = "cache_metrics")]
reads_hits: Default::default(),
fsync_failed: Default::default(),
read_cache,
write_buffer: Arc::new(Mutex::new(PrioritizedWriteCache::new())),
})
}

pub(crate) fn raw_file_len(&self) -> Result<u64> {
self.file.len().map_err(StorageError::from)
self.file.len()
}

const fn lock_stripes() -> u64 {
131
}

#[inline]
fn check_fsync_failure(&self) -> Result<()> {
if self.fsync_failed.load(Ordering::Acquire) {
Err(StorageError::Io(io::Error::from(io::ErrorKind::Other)))
} else {
Ok(())
}
}

#[inline]
fn set_fsync_failed(&self, failed: bool) {
self.fsync_failed.store(failed, Ordering::Release);
}

fn flush_write_buffer(&self) -> Result {
self.check_fsync_failure()?;
let mut write_buffer = self.write_buffer.lock().unwrap();

for (offset, buffer) in write_buffer.cache.iter() {
Expand Down Expand Up @@ -333,20 +383,13 @@ impl PagedCachedFile {
// TODO: be more fine-grained about this invalidation
self.invalidate_cache_all();

self.file.set_len(len).map_err(StorageError::from)
self.file.set_len(len)
}

pub(super) fn flush(&self, #[allow(unused_variables)] eventual: bool) -> Result {
self.check_fsync_failure()?;
self.flush_write_buffer()?;

let res = self.file.sync_data(eventual).map_err(StorageError::from);
if res.is_err() {
self.set_fsync_failed(true);
return res;
}

Ok(())
self.file.sync_data(eventual)
}

// Make writes visible to readers, but does not guarantee any durability
Expand All @@ -356,8 +399,7 @@ impl PagedCachedFile {

// Read directly from the file, ignoring any cached data
pub(super) fn read_direct(&self, offset: u64, len: usize) -> Result<Vec<u8>> {
self.check_fsync_failure()?;
Ok(self.file.read(offset, len)?)
self.file.read(offset, len)
}

// Read with caching. Caller must not read overlapping ranges without first calling invalidate_cache().
Expand All @@ -369,7 +411,6 @@ impl PagedCachedFile {
hint: PageHint,
cache_policy: impl Fn(&[u8]) -> CachePriority,
) -> Result<Arc<[u8]>> {
self.check_fsync_failure()?;
debug_assert_eq!(0, offset % self.page_size);
#[cfg(feature = "cache_metrics")]
self.reads_total.fetch_add(1, Ordering::AcqRel);
Expand Down Expand Up @@ -457,7 +498,6 @@ impl PagedCachedFile {
overwrite: bool,
cache_policy: impl Fn(&[u8]) -> CachePriority,
) -> Result<WritablePage> {
self.check_fsync_failure()?;
assert_eq!(0, offset % self.page_size);
let mut lock = self.write_buffer.lock().unwrap();

Expand Down

0 comments on commit dc6f44b

Please sign in to comment.