diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index 57f68d33..f3b2dcc4 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -240,6 +240,25 @@ pub type Transaction<'db> = >::Transaction; #[cfg(feature = "full")] pub type TransactionArg<'db, 'a> = Option<&'a Transaction<'db>>; +/// Type alias for the return type of the `verify_merk_and_submerks` and +/// `verify_grovedb` functions. It represents a mapping of paths (as vectors of +/// vectors of bytes) to a tuple of three cryptographic hashes: the root hash, +/// the combined value hash, and the expected value hash. +#[cfg(feature = "full")] +type VerificationIssues = HashMap>, (CryptoHash, CryptoHash, CryptoHash)>; + +/// Type alias for the return type of the `open_merk_for_replication` function. +/// It represents a tuple containing: +/// - A `Merk` instance with a prefixed RocksDB immediate storage context. +/// - An optional `root_key`, represented as a vector of bytes. +/// - A boolean indicating whether the Merk is a sum tree. +#[cfg(feature = "full")] +type OpenedMerkForReplication<'tx> = ( + Merk>, + Option>, + bool, +); + #[cfg(feature = "full")] impl GroveDb { /// Opens a given path @@ -330,6 +349,46 @@ impl GroveDb { } } + fn open_transactional_merk_by_prefix<'db>( + &'db self, + prefix: SubtreePrefix, + root_key: Option>, + is_sum_tree: bool, + tx: &'db Transaction, + batch: Option<&'db StorageBatch>, + grove_version: &GroveVersion, + ) -> CostResult>, Error> { + let mut cost = OperationCost::default(); + let storage = self + .db + .get_transactional_storage_context_by_subtree_prefix(prefix, batch, tx) + .unwrap_add_cost(&mut cost); + if root_key.is_some() { + Merk::open_layered_with_root_key( + storage, + root_key, + is_sum_tree, + Some(&Element::value_defined_cost_for_serialized_value), + grove_version, + ) + .map_err(|_| { + Error::CorruptedData( + "cannot open a subtree by prefix with given root key".to_owned(), + ) + }) + .add_cost(cost) + } else { + Merk::open_base( + storage, + false, + Some(&Element::value_defined_cost_for_serialized_value), + grove_version, + ) + .map_err(|_| Error::CorruptedData("cannot open a root subtree by prefix".to_owned())) + .add_cost(cost) + } + } + /// Opens a Merk at given path for with direct write access. Intended for /// replication purposes. fn open_merk_for_replication<'tx, 'db: 'tx, 'b, B>( @@ -337,7 +396,7 @@ impl GroveDb { path: SubtreePath<'b, B>, tx: &'tx Transaction<'db>, grove_version: &GroveVersion, - ) -> Result>, Error> + ) -> Result, Error> where B: AsRef<[u8]> + 'b, { @@ -364,31 +423,39 @@ impl GroveDb { .unwrap()?; let is_sum_tree = element.is_sum_tree(); if let Element::Tree(root_key, _) | Element::SumTree(root_key, ..) = element { - Merk::open_layered_with_root_key( - storage, + Ok(( + Merk::open_layered_with_root_key( + storage, + root_key.clone(), + is_sum_tree, + Some(&Element::value_defined_cost_for_serialized_value), + grove_version, + ) + .map_err(|_| { + Error::CorruptedData("cannot open a subtree with given root key".to_owned()) + }) + .unwrap()?, root_key, is_sum_tree, - Some(&Element::value_defined_cost_for_serialized_value), - grove_version, - ) - .map_err(|_| { - Error::CorruptedData("cannot open a subtree with given root key".to_owned()) - }) - .unwrap() + )) } else { Err(Error::CorruptedPath( "cannot open a subtree as parent exists but is not a tree".to_string(), )) } } else { - Merk::open_base( - storage, + Ok(( + Merk::open_base( + storage, + false, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .map_err(|_| Error::CorruptedData("cannot open a the root subtree".to_owned())) + .unwrap()?, + None, false, - None::<&fn(&[u8], &GroveVersion) -> Option>, - grove_version, - ) - .map_err(|_| Error::CorruptedData("cannot open a the root subtree".to_owned())) - .unwrap() + )) } } @@ -398,7 +465,7 @@ impl GroveDb { path: SubtreePath<'b, B>, batch: Option<&'db StorageBatch>, grove_version: &GroveVersion, - ) -> CostResult, Error> + ) -> CostResult>, Error> where B: AsRef<[u8]> + 'b, { @@ -458,6 +525,45 @@ impl GroveDb { } } + fn open_non_transactional_merk_by_prefix<'db>( + &'db self, + prefix: SubtreePrefix, + root_key: Option>, + is_sum_tree: bool, + batch: Option<&'db StorageBatch>, + grove_version: &GroveVersion, + ) -> CostResult>, Error> { + let mut cost = OperationCost::default(); + let storage = self + .db + .get_storage_context_by_subtree_prefix(prefix, batch) + .unwrap_add_cost(&mut cost); + if root_key.is_some() { + Merk::open_layered_with_root_key( + storage, + root_key, + is_sum_tree, + Some(&Element::value_defined_cost_for_serialized_value), + grove_version, + ) + .map_err(|_| { + Error::CorruptedData( + "cannot open a subtree by prefix with given root key".to_owned(), + ) + }) + .add_cost(cost) + } else { + Merk::open_base( + storage, + false, + Some(&Element::value_defined_cost_for_serialized_value), + grove_version, + ) + .map_err(|_| Error::CorruptedData("cannot open a root subtree by prefix".to_owned())) + .add_cost(cost) + } + } + /// Creates a checkpoint pub fn create_checkpoint>(&self, path: P) -> Result<(), Error> { self.db.create_checkpoint(path).map_err(|e| e.into()) @@ -935,7 +1041,7 @@ impl GroveDb { verify_references: bool, allow_cache: bool, grove_version: &GroveVersion, - ) -> Result>, (CryptoHash, CryptoHash, CryptoHash)>, Error> { + ) -> Result { if let Some(transaction) = transaction { let root_merk = self .open_transactional_merk_at_path( @@ -979,7 +1085,7 @@ impl GroveDb { verify_references: bool, allow_cache: bool, grove_version: &GroveVersion, - ) -> Result>, (CryptoHash, CryptoHash, CryptoHash)>, Error> { + ) -> Result { let mut all_query = Query::new(); all_query.insert_all(); @@ -1123,7 +1229,7 @@ impl GroveDb { verify_references: bool, allow_cache: bool, grove_version: &GroveVersion, - ) -> Result>, (CryptoHash, CryptoHash, CryptoHash)>, Error> { + ) -> Result { let mut all_query = Query::new(); all_query.insert_all(); diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 876fe62c..1cd50519 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -1,268 +1,72 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt, -}; - -use grovedb_merk::{ - ed::Encode, - merk::restore::Restorer, - proofs::{Decoder, Op}, - tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, - ChunkProducer, -}; +mod state_sync_session; + +use std::pin::Pin; + +use grovedb_merk::{tree::hash::CryptoHash, ChunkProducer}; use grovedb_path::SubtreePath; -use grovedb_storage::rocksdb_storage::RocksDbStorage; -#[rustfmt::skip] -use grovedb_storage::rocksdb_storage::storage_context::context_immediate::PrefixedRocksDbImmediateStorageContext; use grovedb_version::{check_grovedb_v0, error::GroveVersionError, version::GroveVersion}; -use crate::{replication, Error, GroveDb, Transaction, TransactionArg}; +pub use self::state_sync_session::MultiStateSyncSession; +use crate::{Error, GroveDb, TransactionArg}; -pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; +/// Type alias representing a chunk identifier in the state synchronization +/// process. +/// +/// - `SubtreePrefix`: The prefix of the subtree (32 bytes). +/// - `Option>`: The root key, which may be `None` if not present. +/// - `bool`: Indicates whether the tree is a sum tree. +/// - `Vec`: The chunk ID representing traversal instructions. +pub type ChunkIdentifier = (crate::SubtreePrefix, Option>, bool, Vec); pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; -#[derive(Default)] -struct SubtreeStateSyncInfo<'db> { - // Current Chunk restorer - restorer: Option>>, - // Set of global chunk ids requested to be fetched and pending for processing. For the - // description of global chunk id check fetch_chunk(). - pending_chunks: BTreeSet>, - // Number of processed chunks in current prefix (Path digest) - num_processed_chunks: usize, -} - -// Struct governing state sync -pub struct MultiStateSyncInfo<'db> { - // Map of current processing subtrees - // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo - current_prefixes: BTreeMap>, - // Set of processed prefixes (Path digests) - processed_prefixes: BTreeSet, - // Root app_hash - app_hash: [u8; 32], - // Version of state sync protocol, - version: u16, -} - -impl<'db> Default for MultiStateSyncInfo<'db> { - fn default() -> Self { - Self { - current_prefixes: BTreeMap::new(), - processed_prefixes: BTreeSet::new(), - app_hash: [0; 32], - version: CURRENT_STATE_SYNC_VERSION, - } - } -} - -// Struct containing information about current subtrees found in GroveDB -pub struct SubtreesMetadata { - // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent - // Subtree elem_value_hash) Note: Parent Subtree actual_value_hash, Parent Subtree - // elem_value_hash are needed when verifying the new constructed subtree after wards. - pub data: BTreeMap>, CryptoHash, CryptoHash)>, -} - -impl SubtreesMetadata { - pub fn new() -> SubtreesMetadata { - SubtreesMetadata { - data: BTreeMap::new(), - } - } -} - -impl Default for SubtreesMetadata { - fn default() -> Self { - Self::new() - } -} - -impl fmt::Debug for SubtreesMetadata { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - for (prefix, metadata) in self.data.iter() { - let metadata_path = &metadata.0; - let metadata_path_str = util_path_to_string(metadata_path); - writeln!( - f, - " prefix:{:?} -> path:{:?}", - hex::encode(prefix), - metadata_path_str - )?; - } - Ok(()) - } -} - -// Converts a path into a human-readable string (for debugging) -pub fn util_path_to_string(path: &[Vec]) -> Vec { - let mut subtree_path_str: Vec = vec![]; - for subtree in path { - let string = std::str::from_utf8(subtree).expect("should be able to convert path"); - subtree_path_str.push( - string - .parse() - .expect("should be able to parse path to string"), - ); - } - subtree_path_str -} - -// Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] -pub fn util_split_global_chunk_id( - global_chunk_id: &[u8], - app_hash: &[u8], -) -> Result<(crate::SubtreePrefix, Vec), Error> { - let chunk_prefix_length: usize = 32; - if global_chunk_id.len() < chunk_prefix_length { - return Err(Error::CorruptedData( - "expected global chunk id of at least 32 length".to_string(), - )); - } - - if global_chunk_id == app_hash { - let array_of_zeros: [u8; 32] = [0; 32]; - let root_chunk_prefix_key: crate::SubtreePrefix = array_of_zeros; - return Ok((root_chunk_prefix_key, vec![])); - } - - let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); - let mut array = [0u8; 32]; - array.copy_from_slice(chunk_prefix); - let chunk_prefix_key: crate::SubtreePrefix = array; - Ok((chunk_prefix_key, chunk_id.to_vec())) -} - -pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { - let mut res = vec![]; - for op in chunk { - op.encode_into(&mut res) - .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; +#[cfg(feature = "full")] +impl GroveDb { + pub fn start_syncing_session(&self, app_hash: [u8; 32]) -> Pin> { + MultiStateSyncSession::new(self.start_transaction(), app_hash) } - Ok(res) -} -pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { - let decoder = Decoder::new(&chunk); - let mut res = vec![]; - for op in decoder { - match op { - Ok(op) => res.push(op), + pub fn commit_session(&self, session: Pin>) -> Result<(), Error> { + match self.commit_transaction(session.into_transaction()).value { + Ok(_) => Ok(()), Err(e) => { - return Err(Error::CorruptedData(format!( - "unable to decode chunk: {}", - e - ))); + // Log the error or handle it as needed + eprintln!("Failed to commit session: {:?}", e); + Err(e) } } } - Ok(res) -} -#[cfg(feature = "full")] -impl GroveDb { - // Returns the discovered subtrees found recursively along with their associated - // metadata Params: - // tx: Transaction. Function returns the data by opening merks at given tx. - // TODO: Add a SubTreePath as param and start searching from that path instead - // of root (as it is now) - pub fn get_subtrees_metadata( - &self, - tx: TransactionArg, - grove_version: &GroveVersion, - ) -> Result { - check_grovedb_v0!( - "is_empty_tree", - grove_version - .grovedb_versions - .replication - .get_subtrees_metadata - ); - let mut subtrees_metadata = SubtreesMetadata::new(); - - let subtrees_root = self - .find_subtrees(&SubtreePath::empty(), tx, grove_version) - .value?; - for subtree in subtrees_root.into_iter() { - let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap(); - - let current_path = SubtreePath::from(path); - - match (current_path.derive_parent(), subtree.last()) { - (Some((parent_path, _)), Some(parent_key)) => match tx { - None => { - let parent_merk = self - .open_non_transactional_merk_at_path(parent_path, None, grove_version) - .value?; - if let Ok(Some((elem_value, elem_value_hash))) = parent_merk - .get_value_and_value_hash( - parent_key, - true, - None::<&fn(&[u8], &GroveVersion) -> Option>, - grove_version, - ) - .value - { - let actual_value_hash = value_hash(&elem_value).unwrap(); - subtrees_metadata.data.insert( - prefix, - (current_path.to_vec(), actual_value_hash, elem_value_hash), - ); - } - } - Some(t) => { - let parent_merk = self - .open_transactional_merk_at_path(parent_path, t, None, grove_version) - .value?; - if let Ok(Some((elem_value, elem_value_hash))) = parent_merk - .get_value_and_value_hash( - parent_key, - true, - None::<&fn(&[u8], &GroveVersion) -> Option>, - grove_version, - ) - .value - { - let actual_value_hash = value_hash(&elem_value).unwrap(); - subtrees_metadata.data.insert( - prefix, - (current_path.to_vec(), actual_value_hash, elem_value_hash), - ); - } - } - }, - _ => { - subtrees_metadata.data.insert( - prefix, - ( - current_path.to_vec(), - CryptoHash::default(), - CryptoHash::default(), - ), - ); - } - } - } - Ok(subtrees_metadata) - } - - // Fetch a chunk by global chunk id (should be called by ABCI when - // LoadSnapshotChunk method is called) Params: - // global_chunk_id: Global chunk id in the following format: - // [SUBTREE_PREFIX:CHUNK_ID] SUBTREE_PREFIX: 32 bytes (mandatory) (All zeros - // = Root subtree) CHUNK_ID: 0.. bytes (optional) Traversal instructions to - // the root of the given chunk. Traversal instructions are "1" for left, and - // "0" for right. TODO: Compact CHUNK_ID into bitset for size optimization - // as a subtree can be big hence traversal instructions for the deepest chunks - // tx: Transaction. Function returns the data by opening merks at given tx. - // Returns the Chunk proof operators for the requested chunk encoded in bytes + /// Fetch a chunk by global chunk ID (should be called by ABCI when the + /// `LoadSnapshotChunk` method is invoked). + /// + /// # Parameters + /// - `global_chunk_id`: Global chunk ID in the following format: + /// `[SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID]` + /// - **SUBTREE_PREFIX**: 32 bytes (mandatory) - All zeros indicate the + /// Root subtree. + /// - **SIZE_ROOT_KEY**: 1 byte - Size of `ROOT_KEY` in bytes. + /// - **ROOT_KEY**: `SIZE_ROOT_KEY` bytes (optional). + /// - **IS_SUM_TREE**: 1 byte (mandatory) - Marks if the tree is a sum + /// tree or not. + /// - **CHUNK_ID**: 0 or more bytes (optional) - Traversal instructions to + /// the root of the given chunk. Traversal instructions are represented + /// as "1" for left and "0" for right. + /// - TODO: Compact `CHUNK_ID` into a bitset for size optimization as a + /// subtree can be large, and traversal instructions for the deepest + /// chunks could consume significant space. + /// + /// - `transaction`: The transaction used to fetch the chunk. + /// - `version`: The version of the state sync protocol. + /// - `grove_version`: The version of GroveDB. + /// + /// # Returns + /// Returns the chunk proof operators for the requested chunk, encoded as + /// bytes. pub fn fetch_chunk( &self, global_chunk_id: &[u8], - tx: TransactionArg, + transaction: TransactionArg, version: u16, grove_version: &GroveVersion, ) -> Result, Error> { @@ -277,99 +81,147 @@ impl GroveDb { )); } - let root_app_hash = self.root_hash(tx, grove_version).value?; - let (chunk_prefix, chunk_id) = - replication::util_split_global_chunk_id(global_chunk_id, &root_app_hash)?; - - let subtrees_metadata = self.get_subtrees_metadata(tx, grove_version)?; - - match subtrees_metadata.data.get(&chunk_prefix) { - Some(path_data) => { - let subtree = &path_data.0; - let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - - match tx { - None => { - let merk = self - .open_non_transactional_merk_at_path(path.into(), None, grove_version) - .value?; - - if merk.is_empty_tree().unwrap() { - return Ok(vec![]); - } - - let chunk_producer_res = ChunkProducer::new(&merk); - match chunk_producer_res { - Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(&chunk_id, grove_version); - match chunk_res { - Ok((chunk, _)) => match util_encode_vec_ops(chunk) { - Ok(op_bytes) => Ok(op_bytes), - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - }, - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - } - } - Err(_) => Err(Error::CorruptedData( - "Unable to create Chunk producer".to_string(), - )), - } - } - Some(t) => { - let merk = self - .open_transactional_merk_at_path(path.into(), t, None, grove_version) - .value?; - - if merk.is_empty_tree().unwrap() { - return Ok(vec![]); - } - - let chunk_producer_res = ChunkProducer::new(&merk); - match chunk_producer_res { - Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(&chunk_id, grove_version); - match chunk_res { - Ok((chunk, _)) => match util_encode_vec_ops(chunk) { - Ok(op_bytes) => Ok(op_bytes), - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - }, - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - } - } - Err(_) => Err(Error::CorruptedData( - "Unable to create Chunk producer".to_string(), - )), - } - } - } + let root_app_hash = self.root_hash(transaction, grove_version).value?; + let (chunk_prefix, root_key, is_sum_tree, chunk_id) = + utils::decode_global_chunk_id(global_chunk_id, &root_app_hash)?; + + // TODO: Refactor this by writing fetch_chunk_inner (as only merk constructor + // and type are different) + if let Some(tx) = transaction { + let merk = self + .open_transactional_merk_by_prefix( + chunk_prefix, + root_key, + is_sum_tree, + tx, + None, + grove_version, + ) + .value + .map_err(|e| { + Error::CorruptedData(format!( + "failed to open merk by prefix tx:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + if merk.is_empty_tree().unwrap() { + return Ok(vec![]); } - None => Err(Error::CorruptedData("Prefix not found".to_string())), + + let mut chunk_producer = ChunkProducer::new(&merk).map_err(|e| { + Error::CorruptedData(format!( + "failed to create chunk producer by prefix tx:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + let (chunk, _) = chunk_producer + .chunk(&chunk_id, grove_version) + .map_err(|e| { + Error::CorruptedData(format!( + "failed to apply chunk:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + let op_bytes = utils::encode_vec_ops(chunk).map_err(|e| { + Error::CorruptedData(format!( + "failed to encode chunk ops:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + Ok(op_bytes) + } else { + let merk = self + .open_non_transactional_merk_by_prefix( + chunk_prefix, + root_key, + is_sum_tree, + None, + grove_version, + ) + .value + .map_err(|e| { + Error::CorruptedData(format!( + "failed to open merk by prefix non-tx:{} with:{}", + e, + hex::encode(chunk_prefix) + )) + })?; + if merk.is_empty_tree().unwrap() { + return Ok(vec![]); + } + + let mut chunk_producer = ChunkProducer::new(&merk).map_err(|e| { + Error::CorruptedData(format!( + "failed to create chunk producer by prefix non-tx:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + let (chunk, _) = chunk_producer + .chunk(&chunk_id, grove_version) + .map_err(|e| { + Error::CorruptedData(format!( + "failed to apply chunk:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + let op_bytes = utils::encode_vec_ops(chunk).map_err(|e| { + Error::CorruptedData(format!( + "failed to encode chunk ops:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + Ok(op_bytes) } } - // Starts a state sync process (should be called by ABCI when OfferSnapshot - // method is called) Params: - // state_sync_info: Consumed StateSyncInfo - // app_hash: Snapshot's AppHash - // tx: Transaction for the state sync - // Returns the StateSyncInfo transferring ownership back to the caller) - pub fn start_snapshot_syncing<'db>( - &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, + /// Starts a state synchronization process for a snapshot with the given + /// `app_hash` root hash. This method should be called by ABCI when the + /// `OfferSnapshot` method is invoked. + /// + /// # Parameters + /// - `app_hash`: The root hash of the application state to synchronize. + /// - `version`: The version of the state sync protocol to use. + /// - `grove_version`: The version of GroveDB being used. + /// + /// # Returns + /// - `Ok(Pin>)`: A pinned, boxed + /// `MultiStateSyncSession` representing the new sync session. This + /// session allows for managing the synchronization process. + /// - `Err(Error)`: An error indicating why the state sync process could not + /// be started. + /// + /// # Behavior + /// - Initiates the state synchronization process by preparing the necessary + /// data and resources. + /// - Returns the first set of global chunk IDs that can be fetched from + /// available sources. + /// - A new sync session is created and managed internally, facilitating + /// further synchronization. + /// + /// # Usage + /// This method is typically called as part of the ABCI `OfferSnapshot` + /// workflow when a new snapshot synchronization process is required to + /// bring the application state up to date. + /// + /// # Notes + /// - The returned `MultiStateSyncSession` is pinned because its lifetime + /// may depend on asynchronous operations or other system resources that + /// require it to remain immovable in memory. + /// - Ensure that `app_hash` corresponds to a valid snapshot to avoid + /// errors. + pub fn start_snapshot_syncing( + &self, app_hash: CryptoHash, - tx: &'db Transaction, version: u16, grove_version: &GroveVersion, - ) -> Result { + ) -> Result>, Error> { check_grovedb_v0!( "start_snapshot_syncing", grove_version @@ -383,277 +235,216 @@ impl GroveDb { "Unsupported state sync protocol version".to_string(), )); } - if version != state_sync_info.version { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); + + let root_prefix = [0u8; 32]; + + let mut session = self.start_syncing_session(app_hash); + + session.add_subtree_sync_info( + self, + SubtreePath::empty(), + app_hash, + None, + root_prefix, + grove_version, + )?; + + Ok(session) + } +} + +pub(crate) mod utils { + use grovedb_merk::{ + ed::Encode, + proofs::{Decoder, Op}, + }; + + use crate::{replication::ChunkIdentifier, Error}; + + /// Converts a path, represented as a slice of byte vectors (`&[Vec]`), + /// into a human-readable string representation for debugging purposes. + /// + /// # Parameters + /// - `path`: A slice of byte vectors where each vector represents a segment + /// of the path. + /// + /// # Returns + /// - `Vec`: A vector of strings where each string is a + /// human-readable representation of a corresponding segment in the input + /// path. If a segment contains invalid UTF-8, it is replaced with the + /// placeholder string `""`. + /// + /// # Behavior + /// - Each byte vector in the path is interpreted as a UTF-8 string. If the + /// conversion fails, the placeholder `""` is used instead. + /// - This function is primarily intended for debugging and logging. + /// + /// # Notes + /// - This function does not handle or normalize paths; it only provides a + /// human-readable representation. + /// - Be cautious when using this for paths that might contain sensitive + /// data, as the output could be logged. + pub fn path_to_string(path: &[Vec]) -> Vec { + let mut subtree_path_str: Vec = vec![]; + for subtree in path { + let string = std::str::from_utf8(subtree).unwrap_or(""); + subtree_path_str.push(string.to_string()); } + subtree_path_str + } - if !state_sync_info.current_prefixes.is_empty() - || !state_sync_info.processed_prefixes.is_empty() - { - return Err(Error::InternalError( - "GroveDB has already started a snapshot syncing".to_string(), + /// Decodes a given global chunk ID into its components: + /// `[SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID]`. + /// + /// # Parameters + /// - `global_chunk_id`: A byte slice representing the global chunk ID to + /// decode. + /// - `app_hash`: The application hash, which may be required for validation + /// or context. + /// + /// # Returns + /// - `Ok(ChunkIdentifier)`: A tuple containing the decoded components: + /// - `SUBTREE_PREFIX`: A 32-byte prefix of the subtree. + /// - `SIZE_ROOT_KEY`: Size of the root key (derived from `ROOT_KEY` + /// length). + /// - `ROOT_KEY`: Optional root key as a byte vector. + /// - `IS_SUM_TREE`: A boolean indicating whether the tree is a sum tree. + /// - `CHUNK_ID`: Traversal instructions as a byte vector. + /// - `Err(Error)`: An error if the global chunk ID could not be decoded. + pub fn decode_global_chunk_id( + global_chunk_id: &[u8], + app_hash: &[u8], + ) -> Result { + let chunk_prefix_length: usize = 32; + if global_chunk_id.len() < chunk_prefix_length { + return Err(Error::CorruptedData( + "expected global chunk id of at least 32 length".to_string(), )); } - println!( - " starting:{:?}...", - replication::util_path_to_string(&[]) - ); - - let mut root_prefix_state_sync_info = SubtreeStateSyncInfo::default(); - let root_prefix = [0u8; 32]; - if let Ok(merk) = self.open_merk_for_replication(SubtreePath::empty(), tx, grove_version) { - let restorer = Restorer::new(merk, app_hash, None); - root_prefix_state_sync_info.restorer = Some(restorer); - root_prefix_state_sync_info.pending_chunks.insert(vec![]); - state_sync_info - .current_prefixes - .insert(root_prefix, root_prefix_state_sync_info); - state_sync_info.app_hash = app_hash; - } else { - return Err(Error::InternalError( - "Unable to open merk for replication".to_string(), - )); + if global_chunk_id == app_hash { + let root_chunk_prefix_key: crate::SubtreePrefix = [0u8; 32]; + return Ok((root_chunk_prefix_key, None, false, vec![])); } - Ok(state_sync_info) - } + let (chunk_prefix_key, remaining) = global_chunk_id.split_at(chunk_prefix_length); - // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is - // called) Params: - // state_sync_info: Consumed MultiStateSyncInfo - // global_chunk_id: Global chunk id - // chunk: Chunk proof operators encoded in bytes - // tx: Transaction for the state sync - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the MultiStateSyncInfo transferring ownership back to the caller) - pub fn apply_chunk<'db>( - &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, - global_chunk_id: &[u8], - chunk: Vec, - tx: &'db Transaction, - version: u16, - grove_version: &GroveVersion, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { - check_grovedb_v0!( - "apply_chunk", - grove_version.grovedb_versions.replication.apply_chunk - ); - // For now, only CURRENT_STATE_SYNC_VERSION is supported - if version != CURRENT_STATE_SYNC_VERSION { + let root_key_size_length: usize = 1; + if remaining.len() < root_key_size_length { return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), + "unable to decode root key size".to_string(), )); } - if version != state_sync_info.version { + let (root_key_size, remaining) = remaining.split_at(root_key_size_length); + if remaining.len() < root_key_size[0] as usize { return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), + "unable to decode root key".to_string(), )); } - - let mut next_chunk_ids = vec![]; - - let (chunk_prefix, chunk_id) = - replication::util_split_global_chunk_id(global_chunk_id, &state_sync_info.app_hash)?; - - if state_sync_info.current_prefixes.is_empty() { - return Err(Error::InternalError( - "GroveDB is not in syncing mode".to_string(), + let (root_key, remaining) = remaining.split_at(root_key_size[0] as usize); + let is_sum_tree_length: usize = 1; + if remaining.len() < is_sum_tree_length { + return Err(Error::CorruptedData( + "unable to decode root key".to_string(), )); } - if let Some(subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) { - if let Ok((res, mut new_subtree_state_sync)) = - self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk, grove_version) - { - if !res.is_empty() { - for local_chunk_id in res.iter() { - let mut next_global_chunk_id = chunk_prefix.to_vec(); - next_global_chunk_id.extend(local_chunk_id.to_vec()); - next_chunk_ids.push(next_global_chunk_id); - } - - // re-insert subtree_state_sync in state_sync_info - state_sync_info - .current_prefixes - .insert(chunk_prefix, new_subtree_state_sync); - Ok((next_chunk_ids, state_sync_info)) - } else { - if !new_subtree_state_sync.pending_chunks.is_empty() { - // re-insert subtree_state_sync in state_sync_info - state_sync_info - .current_prefixes - .insert(chunk_prefix, new_subtree_state_sync); - return Ok((vec![], state_sync_info)); - } - - // Subtree is finished. We can save it. - match new_subtree_state_sync.restorer.take() { - None => Err(Error::InternalError( - "Unable to finalize subtree".to_string(), - )), - Some(restorer) => { - if (new_subtree_state_sync.num_processed_chunks > 0) - && (restorer.finalize(grove_version).is_err()) - { - return Err(Error::InternalError( - "Unable to finalize Merk".to_string(), - )); - } - state_sync_info.processed_prefixes.insert(chunk_prefix); - - // Subtree was successfully save. Time to discover new subtrees that - // need to be processed - let subtrees_metadata = - self.get_subtrees_metadata(Some(tx), grove_version)?; - if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { - println!( - " path:{:?} done (num_processed_chunks:{:?})", - replication::util_path_to_string(&value.0), - new_subtree_state_sync.num_processed_chunks - ); - } - - if let Ok((res, new_state_sync_info)) = self.discover_subtrees( - state_sync_info, - subtrees_metadata, - tx, - grove_version, - ) { - next_chunk_ids.extend(res); - Ok((next_chunk_ids, new_state_sync_info)) - } else { - Err(Error::InternalError( - "Unable to discover Subtrees".to_string(), - )) - } - } - } - } - } else { - Err(Error::InternalError( - "Unable to process incoming chunk".to_string(), - )) - } + let (is_sum_tree, chunk_id) = remaining.split_at(is_sum_tree_length); + + let subtree_prefix: crate::SubtreePrefix = chunk_prefix_key + .try_into() + .map_err(|_| Error::CorruptedData("unable to construct subtree".to_string()))?; + + if !root_key.is_empty() { + Ok(( + subtree_prefix, + Some(root_key.to_vec()), + is_sum_tree[0] != 0, + chunk_id.to_vec(), + )) } else { - Err(Error::InternalError("Invalid incoming prefix".to_string())) + Ok((subtree_prefix, None, is_sum_tree[0] != 0, chunk_id.to_vec())) } } - // Apply a chunk using the given SubtreeStateSyncInfo - // state_sync_info: Consumed SubtreeStateSyncInfo - // chunk_id: Local chunk id - // chunk_data: Chunk proof operators encoded in bytes - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the SubtreeStateSyncInfo transferring ownership back to the caller) - fn apply_inner_chunk<'db>( - &'db self, - mut state_sync_info: SubtreeStateSyncInfo<'db>, - chunk_id: &[u8], - chunk_data: Vec, - grove_version: &GroveVersion, - ) -> Result<(Vec>, SubtreeStateSyncInfo), Error> { + /// Encodes the given components into a global chunk ID in the format: + /// `[SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID]`. + /// + /// # Parameters + /// - `subtree_prefix`: A 32-byte array representing the prefix of the + /// subtree. + /// - `root_key_opt`: An optional root key as a byte vector. + /// - `is_sum_tree`: A boolean indicating whether the tree is a sum tree. + /// - `chunk_id`: A byte vector representing the traversal instructions. + /// + /// # Returns + /// - A `Vec` containing the encoded global chunk ID. + pub fn encode_global_chunk_id( + subtree_prefix: [u8; blake3::OUT_LEN], + root_key_opt: Option>, + is_sum_tree: bool, + chunk_id: Vec, + ) -> Vec { let mut res = vec![]; - match &mut state_sync_info.restorer { - Some(restorer) => { - if !state_sync_info.pending_chunks.contains(chunk_id) { - return Err(Error::InternalError( - "Incoming global_chunk_id not expected".to_string(), - )); - } - state_sync_info.pending_chunks.remove(chunk_id); - if !chunk_data.is_empty() { - match util_decode_vec_ops(chunk_data) { - Ok(ops) => { - match restorer.process_chunk(chunk_id, ops, grove_version) { - Ok(next_chunk_ids) => { - state_sync_info.num_processed_chunks += 1; - for next_chunk_id in next_chunk_ids { - state_sync_info - .pending_chunks - .insert(next_chunk_id.clone()); - res.push(next_chunk_id); - } - } - _ => { - return Err(Error::InternalError( - "Unable to process incoming chunk".to_string(), - )); - } - }; - } - Err(_) => { - return Err(Error::CorruptedData( - "Unable to decode incoming chunk".to_string(), - )); - } - } - } - } - _ => { - return Err(Error::InternalError( - "Invalid internal state (restorer".to_string(), - )); - } + res.extend(subtree_prefix); + + if let Some(root_key) = root_key_opt { + res.push(root_key.len() as u8); + res.extend(root_key); + } else { + res.push(0u8); + } + + let mut is_sum_tree_v = 0u8; + if is_sum_tree { + is_sum_tree_v = 1u8; } + res.push(is_sum_tree_v); + + res.extend(chunk_id.to_vec()); - Ok((res, state_sync_info)) + res } - // Prepares SubtreeStateSyncInfos for the freshly discovered subtrees in - // subtrees_metadata and returns the root global chunk ids for all of those - // new subtrees. state_sync_info: Consumed MultiStateSyncInfo - // subtrees_metadata: Metadata about discovered subtrees - // chunk_data: Chunk proof operators - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the MultiStateSyncInfo transferring ownership back to the caller) - fn discover_subtrees<'db>( - &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, - subtrees_metadata: SubtreesMetadata, - tx: &'db Transaction, - grove_version: &GroveVersion, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { + /// Encodes a vector of operations (`Vec`) into a byte vector. + /// + /// # Parameters + /// - `chunk`: A vector of `Op` operations to be encoded. + /// + /// # Returns + /// - `Ok(Vec)`: A byte vector representing the encoded operations. + /// - `Err(Error)`: An error if the encoding process fails. + pub fn encode_vec_ops(chunk: Vec) -> Result, Error> { let mut res = vec![]; + for op in chunk { + op.encode_into(&mut res) + .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; + } + Ok(res) + } - for (prefix, prefix_metadata) in &subtrees_metadata.data { - if !state_sync_info.processed_prefixes.contains(prefix) - && !state_sync_info.current_prefixes.contains_key(prefix) - { - let (current_path, s_actual_value_hash, s_elem_value_hash) = &prefix_metadata; - - let subtree_path: Vec<&[u8]> = - current_path.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - println!( - " path:{:?} starting...", - replication::util_path_to_string(&prefix_metadata.0) - ); - - let mut subtree_state_sync_info = SubtreeStateSyncInfo::default(); - if let Ok(merk) = self.open_merk_for_replication(path.into(), tx, grove_version) { - let restorer = - Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash)); - subtree_state_sync_info.restorer = Some(restorer); - subtree_state_sync_info.pending_chunks.insert(vec![]); - - state_sync_info - .current_prefixes - .insert(*prefix, subtree_state_sync_info); - - let root_chunk_prefix = prefix.to_vec(); - res.push(root_chunk_prefix.to_vec()); - } else { - return Err(Error::InternalError( - "Unable to open Merk for replication".to_string(), - )); + /// Decodes a byte vector into a vector of operations (`Vec`). + /// + /// # Parameters + /// - `chunk`: A byte vector representing encoded operations. + /// + /// # Returns + /// - `Ok(Vec)`: A vector of decoded `Op` operations. + /// - `Err(Error)`: An error if the decoding process fails. + pub fn decode_vec_ops(chunk: Vec) -> Result, Error> { + let decoder = Decoder::new(&chunk); + let mut res = vec![]; + for op in decoder { + match op { + Ok(op) => res.push(op), + Err(e) => { + return Err(Error::CorruptedData(format!( + "unable to decode chunk: {}", + e + ))); } } } - - Ok((res, state_sync_info)) + Ok(res) } } diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs new file mode 100644 index 00000000..59d93316 --- /dev/null +++ b/grovedb/src/replication/state_sync_session.rs @@ -0,0 +1,618 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt, + marker::PhantomPinned, + pin::Pin, +}; + +use grovedb_merk::{ + tree::{kv::ValueDefinedCostType, value_hash}, + CryptoHash, Restorer, +}; +use grovedb_path::SubtreePath; +use grovedb_storage::{ + rocksdb_storage::{PrefixedRocksDbImmediateStorageContext, RocksDbStorage}, + StorageContext, +}; +use grovedb_version::version::GroveVersion; + +use super::{ + utils::{decode_vec_ops, encode_global_chunk_id, path_to_string}, + CURRENT_STATE_SYNC_VERSION, +}; +use crate::{replication, Element, Error, GroveDb, Transaction}; + +pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; + +/// Struct governing the state synchronization of one subtree. +struct SubtreeStateSyncInfo<'db> { + /// Current Chunk restorer + restorer: Restorer>, + + /// Set of global chunk ids requested to be fetched and pending for + /// processing. For the description of global chunk id check + /// fetch_chunk(). + pending_chunks: BTreeSet>, + + /// Tree root key + root_key: Option>, + + /// Is Sum tree? + is_sum_tree: bool, + + /// Path of current tree + current_path: Vec>, + + /// Number of processed chunks in current prefix (Path digest) + num_processed_chunks: usize, +} + +impl SubtreeStateSyncInfo<'_> { + /// Applies a chunk using the given `SubtreeStateSyncInfo`. + /// + /// # Parameters + /// - `chunk_id`: A byte slice representing the local chunk ID to be + /// applied. + /// - `chunk_data`: A vector of bytes containing the chunk proof operators, + /// encoded as bytes. + /// - `grove_version`: A reference to the `GroveVersion` being used for + /// synchronization. + /// + /// # Returns + /// - `Ok(Vec>)`: A vector of global chunk IDs (each represented as + /// a vector of bytes) that can be fetched from sources for further + /// synchronization. Ownership of the `SubtreeStateSyncInfo` is + /// transferred back to the caller. + /// - `Err(Error)`: An error if the chunk cannot be applied. + /// + /// # Behavior + /// - The function consumes the provided `SubtreeStateSyncInfo` to apply the + /// given chunk. + /// - Once the chunk is applied, the function calculates and returns the + /// next set of global chunk IDs required for further state + /// synchronization. + /// + /// # Usage + /// This function is called as part of the state sync process to apply + /// received chunks and advance the synchronization state. + /// + /// # Notes + /// - Ensure that the `chunk_data` is correctly encoded and matches the + /// expected format. + /// - The function modifies the state of the synchronization process, so it + /// must be used carefully to maintain correctness. + fn apply_inner_chunk( + &mut self, + chunk_id: &[u8], + chunk_data: Vec, + grove_version: &GroveVersion, + ) -> Result>, Error> { + let mut res = vec![]; + + if !self.pending_chunks.contains(chunk_id) { + return Err(Error::InternalError( + "Incoming global_chunk_id not expected".to_string(), + )); + } + self.pending_chunks.remove(chunk_id); + if !chunk_data.is_empty() { + match decode_vec_ops(chunk_data) { + Ok(ops) => { + match self.restorer.process_chunk(chunk_id, ops, grove_version) { + Ok(next_chunk_ids) => { + self.num_processed_chunks += 1; + for next_chunk_id in next_chunk_ids { + self.pending_chunks.insert(next_chunk_id.clone()); + res.push(next_chunk_id); + } + } + _ => { + return Err(Error::InternalError( + "Unable to process incoming chunk".to_string(), + )); + } + }; + } + Err(_) => { + return Err(Error::CorruptedData( + "Unable to decode incoming chunk".to_string(), + )); + } + } + } + + Ok(res) + } +} + +impl<'tx> SubtreeStateSyncInfo<'tx> { + pub fn new(restorer: Restorer>) -> Self { + SubtreeStateSyncInfo { + restorer, + root_key: None, + is_sum_tree: false, + pending_chunks: Default::default(), + current_path: vec![], + num_processed_chunks: 0, + } + } +} + +/// Struct governing the state synchronization process. +pub struct MultiStateSyncSession<'db> { + /// Map of currently processing subtrees. + /// Keys are `SubtreePrefix` (path digests), and values are + /// `SubtreeStateSyncInfo` for each subtree. + current_prefixes: BTreeMap>, + + /// Set of processed prefixes, represented as `SubtreePrefix` (path + /// digests). + processed_prefixes: BTreeSet, + + /// Root application hash (`app_hash`). + app_hash: [u8; 32], + + /// Version of the state synchronization protocol. + pub(crate) version: u16, + + /// Transaction used for the synchronization process. + /// This is placed last to ensure it is dropped last. + transaction: Transaction<'db>, + + /// Marker to ensure this struct is not moved in memory. + _pin: PhantomPinned, +} + +impl<'db> MultiStateSyncSession<'db> { + /// Initializes a new state sync session. + pub fn new(transaction: Transaction<'db>, app_hash: [u8; 32]) -> Pin> { + Box::pin(MultiStateSyncSession { + transaction, + current_prefixes: Default::default(), + processed_prefixes: Default::default(), + app_hash, + version: CURRENT_STATE_SYNC_VERSION, + _pin: PhantomPinned, + }) + } + + pub fn is_empty(&self) -> bool { + self.current_prefixes.is_empty() + } + + pub fn is_sync_completed(&self) -> bool { + for (_, subtree_state_info) in self.current_prefixes.iter() { + if !subtree_state_info.pending_chunks.is_empty() { + return false; + } + } + + true + } + + pub fn into_transaction(self: Pin>) -> Transaction<'db> { + // SAFETY: the struct isn't used anymore and no one will refer to transaction + // address again + unsafe { Pin::into_inner_unchecked(self) }.transaction + } + + /// Adds synchronization information for a subtree into the current + /// synchronization session. + /// + /// This function interacts with a `GroveDb` database to open a Merk tree at + /// the specified path, calculate and verify its cryptographic hashes, + /// and update the session state with the relevant synchronization + /// information. The function generates and returns the global chunk ID for + /// the subtree. + /// + /// # Parameters + /// - `self`: A pinned, boxed instance of the `MultiStateSyncSession`. + /// - `db`: A reference to the `GroveDb` instance. + /// - `path`: The path to the subtree as a `SubtreePath`. + /// - `hash`: The expected cryptographic hash of the subtree. + /// - `actual_hash`: An optional actual cryptographic hash to compare + /// against the expected hash. + /// - `chunk_prefix`: A 32-byte prefix used for identifying chunks in the + /// synchronization process. + /// - `grove_version`: The GroveDB version to use for processing. + /// + /// # Returns + /// - `Ok(Vec)`: On success, returns the encoded global chunk ID for the + /// subtree. + /// - `Err(Error)`: If the Merk tree cannot be opened or synchronization + /// information cannot be added. + /// + /// # Errors + /// This function returns an error if: + /// - The Merk tree at the specified path cannot be opened. + /// - Any synchronization-related operations fail. + /// - Internal errors occur during processing. + /// + /// # Safety + /// - This function uses unsafe code to create a reference to the + /// transaction. Ensure that the transaction is properly managed and the + /// lifetime guarantees are respected. + pub fn add_subtree_sync_info<'b, B: AsRef<[u8]>>( + self: &mut Pin>>, + db: &'db GroveDb, + path: SubtreePath<'b, B>, + hash: CryptoHash, + actual_hash: Option, + chunk_prefix: [u8; 32], + grove_version: &GroveVersion, + ) -> Result, Error> { + let transaction_ref: &'db Transaction<'db> = unsafe { + let tx: &Transaction<'db> = &self.as_ref().transaction; + &*(tx as *const _) + }; + + if let Ok((merk, root_key, is_sum_tree)) = + db.open_merk_for_replication(path.clone(), transaction_ref, grove_version) + { + let restorer = Restorer::new(merk, hash, actual_hash); + let mut sync_info = SubtreeStateSyncInfo::new(restorer); + sync_info.pending_chunks.insert(vec![]); + sync_info.root_key = root_key.clone(); + sync_info.is_sum_tree = is_sum_tree; + sync_info.current_path = path.to_vec(); + self.as_mut() + .current_prefixes() + .insert(chunk_prefix, sync_info); + Ok(encode_global_chunk_id( + chunk_prefix, + root_key, + is_sum_tree, + vec![], + )) + } else { + Err(Error::InternalError( + "Unable to open merk for replication".to_string(), + )) + } + } + + fn current_prefixes( + self: Pin<&mut MultiStateSyncSession<'db>>, + ) -> &mut BTreeMap> { + // SAFETY: no memory-sensitive assumptions are made about fields except the + // `transaciton` so it will be safe to modify them + &mut unsafe { self.get_unchecked_mut() }.current_prefixes + } + + fn processed_prefixes( + self: Pin<&mut MultiStateSyncSession<'db>>, + ) -> &mut BTreeSet { + // SAFETY: no memory-sensitive assumptions are made about fields except the + // `transaciton` so it will be safe to modify them + &mut unsafe { self.get_unchecked_mut() }.processed_prefixes + } + + /// Applies a chunk during the state synchronization process. + /// This method should be called by ABCI when the `ApplySnapshotChunk` + /// method is invoked. + /// + /// # Parameters + /// - `self`: A pinned mutable reference to the `MultiStateSyncSession`. + /// - `db`: A reference to the `GroveDb` instance used for synchronization. + /// - `global_chunk_id`: A byte slice representing the global chunk ID being + /// applied. + /// - `chunk`: A vector of bytes containing the encoded proof for the chunk. + /// - `version`: The state synchronization protocol version being used. + /// - `grove_version`: A reference to the `GroveVersion` specifying the + /// GroveDB version. + /// + /// # Returns + /// - `Ok(Vec>)`: A vector of global chunk IDs (each represented as + /// a vector of bytes) that can be fetched from sources for further + /// synchronization. + /// - `Err(Error)`: An error if the chunk application fails or if the chunk + /// proof is invalid. + /// + /// # Behavior + /// - This method applies the given chunk using the provided + /// `global_chunk_id` and its corresponding proof data (`chunk`). + /// - Once the chunk is applied successfully, it calculates and returns the + /// next set of global chunk IDs required for further synchronization. + /// + /// # Notes + /// - Ensure the `chunk` is correctly encoded and matches the expected proof + /// format. + /// - This function modifies the state of the synchronization session, so it + /// must be used carefully to maintain correctness and avoid errors. + /// - The pinned `self` ensures that the session cannot be moved in memory, + /// preserving consistency during the synchronization process. + pub fn apply_chunk( + self: &mut Pin>>, + db: &'db GroveDb, + global_chunk_id: &[u8], + chunk: Vec, + version: u16, + grove_version: &GroveVersion, + ) -> Result>, Error> { + // For now, only CURRENT_STATE_SYNC_VERSION is supported + if version != CURRENT_STATE_SYNC_VERSION { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + if version != self.version { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + + let mut next_chunk_ids = vec![]; + + let (chunk_prefix, _, _, chunk_id) = + replication::utils::decode_global_chunk_id(global_chunk_id, &self.app_hash)?; + + if self.is_empty() { + return Err(Error::InternalError( + "GroveDB is not in syncing mode".to_string(), + )); + } + + let current_prefixes = self.as_mut().current_prefixes(); + let Some(subtree_state_sync) = current_prefixes.get_mut(&chunk_prefix) else { + return Err(Error::InternalError( + "Unable to process incoming chunk".to_string(), + )); + }; + let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk, grove_version) else { + return Err(Error::InternalError("Invalid incoming prefix".to_string())); + }; + + if !res.is_empty() { + for local_chunk_id in res.iter() { + next_chunk_ids.push(encode_global_chunk_id( + chunk_prefix, + subtree_state_sync.root_key.clone(), + subtree_state_sync.is_sum_tree, + local_chunk_id.clone(), + )); + } + + Ok(next_chunk_ids) + } else { + if !subtree_state_sync.pending_chunks.is_empty() { + return Ok(vec![]); + } + + let completed_path = subtree_state_sync.current_path.clone(); + + // Subtree is finished. We can save it. + if subtree_state_sync.num_processed_chunks > 0 { + if let Some(prefix_data) = current_prefixes.remove(&chunk_prefix) { + if let Err(err) = prefix_data.restorer.finalize(grove_version) { + return Err(Error::InternalError(format!( + "Unable to finalize Merk: {:?}", + err + ))); + } + } else { + return Err(Error::InternalError(format!( + "Prefix {:?} does not exist in current_prefixes", + chunk_prefix + ))); + } + } + + self.as_mut().processed_prefixes().insert(chunk_prefix); + + let new_subtrees_metadata = + self.discover_new_subtrees_metadata(db, &completed_path, grove_version)?; + + if let Ok(res) = + self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) + { + next_chunk_ids.extend(res); + Ok(next_chunk_ids) + } else { + Err(Error::InternalError( + "Unable to discover Subtrees".to_string(), + )) + } + } + } + + /// Discovers new subtrees at the given path that need to be synchronized. + /// + /// # Parameters + /// - `self`: A pinned mutable reference to the `MultiStateSyncSession`. + /// - `db`: A reference to the `GroveDb` instance being used for + /// synchronization. + /// - `path_vec`: A vector of byte vectors representing the path where + /// subtrees should be discovered. + /// - `grove_version`: A reference to the `GroveVersion` specifying the + /// GroveDB version. + /// + /// # Returns + /// - `Ok(SubtreesMetadata)`: Metadata about the discovered subtrees, + /// including information necessary for their synchronization. + /// - `Err(Error)`: An error if the discovery process fails. + /// + /// # Behavior + /// - This function traverses the specified `path_vec` in the database and + /// identifies subtrees that are not yet synchronized. + /// - Returns metadata about these subtrees, which can be used to initiate + /// or manage the synchronization process. + /// + /// # Notes + /// - The `path_vec` should represent a valid path in the GroveDB where + /// subtrees are expected to exist. + /// - Ensure that the GroveDB instance (`db`) and Grove version + /// (`grove_version`) are compatible and up-to-date to avoid errors during + /// discovery. + /// - The function modifies the state of the synchronization session, so it + /// should be used carefully to maintain session integrity. + fn discover_new_subtrees_metadata( + self: &mut Pin>>, + db: &'db GroveDb, + path_vec: &[Vec], + grove_version: &GroveVersion, + ) -> Result { + let transaction_ref: &'db Transaction<'db> = unsafe { + let tx: &Transaction<'db> = &self.as_ref().transaction; + &*(tx as *const _) + }; + let subtree_path: Vec<&[u8]> = path_vec.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + let merk = db + .open_transactional_merk_at_path(path.into(), transaction_ref, None, grove_version) + .value + .map_err(|e| Error::CorruptedData(format!("failed to open merk by path-tx:{}", e)))?; + if merk.is_empty_tree().unwrap() { + return Ok(SubtreesMetadata::default()); + } + let mut subtree_keys = BTreeSet::new(); + + let mut raw_iter = Element::iterator(merk.storage.raw_iter()).unwrap(); + while let Some((key, value)) = raw_iter.next_element(grove_version).unwrap().unwrap() { + if value.is_any_tree() { + subtree_keys.insert(key.to_vec()); + } + } + + let mut subtrees_metadata = SubtreesMetadata::new(); + for subtree_key in &subtree_keys { + if let Ok(Some((elem_value, elem_value_hash))) = merk + .get_value_and_value_hash( + subtree_key.as_slice(), + true, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .value + { + let actual_value_hash = value_hash(&elem_value).unwrap(); + let mut new_path = path_vec.to_vec(); + new_path.push(subtree_key.to_vec()); + + let subtree_path: Vec<&[u8]> = new_path.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap(); + + subtrees_metadata.data.insert( + prefix, + (new_path.to_vec(), actual_value_hash, elem_value_hash), + ); + } + } + + Ok(subtrees_metadata) + } + + /// Prepares a synchronization session for the newly discovered subtrees and + /// returns the global chunk IDs of those subtrees. + /// + /// # Parameters + /// - `self`: A pinned mutable reference to the `MultiStateSyncSession`. + /// - `db`: A reference to the `GroveDb` instance used for managing the + /// synchronization process. + /// - `subtrees_metadata`: Metadata about the discovered subtrees that + /// require synchronization. + /// - `grove_version`: A reference to the `GroveVersion` specifying the + /// GroveDB version. + /// + /// # Returns + /// - `Ok(Vec>)`: A vector of global chunk IDs (each represented as + /// a vector of bytes) corresponding to the newly discovered subtrees. + /// These IDs can be fetched from sources to continue the synchronization + /// process. + /// - `Err(Error)`: An error if the synchronization session could not be + /// prepared or if processing the metadata fails. + /// + /// # Behavior + /// - Initializes the synchronization state for the newly discovered + /// subtrees based on the provided metadata. + /// - Calculates and returns the global chunk IDs of these subtrees, + /// enabling further state synchronization. + /// + /// # Notes + /// - Ensure that the `subtrees_metadata` accurately reflects the subtrees + /// requiring synchronization. + /// - This function modifies the state of the synchronization session to + /// include the new subtrees. + /// - Proper handling of the returned global chunk IDs is essential to + /// ensure seamless state synchronization. + fn prepare_sync_state_sessions( + self: &mut Pin>>, + db: &'db GroveDb, + subtrees_metadata: SubtreesMetadata, + grove_version: &GroveVersion, + ) -> Result>, Error> { + let mut res = vec![]; + + for (prefix, prefix_metadata) in &subtrees_metadata.data { + if !self.processed_prefixes.contains(prefix) + && !self.current_prefixes.contains_key(prefix) + { + let (current_path, actual_value_hash, elem_value_hash) = &prefix_metadata; + + let subtree_path: Vec<&[u8]> = + current_path.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + + let next_chunks_ids = self.add_subtree_sync_info( + db, + path.into(), + *elem_value_hash, + Some(*actual_value_hash), + *prefix, + grove_version, + )?; + + res.push(next_chunks_ids); + } + } + + Ok(res) + } +} + +/// Struct containing metadata about the current subtrees found in GroveDB. +/// This metadata is used during the state synchronization process to track +/// discovered subtrees and verify their integrity after they are constructed. +pub struct SubtreesMetadata { + /// A map where: + /// - **Key**: `SubtreePrefix` (the path digest of the subtree). + /// - **Value**: A tuple containing: + /// - `Vec>`: The actual path of the subtree in GroveDB. + /// - `CryptoHash`: The parent subtree's actual value hash. + /// - `CryptoHash`: The parent subtree's element value hash. + /// + /// The `parent subtree actual_value_hash` and `parent subtree + /// elem_value_hash` are required to verify the integrity of the newly + /// constructed subtree after synchronization. + pub data: BTreeMap>, CryptoHash, CryptoHash)>, +} + +impl SubtreesMetadata { + pub fn new() -> SubtreesMetadata { + SubtreesMetadata { + data: BTreeMap::new(), + } + } +} + +impl Default for SubtreesMetadata { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for SubtreesMetadata { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for (prefix, metadata) in self.data.iter() { + let metadata_path = &metadata.0; + let metadata_path_str = path_to_string(metadata_path); + writeln!( + f, + " prefix:{:?} -> path:{:?}", + hex::encode(prefix), + metadata_path_str, + )?; + } + Ok(()) + } +} diff --git a/storage/src/rocksdb_storage/storage.rs b/storage/src/rocksdb_storage/storage.rs index 8a91d4f4..44510694 100644 --- a/storage/src/rocksdb_storage/storage.rs +++ b/storage/src/rocksdb_storage/storage.rs @@ -58,7 +58,7 @@ use crate::{ const BLAKE_BLOCK_LEN: usize = 64; -pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; +pub type SubtreePrefix = [u8; blake3::OUT_LEN]; fn blake_block_count(len: usize) -> usize { if len == 0 { @@ -472,6 +472,15 @@ impl<'db> Storage<'db> for RocksDbStorage { .map(|prefix| PrefixedRocksDbStorageContext::new(&self.db, prefix, batch)) } + fn get_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + ) -> CostContext { + PrefixedRocksDbStorageContext::new(&self.db, prefix, batch) + .wrap_with_cost(OperationCost::default()) + } + fn get_transactional_storage_context<'b, B>( &'db self, path: SubtreePath<'b, B>, @@ -486,6 +495,16 @@ impl<'db> Storage<'db> for RocksDbStorage { }) } + fn get_transactional_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + transaction: &'db Self::Transaction, + ) -> CostContext { + PrefixedRocksDbTransactionContext::new(&self.db, transaction, prefix, batch) + .wrap_with_cost(OperationCost::default()) + } + fn get_immediate_storage_context<'b, B>( &'db self, path: SubtreePath<'b, B>, @@ -499,6 +518,15 @@ impl<'db> Storage<'db> for RocksDbStorage { }) } + fn get_immediate_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + transaction: &'db Self::Transaction, + ) -> CostContext { + PrefixedRocksDbImmediateStorageContext::new(&self.db, transaction, prefix) + .wrap_with_cost(OperationCost::default()) + } + fn commit_multi_context_batch( &self, batch: StorageBatch, diff --git a/storage/src/storage.rs b/storage/src/storage.rs index 5ef26e06..2795cfc2 100644 --- a/storage/src/storage.rs +++ b/storage/src/storage.rs @@ -43,6 +43,8 @@ use grovedb_visualize::visualize_to_vec; use crate::{worst_case_costs::WorstKeyLength, Error}; +pub type SubtreePrefix = [u8; blake3::OUT_LEN]; + /// Top-level storage_cost abstraction. /// Should be able to hold storage_cost connection and to start transaction when /// needed. All query operations will be exposed using [StorageContext]. @@ -89,6 +91,14 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; + /// Make storage context for a subtree with prefix, keeping all write + /// operations inside a `batch` if provided. + fn get_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + ) -> CostContext; + /// Make context for a subtree on transactional data, keeping all write /// operations inside a `batch` if provided. fn get_transactional_storage_context<'b, B>( @@ -100,6 +110,15 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; + /// Make context for a subtree by prefix on transactional data, keeping all + /// write operations inside a `batch` if provided. + fn get_transactional_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + transaction: &'db Self::Transaction, + ) -> CostContext; + /// Make context for a subtree on transactional data that will apply all /// operations straight to the storage. fn get_immediate_storage_context<'b, B>( @@ -110,6 +129,14 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; + /// Make context for a subtree by prefix on transactional data that will + /// apply all operations straight to the storage. + fn get_immediate_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + transaction: &'db Self::Transaction, + ) -> CostContext; + /// Creates a database checkpoint in a specified path fn create_checkpoint>(&self, path: P) -> Result<(), Error>; diff --git a/tutorials/Cargo.toml b/tutorials/Cargo.toml index 8084a248..7fd4ee81 100644 --- a/tutorials/Cargo.toml +++ b/tutorials/Cargo.toml @@ -14,6 +14,7 @@ grovedb-visualize = { path = "../visualize" } rand = "0.8.5" hex = "0.4" +blake3 = "1.5.1" [workspace] diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index ceeec2f2..74374c33 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -1,11 +1,11 @@ use std::collections::VecDeque; use std::path::Path; +use std::time::{Duration, Instant}; use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Query, Transaction}; use grovedb::reference_path::ReferencePathType; use rand::{distributions::Alphanumeric, Rng, }; use grovedb::element::SumValue; use grovedb::replication::CURRENT_STATE_SYNC_VERSION; -use grovedb::replication::MultiStateSyncInfo; use grovedb_version::version::GroveVersion; const MAIN_ΚΕΥ: &[u8] = b"key_main"; @@ -18,6 +18,8 @@ const KEY_INT_REF_0: &[u8] = b"key_int_ref_0"; const KEY_INT_A: &[u8] = b"key_sum_0"; const ROOT_PATH: &[&[u8]] = &[]; +pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; + // Allow insertions to overwrite trees // This is necessary so the tutorial can be rerun easily const INSERT_OPTIONS: Option = Some(InsertOptions { @@ -37,14 +39,14 @@ fn populate_db(grovedb_path: String, grove_version: &GroveVersion) -> GroveDb { let tx = db.start_transaction(); let batch_size = 50; - for i in 0..=5 { + for i in 0..=100 { insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_0], i * batch_size, i * batch_size + batch_size - 1, &tx, &grove_version); } let _ = db.commit_transaction(tx); let tx = db.start_transaction(); let batch_size = 50; - for i in 0..=5 { + for i in 0..=100 { insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_1], i * batch_size, i * batch_size + batch_size - 1, &tx, &grove_version); } let _ = db.commit_transaction(tx); @@ -98,15 +100,8 @@ fn main() { let root_hash_destination = db_destination.root_hash(None, grove_version).unwrap().unwrap(); println!("root_hash_destination: {:?}", hex::encode(root_hash_destination)); - println!("\n######### source_subtree_metadata of db_source"); - let subtrees_metadata_source = db_source.get_subtrees_metadata(None, grove_version).unwrap(); - println!("{:?}", subtrees_metadata_source); - println!("\n######### db_checkpoint_0 -> db_destination state sync"); - let state_info = MultiStateSyncInfo::default(); - let tx = db_destination.start_transaction(); - sync_db_demo(&db_checkpoint_0, &db_destination, state_info, &tx, &grove_version).unwrap(); - db_destination.commit_transaction(tx).unwrap().expect("expected to commit transaction"); + sync_db_demo(&db_checkpoint_0, &db_destination, &grove_version).unwrap(); println!("\n######### verify db_destination"); let incorrect_hashes = db_destination.verify_grovedb(None, true, false, grove_version).unwrap(); @@ -246,24 +241,33 @@ fn query_db(db: &GroveDb, path: &[&[u8]], key: Vec, grove_version: &GroveVer fn sync_db_demo( source_db: &GroveDb, target_db: &GroveDb, - state_sync_info: MultiStateSyncInfo, - target_tx: &Transaction, grove_version: &GroveVersion, ) -> Result<(), grovedb::Error> { + let start_time = Instant::now(); let app_hash = source_db.root_hash(None, grove_version).value.unwrap(); - let mut state_sync_info = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx, CURRENT_STATE_SYNC_VERSION, grove_version)?; + let mut session = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION, grove_version)?; let mut chunk_queue : VecDeque> = VecDeque::new(); // The very first chunk to fetch is always identified by the root app_hash chunk_queue.push_back(app_hash.to_vec()); + let mut num_chunks = 0; while let Some(chunk_id) = chunk_queue.pop_front() { + num_chunks += 1; let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION, grove_version)?; - let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, chunk_id.as_slice(), ops, target_tx, CURRENT_STATE_SYNC_VERSION, grove_version)?; - state_sync_info = new_state_sync_info; + + let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), ops, CURRENT_STATE_SYNC_VERSION, grove_version)?; chunk_queue.extend(more_chunks); } + println!("num_chunks: {}", num_chunks); + + if session.is_sync_completed() { + target_db.commit_session(session).expect("failed to commit session"); + } + let elapsed = start_time.elapsed(); + println!("state_synced in {:.2?}", elapsed); + Ok(()) }