diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 323491dc..359020c6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -83,7 +83,7 @@ jobs: uses: nanasess/setup-chromedriver@v2 - name: Build (wasm32-unknown-unknown) - run: cargo build --all --target=wasm32-unknown-unknown --features=wasm-bindgen + run: cargo build --all --target=wasm32-unknown-unknown --all-features - name: Test proto crate run: wasm-pack test --node proto @@ -97,6 +97,9 @@ jobs: - name: Test node-wasm crate run: wasm-pack test --headless --chrome node-wasm + - name: Test blockstore crate + run: wasm-pack test --headless --chrome blockstore --all-features + - name: Build and pack node-wasm run: wasm-pack build --release --target web node-wasm && wasm-pack pack node-wasm diff --git a/Cargo.lock b/Cargo.lock index 2f887328..1b8ebf7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -655,13 +655,19 @@ dependencies = [ "async-trait", "cid", "dashmap", + "js-sys", "lru", "multihash", "multihash-codetable", + "rexie", + "rstest", + "send_wrapper 0.6.0", "sled", "tempfile", "thiserror", "tokio", + "wasm-bindgen", + "wasm-bindgen-test", ] [[package]] @@ -1683,6 +1689,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "gloo-timers" version = "0.2.6" @@ -3879,6 +3891,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "relative-path" +version = "1.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e898588f33fdd5b9420719948f9f2a32c922a246964576f71ba7f24f80610fbc" + [[package]] name = "resolv-conf" version = "0.7.0" @@ -3952,6 +3970,35 @@ dependencies = [ "rustc-hex", ] +[[package]] +name = "rstest" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97eeab2f3c0a199bc4be135c36c924b6590b88c377d416494288c14f2db30199" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version 0.4.0", +] + +[[package]] +name = "rstest_macros" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d428f8247852f894ee1be110b375111b586d4fa431f6c46e64ba5a0dcccbe605" +dependencies = [ + "cfg-if", + "glob", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version 0.4.0", + "syn 2.0.48", + "unicode-ident", +] + [[package]] name = "rtnetlink" version = "0.10.1" diff --git a/blockstore/Cargo.toml b/blockstore/Cargo.toml index 32467fa9..71b9bebf 100644 --- a/blockstore/Cargo.toml +++ b/blockstore/Cargo.toml @@ -19,23 +19,36 @@ cid = "0.11.0" dashmap = "5.5.3" lru = { version = "0.12.2", optional = true } multihash = "0.19.1" +thiserror = "1.0.40" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] # Upgrading this dependency invalidates existing persistent dbs. # Those can be restored by migrating between versions: # https://docs.rs/sled/latest/sled/struct.Db.html#examples-1 sled = { version = "0.34.7", optional = true } tokio = { version = "1.29.0", features = ["macros", "rt"], optional = true } -thiserror = "1.0.40" + +[target.'cfg(target_arch = "wasm32")'.dependencies] +js-sys = { version = "0.3.68", optional = true } +rexie = { version = "0.5.0", optional = true } +send_wrapper = { version = "0.6.0", features = ["futures"], optional = true } +wasm-bindgen = { version = "0.2.91", optional = true } [dev-dependencies] +rstest = "0.18.2" tokio = { version = "1.29.0", features = ["macros", "rt"] } tempfile = "3.10" # doc-tests multihash-codetable = { version = "0.1.1", features = ["digest", "sha2"] } +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +wasm-bindgen-test = "0.3.41" + [features] lru = ["dep:lru"] sled = ["dep:sled", "dep:tokio"] +indexeddb = ["dep:js-sys", "dep:rexie", "dep:send_wrapper", "dep:wasm-bindgen"] [package.metadata.docs.rs] rustdoc-args = ["--cfg", "docs_rs"] diff --git a/blockstore/src/in_memory_blockstore.rs b/blockstore/src/in_memory_blockstore.rs index 74d0d0b3..1abc5e82 100644 --- a/blockstore/src/in_memory_blockstore.rs +++ b/blockstore/src/in_memory_blockstore.rs @@ -60,210 +60,3 @@ impl Default for InMemoryBlockstore::new(); - - let cid = CidGeneric::<128>::read_bytes( - [ - 0x01, // CIDv1 - 0x01, // CID codec = 1 - 0x02, // code = 2 - 0x03, // len = 3 - 1, 2, 3, // hash - ] - .as_ref(), - ) - .unwrap(); - let data = [0xCD; 512]; - - store.put_keyed(&cid, &data).await.unwrap(); - - let retrieved_data = store.get(&cid).await.unwrap().unwrap(); - assert_eq!(data, retrieved_data.as_ref()); - - let another_cid = CidGeneric::<128>::default(); - let missing_block = store.get(&another_cid).await.unwrap(); - assert_eq!(missing_block, None); - } - - #[tokio::test] - async fn test_duplicate_insert() { - let cid0 = CidGeneric::<128>::read_bytes( - [ - 0x01, // CIDv1 - 0x11, // CID codec - 0x22, // multihash code - 0x02, // len = 2 - 0, 0, // hash - ] - .as_ref(), - ) - .unwrap(); - let cid1 = CidGeneric::<128>::read_bytes( - [ - 0x01, // CIDv1 - 0x33, // CID codec - 0x44, // multihash code - 0x02, // len = 2 - 0, 1, // hash - ] - .as_ref(), - ) - .unwrap(); - - let store = InMemoryBlockstore::<128>::new(); - - store.put_keyed(&cid0, &[0x01; 1]).await.unwrap(); - store.put_keyed(&cid1, &[0x02; 1]).await.unwrap(); - let insert_err = store.put_keyed(&cid1, &[0x03; 1]).await.unwrap_err(); - assert!(matches!(insert_err, BlockstoreError::CidExists)); - } - - #[tokio::test] - async fn different_cid_size() { - let cid_bytes = [ - 0x01, // CIDv1 - 0x11, // CID codec - 0x22, // multihash code - 0x02, // len = 2 - 0, 0, // hash - ]; - let cid0 = CidGeneric::<6>::read_bytes(cid_bytes.as_ref()).unwrap(); - let cid1 = CidGeneric::<32>::read_bytes(cid_bytes.as_ref()).unwrap(); - let cid2 = CidGeneric::<64>::read_bytes(cid_bytes.as_ref()).unwrap(); - let cid3 = CidGeneric::<65>::read_bytes(cid_bytes.as_ref()).unwrap(); - let cid4 = CidGeneric::<128>::read_bytes(cid_bytes.as_ref()).unwrap(); - - let data = [0x99; 5]; - - let store = InMemoryBlockstore::<64>::new(); - store.put_keyed(&cid0, data.as_ref()).await.unwrap(); - - let data0 = store.get(&cid0).await.unwrap().unwrap(); - assert_eq!(data, data0.as_ref()); - let data1 = store.get(&cid1).await.unwrap().unwrap(); - assert_eq!(data, data1.as_ref()); - let data2 = store.get(&cid2).await.unwrap().unwrap(); - assert_eq!(data, data2.as_ref()); - let data3 = store.get(&cid3).await.unwrap().unwrap(); - assert_eq!(data, data3.as_ref()); - let data4 = store.get(&cid4).await.unwrap().unwrap(); - assert_eq!(data, data4.as_ref()); - } - - #[tokio::test] - async fn too_large_cid() { - let cid = CidGeneric::<32>::read_bytes( - [ - 0x01, // CIDv1 - 0x11, // CID codec - 0x22, // multihash code - 0x10, // len = 16 - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - ] - .as_ref(), - ) - .unwrap(); - - let store = InMemoryBlockstore::<8>::new(); - let insert_err = store.put_keyed(&cid, [0x00, 1].as_ref()).await.unwrap_err(); - assert!(matches!(insert_err, BlockstoreError::CidTooLong)); - - let insert_err = store.get(&cid).await.unwrap_err(); - assert!(matches!(insert_err, BlockstoreError::CidTooLong)); - } - - #[tokio::test] - async fn test_block_insert() { - let block = TestBlock([0, 1, 2, 3]); - - let store = InMemoryBlockstore::<8>::new(); - store.put(block).await.unwrap(); - let retrieved_block = store.get(&block.cid().unwrap()).await.unwrap().unwrap(); - assert_eq!(block.data(), &retrieved_block); - } - - #[tokio::test] - async fn test_multiple_blocks_insert() { - let blocks = [ - TestBlock([0, 0, 0, 0]), - TestBlock([0, 0, 0, 1]), - TestBlock([0, 0, 1, 0]), - TestBlock([0, 0, 1, 1]), - TestBlock([0, 1, 0, 0]), - TestBlock([0, 1, 0, 1]), - TestBlock([0, 1, 1, 0]), - TestBlock([0, 1, 1, 1]), - ]; - let uninserted_blocks = [ - TestBlock([1, 0, 0, 0]), - TestBlock([1, 0, 0, 1]), - TestBlock([1, 0, 1, 0]), - TestBlock([1, 1, 0, 1]), - ]; - - let store = InMemoryBlockstore::<8>::new(); - store.put_many(blocks).await.unwrap(); - - for b in blocks { - let cid = b.cid().unwrap(); - assert!(store.has(&cid).await.unwrap()); - let retrieved_block = store.get(&cid).await.unwrap().unwrap(); - assert_eq!(b.data(), &retrieved_block); - } - - for b in uninserted_blocks { - let cid = b.cid().unwrap(); - assert!(!store.has(&cid).await.unwrap()); - assert!(store.get(&cid).await.unwrap().is_none()); - } - } - - #[tokio::test] - async fn test_multiple_keyed() { - let blocks = [[0], [1], [2], [3]]; - let cids = [ - // 4 different arbitrary CIDs - TestBlock([0, 0, 0, 1]).cid().unwrap(), - TestBlock([0, 0, 0, 2]).cid().unwrap(), - TestBlock([0, 0, 0, 3]).cid().unwrap(), - TestBlock([0, 0, 0, 4]).cid().unwrap(), - ]; - let pairs = zip(cids, blocks); - - let store = InMemoryBlockstore::<8>::new(); - store.put_many_keyed(pairs.clone()).await.unwrap(); - - for (cid, block) in pairs { - let retrieved_block = store.get(&cid).await.unwrap().unwrap(); - assert_eq!(block.as_ref(), &retrieved_block); - } - } - - const TEST_CODEC: u64 = 0x0A; - const TEST_MH_CODE: u64 = 0x0A; - - #[derive(Debug, PartialEq, Clone, Copy)] - struct TestBlock(pub [u8; 4]); - - impl Block<8> for TestBlock { - fn cid(&self) -> StdResult, CidError> { - let mh = Multihash::wrap(TEST_MH_CODE, &self.0).unwrap(); - Ok(CidGeneric::new_v1(TEST_CODEC, mh)) - } - - fn data(&self) -> &[u8] { - &self.0 - } - } -} diff --git a/blockstore/src/indexed_db_blockstore.rs b/blockstore/src/indexed_db_blockstore.rs new file mode 100644 index 00000000..288bf440 --- /dev/null +++ b/blockstore/src/indexed_db_blockstore.rs @@ -0,0 +1,157 @@ +use cid::CidGeneric; +use js_sys::Uint8Array; +use rexie::{KeyRange, ObjectStore, Rexie, Store, TransactionMode}; +use send_wrapper::SendWrapper; +use wasm_bindgen::{JsCast, JsValue}; + +use crate::{Blockstore, BlockstoreError, Result}; + +/// indexeddb version, needs to be incremented on every schema change +const DB_VERSION: u32 = 1; + +const BLOCK_STORE: &str = "BLOCKSTORE.BLOCKS"; + +/// A [`Blockstore`] implementation backed by an `IndexedDb` database. +#[derive(Debug)] +pub struct IndexedDbBlockstore { + db: SendWrapper, +} + +impl IndexedDbBlockstore { + /// Create or open a [`IndexedDbBlockstore`] with a given name. + /// + /// # Example + /// ``` + /// # async fn example() -> Result<(), Box> { + /// use blockstore::IndexedDbBlockstore; + /// + /// let blockstore = IndexedDbBlockstore::new("blocks").await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn new(name: &str) -> Result { + let rexie = Rexie::builder(name) + .version(DB_VERSION) + .add_object_store(ObjectStore::new(BLOCK_STORE).auto_increment(false)) + .build() + .await + .map_err(|e| BlockstoreError::BackingStoreError(e.to_string()))?; + + Ok(Self { + db: SendWrapper::new(rexie), + }) + } + + async fn get(&self, cid: &CidGeneric) -> Result>> { + let cid = Uint8Array::from(cid.to_bytes().as_ref()); + + let tx = self + .db + .transaction(&[BLOCK_STORE], TransactionMode::ReadOnly)?; + let blocks = tx.store(BLOCK_STORE)?; + let block = blocks.get(&cid).await?; + + if block.is_undefined() { + Ok(None) + } else { + let arr = block.dyn_ref::().ok_or_else(|| { + BlockstoreError::StorageCorrupted(format!( + "expected 'Uint8Array', got '{}'", + block + .js_typeof() + .as_string() + .expect("typeof must be a string") + )) + })?; + Ok(Some(arr.to_vec())) + } + } + + async fn put(&self, cid: &CidGeneric, data: &[u8]) -> Result<()> { + let cid = Uint8Array::from(cid.to_bytes().as_ref()); + let data = Uint8Array::from(data); + + let tx = self + .db + .transaction(&[BLOCK_STORE], TransactionMode::ReadWrite)?; + let blocks = tx.store(BLOCK_STORE)?; + + if !has_key(&blocks, &cid).await? { + blocks.add(&data, Some(&cid)).await?; + Ok(()) + } else { + Err(BlockstoreError::CidExists) + } + } + + async fn has(&self, cid: &CidGeneric) -> Result { + let cid = Uint8Array::from(cid.to_bytes().as_ref()); + + let tx = self + .db + .transaction(&[BLOCK_STORE], TransactionMode::ReadOnly)?; + let blocks = tx.store(BLOCK_STORE)?; + + has_key(&blocks, &cid).await + } +} + +#[cfg_attr(not(docs_rs), async_trait::async_trait)] +impl Blockstore for IndexedDbBlockstore { + async fn get(&self, cid: &CidGeneric) -> Result>> { + let fut = SendWrapper::new(self.get(cid)); + fut.await + } + + async fn put_keyed(&self, cid: &CidGeneric, data: &[u8]) -> Result<()> { + let fut = SendWrapper::new(self.put(cid, data)); + fut.await + } + + async fn has(&self, cid: &CidGeneric) -> Result { + let fut = SendWrapper::new(self.has(cid)); + fut.await + } +} + +impl From for BlockstoreError { + fn from(value: rexie::Error) -> Self { + BlockstoreError::BackingStoreError(value.to_string()) + } +} + +async fn has_key(store: &Store, key: &JsValue) -> Result { + let key_range = KeyRange::only(key)?; + let count = store.count(Some(&key_range)).await?; + Ok(count > 0) +} + +#[cfg(test)] +mod tests { + use wasm_bindgen_test::{wasm_bindgen_test, wasm_bindgen_test_configure}; + + use crate::tests::cid_v1; + + use super::*; + + wasm_bindgen_test_configure!(run_in_browser); + + #[wasm_bindgen_test] + async fn store_persists() { + let store_name = "indexeddb-blockstore-test-persistent"; + Rexie::delete(store_name).await.unwrap(); + + let store = IndexedDbBlockstore::new(store_name).await.unwrap(); + let cid = cid_v1::<64>(b"1"); + let data = b"data"; + + store.put_keyed(&cid, data).await.unwrap(); + + store.db.take().close(); + + let store = IndexedDbBlockstore::new(store_name).await.unwrap(); + let received = store.get(&cid).await.unwrap(); + + assert_eq!(received, Some(data.to_vec())); + } +} diff --git a/blockstore/src/lib.rs b/blockstore/src/lib.rs index 937aa826..2e7021e8 100644 --- a/blockstore/src/lib.rs +++ b/blockstore/src/lib.rs @@ -9,17 +9,22 @@ use crate::block::{Block, CidError}; /// Utilities related to computing CID for the inserted data pub mod block; mod in_memory_blockstore; +#[cfg(all(target_arch = "wasm32", feature = "indexeddb"))] +mod indexed_db_blockstore; #[cfg(feature = "lru")] mod lru_blockstore; -#[cfg(feature = "sled")] +#[cfg(all(not(target_arch = "wasm32"), feature = "sled"))] mod sled_blockstore; pub use crate::in_memory_blockstore::InMemoryBlockstore; +#[cfg(all(target_arch = "wasm32", feature = "indexeddb"))] +#[cfg_attr(docs_rs, doc(cfg(all(target_arch = "wasm32", feature = "indexeddb"))))] +pub use crate::indexed_db_blockstore::IndexedDbBlockstore; #[cfg(feature = "lru")] #[cfg_attr(docs_rs, doc(cfg(feature = "lru")))] pub use crate::lru_blockstore::LruBlockstore; -#[cfg(feature = "sled")] -#[cfg_attr(docs_rs, doc(cfg(feature = "sled")))] +#[cfg(all(not(target_arch = "wasm32"), feature = "sled"))] +#[cfg_attr(docs_rs, doc(cfg(all(not(target_arch = "wasm32"), feature = "sled"))))] pub use crate::sled_blockstore::SledBlockstore; /// Error returned when performing operations on [`Blockstore`] @@ -50,7 +55,7 @@ pub enum BlockstoreError { BackingStoreError(String), } -type Result = std::result::Result; +type Result = std::result::Result; /// An IPLD blockstore capable of holding arbitrary data indexed by CID. /// @@ -125,3 +130,286 @@ pub(crate) fn convert_cid( Ok(cid) } + +#[cfg(test)] +pub(crate) mod tests { + use rstest::rstest; + // rstest only supports attributes which last segment is `test` + // https://docs.rs/rstest/0.18.2/rstest/attr.rstest.html#inject-test-attribute + #[cfg(not(target_arch = "wasm32"))] + use tokio::test; + #[cfg(target_arch = "wasm32")] + use wasm_bindgen_test::wasm_bindgen_test as test; + + use super::*; + + const TEST_CODEC: u64 = 0x0A; + const TEST_MH_CODE: u64 = 0x0A; + + #[rstest] + #[case(new_in_memory::<64>())] + #[cfg_attr(feature = "lru", case(new_lru::<64>()))] + #[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))] + #[cfg_attr( + all(target_arch = "wasm32", feature = "indexeddb"), + case(new_indexeddb()) + )] + #[self::test] + async fn test_insert_get( + #[case] + #[future(awt)] + store: B, + ) { + let cid = cid_v1::<64>(b"1"); + let data = b"3"; + store.put_keyed(&cid, data).await.unwrap(); + + let retrieved_data = store.get(&cid).await.unwrap().unwrap(); + assert_eq!(&retrieved_data, data); + assert!(store.has(&cid).await.unwrap()); + + let another_cid = cid_v1::<64>(b"2"); + let missing_block = store.get(&another_cid).await.unwrap(); + assert_eq!(missing_block, None); + assert!(!store.has(&another_cid).await.unwrap()); + } + + #[rstest] + #[case(new_in_memory::<64>())] + #[cfg_attr(feature = "lru", case(new_lru::<64>()))] + #[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))] + #[cfg_attr( + all(target_arch = "wasm32", feature = "indexeddb"), + case(new_indexeddb()) + )] + #[self::test] + async fn test_duplicate_insert( + #[case] + #[future(awt)] + store: B, + ) { + let cid0 = cid_v1::<64>(b"1"); + let cid1 = cid_v1::<64>(b"2"); + + store.put_keyed(&cid0, b"1").await.unwrap(); + store.put_keyed(&cid1, b"2").await.unwrap(); + + let insert_err = store.put_keyed(&cid1, b"3").await.unwrap_err(); + assert!(matches!(insert_err, BlockstoreError::CidExists)); + } + + #[rstest] + #[case(new_in_memory::<128>())] + #[cfg_attr(feature = "lru", case(new_lru::<128>()))] + #[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))] + #[cfg_attr( + all(target_arch = "wasm32", feature = "indexeddb"), + case(new_indexeddb()) + )] + #[self::test] + async fn different_cid_size( + #[case] + #[future(awt)] + store: B, + ) { + let cid0 = cid_v1::<32>(b"1"); + let cid1 = cid_v1::<64>(b"1"); + let cid2 = cid_v1::<128>(b"1"); + let data = b"2"; + + store.put_keyed(&cid0, data).await.unwrap(); + + let received = store.get(&cid0).await.unwrap().unwrap(); + assert_eq!(&received, data); + let received = store.get(&cid1).await.unwrap().unwrap(); + assert_eq!(&received, data); + let received = store.get(&cid2).await.unwrap().unwrap(); + assert_eq!(&received, data); + } + + #[rstest] + #[case(new_in_memory::<8>())] + #[cfg_attr(feature = "lru", case(new_lru::<8>()))] + #[self::test] + async fn too_large_cid( + #[case] + #[future(awt)] + store: B, + ) { + let small_cid = cid_v1::<64>([1u8; 8]); + let big_cid = cid_v1::<64>([1u8; 64]); + + store.put_keyed(&small_cid, b"1").await.unwrap(); + let put_err = store.put_keyed(&big_cid, b"1").await.unwrap_err(); + assert!(matches!(put_err, BlockstoreError::CidTooLong)); + + store.get(&small_cid).await.unwrap(); + let get_err = store.get(&big_cid).await.unwrap_err(); + assert!(matches!(get_err, BlockstoreError::CidTooLong)); + + store.has(&small_cid).await.unwrap(); + let has_err = store.has(&big_cid).await.unwrap_err(); + assert!(matches!(has_err, BlockstoreError::CidTooLong)); + } + + #[rstest] + #[case(new_in_memory::<8>())] + #[cfg_attr(feature = "lru", case(new_lru::<8>()))] + #[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))] + #[cfg_attr( + all(target_arch = "wasm32", feature = "indexeddb"), + case(new_indexeddb()) + )] + #[self::test] + async fn test_block_insert( + #[case] + #[future(awt)] + store: B, + ) { + let block = TestBlock([0, 1, 2, 3]); + + store.put(block).await.unwrap(); + let retrieved_block = store.get(&block.cid().unwrap()).await.unwrap().unwrap(); + assert_eq!(block.data(), &retrieved_block); + } + + #[rstest] + #[case(new_in_memory::<8>())] + #[cfg_attr(feature = "lru", case(new_lru::<8>()))] + #[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))] + #[cfg_attr( + all(target_arch = "wasm32", feature = "indexeddb"), + case(new_indexeddb()) + )] + #[self::test] + async fn test_multiple_blocks_insert( + #[case] + #[future(awt)] + store: B, + ) { + let blocks = [ + TestBlock([0, 0, 0, 0]), + TestBlock([0, 0, 0, 1]), + TestBlock([0, 0, 1, 0]), + TestBlock([0, 0, 1, 1]), + TestBlock([0, 1, 0, 0]), + TestBlock([0, 1, 0, 1]), + TestBlock([0, 1, 1, 0]), + TestBlock([0, 1, 1, 1]), + ]; + let uninserted_blocks = [ + TestBlock([1, 0, 0, 0]), + TestBlock([1, 0, 0, 1]), + TestBlock([1, 0, 1, 0]), + TestBlock([1, 1, 0, 1]), + ]; + + store.put_many(blocks).await.unwrap(); + + for b in blocks { + let cid = b.cid().unwrap(); + assert!(store.has(&cid).await.unwrap()); + let retrieved_block = store.get(&cid).await.unwrap().unwrap(); + assert_eq!(b.data(), &retrieved_block); + } + + for b in uninserted_blocks { + let cid = b.cid().unwrap(); + assert!(!store.has(&cid).await.unwrap()); + assert!(store.get(&cid).await.unwrap().is_none()); + } + } + + #[rstest] + #[case(new_in_memory::<8>())] + #[cfg_attr(feature = "lru", case(new_lru::<8>()))] + #[cfg_attr(all(not(target_arch = "wasm32"), feature = "sled"), case(new_sled()))] + #[cfg_attr( + all(target_arch = "wasm32", feature = "indexeddb"), + case(new_indexeddb()) + )] + #[self::test] + async fn test_multiple_keyed( + #[case] + #[future(awt)] + store: B, + ) { + let blocks = [[0], [1], [2], [3]]; + let cids = [ + // 4 different arbitrary CIDs + TestBlock([0, 0, 0, 1]).cid().unwrap(), + TestBlock([0, 0, 0, 2]).cid().unwrap(), + TestBlock([0, 0, 0, 3]).cid().unwrap(), + TestBlock([0, 0, 0, 4]).cid().unwrap(), + ]; + let pairs = std::iter::zip(cids, blocks); + + store.put_many_keyed(pairs.clone()).await.unwrap(); + + for (cid, block) in pairs { + let retrieved_block = store.get(&cid).await.unwrap().unwrap(); + assert_eq!(block.as_ref(), &retrieved_block); + } + } + + async fn new_in_memory() -> InMemoryBlockstore { + InMemoryBlockstore::new() + } + + #[cfg(feature = "lru")] + async fn new_lru() -> LruBlockstore { + LruBlockstore::new(std::num::NonZeroUsize::new(128).unwrap()) + } + + #[cfg(all(not(target_arch = "wasm32"), feature = "sled"))] + async fn new_sled() -> SledBlockstore { + let path = tempfile::TempDir::with_prefix("sled-blockstore-test") + .unwrap() + .into_path(); + + let db = tokio::task::spawn_blocking(move || { + sled::Config::default() + .path(path) + .temporary(true) + .create_new(true) + .open() + .unwrap() + }) + .await + .unwrap(); + + SledBlockstore::new(db).await.unwrap() + } + + #[cfg(all(target_arch = "wasm32", feature = "indexeddb"))] + async fn new_indexeddb() -> IndexedDbBlockstore { + use std::sync::atomic::{AtomicU32, Ordering}; + + static NAME: AtomicU32 = AtomicU32::new(0); + + let name = NAME.fetch_add(1, Ordering::SeqCst); + let name = format!("indexeddb-blockstore-test-{name}"); + + // the db's don't seem to persist but for extra safety make a cleanup + rexie::Rexie::delete(&name).await.unwrap(); + IndexedDbBlockstore::new(&name).await.unwrap() + } + + #[derive(Debug, PartialEq, Clone, Copy)] + struct TestBlock(pub [u8; 4]); + + impl Block<8> for TestBlock { + fn cid(&self) -> Result, CidError> { + let mh = Multihash::wrap(TEST_MH_CODE, &self.0).unwrap(); + Ok(CidGeneric::new_v1(TEST_CODEC, mh)) + } + + fn data(&self) -> &[u8] { + &self.0 + } + } + + pub(crate) fn cid_v1(data: impl AsRef<[u8]>) -> CidGeneric { + CidGeneric::new_v1(1, Multihash::wrap(1, data.as_ref()).unwrap()) + } +} diff --git a/blockstore/src/lru_blockstore.rs b/blockstore/src/lru_blockstore.rs index 120c6011..27894d75 100644 --- a/blockstore/src/lru_blockstore.rs +++ b/blockstore/src/lru_blockstore.rs @@ -3,7 +3,7 @@ use std::{num::NonZeroUsize, sync::Mutex}; use cid::CidGeneric; use lru::LruCache; -use crate::{convert_cid, Blockstore, Result}; +use crate::{convert_cid, Blockstore, BlockstoreError, Result}; /// An LRU cached [`Blockstore`]. pub struct LruBlockstore { @@ -30,8 +30,12 @@ impl Blockstore for LruBlockstore(&self, cid: &CidGeneric, data: &[u8]) -> Result<()> { let cid = convert_cid(cid)?; let mut cache = self.cache.lock().expect("lock failed"); - cache.put(cid, data.to_vec()); - Ok(()) + if !cache.contains(&cid) { + cache.put(cid, data.to_vec()); + Ok(()) + } else { + Err(BlockstoreError::CidExists) + } } async fn has(&self, cid: &CidGeneric) -> Result { @@ -43,19 +47,23 @@ impl Blockstore for LruBlockstore::new(NonZeroUsize::new(2).unwrap()); - let cid1 = Cid::new_v1(1, Multihash::wrap(2, &[1]).unwrap()); - let cid2 = Cid::new_v1(1, Multihash::wrap(2, &[2]).unwrap()); - let cid3 = Cid::new_v1(1, Multihash::wrap(2, &[3]).unwrap()); + let cid1 = cid_v1::<64>(b"1"); + let cid2 = cid_v1::<64>(b"2"); + let cid3 = cid_v1::<64>(b"3"); store.put_keyed(&cid1, b"1").await.unwrap(); assert_eq!(store.get(&cid1).await.unwrap().unwrap(), b"1"); diff --git a/blockstore/src/sled_blockstore.rs b/blockstore/src/sled_blockstore.rs index 51ec5e7d..0d9dc5c9 100644 --- a/blockstore/src/sled_blockstore.rs +++ b/blockstore/src/sled_blockstore.rs @@ -6,13 +6,13 @@ use sled::{Db, Error as SledError, Tree}; use tokio::task::spawn_blocking; use tokio::task::JoinError; -use crate::{convert_cid, Blockstore, BlockstoreError, Result}; +use crate::{Blockstore, BlockstoreError, Result}; const BLOCKS_TREE_ID: &[u8] = b"BLOCKSTORE.BLOCKS"; /// A [`Blockstore`] implementation backed by a [`sled`] database. #[derive(Debug)] -pub struct SledBlockstore { +pub struct SledBlockstore { inner: Arc, } @@ -22,7 +22,7 @@ struct Inner { blocks: Tree, } -impl SledBlockstore { +impl SledBlockstore { /// Create or open a [`SledBlockstore`] in a given sled [`Db`]. /// /// # Example @@ -32,7 +32,7 @@ impl SledBlockstore { /// use tokio::task::spawn_blocking; /// /// let db = spawn_blocking(|| sled::open("path/to/db")).await??; - /// let blockstore = SledBlockstore::<64>::new(db).await?; + /// let blockstore = SledBlockstore::new(db).await?; /// # Ok(()) /// # } /// ``` @@ -48,45 +48,36 @@ impl SledBlockstore { } async fn get(&self, cid: &CidGeneric) -> Result>> { - let cid = convert_cid::(cid)?; let inner = self.inner.clone(); + let cid = cid.to_bytes(); - spawn_blocking(move || { - let key = cid.to_bytes(); - Ok(inner.blocks.get(key)?.map(|bytes| bytes.to_vec())) - }) - .await? + spawn_blocking(move || Ok(inner.blocks.get(cid)?.map(|bytes| bytes.to_vec()))).await? } async fn put(&self, cid: &CidGeneric, data: &[u8]) -> Result<()> { - let cid = convert_cid::(cid)?; let inner = self.inner.clone(); + let cid = cid.to_bytes(); let data = data.to_vec(); spawn_blocking(move || { - let key = cid.to_bytes(); inner .blocks - .compare_and_swap(key, None as Option<&[u8]>, Some(data))? + .compare_and_swap(cid, None as Option<&[u8]>, Some(data))? .or(Err(BlockstoreError::CidExists)) }) .await? } async fn has(&self, cid: &CidGeneric) -> Result { - let cid = convert_cid::(cid)?; let inner = self.inner.clone(); + let cid = cid.to_bytes(); - spawn_blocking(move || { - let key = cid.to_bytes(); - Ok(inner.blocks.contains_key(key)?) - }) - .await? + spawn_blocking(move || Ok(inner.blocks.contains_key(cid)?)).await? } } #[cfg_attr(not(docs_rs), async_trait::async_trait)] -impl Blockstore for SledBlockstore { +impl Blockstore for SledBlockstore { async fn get(&self, cid: &CidGeneric) -> Result>> { self.get(cid).await } @@ -123,96 +114,44 @@ impl From for BlockstoreError { #[cfg(test)] mod tests { - use cid::Cid; - use multihash::Multihash; + use std::path::Path; + + use crate::tests::cid_v1; use super::*; #[tokio::test] - async fn test_insert_get() { + async fn store_persists() { + let path = tempfile::TempDir::with_prefix("sled-blockstore-test") + .unwrap() + .into_path(); + + let store = new_sled_blockstore(&path).await; let cid = cid_v1::<64>(b"1"); - let data = b"3"; + let data = b"data"; - let store = temp_blockstore::<64>().await; store.put_keyed(&cid, data).await.unwrap(); - let retrieved_data = store.get(&cid).await.unwrap().unwrap(); - assert_eq!(&retrieved_data, data); - assert!(store.has(&cid).await.unwrap()); - - let another_cid = Cid::default(); - let missing_block = store.get(&another_cid).await.unwrap(); - assert_eq!(missing_block, None); - assert!(!store.has(&another_cid).await.unwrap()); - } - - #[tokio::test] - async fn test_duplicate_insert() { - let cid0 = cid_v1::<64>(b"1"); - let cid1 = cid_v1::<64>(b"2"); - - let store = temp_blockstore::<64>().await; + spawn_blocking(move || drop(store)).await.unwrap(); - store.put_keyed(&cid0, b"1").await.unwrap(); - store.put_keyed(&cid1, b"2").await.unwrap(); + let store = new_sled_blockstore(&path).await; + let received = store.get(&cid).await.unwrap(); - let insert_err = store.put_keyed(&cid1, b"3").await.unwrap_err(); - assert!(matches!(insert_err, BlockstoreError::CidExists)); + assert_eq!(received, Some(data.to_vec())); } - #[tokio::test] - async fn different_cid_size() { - let cid0 = cid_v1::<32>(b"1"); - let cid1 = cid_v1::<64>(b"1"); - let cid2 = cid_v1::<128>(b"1"); - let data = b"2"; - - let store = temp_blockstore::<128>().await; - store.put_keyed(&cid0, data).await.unwrap(); - - let received = store.get(&cid0).await.unwrap().unwrap(); - assert_eq!(&received, data); - let received = store.get(&cid1).await.unwrap().unwrap(); - assert_eq!(&received, data); - let received = store.get(&cid2).await.unwrap().unwrap(); - assert_eq!(&received, data); - } - - #[tokio::test] - async fn too_large_cid() { - let small_cid = cid_v1::<64>([1u8; 8]); - let big_cid = cid_v1::<64>([1u8; 64]); - - let store = temp_blockstore::<8>().await; - - store.put_keyed(&small_cid, b"1").await.unwrap(); - let put_err = store.put_keyed(&big_cid, b"1").await.unwrap_err(); - assert!(matches!(put_err, BlockstoreError::CidTooLong)); - - store.get(&small_cid).await.unwrap(); - let get_err = store.get(&big_cid).await.unwrap_err(); - assert!(matches!(get_err, BlockstoreError::CidTooLong)); - - store.has(&small_cid).await.unwrap(); - let has_err = store.has(&big_cid).await.unwrap_err(); - assert!(matches!(has_err, BlockstoreError::CidTooLong)); - } - - fn cid_v1(data: impl AsRef<[u8]>) -> CidGeneric { - CidGeneric::new_v1(1, Multihash::wrap(1, data.as_ref()).unwrap()) - } - - async fn temp_blockstore() -> SledBlockstore { - let test_dir = tempfile::TempDir::with_prefix("sled-blockstore-test") - .unwrap() - .into_path(); - - let db = sled::Config::default() - .path(test_dir) - .temporary(true) - .create_new(true) - .open() - .unwrap(); + async fn new_sled_blockstore(path: impl AsRef) -> SledBlockstore { + let path = path.as_ref().to_owned(); + let db = tokio::task::spawn_blocking(move || { + sled::Config::default() + .path(path) + .temporary(false) + .create_new(false) + .open() + .unwrap() + }) + .await + .unwrap(); SledBlockstore::new(db).await.unwrap() } diff --git a/node-wasm/src/node.rs b/node-wasm/src/node.rs index 98575ca8..8f288ec5 100644 --- a/node-wasm/src/node.rs +++ b/node-wasm/src/node.rs @@ -6,7 +6,7 @@ use celestia_types::{hash::Hash, ExtendedHeader}; use js_sys::Array; use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; -use lumina_node::blockstore::InMemoryBlockstore; +use lumina_node::blockstore::IndexedDbBlockstore; use lumina_node::network::{canonical_network_bootnodes, network_genesis, network_id}; use lumina_node::node::{Node, NodeConfig}; use lumina_node::store::{IndexedDbStore, Store}; @@ -213,11 +213,14 @@ impl WasmNodeConfig { } } - async fn into_node_config(self) -> Result> { + async fn into_node_config(self) -> Result> { let network_id = network_id(self.network.into()); let store = IndexedDbStore::new(network_id) .await .js_context("Failed to open the store")?; + let blockstore = IndexedDbBlockstore::new(&format!("{network_id}-blockstore")) + .await + .js_context("Failed to open the blockstore")?; let p2p_local_keypair = Keypair::generate_ed25519(); @@ -234,7 +237,7 @@ impl WasmNodeConfig { p2p_bootnodes, p2p_local_keypair, p2p_listen_on: vec![], - blockstore: InMemoryBlockstore::new(), + blockstore, store, }) } diff --git a/node/Cargo.toml b/node/Cargo.toml index de914002..22d133df 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -19,7 +19,6 @@ categories = [ ] [dependencies] -blockstore = { workspace = true } celestia-proto = { workspace = true } celestia-tendermint-proto = { workspace = true } celestia-types = { workspace = true } @@ -68,6 +67,7 @@ libp2p = { workspace = true, features = [ [target.'cfg(target_arch = "wasm32")'.dependencies] backoff = { version = "0.4.0", features = ["wasm-bindgen"] } +blockstore = { workspace = true, features = ["indexeddb"] } celestia-types = { workspace = true, features = ["wasm-bindgen"] } getrandom = { version = "0.2.10", features = ["js"] } gloo-timers = { version = "0.3.0", features = ["futures"] } diff --git a/node/src/blockstore.rs b/node/src/blockstore.rs index f8c62fe9..6409cd2e 100644 --- a/node/src/blockstore.rs +++ b/node/src/blockstore.rs @@ -8,7 +8,13 @@ use crate::p2p::MAX_MH_SIZE; pub type InMemoryBlockstore = blockstore::InMemoryBlockstore; #[cfg(not(target_arch = "wasm32"))] -/// A [`SledBlockstore`] with maximum multihash size used by lumina. +/// A [`SledBlockstore`]. /// /// [`SledBlockstore`]: blockstore::SledBlockstore -pub type SledBlockstore = blockstore::SledBlockstore; +pub type SledBlockstore = blockstore::SledBlockstore; + +#[cfg(target_arch = "wasm32")] +/// An [`IndexedDbBlockstore`]. +/// +/// [`IndexedDbBlockstore`]: blockstore::IndexedDbBlockstore +pub type IndexedDbBlockstore = blockstore::IndexedDbBlockstore;