Skip to content

Commit

Permalink
feat(node): Implement RedbStore (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
oblique authored Apr 15, 2024
1 parent 26cb2fe commit 8b964bc
Show file tree
Hide file tree
Showing 6 changed files with 685 additions and 20 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ libp2p = { workspace = true, features = [
"yamux",
"quic",
] }
redb = "2"

[target.'cfg(target_arch = "wasm32")'.dependencies]
backoff = { version = "0.4.0", features = ["wasm-bindgen"] }
Expand Down
43 changes: 35 additions & 8 deletions node/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Primitives related to the [`ExtendedHeader`] storage.
use std::fmt::Debug;
use std::io::{self, Cursor};
use std::io::Cursor;
use std::ops::{Bound, RangeBounds, RangeInclusive};

use async_trait::async_trait;
Expand All @@ -17,12 +17,16 @@ pub use in_memory_store::InMemoryStore;
#[cfg(target_arch = "wasm32")]
pub use indexed_db_store::IndexedDbStore;
#[cfg(not(target_arch = "wasm32"))]
pub use redb_store::RedbStore;
#[cfg(not(target_arch = "wasm32"))]
pub use sled_store::SledStore;

mod in_memory_store;
#[cfg(target_arch = "wasm32")]
mod indexed_db_store;
#[cfg(not(target_arch = "wasm32"))]
mod redb_store;
#[cfg(not(target_arch = "wasm32"))]
mod sled_store;

use crate::utils::validate_headers;
Expand Down Expand Up @@ -222,18 +226,14 @@ pub enum StoreError {
#[error("Stored data in inconsistent state, try reseting the store: {0}")]
StoredDataError(String),

/// Unrecoverable error reported by the backing store.
#[error("Persistent storage reported unrecoverable error: {0}")]
BackingStoreError(String),
/// Unrecoverable error reported by the database.
#[error("Database reported unrecoverable error: {0}")]
FatalDatabaseError(String),

/// An error propagated from the async executor.
#[error("Received error from executor: {0}")]
ExecutorError(String),

/// An error propagated from the IO operation.
#[error("Received io error from persistent storage: {0}")]
IoError(#[from] io::Error),

/// Failed to open the store.
#[error("Error opening store: {0}")]
OpenFailed(String),
Expand All @@ -243,6 +243,13 @@ pub enum StoreError {
InvalidHeadersRange,
}

#[cfg(not(target_arch = "wasm32"))]
impl From<tokio::task::JoinError> for StoreError {
fn from(error: tokio::task::JoinError) -> StoreError {
StoreError::ExecutorError(error.to_string())
}
}

#[derive(Message)]
struct RawSamplingMetadata {
#[prost(bool, tag = "1")]
Expand Down Expand Up @@ -429,6 +436,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_contains_height<S: Store>(
Expand All @@ -448,6 +456,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_empty_store<S: Store>(
Expand All @@ -470,6 +479,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_read_write<S: Store>(
Expand All @@ -491,6 +501,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_pregenerated_data<S: Store>(
Expand All @@ -516,6 +527,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_duplicate_insert<S: Store>(
Expand All @@ -537,6 +549,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_overwrite_height<S: Store>(
Expand All @@ -561,6 +574,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_overwrite_hash<S: Store>(
Expand All @@ -583,6 +597,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_append_range<S: Store>(
Expand All @@ -601,6 +616,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_append_gap_between_head<S: Store>(
Expand All @@ -626,6 +642,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_non_continuous_append<S: Store>(
Expand All @@ -650,6 +667,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_genesis_with_height<S: Store>(
Expand All @@ -669,6 +687,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_sampling_height_empty_store<S: Store>(
Expand All @@ -689,6 +708,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_sampling_height<S: Store>(
Expand Down Expand Up @@ -766,6 +786,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_sampling_merge<S: Store>(
Expand Down Expand Up @@ -826,6 +847,7 @@ mod tests {
#[rstest]
#[case::in_memory(new_in_memory_store())]
#[cfg_attr(not(target_arch = "wasm32"), case::sled(new_sled_store()))]
#[cfg_attr(not(target_arch = "wasm32"), case::redb(new_redb_store()))]
#[cfg_attr(target_arch = "wasm32", case::indexed_db(new_indexed_db_store()))]
#[self::test]
async fn test_sampled_cids<S: Store>(
Expand Down Expand Up @@ -938,6 +960,11 @@ mod tests {
SledStore::new(db).await.unwrap()
}

#[cfg(not(target_arch = "wasm32"))]
async fn new_redb_store() -> RedbStore {
RedbStore::in_memory().await.unwrap()
}

#[cfg(target_arch = "wasm32")]
async fn new_indexed_db_store() -> IndexedDbStore {
use std::sync::atomic::{AtomicU32, Ordering};
Expand Down
2 changes: 1 addition & 1 deletion node/src/store/indexed_db_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ impl From<rexie::Error> for StoreError {
use rexie::Error as E;
match error {
e @ E::AsyncChannelError => StoreError::ExecutorError(e.to_string()),
other => StoreError::BackingStoreError(other.to_string()),
other => StoreError::FatalDatabaseError(other.to_string()),
}
}
}
Expand Down
Loading

0 comments on commit 8b964bc

Please sign in to comment.