Skip to content

Commit

Permalink
feat!(blockstore): add sled blockstore (#217)
Browse files Browse the repository at this point in the history
* feat(blockstore): add sled blockstore

* feat!(blockstore): remove PartialEq from BlockstoreError

* feat!(blockstore): require Send and Sync bounds on Blockstore

* remove OpenFailed error

* Update blockstore/src/sled_blockstore.rs

Co-authored-by: Mikołaj Florkiewicz <[email protected]>
Signed-off-by: Maciej Zwoliński <[email protected]>

* add new error variants unconditionally

---------

Signed-off-by: Maciej Zwoliński <[email protected]>
Co-authored-by: Mikołaj Florkiewicz <[email protected]>
  • Loading branch information
zvolin and fl0rek authored Feb 9, 2024
1 parent 6bb6ecb commit 5616400
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions blockstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@ cid = "0.11.0"
dashmap = "5.5.3"
lru = { version = "0.12.2", optional = true }
multihash = "0.19.1"
# Upgrading this dependency invalidates existing persistent dbs.
# Those can be restored by migrating between versions:
# https://docs.rs/sled/latest/sled/struct.Db.html#examples-1
sled = { version = "0.34.7", optional = true }
tokio = { version = "1.29.0", features = ["macros", "rt"], optional = true }
thiserror = "1.0.40"

[dev-dependencies]
tokio = { version = "1.29.0", features = ["macros", "rt"] }
tempfile = "3.10"

# doc-tests
multihash-codetable = { version = "0.1.1", features = ["digest", "sha2"] }

[features]
lru = ["dep:lru"]
sled = ["dep:sled", "dep:tokio"]

[package.metadata.docs.rs]
rustdoc-args = ["--cfg", "docs_rs"]
Expand Down
6 changes: 3 additions & 3 deletions blockstore/src/in_memory_blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ mod tests {
store.put_keyed(&cid0, &[0x01; 1]).await.unwrap();
store.put_keyed(&cid1, &[0x02; 1]).await.unwrap();
let insert_err = store.put_keyed(&cid1, &[0x03; 1]).await.unwrap_err();
assert_eq!(insert_err, BlockstoreError::CidExists);
assert!(matches!(insert_err, BlockstoreError::CidExists));
}

#[tokio::test]
Expand Down Expand Up @@ -177,10 +177,10 @@ mod tests {

let store = InMemoryBlockstore::<8>::new();
let insert_err = store.put_keyed(&cid, [0x00, 1].as_ref()).await.unwrap_err();
assert_eq!(insert_err, BlockstoreError::CidTooLong);
assert!(matches!(insert_err, BlockstoreError::CidTooLong));

let insert_err = store.get(&cid).await.unwrap_err();
assert_eq!(insert_err, BlockstoreError::CidTooLong);
assert!(matches!(insert_err, BlockstoreError::CidTooLong));
}

#[tokio::test]
Expand Down
21 changes: 19 additions & 2 deletions blockstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,19 @@ pub mod block;
mod in_memory_blockstore;
#[cfg(feature = "lru")]
mod lru_blockstore;
#[cfg(feature = "sled")]
mod sled_blockstore;

pub use crate::in_memory_blockstore::InMemoryBlockstore;
#[cfg(feature = "lru")]
#[cfg_attr(docs_rs, doc(cfg(feature = "lru")))]
pub use crate::lru_blockstore::LruBlockstore;
#[cfg(feature = "sled")]
#[cfg_attr(docs_rs, doc(cfg(feature = "sled")))]
pub use crate::sled_blockstore::SledBlockstore;

/// Error returned when performing operations on [`Blockstore`]
#[derive(Debug, PartialEq, Error)]
#[derive(Debug, Error)]
pub enum BlockstoreError {
/// Provided CID already exists in blockstore when trying to insert it
#[error("CID already exists in the store")]
Expand All @@ -31,6 +36,18 @@ pub enum BlockstoreError {
/// Error occured when trying to compute CID.
#[error("Error generating CID: {0}")]
CidError(#[from] CidError),

/// An error propagated from the IO operation.
#[error("Received io error from persistent storage: {0}")]
IoError(#[from] std::io::Error),

/// Storage corrupted. Try reseting the blockstore.
#[error("Stored data in inconsistent state, try reseting the store: {0}")]
StorageCorrupted(String),

/// Unrecoverable error reported by the backing store.
#[error("Persistent storage reported unrecoverable error: {0}")]
BackingStoreError(String),
}

type Result<T> = std::result::Result<T, BlockstoreError>;
Expand All @@ -42,7 +59,7 @@ type Result<T> = std::result::Result<T, BlockstoreError>;
///
/// [`CidTooLong`]: BlockstoreError::CidTooLong
#[cfg_attr(not(docs_rs), async_trait::async_trait)]
pub trait Blockstore {
pub trait Blockstore: Send + Sync {
/// Gets the block from the blockstore
async fn get<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<Option<Vec<u8>>>;

Expand Down
219 changes: 219 additions & 0 deletions blockstore/src/sled_blockstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use std::io;
use std::sync::Arc;

use cid::CidGeneric;
use sled::{Db, Error as SledError, Tree};
use tokio::task::spawn_blocking;
use tokio::task::JoinError;

use crate::{convert_cid, Blockstore, BlockstoreError, Result};

const BLOCKS_TREE_ID: &[u8] = b"BLOCKSTORE.BLOCKS";

/// A [`Blockstore`] implementation backed by a [`sled`] database.
#[derive(Debug)]
pub struct SledBlockstore<const MAX_MULTIHASH_SIZE: usize> {
inner: Arc<Inner>,
}

#[derive(Debug)]
struct Inner {
_db: Db,
blocks: Tree,
}

impl<const MAX_MULTIHASH_SIZE: usize> SledBlockstore<MAX_MULTIHASH_SIZE> {
/// Create or open a [`SledBlockstore`] in a given sled [`Db`].
///
/// # Example
/// ```
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// use blockstore::SledBlockstore;
/// use tokio::task::spawn_blocking;
///
/// let db = spawn_blocking(|| sled::open("path/to/db")).await??;
/// let blockstore = SledBlockstore::<64>::new(db).await?;
/// # Ok(())
/// # }
/// ```
pub async fn new(db: Db) -> Result<Self> {
spawn_blocking(move || {
let blocks = db.open_tree(BLOCKS_TREE_ID)?;

Ok(Self {
inner: Arc::new(Inner { _db: db, blocks }),
})
})
.await?
}

async fn get<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<Option<Vec<u8>>> {
let cid = convert_cid::<S, MAX_MULTIHASH_SIZE>(cid)?;
let inner = self.inner.clone();

spawn_blocking(move || {
let key = cid.to_bytes();
Ok(inner.blocks.get(key)?.map(|bytes| bytes.to_vec()))
})
.await?
}

async fn put<const S: usize>(&self, cid: &CidGeneric<S>, data: &[u8]) -> Result<()> {
let cid = convert_cid::<S, MAX_MULTIHASH_SIZE>(cid)?;
let inner = self.inner.clone();
let data = data.to_vec();

spawn_blocking(move || {
let key = cid.to_bytes();
inner
.blocks
.compare_and_swap(key, None as Option<&[u8]>, Some(data))?
.or(Err(BlockstoreError::CidExists))
})
.await?
}

async fn has<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<bool> {
let cid = convert_cid::<S, MAX_MULTIHASH_SIZE>(cid)?;
let inner = self.inner.clone();

spawn_blocking(move || {
let key = cid.to_bytes();
Ok(inner.blocks.contains_key(key)?)
})
.await?
}
}

#[cfg_attr(not(docs_rs), async_trait::async_trait)]
impl<const MAX_MULTIHASH_SIZE: usize> Blockstore for SledBlockstore<MAX_MULTIHASH_SIZE> {
async fn get<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<Option<Vec<u8>>> {
self.get(cid).await
}

async fn put_keyed<const S: usize>(&self, cid: &CidGeneric<S>, data: &[u8]) -> Result<()> {
self.put(cid, data).await
}

async fn has<const S: usize>(&self, cid: &CidGeneric<S>) -> Result<bool> {
self.has(cid).await
}
}

// divide errors into recoverable and not avoiding directly relying on passing sled types
impl From<SledError> for BlockstoreError {
fn from(error: SledError) -> BlockstoreError {
match error {
e @ SledError::CollectionNotFound(_) | e @ SledError::Corruption { .. } => {
BlockstoreError::StorageCorrupted(e.to_string())
}
e @ SledError::Unsupported(_) | e @ SledError::ReportableBug(_) => {
BlockstoreError::BackingStoreError(e.to_string())
}
SledError::Io(e) => e.into(),
}
}
}

impl From<JoinError> for BlockstoreError {
fn from(error: JoinError) -> BlockstoreError {
io::Error::from(error).into()
}
}

#[cfg(test)]
mod tests {
use cid::Cid;
use multihash::Multihash;

use super::*;

#[tokio::test]
async fn test_insert_get() {
let cid = cid_v1::<64>(b"1");
let data = b"3";

let store = temp_blockstore::<64>().await;
store.put_keyed(&cid, data).await.unwrap();

let retrieved_data = store.get(&cid).await.unwrap().unwrap();
assert_eq!(&retrieved_data, data);
assert!(store.has(&cid).await.unwrap());

let another_cid = Cid::default();
let missing_block = store.get(&another_cid).await.unwrap();
assert_eq!(missing_block, None);
assert!(!store.has(&another_cid).await.unwrap());
}

#[tokio::test]
async fn test_duplicate_insert() {
let cid0 = cid_v1::<64>(b"1");
let cid1 = cid_v1::<64>(b"2");

let store = temp_blockstore::<64>().await;

store.put_keyed(&cid0, b"1").await.unwrap();
store.put_keyed(&cid1, b"2").await.unwrap();

let insert_err = store.put_keyed(&cid1, b"3").await.unwrap_err();
assert!(matches!(insert_err, BlockstoreError::CidExists));
}

#[tokio::test]
async fn different_cid_size() {
let cid0 = cid_v1::<32>(b"1");
let cid1 = cid_v1::<64>(b"1");
let cid2 = cid_v1::<128>(b"1");
let data = b"2";

let store = temp_blockstore::<128>().await;
store.put_keyed(&cid0, data).await.unwrap();

let received = store.get(&cid0).await.unwrap().unwrap();
assert_eq!(&received, data);
let received = store.get(&cid1).await.unwrap().unwrap();
assert_eq!(&received, data);
let received = store.get(&cid2).await.unwrap().unwrap();
assert_eq!(&received, data);
}

#[tokio::test]
async fn too_large_cid() {
let small_cid = cid_v1::<64>([1u8; 8]);
let big_cid = cid_v1::<64>([1u8; 64]);

let store = temp_blockstore::<8>().await;

store.put_keyed(&small_cid, b"1").await.unwrap();
let put_err = store.put_keyed(&big_cid, b"1").await.unwrap_err();
assert!(matches!(put_err, BlockstoreError::CidTooLong));

store.get(&small_cid).await.unwrap();
let get_err = store.get(&big_cid).await.unwrap_err();
assert!(matches!(get_err, BlockstoreError::CidTooLong));

store.has(&small_cid).await.unwrap();
let has_err = store.has(&big_cid).await.unwrap_err();
assert!(matches!(has_err, BlockstoreError::CidTooLong));
}

fn cid_v1<const S: usize>(data: impl AsRef<[u8]>) -> CidGeneric<S> {
CidGeneric::new_v1(1, Multihash::wrap(1, data.as_ref()).unwrap())
}

async fn temp_blockstore<const SIZE: usize>() -> SledBlockstore<SIZE> {
let test_dir = tempfile::TempDir::with_prefix("sled-blockstore-test")
.unwrap()
.into_path();

let db = sled::Config::default()
.path(test_dir)
.temporary(true)
.create_new(true)
.open()
.unwrap();

SledBlockstore::new(db).await.unwrap()
}
}

0 comments on commit 5616400

Please sign in to comment.