Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom storage backends #707

Merged
merged 19 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ exclude = ["fuzz/"]
crate-type = ["cdylib", "rlib"]

[build-dependencies]
pyo3-build-config = "0.20.0"
pyo3-build-config = { version = "0.20.0", optional = true }

[dependencies]
libc = "0.2.104"
log = {version = "0.4.17", optional = true }
pyo3 = {version = "0.20.0", features=["extension-module", "abi3-py37"], optional = true }

[target.'cfg(unix)'.dependencies]
libc = "0.2.104"

# Common test/bench dependencies
[dev-dependencies]
rand = "0.8"
Expand All @@ -45,7 +47,7 @@ io-uring = "0.6.2"

[features]
# This feature is still experimental, and is not considered stable
python = ["pyo3"]
python = [ "pyo3", "pyo3-build-config" ]
# Enables log messages
logging = ["log"]
# Enable cache hit metrics
Expand Down
8 changes: 8 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
fn main() {
if std::env::var("CARGO_CFG_FUZZING").is_ok()
&& std::env::var("CARGO_CFG_TARGET_OS").as_deref() == Ok("macos")
{
println!("cargo:rustc-cdylib-link-arg=-undefined");
println!("cargo:rustc-cdylib-link-arg=dynamic_lookup");
}

#[cfg(feature = "python")]
pyo3_build_config::add_extension_module_link_args();
}
59 changes: 54 additions & 5 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ use crate::{
StorageError,
};
use crate::{ReadTransaction, Result, WriteTransaction};
use std::fmt::{Display, Formatter};
use std::fmt::{Debug, Display, Formatter};

#[cfg(any(windows, unix, target_os = "wasi"))]
use std::fs::{File, OpenOptions};
use std::io;
#[cfg(any(windows, unix, target_os = "wasi"))]
DouglasDwyer marked this conversation as resolved.
Show resolved Hide resolved
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::ops::RangeFull;
#[cfg(any(windows, unix, target_os = "wasi"))]
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex};
Expand All @@ -26,6 +31,29 @@ use crate::transactions::SAVEPOINT_TABLE;
#[cfg(feature = "logging")]
use log::{info, warn};

/// Implements persistent storage for a database.
pub trait StorageBackend: 'static + Debug + Send + Sync {
/// Whether the current storage backend is of zero length.
fn is_empty(&self) -> Result<bool, io::Error> {
Ok(self.len()? == 0)
}
DouglasDwyer marked this conversation as resolved.
Show resolved Hide resolved

/// Gets the current length of the storage.
fn len(&self) -> Result<u64, io::Error>;

/// Reads the specified array of bytes from the storage.
fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, io::Error>;

/// Sets the length of the storage.
fn set_len(&self, len: u64) -> Result<(), io::Error>;

/// Syncs all buffered data with the persistent storage.
fn sync_data(&self, eventual: bool) -> Result<(), io::Error>;

/// Writes the specified array to the storage.
fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error>;
}

struct AtomicTransactionId {
inner: AtomicU64,
}
Expand Down Expand Up @@ -245,15 +273,22 @@ impl Database {
/// * if the file does not exist, or is an empty file, a new database will be initialized in it
/// * if the file is a valid redb database, it will be opened
/// * otherwise this function will return an error
#[cfg(any(windows, unix, target_os = "wasi"))]
pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
Self::builder().create(path)
}

/// Opens an existing redb database.
#[cfg(any(windows, unix, target_os = "wasi"))]
DouglasDwyer marked this conversation as resolved.
Show resolved Hide resolved
pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
Self::builder().open(path)
}

/// Opens a redb database for the given backend.
pub fn open_backend(backend: impl StorageBackend) -> Result<Database, DatabaseError> {
DouglasDwyer marked this conversation as resolved.
Show resolved Hide resolved
Self::builder().open_backend(backend)
}

pub(crate) fn start_write_transaction(&self) -> TransactionId {
let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
while live_write_transaction.is_some() {
Expand Down Expand Up @@ -608,7 +643,7 @@ impl Database {
}

fn new(
file: File,
file: Box<dyn StorageBackend>,
page_size: usize,
region_size: Option<u64>,
read_cache_size_bytes: usize,
Expand Down Expand Up @@ -785,6 +820,7 @@ impl Builder {
/// * if the file does not exist, or is an empty file, a new database will be initialized in it
/// * if the file is a valid redb database, it will be opened
/// * otherwise this function will return an error
#[cfg(any(windows, unix, target_os = "wasi"))]
pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
let file = OpenOptions::new()
.read(true)
Expand All @@ -793,7 +829,7 @@ impl Builder {
.open(path)?;

Database::new(
file,
Box::new(crate::FileBackend::new(file)?),
self.page_size,
self.region_size,
self.read_cache_size_bytes,
Expand All @@ -802,6 +838,7 @@ impl Builder {
}

/// Opens an existing redb database.
#[cfg(any(windows, unix, target_os = "wasi"))]
pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
let file = OpenOptions::new().read(true).write(true).open(path)?;

Expand All @@ -810,7 +847,7 @@ impl Builder {
}

Database::new(
file,
Box::new(crate::FileBackend::new(file)?),
self.page_size,
None,
self.read_cache_size_bytes,
Expand All @@ -821,9 +858,21 @@ impl Builder {
/// Open an existing or create a new database in the given `file`.
///
/// The file must be empty or contain a valid database.
#[cfg(any(windows, unix, target_os = "wasi"))]
pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
Database::new(
file,
Box::new(crate::FileBackend::new(file)?),
self.page_size,
self.region_size,
self.read_cache_size_bytes,
self.write_cache_size_bytes,
)
}

/// Opens the database with the given backend.
pub fn open_backend(&self, backend: impl StorageBackend) -> Result<Database, DatabaseError> {
DouglasDwyer marked this conversation as resolved.
Show resolved Hide resolved
Database::new(
Box::new(backend),
self.page_size,
self.region_size,
self.read_cache_size_bytes,
Expand Down
8 changes: 5 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
//! [design]: https://github.com/cberner/redb/blob/master/docs/design.md

pub use db::{
Builder, Database, MultimapTableDefinition, MultimapTableHandle, TableDefinition, TableHandle,
UntypedMultimapTableHandle, UntypedTableHandle,
Builder, Database, MultimapTableDefinition, MultimapTableHandle, StorageBackend,
TableDefinition, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle,
};
pub use error::{
CommitError, CompactionError, DatabaseError, Error, SavepointError, StorageError, TableError,
Expand All @@ -66,7 +66,9 @@ pub use multimap_table::{
};
pub use table::{Drain, DrainFilter, Range, ReadOnlyTable, ReadableTable, Table};
pub use transactions::{DatabaseStats, Durability, ReadTransaction, WriteTransaction};
pub use tree_store::{AccessGuard, AccessGuardMut, Savepoint};
#[cfg(any(windows, unix, target_os = "wasi"))]
pub use tree_store::file_backend::FileBackend;
DouglasDwyer marked this conversation as resolved.
Show resolved Hide resolved
pub use tree_store::{AccessGuard, AccessGuardMut, MemoryBackend, Savepoint};
pub use types::{RedbKey, RedbValue, TypeName};

type Result<T = (), E = StorageError> = std::result::Result<T, E>;
Expand Down
2 changes: 1 addition & 1 deletion src/tree_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub(crate) use btree_base::{LeafAccessor, LeafMutator, RawLeafBuilder, BRANCH, L
pub(crate) use btree_iters::{
AllPageNumbersBtreeIter, BtreeDrain, BtreeDrainFilter, BtreeRangeIter,
};
pub use page_store::Savepoint;
pub use page_store::{file_backend, MemoryBackend, Savepoint};
pub(crate) use page_store::{
CachePriority, Page, PageHint, PageNumber, SerializedSavepoint, TransactionalMemory,
FILE_FORMAT_VERSION, MAX_VALUE_LENGTH, PAGE_SIZE,
Expand Down
60 changes: 8 additions & 52 deletions src/tree_store/page_store/cached_file.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use crate::tree_store::page_store::base::PageHint;
use crate::tree_store::page_store::file_lock::LockedFile;
use crate::tree_store::LEAF;
use crate::{DatabaseError, Result, StorageError};
use crate::{DatabaseError, Result, StorageBackend, StorageError};
use std::collections::BTreeMap;
use std::fs::File;
use std::io;
use std::mem;
use std::ops::{Index, IndexMut};
#[cfg(any(target_os = "linux", all(unix, not(fuzzing))))]
use std::os::unix::io::AsRawFd;
use std::slice::SliceIndex;
#[cfg(any(fuzzing, test, feature = "cache_metrics"))]
use std::sync::atomic::AtomicU64;
Expand Down Expand Up @@ -225,7 +221,7 @@ impl PrioritizedWriteCache {
}

pub(super) struct PagedCachedFile {
file: LockedFile,
file: Box<dyn StorageBackend>,
page_size: u64,
max_read_cache_bytes: usize,
read_cache_bytes: AtomicUsize,
Expand All @@ -245,7 +241,7 @@ pub(super) struct PagedCachedFile {

impl PagedCachedFile {
pub(super) fn new(
file: File,
file: Box<dyn StorageBackend>,
page_size: u64,
max_read_cache_bytes: usize,
max_write_buffer_bytes: usize,
Expand All @@ -255,17 +251,8 @@ impl PagedCachedFile {
read_cache.push(RwLock::new(PrioritizedCache::new()));
}

let lock = LockedFile::new(file)?;

// Try to flush any pages in the page cache that are out of sync with disk.
// See here for why: <https://github.com/cberner/redb/issues/450>
#[cfg(target_os = "linux")]
unsafe {
libc::posix_fadvise64(lock.file().as_raw_fd(), 0, 0, libc::POSIX_FADV_DONTNEED);
}

Ok(Self {
file: lock,
file,
page_size,
max_read_cache_bytes,
read_cache_bytes: AtomicUsize::new(0),
Expand All @@ -284,7 +271,7 @@ impl PagedCachedFile {
}

pub(crate) fn raw_file_len(&self) -> Result<u64> {
Ok(self.file.file().metadata()?.len())
self.file.len().map_err(StorageError::from)
}

#[cfg(any(fuzzing, test))]
Expand Down Expand Up @@ -338,56 +325,25 @@ impl PagedCachedFile {
// TODO: be more fine-grained about this invalidation
self.invalidate_cache_all();

self.file.file().set_len(len).map_err(StorageError::from)
self.file.set_len(len).map_err(StorageError::from)
}

pub(super) fn flush(&self) -> Result {
pub(super) fn flush(&self, #[allow(unused_variables)] eventual: bool) -> Result {
self.check_fsync_failure()?;
self.flush_write_buffer()?;
// Disable fsync when fuzzing, since it doesn't test crash consistency
#[cfg(not(fuzzing))]
{
let res = self.file.file().sync_data().map_err(StorageError::from);
let res = self.file.sync_data(eventual).map_err(StorageError::from);
if res.is_err() {
self.set_fsync_failed(true);
// Try to flush any pages in the page cache that are out of sync with disk.
// See here for why: <https://github.com/cberner/redb/issues/450>
#[cfg(target_os = "linux")]
unsafe {
libc::posix_fadvise64(
self.file.file().as_raw_fd(),
0,
0,
libc::POSIX_FADV_DONTNEED,
);
}
return res;
}
}

Ok(())
}

pub(super) fn eventual_flush(&self) -> Result {
self.check_fsync_failure()?;

#[cfg(not(target_os = "macos"))]
{
self.flush()?;
}
#[cfg(all(target_os = "macos", not(fuzzing)))]
{
self.flush_write_buffer()?;
let code = unsafe { libc::fcntl(self.file.file().as_raw_fd(), libc::F_BARRIERFSYNC) };
if code == -1 {
self.set_fsync_failed(true);
return Err(io::Error::last_os_error().into());
}
}

Ok(())
}

// Make writes visible to readers, but does not guarantee any durability
pub(super) fn write_barrier(&self) -> Result {
self.flush_write_buffer()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#[cfg(any(unix, target_os = "wasi"))]
mod unix;
#[cfg(any(unix, target_os = "wasi"))]
pub(super) use unix::LockedFile;
pub use unix::FileBackend;

#[cfg(windows)]
mod windows;
#[cfg(windows)]
pub(super) use windows::LockedFile;
pub use windows::FileBackend;
Loading
Loading