Skip to content

Commit

Permalink
Add custom storage backends (#707)
Browse files Browse the repository at this point in the history
* Add pluggable storage backends

* Run cargo fmt

* Impl debug for FileBackend

* Fix typo

* Implement debug for log feature

* Fix OSX compilation issues

* Run cargo fmt again

* Fix clippy issues

* Run cargo fmt

* Remove unused variable warning

* Remove useless conversion

* Fix fuzzing link issue on Mac

* Update build.rs

* Remove platform-dependent APIs and add unsupported backend

* Remove is_empty and run cargo fmt

* Update member names

* Run cargo fmt

* Fix return type and remove unused code
  • Loading branch information
DouglasDwyer authored Oct 23, 2023
1 parent 267b473 commit c8d98b7
Show file tree
Hide file tree
Showing 16 changed files with 372 additions and 186 deletions.
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ exclude = ["fuzz/"]
crate-type = ["cdylib", "rlib"]

[build-dependencies]
pyo3-build-config = "0.20.0"
pyo3-build-config = { version = "0.20.0", optional = true }

[dependencies]
libc = "0.2.104"
log = {version = "0.4.17", optional = true }
pyo3 = {version = "0.20.0", features=["extension-module", "abi3-py37"], optional = true }

[target.'cfg(unix)'.dependencies]
libc = "0.2.104"

# Common test/bench dependencies
[dev-dependencies]
rand = "0.8"
Expand All @@ -45,7 +47,7 @@ io-uring = "0.6.2"

[features]
# This feature is still experimental, and is not considered stable
python = ["pyo3"]
python = [ "pyo3", "pyo3-build-config" ]
# Enables log messages
logging = ["log"]
# Enable cache hit metrics
Expand Down
8 changes: 8 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
fn main() {
if std::env::var("CARGO_CFG_FUZZING").is_ok()
&& std::env::var("CARGO_CFG_TARGET_OS").as_deref() == Ok("macos")
{
println!("cargo:rustc-cdylib-link-arg=-undefined");
println!("cargo:rustc-cdylib-link-arg=dynamic_lookup");
}

#[cfg(feature = "python")]
pyo3_build_config::add_extension_module_link_args();
}
42 changes: 37 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use crate::{
StorageError,
};
use crate::{ReadTransaction, Result, WriteTransaction};
use std::fmt::{Display, Formatter};
use std::fmt::{Debug, Display, Formatter};

use std::fs::{File, OpenOptions};
use std::io;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::ops::RangeFull;
Expand All @@ -26,6 +28,25 @@ use crate::transactions::SAVEPOINT_TABLE;
#[cfg(feature = "logging")]
use log::{info, warn};

#[allow(clippy::len_without_is_empty)]
/// Implements persistent storage for a database.
pub trait StorageBackend: 'static + Debug + Send + Sync {
/// Gets the current length of the storage.
fn len(&self) -> Result<u64, io::Error>;

/// Reads the specified array of bytes from the storage.
fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, io::Error>;

/// Sets the length of the storage.
fn set_len(&self, len: u64) -> Result<(), io::Error>;

/// Syncs all buffered data with the persistent storage.
fn sync_data(&self, eventual: bool) -> Result<(), io::Error>;

/// Writes the specified array to the storage.
fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error>;
}

struct AtomicTransactionId {
inner: AtomicU64,
}
Expand Down Expand Up @@ -608,7 +629,7 @@ impl Database {
}

fn new(
file: File,
file: Box<dyn StorageBackend>,
page_size: usize,
region_size: Option<u64>,
read_cache_size_bytes: usize,
Expand Down Expand Up @@ -793,7 +814,7 @@ impl Builder {
.open(path)?;

Database::new(
file,
Box::new(crate::FileBackend::new(file)?),
self.page_size,
self.region_size,
self.read_cache_size_bytes,
Expand All @@ -810,7 +831,7 @@ impl Builder {
}

Database::new(
file,
Box::new(crate::FileBackend::new(file)?),
self.page_size,
None,
self.read_cache_size_bytes,
Expand All @@ -823,7 +844,18 @@ impl Builder {
/// The file must be empty or contain a valid database.
pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
Database::new(
file,
Box::new(crate::FileBackend::new(file)?),
self.page_size,
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
)
}

/// Open an existing or create a new database with the given backend.
pub fn create_backend(&self, backend: impl StorageBackend) -> Result<Database, DatabaseError> {
Database::new(
Box::new(backend),
self.page_size,
self.region_size,
self.read_cache_size_bytes,
Expand Down
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
//! [design]: https://github.com/cberner/redb/blob/master/docs/design.md
pub use db::{
Builder, Database, MultimapTableDefinition, MultimapTableHandle, TableDefinition, TableHandle,
UntypedMultimapTableHandle, UntypedTableHandle,
Builder, Database, MultimapTableDefinition, MultimapTableHandle, StorageBackend,
TableDefinition, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle,
};
pub use error::{
CommitError, CompactionError, DatabaseError, Error, SavepointError, StorageError, TableError,
Expand All @@ -66,7 +66,8 @@ pub use multimap_table::{
};
pub use table::{Drain, DrainFilter, Range, ReadOnlyTable, ReadableTable, Table};
pub use transactions::{DatabaseStats, Durability, ReadTransaction, WriteTransaction};
pub use tree_store::{AccessGuard, AccessGuardMut, Savepoint};
pub use tree_store::file_backend::FileBackend;
pub use tree_store::{AccessGuard, AccessGuardMut, InMemoryBackend, Savepoint};
pub use types::{RedbKey, RedbValue, TypeName};

type Result<T = (), E = StorageError> = std::result::Result<T, E>;
Expand Down
2 changes: 1 addition & 1 deletion src/tree_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub(crate) use btree_base::{LeafAccessor, LeafMutator, RawLeafBuilder, BRANCH, L
pub(crate) use btree_iters::{
AllPageNumbersBtreeIter, BtreeDrain, BtreeDrainFilter, BtreeRangeIter,
};
pub use page_store::Savepoint;
pub use page_store::{file_backend, InMemoryBackend, Savepoint};
pub(crate) use page_store::{
CachePriority, Page, PageHint, PageNumber, SerializedSavepoint, TransactionalMemory,
FILE_FORMAT_VERSION, MAX_VALUE_LENGTH, PAGE_SIZE,
Expand Down
60 changes: 8 additions & 52 deletions src/tree_store/page_store/cached_file.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use crate::tree_store::page_store::base::PageHint;
use crate::tree_store::page_store::file_lock::LockedFile;
use crate::tree_store::LEAF;
use crate::{DatabaseError, Result, StorageError};
use crate::{DatabaseError, Result, StorageBackend, StorageError};
use std::collections::BTreeMap;
use std::fs::File;
use std::io;
use std::mem;
use std::ops::{Index, IndexMut};
#[cfg(any(target_os = "linux", all(unix, not(fuzzing))))]
use std::os::unix::io::AsRawFd;
use std::slice::SliceIndex;
#[cfg(any(fuzzing, test, feature = "cache_metrics"))]
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -225,7 +221,7 @@ impl PrioritizedWriteCache {
}

pub(super) struct PagedCachedFile {
file: LockedFile,
file: Box<dyn StorageBackend>,
page_size: u64,
max_read_cache_bytes: usize,
read_cache_bytes: AtomicUsize,
Expand All @@ -245,7 +241,7 @@ pub(super) struct PagedCachedFile {

impl PagedCachedFile {
pub(super) fn new(
file: File,
file: Box<dyn StorageBackend>,
page_size: u64,
max_read_cache_bytes: usize,
max_write_buffer_bytes: usize,
Expand All @@ -255,17 +251,8 @@ impl PagedCachedFile {
read_cache.push(RwLock::new(PrioritizedCache::new()));
}

let lock = LockedFile::new(file)?;

// Try to flush any pages in the page cache that are out of sync with disk.
// See here for why: <https://github.com/cberner/redb/issues/450>
#[cfg(target_os = "linux")]
unsafe {
libc::posix_fadvise64(lock.file().as_raw_fd(), 0, 0, libc::POSIX_FADV_DONTNEED);
}

Ok(Self {
file: lock,
file,
page_size,
max_read_cache_bytes,
read_cache_bytes: AtomicUsize::new(0),
Expand All @@ -284,7 +271,7 @@ impl PagedCachedFile {
}

pub(crate) fn raw_file_len(&self) -> Result<u64> {
Ok(self.file.file().metadata()?.len())
self.file.len().map_err(StorageError::from)
}

#[cfg(any(fuzzing, test))]
Expand Down Expand Up @@ -338,56 +325,25 @@ impl PagedCachedFile {
// TODO: be more fine-grained about this invalidation
self.invalidate_cache_all();

self.file.file().set_len(len).map_err(StorageError::from)
self.file.set_len(len).map_err(StorageError::from)
}

pub(super) fn flush(&self) -> Result {
pub(super) fn flush(&self, #[allow(unused_variables)] eventual: bool) -> Result {
self.check_fsync_failure()?;
self.flush_write_buffer()?;
// Disable fsync when fuzzing, since it doesn't test crash consistency
#[cfg(not(fuzzing))]
{
let res = self.file.file().sync_data().map_err(StorageError::from);
let res = self.file.sync_data(eventual).map_err(StorageError::from);
if res.is_err() {
self.set_fsync_failed(true);
// Try to flush any pages in the page cache that are out of sync with disk.
// See here for why: <https://github.com/cberner/redb/issues/450>
#[cfg(target_os = "linux")]
unsafe {
libc::posix_fadvise64(
self.file.file().as_raw_fd(),
0,
0,
libc::POSIX_FADV_DONTNEED,
);
}
return res;
}
}

Ok(())
}

pub(super) fn eventual_flush(&self) -> Result {
self.check_fsync_failure()?;

#[cfg(not(target_os = "macos"))]
{
self.flush()?;
}
#[cfg(all(target_os = "macos", not(fuzzing)))]
{
self.flush_write_buffer()?;
let code = unsafe { libc::fcntl(self.file.file().as_raw_fd(), libc::F_BARRIERFSYNC) };
if code == -1 {
self.set_fsync_failed(true);
return Err(io::Error::last_os_error().into());
}
}

Ok(())
}

// Make writes visible to readers, but does not guarantee any durability
pub(super) fn write_barrier(&self) -> Result {
self.flush_write_buffer()
Expand Down
14 changes: 14 additions & 0 deletions src/tree_store/page_store/file_backend/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#[cfg(any(unix, target_os = "wasi"))]
mod unix;
#[cfg(any(unix, target_os = "wasi"))]
pub use unix::FileBackend;

#[cfg(windows)]
mod windows;
#[cfg(windows)]
pub use windows::FileBackend;

#[cfg(not(any(windows, unix, target_os = "wasi")))]
mod unsupported;
#[cfg(not(any(windows, unix, target_os = "wasi")))]
pub use unsupported::FileBackend;
Loading

0 comments on commit c8d98b7

Please sign in to comment.