diff --git a/Cargo.lock b/Cargo.lock index c0d78ab9..24fe5ca4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -658,6 +658,8 @@ dependencies = [ "lru", "multihash", "multihash-codetable", + "sled", + "tempfile", "thiserror", "tokio", ] diff --git a/blockstore/Cargo.toml b/blockstore/Cargo.toml index b3ce8022..32467fa9 100644 --- a/blockstore/Cargo.toml +++ b/blockstore/Cargo.toml @@ -19,16 +19,23 @@ cid = "0.11.0" dashmap = "5.5.3" lru = { version = "0.12.2", optional = true } multihash = "0.19.1" +# Upgrading this dependency invalidates existing persistent dbs. +# Those can be restored by migrating between versions: +# https://docs.rs/sled/latest/sled/struct.Db.html#examples-1 +sled = { version = "0.34.7", optional = true } +tokio = { version = "1.29.0", features = ["macros", "rt"], optional = true } thiserror = "1.0.40" [dev-dependencies] tokio = { version = "1.29.0", features = ["macros", "rt"] } +tempfile = "3.10" # doc-tests multihash-codetable = { version = "0.1.1", features = ["digest", "sha2"] } [features] lru = ["dep:lru"] +sled = ["dep:sled", "dep:tokio"] [package.metadata.docs.rs] rustdoc-args = ["--cfg", "docs_rs"] diff --git a/blockstore/src/in_memory_blockstore.rs b/blockstore/src/in_memory_blockstore.rs index 38b3b5df..74d0d0b3 100644 --- a/blockstore/src/in_memory_blockstore.rs +++ b/blockstore/src/in_memory_blockstore.rs @@ -126,7 +126,7 @@ mod tests { store.put_keyed(&cid0, &[0x01; 1]).await.unwrap(); store.put_keyed(&cid1, &[0x02; 1]).await.unwrap(); let insert_err = store.put_keyed(&cid1, &[0x03; 1]).await.unwrap_err(); - assert_eq!(insert_err, BlockstoreError::CidExists); + assert!(matches!(insert_err, BlockstoreError::CidExists)); } #[tokio::test] @@ -177,10 +177,10 @@ mod tests { let store = InMemoryBlockstore::<8>::new(); let insert_err = store.put_keyed(&cid, [0x00, 1].as_ref()).await.unwrap_err(); - assert_eq!(insert_err, BlockstoreError::CidTooLong); + assert!(matches!(insert_err, BlockstoreError::CidTooLong)); let insert_err = store.get(&cid).await.unwrap_err(); - assert_eq!(insert_err, BlockstoreError::CidTooLong); + assert!(matches!(insert_err, BlockstoreError::CidTooLong)); } #[tokio::test] diff --git a/blockstore/src/lib.rs b/blockstore/src/lib.rs index 19d0cc4b..937aa826 100644 --- a/blockstore/src/lib.rs +++ b/blockstore/src/lib.rs @@ -11,14 +11,19 @@ pub mod block; mod in_memory_blockstore; #[cfg(feature = "lru")] mod lru_blockstore; +#[cfg(feature = "sled")] +mod sled_blockstore; pub use crate::in_memory_blockstore::InMemoryBlockstore; #[cfg(feature = "lru")] #[cfg_attr(docs_rs, doc(cfg(feature = "lru")))] pub use crate::lru_blockstore::LruBlockstore; +#[cfg(feature = "sled")] +#[cfg_attr(docs_rs, doc(cfg(feature = "sled")))] +pub use crate::sled_blockstore::SledBlockstore; /// Error returned when performing operations on [`Blockstore`] -#[derive(Debug, PartialEq, Error)] +#[derive(Debug, Error)] pub enum BlockstoreError { /// Provided CID already exists in blockstore when trying to insert it #[error("CID already exists in the store")] @@ -31,6 +36,18 @@ pub enum BlockstoreError { /// Error occured when trying to compute CID. #[error("Error generating CID: {0}")] CidError(#[from] CidError), + + /// An error propagated from the IO operation. + #[error("Received io error from persistent storage: {0}")] + IoError(#[from] std::io::Error), + + /// Storage corrupted. Try reseting the blockstore. + #[error("Stored data in inconsistent state, try reseting the store: {0}")] + StorageCorrupted(String), + + /// Unrecoverable error reported by the backing store. + #[error("Persistent storage reported unrecoverable error: {0}")] + BackingStoreError(String), } type Result = std::result::Result; @@ -42,7 +59,7 @@ type Result = std::result::Result; /// /// [`CidTooLong`]: BlockstoreError::CidTooLong #[cfg_attr(not(docs_rs), async_trait::async_trait)] -pub trait Blockstore { +pub trait Blockstore: Send + Sync { /// Gets the block from the blockstore async fn get(&self, cid: &CidGeneric) -> Result>>; diff --git a/blockstore/src/sled_blockstore.rs b/blockstore/src/sled_blockstore.rs new file mode 100644 index 00000000..51ec5e7d --- /dev/null +++ b/blockstore/src/sled_blockstore.rs @@ -0,0 +1,219 @@ +use std::io; +use std::sync::Arc; + +use cid::CidGeneric; +use sled::{Db, Error as SledError, Tree}; +use tokio::task::spawn_blocking; +use tokio::task::JoinError; + +use crate::{convert_cid, Blockstore, BlockstoreError, Result}; + +const BLOCKS_TREE_ID: &[u8] = b"BLOCKSTORE.BLOCKS"; + +/// A [`Blockstore`] implementation backed by a [`sled`] database. +#[derive(Debug)] +pub struct SledBlockstore { + inner: Arc, +} + +#[derive(Debug)] +struct Inner { + _db: Db, + blocks: Tree, +} + +impl SledBlockstore { + /// Create or open a [`SledBlockstore`] in a given sled [`Db`]. + /// + /// # Example + /// ``` + /// # async fn example() -> Result<(), Box> { + /// use blockstore::SledBlockstore; + /// use tokio::task::spawn_blocking; + /// + /// let db = spawn_blocking(|| sled::open("path/to/db")).await??; + /// let blockstore = SledBlockstore::<64>::new(db).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn new(db: Db) -> Result { + spawn_blocking(move || { + let blocks = db.open_tree(BLOCKS_TREE_ID)?; + + Ok(Self { + inner: Arc::new(Inner { _db: db, blocks }), + }) + }) + .await? + } + + async fn get(&self, cid: &CidGeneric) -> Result>> { + let cid = convert_cid::(cid)?; + let inner = self.inner.clone(); + + spawn_blocking(move || { + let key = cid.to_bytes(); + Ok(inner.blocks.get(key)?.map(|bytes| bytes.to_vec())) + }) + .await? + } + + async fn put(&self, cid: &CidGeneric, data: &[u8]) -> Result<()> { + let cid = convert_cid::(cid)?; + let inner = self.inner.clone(); + let data = data.to_vec(); + + spawn_blocking(move || { + let key = cid.to_bytes(); + inner + .blocks + .compare_and_swap(key, None as Option<&[u8]>, Some(data))? + .or(Err(BlockstoreError::CidExists)) + }) + .await? + } + + async fn has(&self, cid: &CidGeneric) -> Result { + let cid = convert_cid::(cid)?; + let inner = self.inner.clone(); + + spawn_blocking(move || { + let key = cid.to_bytes(); + Ok(inner.blocks.contains_key(key)?) + }) + .await? + } +} + +#[cfg_attr(not(docs_rs), async_trait::async_trait)] +impl Blockstore for SledBlockstore { + async fn get(&self, cid: &CidGeneric) -> Result>> { + self.get(cid).await + } + + async fn put_keyed(&self, cid: &CidGeneric, data: &[u8]) -> Result<()> { + self.put(cid, data).await + } + + async fn has(&self, cid: &CidGeneric) -> Result { + self.has(cid).await + } +} + +// divide errors into recoverable and not avoiding directly relying on passing sled types +impl From for BlockstoreError { + fn from(error: SledError) -> BlockstoreError { + match error { + e @ SledError::CollectionNotFound(_) | e @ SledError::Corruption { .. } => { + BlockstoreError::StorageCorrupted(e.to_string()) + } + e @ SledError::Unsupported(_) | e @ SledError::ReportableBug(_) => { + BlockstoreError::BackingStoreError(e.to_string()) + } + SledError::Io(e) => e.into(), + } + } +} + +impl From for BlockstoreError { + fn from(error: JoinError) -> BlockstoreError { + io::Error::from(error).into() + } +} + +#[cfg(test)] +mod tests { + use cid::Cid; + use multihash::Multihash; + + use super::*; + + #[tokio::test] + async fn test_insert_get() { + let cid = cid_v1::<64>(b"1"); + let data = b"3"; + + let store = temp_blockstore::<64>().await; + store.put_keyed(&cid, data).await.unwrap(); + + let retrieved_data = store.get(&cid).await.unwrap().unwrap(); + assert_eq!(&retrieved_data, data); + assert!(store.has(&cid).await.unwrap()); + + let another_cid = Cid::default(); + let missing_block = store.get(&another_cid).await.unwrap(); + assert_eq!(missing_block, None); + assert!(!store.has(&another_cid).await.unwrap()); + } + + #[tokio::test] + async fn test_duplicate_insert() { + let cid0 = cid_v1::<64>(b"1"); + let cid1 = cid_v1::<64>(b"2"); + + let store = temp_blockstore::<64>().await; + + store.put_keyed(&cid0, b"1").await.unwrap(); + store.put_keyed(&cid1, b"2").await.unwrap(); + + let insert_err = store.put_keyed(&cid1, b"3").await.unwrap_err(); + assert!(matches!(insert_err, BlockstoreError::CidExists)); + } + + #[tokio::test] + async fn different_cid_size() { + let cid0 = cid_v1::<32>(b"1"); + let cid1 = cid_v1::<64>(b"1"); + let cid2 = cid_v1::<128>(b"1"); + let data = b"2"; + + let store = temp_blockstore::<128>().await; + store.put_keyed(&cid0, data).await.unwrap(); + + let received = store.get(&cid0).await.unwrap().unwrap(); + assert_eq!(&received, data); + let received = store.get(&cid1).await.unwrap().unwrap(); + assert_eq!(&received, data); + let received = store.get(&cid2).await.unwrap().unwrap(); + assert_eq!(&received, data); + } + + #[tokio::test] + async fn too_large_cid() { + let small_cid = cid_v1::<64>([1u8; 8]); + let big_cid = cid_v1::<64>([1u8; 64]); + + let store = temp_blockstore::<8>().await; + + store.put_keyed(&small_cid, b"1").await.unwrap(); + let put_err = store.put_keyed(&big_cid, b"1").await.unwrap_err(); + assert!(matches!(put_err, BlockstoreError::CidTooLong)); + + store.get(&small_cid).await.unwrap(); + let get_err = store.get(&big_cid).await.unwrap_err(); + assert!(matches!(get_err, BlockstoreError::CidTooLong)); + + store.has(&small_cid).await.unwrap(); + let has_err = store.has(&big_cid).await.unwrap_err(); + assert!(matches!(has_err, BlockstoreError::CidTooLong)); + } + + fn cid_v1(data: impl AsRef<[u8]>) -> CidGeneric { + CidGeneric::new_v1(1, Multihash::wrap(1, data.as_ref()).unwrap()) + } + + async fn temp_blockstore() -> SledBlockstore { + let test_dir = tempfile::TempDir::with_prefix("sled-blockstore-test") + .unwrap() + .into_path(); + + let db = sled::Config::default() + .path(test_dir) + .temporary(true) + .create_new(true) + .open() + .unwrap(); + + SledBlockstore::new(db).await.unwrap() + } +}