From d51179ecffb8a181be5ae62a6cc3caad4514319d Mon Sep 17 00:00:00 2001 From: Christopher Berner Date: Mon, 23 Oct 2023 21:14:51 -0700 Subject: [PATCH] Refactor fuzzer to use a custom StorageBackend This removes a bunch of cfg(fuzzing) statements from the main codebase --- fuzz/fuzz_targets/fuzz_redb.rs | 104 +++++++++++-- src/db.rs | 176 ++++++++++++++-------- src/error.rs | 16 -- src/tree_store/page_store/cached_file.rs | 44 +----- src/tree_store/page_store/page_manager.rs | 5 - 5 files changed, 214 insertions(+), 131 deletions(-) diff --git a/fuzz/fuzz_targets/fuzz_redb.rs b/fuzz/fuzz_targets/fuzz_redb.rs index 304c2bee..4aafd210 100644 --- a/fuzz/fuzz_targets/fuzz_redb.rs +++ b/fuzz/fuzz_targets/fuzz_redb.rs @@ -1,13 +1,16 @@ #![no_main] use libfuzzer_sys::fuzz_target; -use redb::{AccessGuard, Database, Durability, Error, MultimapTable, MultimapTableDefinition, MultimapValue, ReadableMultimapTable, ReadableTable, Savepoint, Table, TableDefinition, WriteTransaction}; +use redb::{AccessGuard, Database, Durability, Error, MultimapTable, MultimapTableDefinition, MultimapValue, ReadableMultimapTable, ReadableTable, Savepoint, StorageBackend, Table, TableDefinition, WriteTransaction}; use std::collections::{BTreeMap, BTreeSet, HashSet}; -use std::io::{Read, Seek, SeekFrom}; +use std::fmt::Debug; +use std::io::{ErrorKind, Read, Seek, SeekFrom}; +use std::sync::atomic::{AtomicU64, Ordering}; use tempfile::NamedTempFile; mod common; use common::*; +use redb::backends::FileBackend; use crate::FuzzerSavepoint::{Ephemeral, NotYetDurablePersistent, Persistent}; // These slow down the fuzzer, so don't create too many @@ -16,6 +19,62 @@ const TABLE_DEF: TableDefinition = TableDefinition::new("fuzz_table" const MULTIMAP_TABLE_DEF: MultimapTableDefinition = MultimapTableDefinition::new("fuzz_multimap_table"); +#[derive(Debug)] +struct FuzzerBackend { + inner: FileBackend, + countdown: AtomicU64, +} + +impl FuzzerBackend { + fn new(backend: FileBackend, countdown: u64) -> Self { + Self { + inner: backend, + countdown: AtomicU64::new(countdown), + } + } + + fn check_countdown(&self) -> Result<(), std::io::Error> { + if self.countdown.load(Ordering::SeqCst) == 0 { + return Err(std::io::Error::from(ErrorKind::Other)); + } + + Ok(()) + } + + fn decrement_countdown(&self) -> Result<(), std::io::Error> { + if self.countdown.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| if x > 0 { Some(x - 1) } else { None } ).is_err() { + return Err(std::io::Error::from(ErrorKind::Other)); + } + + Ok(()) + } +} + +impl StorageBackend for FuzzerBackend { + fn len(&self) -> Result { + self.inner.len() + } + + fn read(&self, offset: u64, len: usize) -> Result, std::io::Error> { + self.check_countdown()?; + self.inner.read(offset, len) + } + + fn set_len(&self, len: u64) -> Result<(), std::io::Error> { + self.inner.set_len(len) + } + + fn sync_data(&self, _eventual: bool) -> Result<(), std::io::Error> { + // No-op. The fuzzer doesn't test crashes, so fsync is unnecessary + Ok(()) + } + + fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> { + self.decrement_countdown()?; + self.inner.write(offset, data) + } +} + enum FuzzerSavepoint { Ephemeral(Savepoint, BTreeMap), Persistent(u64, BTreeMap), @@ -355,7 +414,7 @@ fn handle_table_op(op: &FuzzOperation, reference: &mut BTreeMap, tab } drop(reference_iter); reference.retain(|x, _| (*x < start || *x >= end) || *x % modulus != 0); - // This is basically assert!(iter.next().is_none()), but we also allow an Err such as SimulatedIOFailure + // This is basically assert!(iter.next().is_none()), but we also allow an Err such as a simulated IO error if let Some(Ok((_, _))) = iter.next() { panic!(); } @@ -390,16 +449,35 @@ fn handle_table_op(op: &FuzzOperation, reference: &mut BTreeMap, tab Ok(()) } +fn is_simulated_io_error(err: &redb::Error) -> bool { + match err { + Error::Io(io_err) => { + matches!(io_err.kind(), ErrorKind::Other) + }, + _ => false + } +} + fn exec_table_crash_support(config: &FuzzConfig, apply: fn(&Database, &mut BTreeMap, &FuzzTransaction, &mut SavepointManager) -> Result<(), redb::Error>) -> Result<(), redb::Error> { let mut redb_file: NamedTempFile = NamedTempFile::new().unwrap(); + let backend = FuzzerBackend::new(FileBackend::new(redb_file.as_file().try_clone().unwrap())?, config.crash_after_ops.value); - let mut db = Database::builder() + let result = Database::builder() .set_page_size(config.page_size.value) .set_cache_size(config.cache_size.value) .set_region_size(config.region_size.value as u64) - .create(redb_file.path()) - .unwrap(); - db.set_crash_countdown(config.crash_after_ops.value); + .create_with_backend(backend); + let mut db = match result { + Ok(db) => db, + Err(err) => { + let err: redb::Error = err.into(); + if is_simulated_io_error(&err) { + return Ok(()); + } else { + return Err(err); + } + } + }; let mut savepoint_manager = SavepointManager::new(); let mut reference = BTreeMap::new(); @@ -414,7 +492,7 @@ fn exec_table_crash_support(config: &FuzzConfig, apply: fn(&Database, } } Err(err) => { - if matches!(err, Error::SimulatedIOFailure) { + if is_simulated_io_error(&err) { drop(db); savepoint_manager.crash(); non_durable_reference = reference.clone(); @@ -426,11 +504,12 @@ fn exec_table_crash_support(config: &FuzzConfig, apply: fn(&Database, assert_ne!(god_byte[0] & 2, 0); // Repair the database + let backend = FuzzerBackend::new(FileBackend::new(redb_file.as_file().try_clone().unwrap()).unwrap(), u64::MAX); db = Database::builder() .set_page_size(config.page_size.value) .set_cache_size(config.cache_size.value) .set_region_size(config.region_size.value as u64) - .create(redb_file.path()) + .create_with_backend(backend) .unwrap(); } else { return Err(err); @@ -440,7 +519,7 @@ fn exec_table_crash_support(config: &FuzzConfig, apply: fn(&Database, let result = apply(&db, &mut non_durable_reference, transaction, &mut savepoint_manager); if result.is_err() { - if matches!(result, Err(Error::SimulatedIOFailure)) { + if is_simulated_io_error(result.as_ref().err().unwrap()) { drop(db); savepoint_manager.crash(); non_durable_reference = reference.clone(); @@ -452,11 +531,12 @@ fn exec_table_crash_support(config: &FuzzConfig, apply: fn(&Database, assert_ne!(god_byte[0] & 2, 0); // Repair the database + let backend = FuzzerBackend::new(FileBackend::new(redb_file.as_file().try_clone().unwrap()).unwrap(), u64::MAX); db = Database::builder() .set_page_size(config.page_size.value) .set_cache_size(config.cache_size.value) .set_region_size(config.region_size.value as u64) - .create(redb_file.path()) + .create_with_backend(backend) .unwrap(); } else { return result; @@ -469,7 +549,7 @@ fn exec_table_crash_support(config: &FuzzConfig, apply: fn(&Database, match run_compaction(&mut db, &mut savepoint_manager) { Ok(_) => {} Err(err) => { - if !matches!(err, Error::SimulatedIOFailure) { + if !is_simulated_io_error(&err) { return Err(err); } } diff --git a/src/db.rs b/src/db.rs index 81b00092..cf8d0b1b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -36,12 +36,20 @@ pub trait StorageBackend: 'static + Debug + Send + Sync { fn len(&self) -> std::result::Result; /// Reads the specified array of bytes from the storage. + /// + /// If `len` + `offset` exceeds the length of the storage an appropriate `Error` should be returned or a panic may occur. fn read(&self, offset: u64, len: usize) -> std::result::Result, io::Error>; /// Sets the length of the storage. + /// + /// When extending the storage the new positions should be zero initialized. fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>; /// Syncs all buffered data with the persistent storage. + /// + /// If `eventual` is true, data may become persistent at some point after this call returns, + /// but the storage must gaurantee that a write barrier is inserted: i.e. all writes before this + /// call to `sync_data()` will become persistent before any writes that occur after. fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>; /// Writes the specified array to the storage. @@ -304,11 +312,6 @@ impl Database { &self.mem } - #[cfg(any(fuzzing, test))] - pub fn set_crash_countdown(&self, value: u64) { - self.mem.set_crash_countdown(value); - } - pub(crate) fn verify_primary_checksums(mem: &TransactionalMemory) -> Result { let fake_freed_pages = Arc::new(Mutex::new(vec![])); let table_tree = TableTree::new(mem.get_data_root(), mem, fake_freed_pages.clone()); @@ -876,10 +879,117 @@ impl std::fmt::Debug for Database { #[cfg(test)] mod test { + use crate::backends::FileBackend; use crate::{ - Database, DatabaseError, Durability, ReadableTable, StorageError, TableDefinition, + Database, DatabaseError, Durability, ReadableTable, StorageBackend, StorageError, + TableDefinition, }; use std::io::ErrorKind; + use std::sync::atomic::{AtomicU64, Ordering}; + + #[derive(Debug)] + struct FailingBackend { + inner: FileBackend, + countdown: AtomicU64, + } + + impl FailingBackend { + fn new(backend: FileBackend, countdown: u64) -> Self { + Self { + inner: backend, + countdown: AtomicU64::new(countdown), + } + } + + fn check_countdown(&self) -> Result<(), std::io::Error> { + if self.countdown.load(Ordering::SeqCst) == 0 { + return Err(std::io::Error::from(ErrorKind::Other)); + } + + Ok(()) + } + + fn decrement_countdown(&self) -> Result<(), std::io::Error> { + if self + .countdown + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { + if x > 0 { + Some(x - 1) + } else { + None + } + }) + .is_err() + { + return Err(std::io::Error::from(ErrorKind::Other)); + } + + Ok(()) + } + } + + impl StorageBackend for FailingBackend { + fn len(&self) -> Result { + self.inner.len() + } + + fn read(&self, offset: u64, len: usize) -> Result, std::io::Error> { + self.check_countdown()?; + self.inner.read(offset, len) + } + + fn set_len(&self, len: u64) -> Result<(), std::io::Error> { + self.inner.set_len(len) + } + + fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> { + self.check_countdown()?; + self.inner.sync_data(eventual) + } + + fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> { + self.decrement_countdown()?; + self.inner.write(offset, data) + } + } + + #[test] + fn crash_regression4() { + let tmpfile = crate::create_tempfile(); + + let backend = FailingBackend::new( + FileBackend::new(tmpfile.as_file().try_clone().unwrap()).unwrap(), + 23, + ); + let db = Database::builder() + .set_cache_size(12686) + .set_page_size(8 * 1024) + .set_region_size(32 * 4096) + .create_with_backend(backend) + .unwrap(); + + let table_def: TableDefinition = TableDefinition::new("x"); + + let tx = db.begin_write().unwrap(); + let _savepoint = tx.ephemeral_savepoint().unwrap(); + let _persistent_savepoint = tx.persistent_savepoint().unwrap(); + tx.commit().unwrap(); + let tx = db.begin_write().unwrap(); + { + let mut table = tx.open_table(table_def).unwrap(); + let _ = table.insert_reserve(118821, 360).unwrap(); + } + let result = tx.commit(); + assert!(result.is_err()); + + drop(db); + Database::builder() + .set_cache_size(1024 * 1024) + .set_page_size(8 * 1024) + .set_region_size(32 * 4096) + .create(tmpfile.path()) + .unwrap(); + } #[test] fn small_pages() { @@ -1062,60 +1172,6 @@ mod test { tx.abort().unwrap(); } - #[test] - fn crash_regression3() { - let tmpfile = crate::create_tempfile(); - - let db = Database::builder() - .set_cache_size(1024 * 1024) - .set_page_size(16 * 1024) - .set_region_size(32 * 4096) - .create(tmpfile.path()) - .unwrap(); - - let tx = db.begin_write().unwrap(); - let savepoint = tx.ephemeral_savepoint().unwrap(); - tx.commit().unwrap(); - - let mut tx = db.begin_write().unwrap(); - tx.restore_savepoint(&savepoint).unwrap(); - tx.commit().unwrap(); - } - - #[test] - fn crash_regression4() { - let tmpfile = crate::create_tempfile(); - - let db = Database::builder() - .set_cache_size(12686) - .set_page_size(8 * 1024) - .set_region_size(32 * 4096) - .create(tmpfile.path()) - .unwrap(); - db.set_crash_countdown(10); - - let table_def: TableDefinition = TableDefinition::new("x"); - - let tx = db.begin_write().unwrap(); - let _savepoint = tx.ephemeral_savepoint().unwrap(); - let _persistent_savepoint = tx.persistent_savepoint().unwrap(); - tx.commit().unwrap(); - let tx = db.begin_write().unwrap(); - { - let mut table = tx.open_table(table_def).unwrap(); - let _ = table.insert_reserve(118821, 360); - } - - drop(tx); - drop(db); - Database::builder() - .set_cache_size(1024 * 1024) - .set_page_size(8 * 1024) - .set_region_size(32 * 4096) - .create(tmpfile.path()) - .unwrap(); - } - #[test] fn dynamic_shrink() { let tmpfile = crate::create_tempfile(); diff --git a/src/error.rs b/src/error.rs index 639daf19..12490a05 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,9 +8,6 @@ use std::{io, panic}; #[derive(Debug)] #[non_exhaustive] pub enum StorageError { - /// For use by fuzzer only - #[cfg(any(fuzzing, test))] - SimulatedIOFailure, /// The Database is corrupted Corrupted(String), /// The value being inserted exceeds the maximum of 3GiB @@ -34,8 +31,6 @@ impl From for StorageError { impl From for Error { fn from(err: StorageError) -> Error { match err { - #[cfg(any(fuzzing, test))] - StorageError::SimulatedIOFailure => Error::SimulatedIOFailure, StorageError::Corrupted(msg) => Error::Corrupted(msg), StorageError::ValueTooLarge(x) => Error::ValueTooLarge(x), StorageError::Io(x) => Error::Io(x), @@ -47,10 +42,6 @@ impl From for Error { impl Display for StorageError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - #[cfg(any(fuzzing, test))] - StorageError::SimulatedIOFailure => { - write!(f, "Fuzzer: Simulated I/O error") - } StorageError::Corrupted(msg) => { write!(f, "DB corrupted: {msg}") } @@ -415,9 +406,6 @@ impl std::error::Error for CommitError {} #[derive(Debug)] #[non_exhaustive] pub enum Error { - /// For use by fuzzer only - #[cfg(any(fuzzing, test))] - SimulatedIOFailure, /// The Database is already open. Cannot acquire lock. DatabaseAlreadyOpen, /// This savepoint is invalid or cannot be created. @@ -474,10 +462,6 @@ impl From for Error { impl Display for Error { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - #[cfg(any(fuzzing, test))] - Error::SimulatedIOFailure => { - write!(f, "Fuzzer: Simulated I/O error") - } Error::Corrupted(msg) => { write!(f, "DB corrupted: {msg}") } diff --git a/src/tree_store/page_store/cached_file.rs b/src/tree_store/page_store/cached_file.rs index 1a950baf..749fb1f6 100644 --- a/src/tree_store/page_store/cached_file.rs +++ b/src/tree_store/page_store/cached_file.rs @@ -6,7 +6,7 @@ use std::io; use std::mem; use std::ops::{Index, IndexMut}; use std::slice::SliceIndex; -#[cfg(any(fuzzing, test, feature = "cache_metrics"))] +#[cfg(feature = "cache_metrics")] use std::sync::atomic::AtomicU64; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; @@ -235,8 +235,6 @@ pub(super) struct PagedCachedFile { read_cache: Vec>, // TODO: maybe move this cache to WriteTransaction? write_buffer: Mutex, - #[cfg(any(fuzzing, test))] - crash_countdown: AtomicU64, } impl PagedCachedFile { @@ -265,8 +263,6 @@ impl PagedCachedFile { fsync_failed: Default::default(), read_cache, write_buffer: Mutex::new(PrioritizedWriteCache::new()), - #[cfg(any(fuzzing, test))] - crash_countdown: AtomicU64::new(u64::MAX), }) } @@ -274,11 +270,6 @@ impl PagedCachedFile { self.file.len().map_err(StorageError::from) } - #[cfg(any(fuzzing, test))] - pub(crate) fn set_crash_countdown(&self, value: u64) { - self.crash_countdown.store(value, Ordering::Release); - } - const fn lock_stripes() -> usize { 131 } @@ -293,18 +284,11 @@ impl PagedCachedFile { } #[inline] - #[cfg(not(fuzzing))] fn set_fsync_failed(&self, failed: bool) { self.fsync_failed.store(failed, Ordering::Release); } fn flush_write_buffer(&self) -> Result { - #[cfg(any(fuzzing, test))] - { - if self.crash_countdown.load(Ordering::Acquire) == 0 { - return Err(StorageError::SimulatedIOFailure); - } - } self.check_fsync_failure()?; let mut write_buffer = self.write_buffer.lock().unwrap(); @@ -331,14 +315,11 @@ impl PagedCachedFile { pub(super) fn flush(&self, #[allow(unused_variables)] eventual: bool) -> Result { self.check_fsync_failure()?; self.flush_write_buffer()?; - // Disable fsync when fuzzing, since it doesn't test crash consistency - #[cfg(not(fuzzing))] - { - let res = self.file.sync_data(eventual).map_err(StorageError::from); - if res.is_err() { - self.set_fsync_failed(true); - return res; - } + + let res = self.file.sync_data(eventual).map_err(StorageError::from); + if res.is_err() { + self.set_fsync_failed(true); + return res; } Ok(()) @@ -351,12 +332,6 @@ impl PagedCachedFile { // Read directly from the file, ignoring any cached data pub(super) fn read_direct(&self, offset: u64, len: usize) -> Result> { - #[cfg(any(fuzzing, test))] - { - if self.crash_countdown.load(Ordering::Acquire) == 0 { - return Err(StorageError::SimulatedIOFailure); - } - } self.check_fsync_failure()?; Ok(self.file.read(offset, len)?) } @@ -486,13 +461,6 @@ impl PagedCachedFile { } else { let previous = self.write_buffer_bytes.fetch_add(len, Ordering::AcqRel); if previous + len > self.max_write_buffer_bytes { - #[cfg(any(fuzzing, test))] - { - if self.crash_countdown.load(Ordering::Acquire) == 0 { - return Err(StorageError::SimulatedIOFailure); - } - self.crash_countdown.fetch_sub(1, Ordering::AcqRel); - } let mut removed_bytes = 0; while removed_bytes < len { if let Some((offset, buffer, removed_priority)) = lock.pop_lowest_priority() { diff --git a/src/tree_store/page_store/page_manager.rs b/src/tree_store/page_store/page_manager.rs index 126501ce..2254897e 100644 --- a/src/tree_store/page_store/page_manager.rs +++ b/src/tree_store/page_store/page_manager.rs @@ -272,11 +272,6 @@ impl TransactionalMemory { }) } - #[cfg(any(fuzzing, test))] - pub(crate) fn set_crash_countdown(&self, value: u64) { - self.storage.set_crash_countdown(value); - } - pub(crate) fn clear_read_cache(&mut self) { self.storage.invalidate_cache_all() }