From 91ea7dfb51816a5bed65b78897476bcee973eb92 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Fri, 20 Dec 2024 10:39:09 +0200 Subject: [PATCH 1/7] feat: optimization refactor --- grovedb/src/lib.rs | 93 ++- grovedb/src/replication.rs | 773 +++++------------- grovedb/src/replication/state_sync_session.rs | 433 ++++++++++ storage/src/rocksdb_storage/storage.rs | 30 +- storage/src/storage.rs | 27 + tutorials/Cargo.toml | 1 + tutorials/src/bin/replication.rs | 47 +- 7 files changed, 812 insertions(+), 592 deletions(-) create mode 100644 grovedb/src/replication/state_sync_session.rs diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index 57f68d33..0a743029 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -186,6 +186,7 @@ use grovedb_merk::{ tree::{combine_hash, value_hash}, BatchEntry, CryptoHash, KVIterator, Merk, }; +use grovedb_merk::ChunkProducer; #[cfg(feature = "full")] use grovedb_path::SubtreePath; #[cfg(feature = "full")] @@ -220,6 +221,7 @@ use crate::operations::proof::util::hex_to_ascii; use crate::util::{root_merk_optional_tx, storage_context_optional_tx}; #[cfg(feature = "full")] use crate::Error::MerkError; +use crate::replication::util_encode_vec_ops; #[cfg(feature = "full")] type Hash = [u8; 32]; @@ -330,6 +332,44 @@ impl GroveDb { } } + fn open_transactional_merk_by_prefix<'db>( + &'db self, + prefix: SubtreePrefix, + root_key: Option>, + is_sum_tree: bool, + tx: &'db Transaction, + batch: Option<&'db StorageBatch>, + grove_version: &GroveVersion, + ) -> CostResult, Error> + { + let mut cost = OperationCost::default(); + let storage = self + .db + .get_transactional_storage_context_by_subtree_prefix(prefix, batch, tx) + .unwrap_add_cost(&mut cost); + if root_key.is_some() { + Merk::open_layered_with_root_key( + storage, + root_key, + is_sum_tree, + Some(&Element::value_defined_cost_for_serialized_value), + grove_version, + ).map_err(|_| { + Error::CorruptedData("cannot open a subtree by prefix with given root key".to_owned()) + }).add_cost(cost) + } + else { + Merk::open_base( + storage, + false, + Some(&Element::value_defined_cost_for_serialized_value), + grove_version, + ).map_err(|_| { + Error::CorruptedData("cannot open a root subtree by prefix".to_owned()) + }).add_cost(cost) + } + } + /// Opens a Merk at given path for with direct write access. Intended for /// replication purposes. fn open_merk_for_replication<'tx, 'db: 'tx, 'b, B>( @@ -337,7 +377,7 @@ impl GroveDb { path: SubtreePath<'b, B>, tx: &'tx Transaction<'db>, grove_version: &GroveVersion, - ) -> Result>, Error> + ) -> Result<(Merk>, Option>, bool), Error> where B: AsRef<[u8]> + 'b, { @@ -364,9 +404,10 @@ impl GroveDb { .unwrap()?; let is_sum_tree = element.is_sum_tree(); if let Element::Tree(root_key, _) | Element::SumTree(root_key, ..) = element { + Ok(( Merk::open_layered_with_root_key( storage, - root_key, + root_key.clone(), is_sum_tree, Some(&Element::value_defined_cost_for_serialized_value), grove_version, @@ -374,13 +415,17 @@ impl GroveDb { .map_err(|_| { Error::CorruptedData("cannot open a subtree with given root key".to_owned()) }) - .unwrap() + .unwrap()?, + root_key, + is_sum_tree + )) } else { Err(Error::CorruptedPath( "cannot open a subtree as parent exists but is not a tree".to_string(), )) } } else { + Ok(( Merk::open_base( storage, false, @@ -388,7 +433,10 @@ impl GroveDb { grove_version, ) .map_err(|_| Error::CorruptedData("cannot open a the root subtree".to_owned())) - .unwrap() + .unwrap()?, + None, + false + )) } } @@ -458,6 +506,43 @@ impl GroveDb { } } + fn open_non_transactional_merk_by_prefix<'db>( + &'db self, + prefix: SubtreePrefix, + root_key: Option>, + is_sum_tree: bool, + batch: Option<&'db StorageBatch>, + grove_version: &GroveVersion, + ) -> CostResult, Error> + { + let mut cost = OperationCost::default(); + let storage = self + .db + .get_storage_context_by_subtree_prefix(prefix, batch) + .unwrap_add_cost(&mut cost); + if root_key.is_some() { + Merk::open_layered_with_root_key( + storage, + root_key, + is_sum_tree, + Some(&Element::value_defined_cost_for_serialized_value), + grove_version, + ).map_err(|_| { + Error::CorruptedData("cannot open a subtree by prefix with given root key".to_owned()) + }).add_cost(cost) + } + else { + Merk::open_base( + storage, + false, + Some(&Element::value_defined_cost_for_serialized_value), + grove_version, + ).map_err(|_| { + Error::CorruptedData("cannot open a root subtree by prefix".to_owned()) + }).add_cost(cost) + } + } + /// Creates a checkpoint pub fn create_checkpoint>(&self, path: P) -> Result<(), Error> { self.db.create_checkpoint(path).map_err(|e| e.into()) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 876fe62c..902e8c1e 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -1,11 +1,9 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt, -}; +mod state_sync_session; + +use std::pin::Pin; use grovedb_merk::{ ed::Encode, - merk::restore::Restorer, proofs::{Decoder, Op}, tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, ChunkProducer, @@ -16,237 +14,21 @@ use grovedb_storage::rocksdb_storage::RocksDbStorage; use grovedb_storage::rocksdb_storage::storage_context::context_immediate::PrefixedRocksDbImmediateStorageContext; use grovedb_version::{check_grovedb_v0, error::GroveVersionError, version::GroveVersion}; -use crate::{replication, Error, GroveDb, Transaction, TransactionArg}; - -pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; +pub use self::state_sync_session::MultiStateSyncSession; +use self::state_sync_session::SubtreesMetadata; +use crate::{Error, GroveDb, TransactionArg}; pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; -#[derive(Default)] -struct SubtreeStateSyncInfo<'db> { - // Current Chunk restorer - restorer: Option>>, - // Set of global chunk ids requested to be fetched and pending for processing. For the - // description of global chunk id check fetch_chunk(). - pending_chunks: BTreeSet>, - // Number of processed chunks in current prefix (Path digest) - num_processed_chunks: usize, -} - -// Struct governing state sync -pub struct MultiStateSyncInfo<'db> { - // Map of current processing subtrees - // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo - current_prefixes: BTreeMap>, - // Set of processed prefixes (Path digests) - processed_prefixes: BTreeSet, - // Root app_hash - app_hash: [u8; 32], - // Version of state sync protocol, - version: u16, -} - -impl<'db> Default for MultiStateSyncInfo<'db> { - fn default() -> Self { - Self { - current_prefixes: BTreeMap::new(), - processed_prefixes: BTreeSet::new(), - app_hash: [0; 32], - version: CURRENT_STATE_SYNC_VERSION, - } - } -} - -// Struct containing information about current subtrees found in GroveDB -pub struct SubtreesMetadata { - // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent - // Subtree elem_value_hash) Note: Parent Subtree actual_value_hash, Parent Subtree - // elem_value_hash are needed when verifying the new constructed subtree after wards. - pub data: BTreeMap>, CryptoHash, CryptoHash)>, -} - -impl SubtreesMetadata { - pub fn new() -> SubtreesMetadata { - SubtreesMetadata { - data: BTreeMap::new(), - } - } -} - -impl Default for SubtreesMetadata { - fn default() -> Self { - Self::new() - } -} - -impl fmt::Debug for SubtreesMetadata { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - for (prefix, metadata) in self.data.iter() { - let metadata_path = &metadata.0; - let metadata_path_str = util_path_to_string(metadata_path); - writeln!( - f, - " prefix:{:?} -> path:{:?}", - hex::encode(prefix), - metadata_path_str - )?; - } - Ok(()) - } -} - -// Converts a path into a human-readable string (for debugging) -pub fn util_path_to_string(path: &[Vec]) -> Vec { - let mut subtree_path_str: Vec = vec![]; - for subtree in path { - let string = std::str::from_utf8(subtree).expect("should be able to convert path"); - subtree_path_str.push( - string - .parse() - .expect("should be able to parse path to string"), - ); - } - subtree_path_str -} - -// Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] -pub fn util_split_global_chunk_id( - global_chunk_id: &[u8], - app_hash: &[u8], -) -> Result<(crate::SubtreePrefix, Vec), Error> { - let chunk_prefix_length: usize = 32; - if global_chunk_id.len() < chunk_prefix_length { - return Err(Error::CorruptedData( - "expected global chunk id of at least 32 length".to_string(), - )); - } - - if global_chunk_id == app_hash { - let array_of_zeros: [u8; 32] = [0; 32]; - let root_chunk_prefix_key: crate::SubtreePrefix = array_of_zeros; - return Ok((root_chunk_prefix_key, vec![])); - } - - let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); - let mut array = [0u8; 32]; - array.copy_from_slice(chunk_prefix); - let chunk_prefix_key: crate::SubtreePrefix = array; - Ok((chunk_prefix_key, chunk_id.to_vec())) -} - -pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { - let mut res = vec![]; - for op in chunk { - op.encode_into(&mut res) - .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; - } - Ok(res) -} - -pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { - let decoder = Decoder::new(&chunk); - let mut res = vec![]; - for op in decoder { - match op { - Ok(op) => res.push(op), - Err(e) => { - return Err(Error::CorruptedData(format!( - "unable to decode chunk: {}", - e - ))); - } - } - } - Ok(res) -} - #[cfg(feature = "full")] impl GroveDb { - // Returns the discovered subtrees found recursively along with their associated - // metadata Params: - // tx: Transaction. Function returns the data by opening merks at given tx. - // TODO: Add a SubTreePath as param and start searching from that path instead - // of root (as it is now) - pub fn get_subtrees_metadata( - &self, - tx: TransactionArg, - grove_version: &GroveVersion, - ) -> Result { - check_grovedb_v0!( - "is_empty_tree", - grove_version - .grovedb_versions - .replication - .get_subtrees_metadata - ); - let mut subtrees_metadata = SubtreesMetadata::new(); - - let subtrees_root = self - .find_subtrees(&SubtreePath::empty(), tx, grove_version) - .value?; - for subtree in subtrees_root.into_iter() { - let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap(); - - let current_path = SubtreePath::from(path); - - match (current_path.derive_parent(), subtree.last()) { - (Some((parent_path, _)), Some(parent_key)) => match tx { - None => { - let parent_merk = self - .open_non_transactional_merk_at_path(parent_path, None, grove_version) - .value?; - if let Ok(Some((elem_value, elem_value_hash))) = parent_merk - .get_value_and_value_hash( - parent_key, - true, - None::<&fn(&[u8], &GroveVersion) -> Option>, - grove_version, - ) - .value - { - let actual_value_hash = value_hash(&elem_value).unwrap(); - subtrees_metadata.data.insert( - prefix, - (current_path.to_vec(), actual_value_hash, elem_value_hash), - ); - } - } - Some(t) => { - let parent_merk = self - .open_transactional_merk_at_path(parent_path, t, None, grove_version) - .value?; - if let Ok(Some((elem_value, elem_value_hash))) = parent_merk - .get_value_and_value_hash( - parent_key, - true, - None::<&fn(&[u8], &GroveVersion) -> Option>, - grove_version, - ) - .value - { - let actual_value_hash = value_hash(&elem_value).unwrap(); - subtrees_metadata.data.insert( - prefix, - (current_path.to_vec(), actual_value_hash, elem_value_hash), - ); - } - } - }, - _ => { - subtrees_metadata.data.insert( - prefix, - ( - current_path.to_vec(), - CryptoHash::default(), - CryptoHash::default(), - ), - ); - } - } - } - Ok(subtrees_metadata) + pub fn start_syncing_session(&self, app_hash: [u8; 32]) -> Pin> { + MultiStateSyncSession::new(self.start_transaction(), app_hash) + } + + pub fn commit_session(&self, session: Pin>) { + // we do not care about the cost + let _ = self.commit_transaction(session.into_transaction()); } // Fetch a chunk by global chunk id (should be called by ABCI when @@ -262,7 +44,7 @@ impl GroveDb { pub fn fetch_chunk( &self, global_chunk_id: &[u8], - tx: TransactionArg, + transaction: TransactionArg, version: u16, grove_version: &GroveVersion, ) -> Result, Error> { @@ -277,99 +59,80 @@ impl GroveDb { )); } - let root_app_hash = self.root_hash(tx, grove_version).value?; - let (chunk_prefix, chunk_id) = - replication::util_split_global_chunk_id(global_chunk_id, &root_app_hash)?; - - let subtrees_metadata = self.get_subtrees_metadata(tx, grove_version)?; - - match subtrees_metadata.data.get(&chunk_prefix) { - Some(path_data) => { - let subtree = &path_data.0; - let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - - match tx { - None => { - let merk = self - .open_non_transactional_merk_at_path(path.into(), None, grove_version) - .value?; - - if merk.is_empty_tree().unwrap() { - return Ok(vec![]); - } - - let chunk_producer_res = ChunkProducer::new(&merk); - match chunk_producer_res { - Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(&chunk_id, grove_version); - match chunk_res { - Ok((chunk, _)) => match util_encode_vec_ops(chunk) { - Ok(op_bytes) => Ok(op_bytes), - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - }, - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - } - } - Err(_) => Err(Error::CorruptedData( - "Unable to create Chunk producer".to_string(), - )), - } - } - Some(t) => { - let merk = self - .open_transactional_merk_at_path(path.into(), t, None, grove_version) - .value?; - - if merk.is_empty_tree().unwrap() { - return Ok(vec![]); - } - - let chunk_producer_res = ChunkProducer::new(&merk); - match chunk_producer_res { - Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(&chunk_id, grove_version); - match chunk_res { - Ok((chunk, _)) => match util_encode_vec_ops(chunk) { - Ok(op_bytes) => Ok(op_bytes), - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - }, - Err(_) => Err(Error::CorruptedData( - "Unable to create to load chunk".to_string(), - )), - } - } - Err(_) => Err(Error::CorruptedData( - "Unable to create Chunk producer".to_string(), - )), - } - } + let root_app_hash = self.root_hash(transaction, grove_version).value?; + let (chunk_prefix, root_key, is_sum_tree, chunk_id) = + util_split_global_chunk_id_2(global_chunk_id, &root_app_hash)?; + + // TODO: Refactor this by writing fetch_chunk_inner (as only merk constructor and type are different) + match transaction { + None => { + let merk = self.open_non_transactional_merk_by_prefix(chunk_prefix, + root_key, + is_sum_tree, None, grove_version) + .value + .map_err(|e| Error::CorruptedData( + format!("failed to open merk by prefix non-tx:{} with:{}", e, hex::encode(chunk_prefix)), + ))?; + if merk.is_empty_tree().unwrap() { + return Ok(vec![]); } + if merk.height().is_none() { + return Ok(vec![]); + } + + let mut chunk_producer = ChunkProducer::new(&merk) + .map_err(|e| Error::CorruptedData( + format!("failed to create chunk producer by prefix non-tx:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + let ((chunk,_)) = chunk_producer.chunk(&chunk_id, grove_version) + .map_err(|e| Error::CorruptedData( + format!("failed to apply chunk:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + let op_bytes = util_encode_vec_ops(chunk) + .map_err(|e| Error::CorruptedData( + format!("failed to encode chunk ops:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + Ok(op_bytes) + } + Some(tx) => { + let merk = self.open_transactional_merk_by_prefix(chunk_prefix, + root_key, + is_sum_tree, tx, None, grove_version) + .value + .map_err(|e| Error::CorruptedData( + format!("failed to open merk by prefix tx:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + if merk.is_empty_tree().unwrap() { + return Ok(vec![]); + } + + let mut chunk_producer = ChunkProducer::new(&merk) + .map_err(|e| Error::CorruptedData( + format!("failed to create chunk producer by prefix tx:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + let ((chunk,_)) = chunk_producer.chunk(&chunk_id, grove_version) + .map_err(|e| Error::CorruptedData( + format!("failed to apply chunk:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + let op_bytes = util_encode_vec_ops(chunk) + .map_err(|e| Error::CorruptedData( + format!("failed to encode chunk ops:{} with:{}", hex::encode(chunk_prefix), e), + ))?; + Ok(op_bytes) } - None => Err(Error::CorruptedData("Prefix not found".to_string())), } } - // Starts a state sync process (should be called by ABCI when OfferSnapshot - // method is called) Params: - // state_sync_info: Consumed StateSyncInfo - // app_hash: Snapshot's AppHash - // tx: Transaction for the state sync - // Returns the StateSyncInfo transferring ownership back to the caller) + /// Starts a state sync process of a snapshot with `app_hash` root hash, + /// should be called by ABCI when OfferSnapshot method is called. + /// Returns the first set of global chunk ids that can be fetched from + /// sources and a new sync session. pub fn start_snapshot_syncing<'db>( &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, app_hash: CryptoHash, - tx: &'db Transaction, version: u16, grove_version: &GroveVersion, - ) -> Result { + ) -> Result>>, Error> { check_grovedb_v0!( "start_snapshot_syncing", grove_version @@ -383,277 +146,145 @@ impl GroveDb { "Unsupported state sync protocol version".to_string(), )); } - if version != state_sync_info.version { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - - if !state_sync_info.current_prefixes.is_empty() - || !state_sync_info.processed_prefixes.is_empty() - { - return Err(Error::InternalError( - "GroveDB has already started a snapshot syncing".to_string(), - )); - } - println!( - " starting:{:?}...", - replication::util_path_to_string(&[]) - ); + println!(" starting:{:?}...", util_path_to_string(&[])); - let mut root_prefix_state_sync_info = SubtreeStateSyncInfo::default(); let root_prefix = [0u8; 32]; - if let Ok(merk) = self.open_merk_for_replication(SubtreePath::empty(), tx, grove_version) { - let restorer = Restorer::new(merk, app_hash, None); - root_prefix_state_sync_info.restorer = Some(restorer); - root_prefix_state_sync_info.pending_chunks.insert(vec![]); - state_sync_info - .current_prefixes - .insert(root_prefix, root_prefix_state_sync_info); - state_sync_info.app_hash = app_hash; - } else { - return Err(Error::InternalError( - "Unable to open merk for replication".to_string(), - )); - } - Ok(state_sync_info) + let mut session = self.start_syncing_session(app_hash); + + session.add_subtree_sync_info(self, SubtreePath::empty(), app_hash, None, root_prefix, grove_version)?; + + Ok(session) } +} - // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is - // called) Params: - // state_sync_info: Consumed MultiStateSyncInfo - // global_chunk_id: Global chunk id - // chunk: Chunk proof operators encoded in bytes - // tx: Transaction for the state sync - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the MultiStateSyncInfo transferring ownership back to the caller) - pub fn apply_chunk<'db>( - &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, - global_chunk_id: &[u8], - chunk: Vec, - tx: &'db Transaction, - version: u16, - grove_version: &GroveVersion, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { - check_grovedb_v0!( - "apply_chunk", - grove_version.grovedb_versions.replication.apply_chunk +// Converts a path into a human-readable string (for debugging) +pub fn util_path_to_string(path: &[Vec]) -> Vec { + let mut subtree_path_str: Vec = vec![]; + for subtree in path { + let string = std::str::from_utf8(subtree).expect("should be able to convert path"); + subtree_path_str.push( + string + .parse() + .expect("should be able to parse path to string"), ); - // For now, only CURRENT_STATE_SYNC_VERSION is supported - if version != CURRENT_STATE_SYNC_VERSION { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - if version != state_sync_info.version { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } + } + subtree_path_str +} + +pub fn util_split_global_chunk_id_2( + global_chunk_id: &[u8], + app_hash: &[u8], +) -> Result<(crate::SubtreePrefix, Option>, bool, Vec), Error> { + //println!("got>{}", hex::encode(global_chunk_id)); + let chunk_prefix_length: usize = 32; + if global_chunk_id.len() < chunk_prefix_length { + return Err(Error::CorruptedData( + "expected global chunk id of at least 32 length".to_string(), + )); + } - let mut next_chunk_ids = vec![]; + if global_chunk_id == app_hash { + let root_chunk_prefix_key: crate::SubtreePrefix = [0u8; 32]; + return Ok((root_chunk_prefix_key, None, false, vec![])); + } - let (chunk_prefix, chunk_id) = - replication::util_split_global_chunk_id(global_chunk_id, &state_sync_info.app_hash)?; + let (chunk_prefix_key, remaining) = global_chunk_id.split_at(chunk_prefix_length); - if state_sync_info.current_prefixes.is_empty() { - return Err(Error::InternalError( - "GroveDB is not in syncing mode".to_string(), - )); - } - if let Some(subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) { - if let Ok((res, mut new_subtree_state_sync)) = - self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk, grove_version) - { - if !res.is_empty() { - for local_chunk_id in res.iter() { - let mut next_global_chunk_id = chunk_prefix.to_vec(); - next_global_chunk_id.extend(local_chunk_id.to_vec()); - next_chunk_ids.push(next_global_chunk_id); - } - - // re-insert subtree_state_sync in state_sync_info - state_sync_info - .current_prefixes - .insert(chunk_prefix, new_subtree_state_sync); - Ok((next_chunk_ids, state_sync_info)) - } else { - if !new_subtree_state_sync.pending_chunks.is_empty() { - // re-insert subtree_state_sync in state_sync_info - state_sync_info - .current_prefixes - .insert(chunk_prefix, new_subtree_state_sync); - return Ok((vec![], state_sync_info)); - } - - // Subtree is finished. We can save it. - match new_subtree_state_sync.restorer.take() { - None => Err(Error::InternalError( - "Unable to finalize subtree".to_string(), - )), - Some(restorer) => { - if (new_subtree_state_sync.num_processed_chunks > 0) - && (restorer.finalize(grove_version).is_err()) - { - return Err(Error::InternalError( - "Unable to finalize Merk".to_string(), - )); - } - state_sync_info.processed_prefixes.insert(chunk_prefix); - - // Subtree was successfully save. Time to discover new subtrees that - // need to be processed - let subtrees_metadata = - self.get_subtrees_metadata(Some(tx), grove_version)?; - if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { - println!( - " path:{:?} done (num_processed_chunks:{:?})", - replication::util_path_to_string(&value.0), - new_subtree_state_sync.num_processed_chunks - ); - } - - if let Ok((res, new_state_sync_info)) = self.discover_subtrees( - state_sync_info, - subtrees_metadata, - tx, - grove_version, - ) { - next_chunk_ids.extend(res); - Ok((next_chunk_ids, new_state_sync_info)) - } else { - Err(Error::InternalError( - "Unable to discover Subtrees".to_string(), - )) - } - } - } - } - } else { - Err(Error::InternalError( - "Unable to process incoming chunk".to_string(), - )) - } - } else { - Err(Error::InternalError("Invalid incoming prefix".to_string())) - } + let root_key_size_length: usize = 1; + if remaining.len() < root_key_size_length { + return Err(Error::CorruptedData( + "unable to decode root key size".to_string(), + )); } + let (root_key_size, remaining) = remaining.split_at(root_key_size_length); + if remaining.len() < root_key_size[0] as usize { + return Err(Error::CorruptedData( + "unable to decode root key".to_string(), + )); + } + let (root_key, remaining) = remaining.split_at(root_key_size[0] as usize); + let is_sum_tree_length: usize = 1; + if remaining.len() < is_sum_tree_length { + return Err(Error::CorruptedData( + "unable to decode root key".to_string(), + )); + } + let (is_sum_tree, chunk_id) = remaining.split_at(is_sum_tree_length); - // Apply a chunk using the given SubtreeStateSyncInfo - // state_sync_info: Consumed SubtreeStateSyncInfo - // chunk_id: Local chunk id - // chunk_data: Chunk proof operators encoded in bytes - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the SubtreeStateSyncInfo transferring ownership back to the caller) - fn apply_inner_chunk<'db>( - &'db self, - mut state_sync_info: SubtreeStateSyncInfo<'db>, - chunk_id: &[u8], - chunk_data: Vec, - grove_version: &GroveVersion, - ) -> Result<(Vec>, SubtreeStateSyncInfo), Error> { - let mut res = vec![]; - - match &mut state_sync_info.restorer { - Some(restorer) => { - if !state_sync_info.pending_chunks.contains(chunk_id) { - return Err(Error::InternalError( - "Incoming global_chunk_id not expected".to_string(), - )); - } - state_sync_info.pending_chunks.remove(chunk_id); - if !chunk_data.is_empty() { - match util_decode_vec_ops(chunk_data) { - Ok(ops) => { - match restorer.process_chunk(chunk_id, ops, grove_version) { - Ok(next_chunk_ids) => { - state_sync_info.num_processed_chunks += 1; - for next_chunk_id in next_chunk_ids { - state_sync_info - .pending_chunks - .insert(next_chunk_id.clone()); - res.push(next_chunk_id); - } - } - _ => { - return Err(Error::InternalError( - "Unable to process incoming chunk".to_string(), - )); - } - }; - } - Err(_) => { - return Err(Error::CorruptedData( - "Unable to decode incoming chunk".to_string(), - )); - } - } - } - } - _ => { - return Err(Error::InternalError( - "Invalid internal state (restorer".to_string(), - )); - } - } + let subtree_prefix: crate::SubtreePrefix = chunk_prefix_key.try_into() + .map_err(|_| { + Error::CorruptedData( + "unable to construct subtree".to_string(), + ) + })?; - Ok((res, state_sync_info)) + if !root_key.is_empty() { + Ok((subtree_prefix, Some(root_key.to_vec()), is_sum_tree[0] != 0, chunk_id.to_vec())) + } + else { + Ok((subtree_prefix, None, is_sum_tree[0] != 0, chunk_id.to_vec())) } +} - // Prepares SubtreeStateSyncInfos for the freshly discovered subtrees in - // subtrees_metadata and returns the root global chunk ids for all of those - // new subtrees. state_sync_info: Consumed MultiStateSyncInfo - // subtrees_metadata: Metadata about discovered subtrees - // chunk_data: Chunk proof operators - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the MultiStateSyncInfo transferring ownership back to the caller) - fn discover_subtrees<'db>( - &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, - subtrees_metadata: SubtreesMetadata, - tx: &'db Transaction, - grove_version: &GroveVersion, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { - let mut res = vec![]; - - for (prefix, prefix_metadata) in &subtrees_metadata.data { - if !state_sync_info.processed_prefixes.contains(prefix) - && !state_sync_info.current_prefixes.contains_key(prefix) - { - let (current_path, s_actual_value_hash, s_elem_value_hash) = &prefix_metadata; - - let subtree_path: Vec<&[u8]> = - current_path.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - println!( - " path:{:?} starting...", - replication::util_path_to_string(&prefix_metadata.0) - ); - - let mut subtree_state_sync_info = SubtreeStateSyncInfo::default(); - if let Ok(merk) = self.open_merk_for_replication(path.into(), tx, grove_version) { - let restorer = - Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash)); - subtree_state_sync_info.restorer = Some(restorer); - subtree_state_sync_info.pending_chunks.insert(vec![]); - - state_sync_info - .current_prefixes - .insert(*prefix, subtree_state_sync_info); - - let root_chunk_prefix = prefix.to_vec(); - res.push(root_chunk_prefix.to_vec()); - } else { - return Err(Error::InternalError( - "Unable to open Merk for replication".to_string(), - )); - } - } - } +// Create the given global chunk id into [SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID] +pub fn util_create_global_chunk_id_2( + subtree_prefix: [u8; blake3::OUT_LEN], + root_key_opt: Option>, + is_sum_tree:bool, + chunk_id: Vec +) -> (Vec){ + let mut res = vec![]; - Ok((res, state_sync_info)) + res.extend(subtree_prefix); + + let mut root_key_len = 0u8; + let mut root_key_vec = vec![]; + if let Some(root_key) = root_key_opt { + res.push(root_key.len() as u8); + res.extend(root_key.clone()); + root_key_len = root_key.len() as u8; + root_key_vec = root_key; + } + else { + res.push(0u8); + } + + let mut is_sum_tree_v = 0u8; + if is_sum_tree { + is_sum_tree_v = 1u8; } + res.push(is_sum_tree_v); + + + res.extend(chunk_id.to_vec()); + //println!("snd>{}|{}|{}|{}|{:?}", hex::encode(res.clone()), root_key_len, hex::encode(root_key_vec), is_sum_tree_v, chunk_id); + res } + +pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { + let mut res = vec![]; + for op in chunk { + op.encode_into(&mut res) + .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; + } + Ok(res) +} + +pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { + let decoder = Decoder::new(&chunk); + let mut res = vec![]; + for op in decoder { + match op { + Ok(op) => res.push(op), + Err(e) => { + return Err(Error::CorruptedData(format!( + "unable to decode chunk: {}", + e + ))); + } + } + } + Ok(res) +} \ No newline at end of file diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs new file mode 100644 index 00000000..f401a49b --- /dev/null +++ b/grovedb/src/replication/state_sync_session.rs @@ -0,0 +1,433 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt, + marker::PhantomPinned, + pin::Pin, +}; +use std::fs::Metadata; +use grovedb_costs::CostsExt; + +use grovedb_merk::{CryptoHash, Restorer}; +use grovedb_merk::tree::kv::ValueDefinedCostType; +use grovedb_merk::tree::value_hash; +use grovedb_path::SubtreePath; +use grovedb_storage::rocksdb_storage::{PrefixedRocksDbImmediateStorageContext, RocksDbStorage}; +use grovedb_storage::StorageContext; +use grovedb_version::version::GroveVersion; +use super::{util_decode_vec_ops, CURRENT_STATE_SYNC_VERSION, util_create_global_chunk_id_2}; +use crate::{replication::util_path_to_string, Error, GroveDb, Transaction, replication, TransactionArg, Element}; +use crate::util::storage_context_optional_tx; + +pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; + +pub(crate) type SubtreeMetadata = (SubtreePrefix, Vec>, /*Option>, bool,*/ CryptoHash, CryptoHash); + +struct SubtreeStateSyncInfo<'db> { + /// Current Chunk restorer + restorer: Restorer>, + /// Set of global chunk ids requested to be fetched and pending for + /// processing. For the description of global chunk id check + /// fetch_chunk(). + current_root_key: Option>, + is_sum_tree: bool, + pending_chunks: BTreeSet>, + current_path: Vec>, + /// Number of processed chunks in current prefix (Path digest) + num_processed_chunks: usize, +} + +impl<'db> SubtreeStateSyncInfo<'db> { + pub fn get_current_path(&self) -> Vec> { + self.current_path.clone() + } + // Apply a chunk using the given SubtreeStateSyncInfo + // state_sync_info: Consumed SubtreeStateSyncInfo + // chunk_id: Local chunk id + // chunk_data: Chunk proof operators encoded in bytes + // Returns the next set of global chunk ids that can be fetched from sources (+ + // the SubtreeStateSyncInfo transferring ownership back to the caller) + fn apply_inner_chunk( + &mut self, + chunk_id: &[u8], + chunk_data: Vec, + grove_version: &GroveVersion, + ) -> Result>, Error> { + let mut res = vec![]; + + if !self.pending_chunks.contains(chunk_id) { + return Err(Error::InternalError( + "Incoming global_chunk_id not expected".to_string(), + )); + } + self.pending_chunks.remove(chunk_id); + if !chunk_data.is_empty() { + match util_decode_vec_ops(chunk_data) { + Ok(ops) => { + match self.restorer.process_chunk(chunk_id, ops, grove_version) { + Ok(next_chunk_ids) => { + self.num_processed_chunks += 1; + for next_chunk_id in next_chunk_ids { + self.pending_chunks.insert(next_chunk_id.clone()); + res.push(next_chunk_id); + } + } + _ => { + return Err(Error::InternalError("Unable to process incoming chunk".to_string())); + } + }; + } + Err(_) => { + return Err(Error::CorruptedData( + "Unable to decode incoming chunk".to_string(), + )); + } + } + } + + Ok(res) + } +} + +impl<'tx> SubtreeStateSyncInfo<'tx> { + pub fn new(restorer: Restorer>) -> Self { + SubtreeStateSyncInfo { + restorer, + current_root_key: None, + is_sum_tree: false, + pending_chunks: Default::default(), + current_path: vec![], + num_processed_chunks: 0, + } + } +} + +// Struct governing state sync +pub struct MultiStateSyncSession<'db> { + // Map of current processing subtrees + // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo + current_prefixes: BTreeMap>, + // Set of processed prefixes (Path digests) + processed_prefixes: BTreeSet, + // Root app_hash + app_hash: [u8; 32], + // Version of state sync protocol, + pub(crate) version: u16, + // Transaction goes last to be dropped last as well + transaction: Transaction<'db>, + _pin: PhantomPinned, +} + +impl<'db> MultiStateSyncSession<'db> { + /// Initializes a new state sync session. + pub fn new(transaction: Transaction<'db>, app_hash: [u8; 32]) -> Pin> { + Box::pin(MultiStateSyncSession { + transaction, + current_prefixes: Default::default(), + processed_prefixes: Default::default(), + app_hash, + version: CURRENT_STATE_SYNC_VERSION, + _pin: PhantomPinned, + }) + } + + pub fn is_empty(&self) -> bool { + self.current_prefixes.is_empty() + } + + pub fn is_sync_completed(&self) -> bool { + for (_, subtree_state_info) in self.current_prefixes.iter() { + if !subtree_state_info.pending_chunks.is_empty() { + return false; + } + } + return true; + } + + pub fn into_transaction(self: Pin>) -> Transaction<'db> { + // SAFETY: the struct isn't used anymore and no one will refer to transaction + // address again + unsafe { Pin::into_inner_unchecked(self) }.transaction + } + + pub fn add_subtree_sync_info<'b, B: AsRef<[u8]>>( + self: &mut Pin>>, + db: &'db GroveDb, + path: SubtreePath<'b, B>, + hash: CryptoHash, + actual_hash: Option, + chunk_prefix: [u8; 32], + grove_version: &GroveVersion, + ) -> Result<(Vec), Error> { + // SAFETY: we get an immutable reference of a transaction that stays behind + // `Pin` so this reference shall remain valid for the whole session + // object lifetime. + let transaction_ref: &'db Transaction<'db> = unsafe { + let tx: &mut Transaction<'db> = + &mut Pin::into_inner_unchecked(self.as_mut()).transaction; + &*(tx as *mut _) + }; + + if let Ok((merk, root_key, is_sum_tree)) = db.open_merk_for_replication(path.clone(), transaction_ref, grove_version) { + let restorer = Restorer::new(merk, hash, actual_hash); + let mut sync_info = SubtreeStateSyncInfo::new(restorer); + sync_info.pending_chunks.insert(vec![]); + sync_info.current_root_key = root_key.clone(); + sync_info.is_sum_tree = is_sum_tree; + sync_info.current_path = path.to_vec(); + self.as_mut() + .current_prefixes() + .insert(chunk_prefix, sync_info); + let x = util_create_global_chunk_id_2(chunk_prefix, root_key, is_sum_tree, vec![]); + Ok((x)) + } else { + Err(Error::InternalError("Unable to open merk for replication".to_string())) + } + } + + fn current_prefixes( + self: Pin<&mut MultiStateSyncSession<'db>>, + ) -> &mut BTreeMap> { + // SAFETY: no memory-sensitive assumptions are made about fields except the + // `transaciton` so it will be safe to modify them + &mut unsafe { self.get_unchecked_mut() }.current_prefixes + } + + fn processed_prefixes( + self: Pin<&mut MultiStateSyncSession<'db>>, + ) -> &mut BTreeSet { + // SAFETY: no memory-sensitive assumptions are made about fields except the + // `transaciton` so it will be safe to modify them + &mut unsafe { self.get_unchecked_mut() }.processed_prefixes + } + + /// Applies a chunk, shuold be called by ABCI when `ApplySnapshotChunk` + /// method is called. `chunk` is a pair of global chunk id and an + /// encoded proof. + pub fn apply_chunk( + self: &mut Pin>>, + db: &'db GroveDb, + global_chunk_id: &[u8], + chunk: Vec, + version: u16, + grove_version: &GroveVersion, + ) -> Result>, Error> { + // For now, only CURRENT_STATE_SYNC_VERSION is supported + if version != CURRENT_STATE_SYNC_VERSION { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + if version != self.version { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + + let mut next_chunk_ids = vec![]; + + // [OLD_WAY] + //let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id, self.app_hash)?; + // [NEW_WAY] + let (chunk_prefix, key_root, is_summ_tree, chunk_id) = replication::util_split_global_chunk_id_2(global_chunk_id, &self.app_hash)?; + + if self.is_empty() { + return Err(Error::InternalError("GroveDB is not in syncing mode".to_string())); + } + + let current_prefixes = self.as_mut().current_prefixes(); + let Some(subtree_state_sync) = current_prefixes.get_mut(&chunk_prefix) else { + return Err(Error::InternalError("Unable to process incoming chunk".to_string())); + }; + let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk, grove_version) else { + return Err(Error::InternalError("Invalid incoming prefix".to_string())); + }; + + if !res.is_empty() { + for local_chunk_id in res.iter() { + // [NEW_WAY] + let x = util_create_global_chunk_id_2(chunk_prefix, subtree_state_sync.current_root_key.clone(), subtree_state_sync.is_sum_tree.clone(), local_chunk_id.clone()); + next_chunk_ids.push(x); + // [OLD_WAY] + //let mut next_global_chunk_id = chunk_prefix.to_vec(); + //next_global_chunk_id.extend(local_chunk_id.to_vec()); + //next_chunk_ids.push(next_global_chunk_id); + } + + Ok(next_chunk_ids) + } else { + if !subtree_state_sync.pending_chunks.is_empty() { + return Ok(vec![]); + } + + let completed_path = subtree_state_sync.get_current_path(); + + // Subtree is finished. We can save it. + if (subtree_state_sync.num_processed_chunks > 0) + && (current_prefixes + .remove(&chunk_prefix) + .expect("prefix exists") + .restorer + .finalize(grove_version) + .is_err()) + { + return Err(Error::InternalError("Unable to finalize Merk".to_string())); + } + + self.as_mut().processed_prefixes().insert(chunk_prefix); + + println!(" finished tree: {:?}", util_path_to_string(completed_path.as_slice())); + let new_subtrees_metadata = self.discover_new_subtrees_metadata(db, completed_path.to_vec(), grove_version)?; + + if let Ok(res) = self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) { + next_chunk_ids.extend(res); + Ok(next_chunk_ids) + } else { + Err(Error::InternalError("Unable to discover Subtrees".to_string())) + } + } + } + + fn discover_new_subtrees_metadata( + self: &mut Pin>>, + db: &'db GroveDb, + path_vec: Vec>, + grove_version: &GroveVersion, + ) -> Result { + let transaction_ref: &'db Transaction<'db> = unsafe { + let tx: &mut Transaction<'db> = + &mut Pin::into_inner_unchecked(self.as_mut()).transaction; + &*(tx as *mut _) + }; + let subtree_path: Vec<&[u8]> = path_vec.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + let merk = db.open_transactional_merk_at_path(path.into(), transaction_ref, None, grove_version) + .value + .map_err(|e| Error::CorruptedData( + format!("failed to open merk by path-tx:{}", e), + ))?; + if merk.is_empty_tree().unwrap() { + return Ok(SubtreesMetadata::default()); + } + let mut subtree_keys = BTreeSet::new(); + + let mut raw_iter = Element::iterator(merk.storage.raw_iter()).unwrap(); + while let Some((key, value)) = raw_iter.next_element(grove_version).unwrap().unwrap() { + if value.is_any_tree() { + subtree_keys.insert(key.to_vec()); + } + } + + let mut subtrees_metadata = SubtreesMetadata::new(); + for subtree_key in &subtree_keys { + if let Ok(Some((elem_value, elem_value_hash))) = merk + .get_value_and_value_hash( + subtree_key.as_slice(), + true, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .value + { + let actual_value_hash = value_hash(&elem_value).unwrap(); + let mut new_path = path_vec.to_vec(); + new_path.push(subtree_key.to_vec()); + + let subtree_path: Vec<&[u8]> = new_path.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap(); + + println!(" detected {:?} prefix:{}", util_path_to_string(&new_path), hex::encode(prefix)); + + subtrees_metadata.data.insert( + prefix, + (new_path.to_vec(), actual_value_hash, elem_value_hash), + ); + } + } + + Ok((subtrees_metadata)) + } + + /// Prepares sync session for the freshly discovered subtrees and returns + /// global chunk ids of those new subtrees. + fn prepare_sync_state_sessions( + self: &mut Pin>>, + db: &'db GroveDb, + subtrees_metadata: SubtreesMetadata, + grove_version: &GroveVersion, + ) -> Result>, Error> { + let mut res = vec![]; + + for (prefix, prefix_metadata) in &subtrees_metadata.data { + if !self.processed_prefixes.contains(prefix) + && !self.current_prefixes.contains_key(prefix) + { + let (current_path, actual_value_hash, elem_value_hash) = &prefix_metadata; + + let subtree_path: Vec<&[u8]> = + current_path.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + println!( + " path:{:?} starting...", + util_path_to_string(&prefix_metadata.0) + ); + + let x = self.add_subtree_sync_info( + db, + path.into(), + elem_value_hash.clone(), + Some(actual_value_hash.clone()), + prefix.clone(), + grove_version + )?; + + // [NEW_WAY] + res.push(x); + // [OLD_WAY] + //let root_chunk_prefix = prefix.to_vec(); + //res.push(root_chunk_prefix.to_vec()); + //res.push(prefix.to_vec()); + } + } + + Ok(res) + } +} + +// Struct containing information about current subtrees found in GroveDB +pub struct SubtreesMetadata { + // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent + // Subtree elem_value_hash) Note: Parent Subtree actual_value_hash, Parent Subtree + // elem_value_hash are needed when verifying the new constructed subtree after wards. + pub data: BTreeMap>, CryptoHash, CryptoHash)>, +} + +impl SubtreesMetadata { + pub fn new() -> SubtreesMetadata { + SubtreesMetadata { + data: BTreeMap::new(), + } + } +} + +impl Default for SubtreesMetadata { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for SubtreesMetadata { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for (prefix, metadata) in self.data.iter() { + let metadata_path = &metadata.0; + let metadata_path_str = util_path_to_string(metadata_path); + writeln!( + f, + " prefix:{:?} -> path:{:?}", + hex::encode(prefix), + metadata_path_str, + )?; + } + Ok(()) + } +} \ No newline at end of file diff --git a/storage/src/rocksdb_storage/storage.rs b/storage/src/rocksdb_storage/storage.rs index 8a91d4f4..d1833707 100644 --- a/storage/src/rocksdb_storage/storage.rs +++ b/storage/src/rocksdb_storage/storage.rs @@ -58,7 +58,7 @@ use crate::{ const BLAKE_BLOCK_LEN: usize = 64; -pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; +pub type SubtreePrefix = [u8; blake3::OUT_LEN]; fn blake_block_count(len: usize) -> usize { if len == 0 { @@ -472,6 +472,15 @@ impl<'db> Storage<'db> for RocksDbStorage { .map(|prefix| PrefixedRocksDbStorageContext::new(&self.db, prefix, batch)) } + fn get_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + ) -> CostContext + { + PrefixedRocksDbStorageContext::new(&self.db, prefix, batch).wrap_with_cost(OperationCost::default()) + } + fn get_transactional_storage_context<'b, B>( &'db self, path: SubtreePath<'b, B>, @@ -486,6 +495,16 @@ impl<'db> Storage<'db> for RocksDbStorage { }) } + fn get_transactional_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + transaction: &'db Self::Transaction, + ) -> CostContext + { + PrefixedRocksDbTransactionContext::new(&self.db, transaction, prefix, batch).wrap_with_cost(OperationCost::default()) + } + fn get_immediate_storage_context<'b, B>( &'db self, path: SubtreePath<'b, B>, @@ -499,6 +518,15 @@ impl<'db> Storage<'db> for RocksDbStorage { }) } + fn get_immediate_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + transaction: &'db Self::Transaction, + ) -> CostContext + { + PrefixedRocksDbImmediateStorageContext::new(&self.db, transaction, prefix).wrap_with_cost(OperationCost::default()) + } + fn commit_multi_context_batch( &self, batch: StorageBatch, diff --git a/storage/src/storage.rs b/storage/src/storage.rs index 5ef26e06..019b0e48 100644 --- a/storage/src/storage.rs +++ b/storage/src/storage.rs @@ -43,6 +43,8 @@ use grovedb_visualize::visualize_to_vec; use crate::{worst_case_costs::WorstKeyLength, Error}; +pub type SubtreePrefix = [u8; blake3::OUT_LEN]; + /// Top-level storage_cost abstraction. /// Should be able to hold storage_cost connection and to start transaction when /// needed. All query operations will be exposed using [StorageContext]. @@ -89,6 +91,14 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; + /// Make storage context for a subtree with prefix, keeping all write + /// operations inside a `batch` if provided. + fn get_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + ) -> CostContext; + /// Make context for a subtree on transactional data, keeping all write /// operations inside a `batch` if provided. fn get_transactional_storage_context<'b, B>( @@ -100,6 +110,15 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; + /// Make context for a subtree by prefix on transactional data, keeping all write + /// operations inside a `batch` if provided. + fn get_transactional_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + transaction: &'db Self::Transaction, + ) -> CostContext; + /// Make context for a subtree on transactional data that will apply all /// operations straight to the storage. fn get_immediate_storage_context<'b, B>( @@ -110,6 +129,14 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; + /// Make context for a subtree by prefix on transactional data that will apply all + /// operations straight to the storage. + fn get_immediate_storage_context_by_subtree_prefix( + &'db self, + prefix: SubtreePrefix, + transaction: &'db Self::Transaction, + ) -> CostContext; + /// Creates a database checkpoint in a specified path fn create_checkpoint>(&self, path: P) -> Result<(), Error>; diff --git a/tutorials/Cargo.toml b/tutorials/Cargo.toml index 8084a248..7fd4ee81 100644 --- a/tutorials/Cargo.toml +++ b/tutorials/Cargo.toml @@ -14,6 +14,7 @@ grovedb-visualize = { path = "../visualize" } rand = "0.8.5" hex = "0.4" +blake3 = "1.5.1" [workspace] diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index ceeec2f2..a8227fe5 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -1,12 +1,13 @@ use std::collections::VecDeque; use std::path::Path; +use std::time::{Duration, Instant}; use grovedb::{operations::insert::InsertOptions, Element, GroveDb, PathQuery, Query, Transaction}; use grovedb::reference_path::ReferencePathType; use rand::{distributions::Alphanumeric, Rng, }; use grovedb::element::SumValue; use grovedb::replication::CURRENT_STATE_SYNC_VERSION; -use grovedb::replication::MultiStateSyncInfo; use grovedb_version::version::GroveVersion; +use grovedb::replication::MultiStateSyncSession; const MAIN_ΚΕΥ: &[u8] = b"key_main"; const MAIN_ΚΕΥ_EMPTY: &[u8] = b"key_main_empty"; @@ -18,6 +19,8 @@ const KEY_INT_REF_0: &[u8] = b"key_int_ref_0"; const KEY_INT_A: &[u8] = b"key_sum_0"; const ROOT_PATH: &[&[u8]] = &[]; +pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; + // Allow insertions to overwrite trees // This is necessary so the tutorial can be rerun easily const INSERT_OPTIONS: Option = Some(InsertOptions { @@ -37,14 +40,14 @@ fn populate_db(grovedb_path: String, grove_version: &GroveVersion) -> GroveDb { let tx = db.start_transaction(); let batch_size = 50; - for i in 0..=5 { + for i in 0..=100 { insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_0], i * batch_size, i * batch_size + batch_size - 1, &tx, &grove_version); } let _ = db.commit_transaction(tx); let tx = db.start_transaction(); let batch_size = 50; - for i in 0..=5 { + for i in 0..=100 { insert_range_values_db(&db, &[MAIN_ΚΕΥ, KEY_INT_1], i * batch_size, i * batch_size + batch_size - 1, &tx, &grove_version); } let _ = db.commit_transaction(tx); @@ -98,15 +101,8 @@ fn main() { let root_hash_destination = db_destination.root_hash(None, grove_version).unwrap().unwrap(); println!("root_hash_destination: {:?}", hex::encode(root_hash_destination)); - println!("\n######### source_subtree_metadata of db_source"); - let subtrees_metadata_source = db_source.get_subtrees_metadata(None, grove_version).unwrap(); - println!("{:?}", subtrees_metadata_source); - println!("\n######### db_checkpoint_0 -> db_destination state sync"); - let state_info = MultiStateSyncInfo::default(); - let tx = db_destination.start_transaction(); - sync_db_demo(&db_checkpoint_0, &db_destination, state_info, &tx, &grove_version).unwrap(); - db_destination.commit_transaction(tx).unwrap().expect("expected to commit transaction"); + sync_db_demo(&db_checkpoint_0, &db_destination, &grove_version).unwrap(); println!("\n######### verify db_destination"); let incorrect_hashes = db_destination.verify_grovedb(None, true, false, grove_version).unwrap(); @@ -246,24 +242,43 @@ fn query_db(db: &GroveDb, path: &[&[u8]], key: Vec, grove_version: &GroveVer fn sync_db_demo( source_db: &GroveDb, target_db: &GroveDb, - state_sync_info: MultiStateSyncInfo, - target_tx: &Transaction, grove_version: &GroveVersion, ) -> Result<(), grovedb::Error> { + let start_time = Instant::now(); let app_hash = source_db.root_hash(None, grove_version).value.unwrap(); - let mut state_sync_info = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx, CURRENT_STATE_SYNC_VERSION, grove_version)?; + let mut session = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION, grove_version)?; let mut chunk_queue : VecDeque> = VecDeque::new(); // The very first chunk to fetch is always identified by the root app_hash chunk_queue.push_back(app_hash.to_vec()); + let mut num_chunks = 0; + let mut duration_sum_fetch: Duration = Duration::ZERO; + let mut duration_sum_apply: Duration = Duration::ZERO; while let Some(chunk_id) = chunk_queue.pop_front() { + num_chunks += 1; + let start_time_fetch = Instant::now(); let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION, grove_version)?; - let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, chunk_id.as_slice(), ops, target_tx, CURRENT_STATE_SYNC_VERSION, grove_version)?; - state_sync_info = new_state_sync_info; + let elapsed_fetch = start_time_fetch.elapsed(); + duration_sum_fetch += elapsed_fetch; + + let start_time_apply = Instant::now(); + let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), ops, CURRENT_STATE_SYNC_VERSION, grove_version)?; + let elapsed_apply = start_time_apply.elapsed(); + duration_sum_apply += elapsed_apply; chunk_queue.extend(more_chunks); } + println!("num_chunks: {}", num_chunks); + println!("duration_sum_fetch: {}", duration_sum_fetch.as_secs_f64()); + println!("duration_sum_apply: {}", duration_sum_apply.as_secs_f64()); + + if session.is_sync_completed() { + target_db.commit_session(session); + } + let elapsed = start_time.elapsed(); + println!("state_synced in {:.2?}", elapsed); + Ok(()) } From e18a10fee748495bd325b7e63c7d307631b7cedb Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Mon, 23 Dec 2024 14:18:06 +0200 Subject: [PATCH 2/7] fix: display non-utf8 path --- grovedb/src/replication.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 902e8c1e..91d39d16 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -163,11 +163,9 @@ impl GroveDb { pub fn util_path_to_string(path: &[Vec]) -> Vec { let mut subtree_path_str: Vec = vec![]; for subtree in path { - let string = std::str::from_utf8(subtree).expect("should be able to convert path"); + let string = std::str::from_utf8(&subtree).unwrap_or_else(|_| ""); subtree_path_str.push( - string - .parse() - .expect("should be able to parse path to string"), + string.to_string() ); } subtree_path_str From f9379234ac8ca5f6976bf11edb34c4ac6b333053 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Mon, 23 Dec 2024 14:18:35 +0200 Subject: [PATCH 3/7] remove check on height --- grovedb/src/replication.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 91d39d16..7f08c16a 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -76,9 +76,6 @@ impl GroveDb { if merk.is_empty_tree().unwrap() { return Ok(vec![]); } - if merk.height().is_none() { - return Ok(vec![]); - } let mut chunk_producer = ChunkProducer::new(&merk) .map_err(|e| Error::CorruptedData( From 70460ebd2f9e510003811c99243a513905edf552 Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Thu, 26 Dec 2024 12:11:44 +0200 Subject: [PATCH 4/7] refactor: various refactoring and comments --- grovedb/src/replication.rs | 29 ++++---- grovedb/src/replication/state_sync_session.rs | 74 +++++++++---------- tutorials/src/bin/replication.rs | 11 --- 3 files changed, 48 insertions(+), 66 deletions(-) diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 7f08c16a..96e1f18f 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -5,17 +5,13 @@ use std::pin::Pin; use grovedb_merk::{ ed::Encode, proofs::{Decoder, Op}, - tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, + tree::{hash::CryptoHash}, ChunkProducer, }; use grovedb_path::SubtreePath; -use grovedb_storage::rocksdb_storage::RocksDbStorage; -#[rustfmt::skip] -use grovedb_storage::rocksdb_storage::storage_context::context_immediate::PrefixedRocksDbImmediateStorageContext; use grovedb_version::{check_grovedb_v0, error::GroveVersionError, version::GroveVersion}; pub use self::state_sync_session::MultiStateSyncSession; -use self::state_sync_session::SubtreesMetadata; use crate::{Error, GroveDb, TransactionArg}; pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; @@ -34,12 +30,18 @@ impl GroveDb { // Fetch a chunk by global chunk id (should be called by ABCI when // LoadSnapshotChunk method is called) Params: // global_chunk_id: Global chunk id in the following format: - // [SUBTREE_PREFIX:CHUNK_ID] SUBTREE_PREFIX: 32 bytes (mandatory) (All zeros - // = Root subtree) CHUNK_ID: 0.. bytes (optional) Traversal instructions to + // [SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID] + // SUBTREE_PREFIX: 32 bytes (mandatory) (All zeros = Root subtree) + // SIZE_ROOT_KEY: 1 byte: Size of ROOT_KEY in bytes + // ROOT_KEY: SIZE_ROOT_KEY bytes (optional) + // IS_SUM_TREE: 1 byte (mandatory) marks if the tree is a sum tree or not + // CHUNK_ID: 0.. bytes (optional) Traversal instructions to // the root of the given chunk. Traversal instructions are "1" for left, and // "0" for right. TODO: Compact CHUNK_ID into bitset for size optimization // as a subtree can be big hence traversal instructions for the deepest chunks - // tx: Transaction. Function returns the data by opening merks at given tx. + // transaction: Transaction. Function returns the data by opening merks at given tx. + // version: Version of state sync protocol version + // grove_version: Version of groveDB // Returns the Chunk proof operators for the requested chunk encoded in bytes pub fn fetch_chunk( &self, @@ -61,7 +63,7 @@ impl GroveDb { let root_app_hash = self.root_hash(transaction, grove_version).value?; let (chunk_prefix, root_key, is_sum_tree, chunk_id) = - util_split_global_chunk_id_2(global_chunk_id, &root_app_hash)?; + util_split_global_chunk_id(global_chunk_id, &root_app_hash)?; // TODO: Refactor this by writing fetch_chunk_inner (as only merk constructor and type are different) match transaction { @@ -168,11 +170,11 @@ pub fn util_path_to_string(path: &[Vec]) -> Vec { subtree_path_str } -pub fn util_split_global_chunk_id_2( +// Splits the given global chunk id into [SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID] +pub fn util_split_global_chunk_id( global_chunk_id: &[u8], app_hash: &[u8], ) -> Result<(crate::SubtreePrefix, Option>, bool, Vec), Error> { - //println!("got>{}", hex::encode(global_chunk_id)); let chunk_prefix_length: usize = 32; if global_chunk_id.len() < chunk_prefix_length { return Err(Error::CorruptedData( @@ -224,7 +226,7 @@ pub fn util_split_global_chunk_id_2( } // Create the given global chunk id into [SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID] -pub fn util_create_global_chunk_id_2( +pub fn util_create_global_chunk_id( subtree_prefix: [u8; blake3::OUT_LEN], root_key_opt: Option>, is_sum_tree:bool, @@ -252,9 +254,8 @@ pub fn util_create_global_chunk_id_2( } res.push(is_sum_tree_v); - res.extend(chunk_id.to_vec()); - //println!("snd>{}|{}|{}|{}|{:?}", hex::encode(res.clone()), root_key_len, hex::encode(root_key_vec), is_sum_tree_v, chunk_id); + res } diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs index f401a49b..1545cca6 100644 --- a/grovedb/src/replication/state_sync_session.rs +++ b/grovedb/src/replication/state_sync_session.rs @@ -4,8 +4,6 @@ use std::{ marker::PhantomPinned, pin::Pin, }; -use std::fs::Metadata; -use grovedb_costs::CostsExt; use grovedb_merk::{CryptoHash, Restorer}; use grovedb_merk::tree::kv::ValueDefinedCostType; @@ -14,13 +12,12 @@ use grovedb_path::SubtreePath; use grovedb_storage::rocksdb_storage::{PrefixedRocksDbImmediateStorageContext, RocksDbStorage}; use grovedb_storage::StorageContext; use grovedb_version::version::GroveVersion; -use super::{util_decode_vec_ops, CURRENT_STATE_SYNC_VERSION, util_create_global_chunk_id_2}; -use crate::{replication::util_path_to_string, Error, GroveDb, Transaction, replication, TransactionArg, Element}; -use crate::util::storage_context_optional_tx; +use super::{util_decode_vec_ops, CURRENT_STATE_SYNC_VERSION, util_create_global_chunk_id}; +use crate::{replication::util_path_to_string, Error, GroveDb, Transaction, replication, Element}; pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; -pub(crate) type SubtreeMetadata = (SubtreePrefix, Vec>, /*Option>, bool,*/ CryptoHash, CryptoHash); +pub(crate) type SubtreeMetadata = (SubtreePrefix, Vec>, CryptoHash, CryptoHash); struct SubtreeStateSyncInfo<'db> { /// Current Chunk restorer @@ -28,9 +25,12 @@ struct SubtreeStateSyncInfo<'db> { /// Set of global chunk ids requested to be fetched and pending for /// processing. For the description of global chunk id check /// fetch_chunk(). - current_root_key: Option>, - is_sum_tree: bool, pending_chunks: BTreeSet>, + /// Tree root key + root_key: Option>, + /// Is Sum tree? + is_sum_tree: bool, + /// Path of current tree current_path: Vec>, /// Number of processed chunks in current prefix (Path digest) num_processed_chunks: usize, @@ -40,6 +40,7 @@ impl<'db> SubtreeStateSyncInfo<'db> { pub fn get_current_path(&self) -> Vec> { self.current_path.clone() } + // Apply a chunk using the given SubtreeStateSyncInfo // state_sync_info: Consumed SubtreeStateSyncInfo // chunk_id: Local chunk id @@ -92,7 +93,7 @@ impl<'tx> SubtreeStateSyncInfo<'tx> { pub fn new(restorer: Restorer>) -> Self { SubtreeStateSyncInfo { restorer, - current_root_key: None, + root_key: None, is_sum_tree: false, pending_chunks: Default::default(), current_path: vec![], @@ -140,7 +141,8 @@ impl<'db> MultiStateSyncSession<'db> { return false; } } - return true; + + true } pub fn into_transaction(self: Pin>) -> Transaction<'db> { @@ -171,14 +173,13 @@ impl<'db> MultiStateSyncSession<'db> { let restorer = Restorer::new(merk, hash, actual_hash); let mut sync_info = SubtreeStateSyncInfo::new(restorer); sync_info.pending_chunks.insert(vec![]); - sync_info.current_root_key = root_key.clone(); + sync_info.root_key = root_key.clone(); sync_info.is_sum_tree = is_sum_tree; sync_info.current_path = path.to_vec(); self.as_mut() .current_prefixes() .insert(chunk_prefix, sync_info); - let x = util_create_global_chunk_id_2(chunk_prefix, root_key, is_sum_tree, vec![]); - Ok((x)) + Ok((util_create_global_chunk_id(chunk_prefix, root_key, is_sum_tree, vec![]))) } else { Err(Error::InternalError("Unable to open merk for replication".to_string())) } @@ -225,10 +226,7 @@ impl<'db> MultiStateSyncSession<'db> { let mut next_chunk_ids = vec![]; - // [OLD_WAY] - //let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id, self.app_hash)?; - // [NEW_WAY] - let (chunk_prefix, key_root, is_summ_tree, chunk_id) = replication::util_split_global_chunk_id_2(global_chunk_id, &self.app_hash)?; + let (chunk_prefix, _, _, chunk_id) = replication::util_split_global_chunk_id(global_chunk_id, &self.app_hash)?; if self.is_empty() { return Err(Error::InternalError("GroveDB is not in syncing mode".to_string())); @@ -244,13 +242,7 @@ impl<'db> MultiStateSyncSession<'db> { if !res.is_empty() { for local_chunk_id in res.iter() { - // [NEW_WAY] - let x = util_create_global_chunk_id_2(chunk_prefix, subtree_state_sync.current_root_key.clone(), subtree_state_sync.is_sum_tree.clone(), local_chunk_id.clone()); - next_chunk_ids.push(x); - // [OLD_WAY] - //let mut next_global_chunk_id = chunk_prefix.to_vec(); - //next_global_chunk_id.extend(local_chunk_id.to_vec()); - //next_chunk_ids.push(next_global_chunk_id); + next_chunk_ids.push(util_create_global_chunk_id(chunk_prefix, subtree_state_sync.root_key.clone(), subtree_state_sync.is_sum_tree.clone(), local_chunk_id.clone())); } Ok(next_chunk_ids) @@ -262,15 +254,20 @@ impl<'db> MultiStateSyncSession<'db> { let completed_path = subtree_state_sync.get_current_path(); // Subtree is finished. We can save it. - if (subtree_state_sync.num_processed_chunks > 0) - && (current_prefixes - .remove(&chunk_prefix) - .expect("prefix exists") - .restorer - .finalize(grove_version) - .is_err()) - { - return Err(Error::InternalError("Unable to finalize Merk".to_string())); + if subtree_state_sync.num_processed_chunks > 0 { + if let Some(prefix_data) = current_prefixes.remove(&chunk_prefix) { + if let Err(err) = prefix_data.restorer.finalize(grove_version) { + return Err(Error::InternalError(format!( + "Unable to finalize Merk: {:?}", + err + ))); + } + } else { + return Err(Error::InternalError(format!( + "Prefix {:?} does not exist in current_prefixes", + chunk_prefix + ))); + } } self.as_mut().processed_prefixes().insert(chunk_prefix); @@ -336,7 +333,7 @@ impl<'db> MultiStateSyncSession<'db> { let path: &[&[u8]] = &subtree_path; let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap(); - println!(" detected {:?} prefix:{}", util_path_to_string(&new_path), hex::encode(prefix)); + println!(" discovered {:?} prefix:{}", util_path_to_string(&new_path), hex::encode(prefix)); subtrees_metadata.data.insert( prefix, @@ -372,7 +369,7 @@ impl<'db> MultiStateSyncSession<'db> { util_path_to_string(&prefix_metadata.0) ); - let x = self.add_subtree_sync_info( + let next_chunks_ids = self.add_subtree_sync_info( db, path.into(), elem_value_hash.clone(), @@ -381,12 +378,7 @@ impl<'db> MultiStateSyncSession<'db> { grove_version )?; - // [NEW_WAY] - res.push(x); - // [OLD_WAY] - //let root_chunk_prefix = prefix.to_vec(); - //res.push(root_chunk_prefix.to_vec()); - //res.push(prefix.to_vec()); + res.push(next_chunks_ids); } } diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index a8227fe5..96cb867f 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -7,7 +7,6 @@ use rand::{distributions::Alphanumeric, Rng, }; use grovedb::element::SumValue; use grovedb::replication::CURRENT_STATE_SYNC_VERSION; use grovedb_version::version::GroveVersion; -use grovedb::replication::MultiStateSyncSession; const MAIN_ΚΕΥ: &[u8] = b"key_main"; const MAIN_ΚΕΥ_EMPTY: &[u8] = b"key_main_empty"; @@ -254,24 +253,14 @@ fn sync_db_demo( chunk_queue.push_back(app_hash.to_vec()); let mut num_chunks = 0; - let mut duration_sum_fetch: Duration = Duration::ZERO; - let mut duration_sum_apply: Duration = Duration::ZERO; while let Some(chunk_id) = chunk_queue.pop_front() { num_chunks += 1; - let start_time_fetch = Instant::now(); let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION, grove_version)?; - let elapsed_fetch = start_time_fetch.elapsed(); - duration_sum_fetch += elapsed_fetch; - let start_time_apply = Instant::now(); let more_chunks = session.apply_chunk(&target_db, chunk_id.as_slice(), ops, CURRENT_STATE_SYNC_VERSION, grove_version)?; - let elapsed_apply = start_time_apply.elapsed(); - duration_sum_apply += elapsed_apply; chunk_queue.extend(more_chunks); } println!("num_chunks: {}", num_chunks); - println!("duration_sum_fetch: {}", duration_sum_fetch.as_secs_f64()); - println!("duration_sum_apply: {}", duration_sum_apply.as_secs_f64()); if session.is_sync_completed() { target_db.commit_session(session); From 995aec74fff77f86f23ba86e41279ba7a1b546ae Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Mon, 30 Dec 2024 14:34:53 +0200 Subject: [PATCH 5/7] refactor: suggestions and richer doc --- grovedb/src/lib.rs | 99 ++-- grovedb/src/replication.rs | 558 ++++++++++++------ grovedb/src/replication/state_sync_session.rs | 311 +++++++--- storage/src/rocksdb_storage/storage.rs | 18 +- storage/src/storage.rs | 8 +- tutorials/src/bin/replication.rs | 2 +- 6 files changed, 673 insertions(+), 323 deletions(-) diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index 0a743029..111a113f 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -186,7 +186,6 @@ use grovedb_merk::{ tree::{combine_hash, value_hash}, BatchEntry, CryptoHash, KVIterator, Merk, }; -use grovedb_merk::ChunkProducer; #[cfg(feature = "full")] use grovedb_path::SubtreePath; #[cfg(feature = "full")] @@ -221,7 +220,6 @@ use crate::operations::proof::util::hex_to_ascii; use crate::util::{root_merk_optional_tx, storage_context_optional_tx}; #[cfg(feature = "full")] use crate::Error::MerkError; -use crate::replication::util_encode_vec_ops; #[cfg(feature = "full")] type Hash = [u8; 32]; @@ -340,8 +338,7 @@ impl GroveDb { tx: &'db Transaction, batch: Option<&'db StorageBatch>, grove_version: &GroveVersion, - ) -> CostResult, Error> - { + ) -> CostResult>, Error> { let mut cost = OperationCost::default(); let storage = self .db @@ -354,19 +351,22 @@ impl GroveDb { is_sum_tree, Some(&Element::value_defined_cost_for_serialized_value), grove_version, - ).map_err(|_| { - Error::CorruptedData("cannot open a subtree by prefix with given root key".to_owned()) - }).add_cost(cost) - } - else { + ) + .map_err(|_| { + Error::CorruptedData( + "cannot open a subtree by prefix with given root key".to_owned(), + ) + }) + .add_cost(cost) + } else { Merk::open_base( storage, false, Some(&Element::value_defined_cost_for_serialized_value), grove_version, - ).map_err(|_| { - Error::CorruptedData("cannot open a root subtree by prefix".to_owned()) - }).add_cost(cost) + ) + .map_err(|_| Error::CorruptedData("cannot open a root subtree by prefix".to_owned())) + .add_cost(cost) } } @@ -377,7 +377,14 @@ impl GroveDb { path: SubtreePath<'b, B>, tx: &'tx Transaction<'db>, grove_version: &GroveVersion, - ) -> Result<(Merk>, Option>, bool), Error> + ) -> Result< + ( + Merk>, + Option>, + bool, + ), + Error, + > where B: AsRef<[u8]> + 'b, { @@ -405,19 +412,19 @@ impl GroveDb { let is_sum_tree = element.is_sum_tree(); if let Element::Tree(root_key, _) | Element::SumTree(root_key, ..) = element { Ok(( - Merk::open_layered_with_root_key( - storage, - root_key.clone(), - is_sum_tree, - Some(&Element::value_defined_cost_for_serialized_value), - grove_version, - ) - .map_err(|_| { - Error::CorruptedData("cannot open a subtree with given root key".to_owned()) - }) - .unwrap()?, + Merk::open_layered_with_root_key( + storage, + root_key.clone(), + is_sum_tree, + Some(&Element::value_defined_cost_for_serialized_value), + grove_version, + ) + .map_err(|_| { + Error::CorruptedData("cannot open a subtree with given root key".to_owned()) + }) + .unwrap()?, root_key, - is_sum_tree + is_sum_tree, )) } else { Err(Error::CorruptedPath( @@ -426,16 +433,16 @@ impl GroveDb { } } else { Ok(( - Merk::open_base( - storage, - false, - None::<&fn(&[u8], &GroveVersion) -> Option>, - grove_version, - ) - .map_err(|_| Error::CorruptedData("cannot open a the root subtree".to_owned())) - .unwrap()?, + Merk::open_base( + storage, + false, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .map_err(|_| Error::CorruptedData("cannot open a the root subtree".to_owned())) + .unwrap()?, None, - false + false, )) } } @@ -446,7 +453,7 @@ impl GroveDb { path: SubtreePath<'b, B>, batch: Option<&'db StorageBatch>, grove_version: &GroveVersion, - ) -> CostResult, Error> + ) -> CostResult>, Error> where B: AsRef<[u8]> + 'b, { @@ -513,8 +520,7 @@ impl GroveDb { is_sum_tree: bool, batch: Option<&'db StorageBatch>, grove_version: &GroveVersion, - ) -> CostResult, Error> - { + ) -> CostResult>, Error> { let mut cost = OperationCost::default(); let storage = self .db @@ -527,19 +533,22 @@ impl GroveDb { is_sum_tree, Some(&Element::value_defined_cost_for_serialized_value), grove_version, - ).map_err(|_| { - Error::CorruptedData("cannot open a subtree by prefix with given root key".to_owned()) - }).add_cost(cost) - } - else { + ) + .map_err(|_| { + Error::CorruptedData( + "cannot open a subtree by prefix with given root key".to_owned(), + ) + }) + .add_cost(cost) + } else { Merk::open_base( storage, false, Some(&Element::value_defined_cost_for_serialized_value), grove_version, - ).map_err(|_| { - Error::CorruptedData("cannot open a root subtree by prefix".to_owned()) - }).add_cost(cost) + ) + .map_err(|_| Error::CorruptedData("cannot open a root subtree by prefix".to_owned())) + .add_cost(cost) } } diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 96e1f18f..b1a3e02f 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -2,18 +2,22 @@ mod state_sync_session; use std::pin::Pin; -use grovedb_merk::{ - ed::Encode, - proofs::{Decoder, Op}, - tree::{hash::CryptoHash}, - ChunkProducer, -}; +use grovedb_merk::{tree::hash::CryptoHash, ChunkProducer}; use grovedb_path::SubtreePath; use grovedb_version::{check_grovedb_v0, error::GroveVersionError, version::GroveVersion}; pub use self::state_sync_session::MultiStateSyncSession; use crate::{Error, GroveDb, TransactionArg}; +/// Type alias representing a chunk identifier in the state synchronization +/// process. +/// +/// - `SubtreePrefix`: The prefix of the subtree (32 bytes). +/// - `Option>`: The root key, which may be `None` if not present. +/// - `bool`: Indicates whether the tree is a sum tree. +/// - `Vec`: The chunk ID representing traversal instructions. +pub type ChunkIdentifier = (crate::SubtreePrefix, Option>, bool, Vec); + pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; #[cfg(feature = "full")] @@ -22,27 +26,43 @@ impl GroveDb { MultiStateSyncSession::new(self.start_transaction(), app_hash) } - pub fn commit_session(&self, session: Pin>) { - // we do not care about the cost - let _ = self.commit_transaction(session.into_transaction()); + pub fn commit_session(&self, session: Pin>) -> Result<(), Error> { + match self.commit_transaction(session.into_transaction()).value { + Ok(_) => Ok(()), + Err(e) => { + // Log the error or handle it as needed + eprintln!("Failed to commit session: {:?}", e); + Err(e) + } + } } - // Fetch a chunk by global chunk id (should be called by ABCI when - // LoadSnapshotChunk method is called) Params: - // global_chunk_id: Global chunk id in the following format: - // [SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID] - // SUBTREE_PREFIX: 32 bytes (mandatory) (All zeros = Root subtree) - // SIZE_ROOT_KEY: 1 byte: Size of ROOT_KEY in bytes - // ROOT_KEY: SIZE_ROOT_KEY bytes (optional) - // IS_SUM_TREE: 1 byte (mandatory) marks if the tree is a sum tree or not - // CHUNK_ID: 0.. bytes (optional) Traversal instructions to - // the root of the given chunk. Traversal instructions are "1" for left, and - // "0" for right. TODO: Compact CHUNK_ID into bitset for size optimization - // as a subtree can be big hence traversal instructions for the deepest chunks - // transaction: Transaction. Function returns the data by opening merks at given tx. - // version: Version of state sync protocol version - // grove_version: Version of groveDB - // Returns the Chunk proof operators for the requested chunk encoded in bytes + /// Fetch a chunk by global chunk ID (should be called by ABCI when the + /// `LoadSnapshotChunk` method is invoked). + /// + /// # Parameters + /// - `global_chunk_id`: Global chunk ID in the following format: + /// `[SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID]` + /// - **SUBTREE_PREFIX**: 32 bytes (mandatory) - All zeros indicate the + /// Root subtree. + /// - **SIZE_ROOT_KEY**: 1 byte - Size of `ROOT_KEY` in bytes. + /// - **ROOT_KEY**: `SIZE_ROOT_KEY` bytes (optional). + /// - **IS_SUM_TREE**: 1 byte (mandatory) - Marks if the tree is a sum + /// tree or not. + /// - **CHUNK_ID**: 0 or more bytes (optional) - Traversal instructions to + /// the root of the given chunk. Traversal instructions are represented + /// as "1" for left and "0" for right. + /// - TODO: Compact `CHUNK_ID` into a bitset for size optimization as a + /// subtree can be large, and traversal instructions for the deepest + /// chunks could consume significant space. + /// + /// - `transaction`: The transaction used to fetch the chunk. + /// - `version`: The version of the state sync protocol. + /// - `grove_version`: The version of GroveDB. + /// + /// # Returns + /// Returns the chunk proof operators for the requested chunk, encoded as + /// bytes. pub fn fetch_chunk( &self, global_chunk_id: &[u8], @@ -63,75 +83,145 @@ impl GroveDb { let root_app_hash = self.root_hash(transaction, grove_version).value?; let (chunk_prefix, root_key, is_sum_tree, chunk_id) = - util_split_global_chunk_id(global_chunk_id, &root_app_hash)?; - - // TODO: Refactor this by writing fetch_chunk_inner (as only merk constructor and type are different) - match transaction { - None => { - let merk = self.open_non_transactional_merk_by_prefix(chunk_prefix, - root_key, - is_sum_tree, None, grove_version) - .value - .map_err(|e| Error::CorruptedData( - format!("failed to open merk by prefix non-tx:{} with:{}", e, hex::encode(chunk_prefix)), - ))?; - if merk.is_empty_tree().unwrap() { - return Ok(vec![]); - } - - let mut chunk_producer = ChunkProducer::new(&merk) - .map_err(|e| Error::CorruptedData( - format!("failed to create chunk producer by prefix non-tx:{} with:{}", hex::encode(chunk_prefix), e), - ))?; - let ((chunk,_)) = chunk_producer.chunk(&chunk_id, grove_version) - .map_err(|e| Error::CorruptedData( - format!("failed to apply chunk:{} with:{}", hex::encode(chunk_prefix), e), - ))?; - let op_bytes = util_encode_vec_ops(chunk) - .map_err(|e| Error::CorruptedData( - format!("failed to encode chunk ops:{} with:{}", hex::encode(chunk_prefix), e), - ))?; - Ok(op_bytes) + utils::decode_global_chunk_id(global_chunk_id, &root_app_hash)?; + + // TODO: Refactor this by writing fetch_chunk_inner (as only merk constructor + // and type are different) + if let Some(tx) = transaction { + let merk = self + .open_transactional_merk_by_prefix( + chunk_prefix, + root_key, + is_sum_tree, + tx, + None, + grove_version, + ) + .value + .map_err(|e| { + Error::CorruptedData(format!( + "failed to open merk by prefix tx:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + if merk.is_empty_tree().unwrap() { + return Ok(vec![]); } - Some(tx) => { - let merk = self.open_transactional_merk_by_prefix(chunk_prefix, - root_key, - is_sum_tree, tx, None, grove_version) - .value - .map_err(|e| Error::CorruptedData( - format!("failed to open merk by prefix tx:{} with:{}", hex::encode(chunk_prefix), e), - ))?; - if merk.is_empty_tree().unwrap() { - return Ok(vec![]); - } - let mut chunk_producer = ChunkProducer::new(&merk) - .map_err(|e| Error::CorruptedData( - format!("failed to create chunk producer by prefix tx:{} with:{}", hex::encode(chunk_prefix), e), - ))?; - let ((chunk,_)) = chunk_producer.chunk(&chunk_id, grove_version) - .map_err(|e| Error::CorruptedData( - format!("failed to apply chunk:{} with:{}", hex::encode(chunk_prefix), e), - ))?; - let op_bytes = util_encode_vec_ops(chunk) - .map_err(|e| Error::CorruptedData( - format!("failed to encode chunk ops:{} with:{}", hex::encode(chunk_prefix), e), - ))?; - Ok(op_bytes) + let mut chunk_producer = ChunkProducer::new(&merk).map_err(|e| { + Error::CorruptedData(format!( + "failed to create chunk producer by prefix tx:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + let (chunk, _) = chunk_producer + .chunk(&chunk_id, grove_version) + .map_err(|e| { + Error::CorruptedData(format!( + "failed to apply chunk:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + let op_bytes = utils::encode_vec_ops(chunk).map_err(|e| { + Error::CorruptedData(format!( + "failed to encode chunk ops:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + Ok(op_bytes) + } else { + let merk = self + .open_non_transactional_merk_by_prefix( + chunk_prefix, + root_key, + is_sum_tree, + None, + grove_version, + ) + .value + .map_err(|e| { + Error::CorruptedData(format!( + "failed to open merk by prefix non-tx:{} with:{}", + e, + hex::encode(chunk_prefix) + )) + })?; + if merk.is_empty_tree().unwrap() { + return Ok(vec![]); } + + let mut chunk_producer = ChunkProducer::new(&merk).map_err(|e| { + Error::CorruptedData(format!( + "failed to create chunk producer by prefix non-tx:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + let (chunk, _) = chunk_producer + .chunk(&chunk_id, grove_version) + .map_err(|e| { + Error::CorruptedData(format!( + "failed to apply chunk:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + let op_bytes = utils::encode_vec_ops(chunk).map_err(|e| { + Error::CorruptedData(format!( + "failed to encode chunk ops:{} with:{}", + hex::encode(chunk_prefix), + e + )) + })?; + Ok(op_bytes) } } - /// Starts a state sync process of a snapshot with `app_hash` root hash, - /// should be called by ABCI when OfferSnapshot method is called. - /// Returns the first set of global chunk ids that can be fetched from - /// sources and a new sync session. - pub fn start_snapshot_syncing<'db>( - &'db self, + /// Starts a state synchronization process for a snapshot with the given + /// `app_hash` root hash. This method should be called by ABCI when the + /// `OfferSnapshot` method is invoked. + /// + /// # Parameters + /// - `app_hash`: The root hash of the application state to synchronize. + /// - `version`: The version of the state sync protocol to use. + /// - `grove_version`: The version of GroveDB being used. + /// + /// # Returns + /// - `Ok(Pin>)`: A pinned, boxed + /// `MultiStateSyncSession` representing the new sync session. This + /// session allows for managing the synchronization process. + /// - `Err(Error)`: An error indicating why the state sync process could not + /// be started. + /// + /// # Behavior + /// - Initiates the state synchronization process by preparing the necessary + /// data and resources. + /// - Returns the first set of global chunk IDs that can be fetched from + /// available sources. + /// - A new sync session is created and managed internally, facilitating + /// further synchronization. + /// + /// # Usage + /// This method is typically called as part of the ABCI `OfferSnapshot` + /// workflow when a new snapshot synchronization process is required to + /// bring the application state up to date. + /// + /// # Notes + /// - The returned `MultiStateSyncSession` is pinned because its lifetime + /// may depend on asynchronous operations or other system resources that + /// require it to remain immovable in memory. + /// - Ensure that `app_hash` corresponds to a valid snapshot to avoid + /// errors. + pub fn start_snapshot_syncing( + &self, app_hash: CryptoHash, version: u16, grove_version: &GroveVersion, - ) -> Result>>, Error> { + ) -> Result>, Error> { check_grovedb_v0!( "start_snapshot_syncing", grove_version @@ -146,141 +236,217 @@ impl GroveDb { )); } - println!(" starting:{:?}...", util_path_to_string(&[])); + println!(" starting:{:?}...", utils::path_to_string(&[])); let root_prefix = [0u8; 32]; let mut session = self.start_syncing_session(app_hash); - session.add_subtree_sync_info(self, SubtreePath::empty(), app_hash, None, root_prefix, grove_version)?; + session.add_subtree_sync_info( + self, + SubtreePath::empty(), + app_hash, + None, + root_prefix, + grove_version, + )?; Ok(session) } } -// Converts a path into a human-readable string (for debugging) -pub fn util_path_to_string(path: &[Vec]) -> Vec { - let mut subtree_path_str: Vec = vec![]; - for subtree in path { - let string = std::str::from_utf8(&subtree).unwrap_or_else(|_| ""); - subtree_path_str.push( - string.to_string() - ); +pub(crate) mod utils { + use grovedb_merk::{ + ed::Encode, + proofs::{Decoder, Op}, + }; + + use crate::{replication::ChunkIdentifier, Error}; + + /// Converts a path, represented as a slice of byte vectors (`&[Vec]`), + /// into a human-readable string representation for debugging purposes. + /// + /// # Parameters + /// - `path`: A slice of byte vectors where each vector represents a segment + /// of the path. + /// + /// # Returns + /// - `Vec`: A vector of strings where each string is a + /// human-readable representation of a corresponding segment in the input + /// path. If a segment contains invalid UTF-8, it is replaced with the + /// placeholder string `""`. + /// + /// # Behavior + /// - Each byte vector in the path is interpreted as a UTF-8 string. If the + /// conversion fails, the placeholder `""` is used instead. + /// - This function is primarily intended for debugging and logging. + /// + /// # Notes + /// - This function does not handle or normalize paths; it only provides a + /// human-readable representation. + /// - Be cautious when using this for paths that might contain sensitive + /// data, as the output could be logged. + pub fn path_to_string(path: &[Vec]) -> Vec { + let mut subtree_path_str: Vec = vec![]; + for subtree in path { + let string = std::str::from_utf8(subtree).unwrap_or(""); + subtree_path_str.push(string.to_string()); + } + subtree_path_str } - subtree_path_str -} -// Splits the given global chunk id into [SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID] -pub fn util_split_global_chunk_id( - global_chunk_id: &[u8], - app_hash: &[u8], -) -> Result<(crate::SubtreePrefix, Option>, bool, Vec), Error> { - let chunk_prefix_length: usize = 32; - if global_chunk_id.len() < chunk_prefix_length { - return Err(Error::CorruptedData( - "expected global chunk id of at least 32 length".to_string(), - )); - } + /// Decodes a given global chunk ID into its components: + /// `[SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID]`. + /// + /// # Parameters + /// - `global_chunk_id`: A byte slice representing the global chunk ID to + /// decode. + /// - `app_hash`: The application hash, which may be required for validation + /// or context. + /// + /// # Returns + /// - `Ok(ChunkIdentifier)`: A tuple containing the decoded components: + /// - `SUBTREE_PREFIX`: A 32-byte prefix of the subtree. + /// - `SIZE_ROOT_KEY`: Size of the root key (derived from `ROOT_KEY` + /// length). + /// - `ROOT_KEY`: Optional root key as a byte vector. + /// - `IS_SUM_TREE`: A boolean indicating whether the tree is a sum tree. + /// - `CHUNK_ID`: Traversal instructions as a byte vector. + /// - `Err(Error)`: An error if the global chunk ID could not be decoded. + pub fn decode_global_chunk_id( + global_chunk_id: &[u8], + app_hash: &[u8], + ) -> Result { + let chunk_prefix_length: usize = 32; + if global_chunk_id.len() < chunk_prefix_length { + return Err(Error::CorruptedData( + "expected global chunk id of at least 32 length".to_string(), + )); + } - if global_chunk_id == app_hash { - let root_chunk_prefix_key: crate::SubtreePrefix = [0u8; 32]; - return Ok((root_chunk_prefix_key, None, false, vec![])); - } + if global_chunk_id == app_hash { + let root_chunk_prefix_key: crate::SubtreePrefix = [0u8; 32]; + return Ok((root_chunk_prefix_key, None, false, vec![])); + } - let (chunk_prefix_key, remaining) = global_chunk_id.split_at(chunk_prefix_length); + let (chunk_prefix_key, remaining) = global_chunk_id.split_at(chunk_prefix_length); - let root_key_size_length: usize = 1; - if remaining.len() < root_key_size_length { - return Err(Error::CorruptedData( - "unable to decode root key size".to_string(), - )); - } - let (root_key_size, remaining) = remaining.split_at(root_key_size_length); - if remaining.len() < root_key_size[0] as usize { - return Err(Error::CorruptedData( - "unable to decode root key".to_string(), - )); - } - let (root_key, remaining) = remaining.split_at(root_key_size[0] as usize); - let is_sum_tree_length: usize = 1; - if remaining.len() < is_sum_tree_length { - return Err(Error::CorruptedData( - "unable to decode root key".to_string(), - )); + let root_key_size_length: usize = 1; + if remaining.len() < root_key_size_length { + return Err(Error::CorruptedData( + "unable to decode root key size".to_string(), + )); + } + let (root_key_size, remaining) = remaining.split_at(root_key_size_length); + if remaining.len() < root_key_size[0] as usize { + return Err(Error::CorruptedData( + "unable to decode root key".to_string(), + )); + } + let (root_key, remaining) = remaining.split_at(root_key_size[0] as usize); + let is_sum_tree_length: usize = 1; + if remaining.len() < is_sum_tree_length { + return Err(Error::CorruptedData( + "unable to decode root key".to_string(), + )); + } + let (is_sum_tree, chunk_id) = remaining.split_at(is_sum_tree_length); + + let subtree_prefix: crate::SubtreePrefix = chunk_prefix_key + .try_into() + .map_err(|_| Error::CorruptedData("unable to construct subtree".to_string()))?; + + if !root_key.is_empty() { + Ok(( + subtree_prefix, + Some(root_key.to_vec()), + is_sum_tree[0] != 0, + chunk_id.to_vec(), + )) + } else { + Ok((subtree_prefix, None, is_sum_tree[0] != 0, chunk_id.to_vec())) + } } - let (is_sum_tree, chunk_id) = remaining.split_at(is_sum_tree_length); - let subtree_prefix: crate::SubtreePrefix = chunk_prefix_key.try_into() - .map_err(|_| { - Error::CorruptedData( - "unable to construct subtree".to_string(), - ) - })?; + /// Encodes the given components into a global chunk ID in the format: + /// `[SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID]`. + /// + /// # Parameters + /// - `subtree_prefix`: A 32-byte array representing the prefix of the + /// subtree. + /// - `root_key_opt`: An optional root key as a byte vector. + /// - `is_sum_tree`: A boolean indicating whether the tree is a sum tree. + /// - `chunk_id`: A byte vector representing the traversal instructions. + /// + /// # Returns + /// - A `Vec` containing the encoded global chunk ID. + pub fn encode_global_chunk_id( + subtree_prefix: [u8; blake3::OUT_LEN], + root_key_opt: Option>, + is_sum_tree: bool, + chunk_id: Vec, + ) -> Vec { + let mut res = vec![]; + + res.extend(subtree_prefix); + + if let Some(root_key) = root_key_opt { + res.push(root_key.len() as u8); + res.extend(root_key); + } else { + res.push(0u8); + } - if !root_key.is_empty() { - Ok((subtree_prefix, Some(root_key.to_vec()), is_sum_tree[0] != 0, chunk_id.to_vec())) - } - else { - Ok((subtree_prefix, None, is_sum_tree[0] != 0, chunk_id.to_vec())) - } -} + let mut is_sum_tree_v = 0u8; + if is_sum_tree { + is_sum_tree_v = 1u8; + } + res.push(is_sum_tree_v); -// Create the given global chunk id into [SUBTREE_PREFIX:SIZE_ROOT_KEY:ROOT_KEY:IS_SUM_TREE:CHUNK_ID] -pub fn util_create_global_chunk_id( - subtree_prefix: [u8; blake3::OUT_LEN], - root_key_opt: Option>, - is_sum_tree:bool, - chunk_id: Vec -) -> (Vec){ - let mut res = vec![]; - - res.extend(subtree_prefix); - - let mut root_key_len = 0u8; - let mut root_key_vec = vec![]; - if let Some(root_key) = root_key_opt { - res.push(root_key.len() as u8); - res.extend(root_key.clone()); - root_key_len = root_key.len() as u8; - root_key_vec = root_key; - } - else { - res.push(0u8); - } + res.extend(chunk_id.to_vec()); - let mut is_sum_tree_v = 0u8; - if is_sum_tree { - is_sum_tree_v = 1u8; + res } - res.push(is_sum_tree_v); - - res.extend(chunk_id.to_vec()); - - res -} -pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { - let mut res = vec![]; - for op in chunk { - op.encode_into(&mut res) - .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; + /// Encodes a vector of operations (`Vec`) into a byte vector. + /// + /// # Parameters + /// - `chunk`: A vector of `Op` operations to be encoded. + /// + /// # Returns + /// - `Ok(Vec)`: A byte vector representing the encoded operations. + /// - `Err(Error)`: An error if the encoding process fails. + pub fn encode_vec_ops(chunk: Vec) -> Result, Error> { + let mut res = vec![]; + for op in chunk { + op.encode_into(&mut res) + .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; + } + Ok(res) } - Ok(res) -} -pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { - let decoder = Decoder::new(&chunk); - let mut res = vec![]; - for op in decoder { - match op { - Ok(op) => res.push(op), - Err(e) => { - return Err(Error::CorruptedData(format!( - "unable to decode chunk: {}", - e - ))); + /// Decodes a byte vector into a vector of operations (`Vec`). + /// + /// # Parameters + /// - `chunk`: A byte vector representing encoded operations. + /// + /// # Returns + /// - `Ok(Vec)`: A vector of decoded `Op` operations. + /// - `Err(Error)`: An error if the decoding process fails. + pub fn decode_vec_ops(chunk: Vec) -> Result, Error> { + let decoder = Decoder::new(&chunk); + let mut res = vec![]; + for op in decoder { + match op { + Ok(op) => res.push(op), + Err(e) => { + return Err(Error::CorruptedData(format!( + "unable to decode chunk: {}", + e + ))); + } } } + Ok(res) } - Ok(res) -} \ No newline at end of file +} diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs index 1545cca6..439ffc3b 100644 --- a/grovedb/src/replication/state_sync_session.rs +++ b/grovedb/src/replication/state_sync_session.rs @@ -5,48 +5,86 @@ use std::{ pin::Pin, }; -use grovedb_merk::{CryptoHash, Restorer}; -use grovedb_merk::tree::kv::ValueDefinedCostType; -use grovedb_merk::tree::value_hash; +use grovedb_merk::{ + tree::{kv::ValueDefinedCostType, value_hash}, + CryptoHash, Restorer, +}; use grovedb_path::SubtreePath; -use grovedb_storage::rocksdb_storage::{PrefixedRocksDbImmediateStorageContext, RocksDbStorage}; -use grovedb_storage::StorageContext; +use grovedb_storage::{ + rocksdb_storage::{PrefixedRocksDbImmediateStorageContext, RocksDbStorage}, + StorageContext, +}; use grovedb_version::version::GroveVersion; -use super::{util_decode_vec_ops, CURRENT_STATE_SYNC_VERSION, util_create_global_chunk_id}; -use crate::{replication::util_path_to_string, Error, GroveDb, Transaction, replication, Element}; -pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; +use super::{ + utils::{decode_vec_ops, encode_global_chunk_id, path_to_string}, + CURRENT_STATE_SYNC_VERSION, +}; +use crate::{replication, Element, Error, GroveDb, Transaction}; -pub(crate) type SubtreeMetadata = (SubtreePrefix, Vec>, CryptoHash, CryptoHash); +pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; +/// Struct governing the state synchronization of one subtree. struct SubtreeStateSyncInfo<'db> { /// Current Chunk restorer restorer: Restorer>, + /// Set of global chunk ids requested to be fetched and pending for /// processing. For the description of global chunk id check /// fetch_chunk(). pending_chunks: BTreeSet>, + /// Tree root key root_key: Option>, + /// Is Sum tree? is_sum_tree: bool, + /// Path of current tree current_path: Vec>, + /// Number of processed chunks in current prefix (Path digest) num_processed_chunks: usize, } -impl<'db> SubtreeStateSyncInfo<'db> { +impl SubtreeStateSyncInfo<'_> { pub fn get_current_path(&self) -> Vec> { self.current_path.clone() } - // Apply a chunk using the given SubtreeStateSyncInfo - // state_sync_info: Consumed SubtreeStateSyncInfo - // chunk_id: Local chunk id - // chunk_data: Chunk proof operators encoded in bytes - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the SubtreeStateSyncInfo transferring ownership back to the caller) + /// Applies a chunk using the given `SubtreeStateSyncInfo`. + /// + /// # Parameters + /// - `chunk_id`: A byte slice representing the local chunk ID to be + /// applied. + /// - `chunk_data`: A vector of bytes containing the chunk proof operators, + /// encoded as bytes. + /// - `grove_version`: A reference to the `GroveVersion` being used for + /// synchronization. + /// + /// # Returns + /// - `Ok(Vec>)`: A vector of global chunk IDs (each represented as + /// a vector of bytes) that can be fetched from sources for further + /// synchronization. Ownership of the `SubtreeStateSyncInfo` is + /// transferred back to the caller. + /// - `Err(Error)`: An error if the chunk cannot be applied. + /// + /// # Behavior + /// - The function consumes the provided `SubtreeStateSyncInfo` to apply the + /// given chunk. + /// - Once the chunk is applied, the function calculates and returns the + /// next set of global chunk IDs required for further state + /// synchronization. + /// + /// # Usage + /// This function is called as part of the state sync process to apply + /// received chunks and advance the synchronization state. + /// + /// # Notes + /// - Ensure that the `chunk_data` is correctly encoded and matches the + /// expected format. + /// - The function modifies the state of the synchronization process, so it + /// must be used carefully to maintain correctness. fn apply_inner_chunk( &mut self, chunk_id: &[u8], @@ -62,7 +100,7 @@ impl<'db> SubtreeStateSyncInfo<'db> { } self.pending_chunks.remove(chunk_id); if !chunk_data.is_empty() { - match util_decode_vec_ops(chunk_data) { + match decode_vec_ops(chunk_data) { Ok(ops) => { match self.restorer.process_chunk(chunk_id, ops, grove_version) { Ok(next_chunk_ids) => { @@ -73,7 +111,9 @@ impl<'db> SubtreeStateSyncInfo<'db> { } } _ => { - return Err(Error::InternalError("Unable to process incoming chunk".to_string())); + return Err(Error::InternalError( + "Unable to process incoming chunk".to_string(), + )); } }; } @@ -102,19 +142,28 @@ impl<'tx> SubtreeStateSyncInfo<'tx> { } } -// Struct governing state sync +/// Struct governing the state synchronization process. pub struct MultiStateSyncSession<'db> { - // Map of current processing subtrees - // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo + /// Map of currently processing subtrees. + /// Keys are `SubtreePrefix` (path digests), and values are + /// `SubtreeStateSyncInfo` for each subtree. current_prefixes: BTreeMap>, - // Set of processed prefixes (Path digests) + + /// Set of processed prefixes, represented as `SubtreePrefix` (path + /// digests). processed_prefixes: BTreeSet, - // Root app_hash + + /// Root application hash (`app_hash`). app_hash: [u8; 32], - // Version of state sync protocol, + + /// Version of the state synchronization protocol. pub(crate) version: u16, - // Transaction goes last to be dropped last as well + + /// Transaction used for the synchronization process. + /// This is placed last to ensure it is dropped last. transaction: Transaction<'db>, + + /// Marker to ensure this struct is not moved in memory. _pin: PhantomPinned, } @@ -159,17 +208,15 @@ impl<'db> MultiStateSyncSession<'db> { actual_hash: Option, chunk_prefix: [u8; 32], grove_version: &GroveVersion, - ) -> Result<(Vec), Error> { - // SAFETY: we get an immutable reference of a transaction that stays behind - // `Pin` so this reference shall remain valid for the whole session - // object lifetime. + ) -> Result, Error> { let transaction_ref: &'db Transaction<'db> = unsafe { - let tx: &mut Transaction<'db> = - &mut Pin::into_inner_unchecked(self.as_mut()).transaction; - &*(tx as *mut _) + let tx: &Transaction<'db> = &self.as_ref().transaction; + &*(tx as *const _) }; - if let Ok((merk, root_key, is_sum_tree)) = db.open_merk_for_replication(path.clone(), transaction_ref, grove_version) { + if let Ok((merk, root_key, is_sum_tree)) = + db.open_merk_for_replication(path.clone(), transaction_ref, grove_version) + { let restorer = Restorer::new(merk, hash, actual_hash); let mut sync_info = SubtreeStateSyncInfo::new(restorer); sync_info.pending_chunks.insert(vec![]); @@ -179,9 +226,16 @@ impl<'db> MultiStateSyncSession<'db> { self.as_mut() .current_prefixes() .insert(chunk_prefix, sync_info); - Ok((util_create_global_chunk_id(chunk_prefix, root_key, is_sum_tree, vec![]))) + Ok(encode_global_chunk_id( + chunk_prefix, + root_key, + is_sum_tree, + vec![], + )) } else { - Err(Error::InternalError("Unable to open merk for replication".to_string())) + Err(Error::InternalError( + "Unable to open merk for replication".to_string(), + )) } } @@ -201,9 +255,40 @@ impl<'db> MultiStateSyncSession<'db> { &mut unsafe { self.get_unchecked_mut() }.processed_prefixes } - /// Applies a chunk, shuold be called by ABCI when `ApplySnapshotChunk` - /// method is called. `chunk` is a pair of global chunk id and an - /// encoded proof. + /// Applies a chunk during the state synchronization process. + /// This method should be called by ABCI when the `ApplySnapshotChunk` + /// method is invoked. + /// + /// # Parameters + /// - `self`: A pinned mutable reference to the `MultiStateSyncSession`. + /// - `db`: A reference to the `GroveDb` instance used for synchronization. + /// - `global_chunk_id`: A byte slice representing the global chunk ID being + /// applied. + /// - `chunk`: A vector of bytes containing the encoded proof for the chunk. + /// - `version`: The state synchronization protocol version being used. + /// - `grove_version`: A reference to the `GroveVersion` specifying the + /// GroveDB version. + /// + /// # Returns + /// - `Ok(Vec>)`: A vector of global chunk IDs (each represented as + /// a vector of bytes) that can be fetched from sources for further + /// synchronization. + /// - `Err(Error)`: An error if the chunk application fails or if the chunk + /// proof is invalid. + /// + /// # Behavior + /// - This method applies the given chunk using the provided + /// `global_chunk_id` and its corresponding proof data (`chunk`). + /// - Once the chunk is applied successfully, it calculates and returns the + /// next set of global chunk IDs required for further synchronization. + /// + /// # Notes + /// - Ensure the `chunk` is correctly encoded and matches the expected proof + /// format. + /// - This function modifies the state of the synchronization session, so it + /// must be used carefully to maintain correctness and avoid errors. + /// - The pinned `self` ensures that the session cannot be moved in memory, + /// preserving consistency during the synchronization process. pub fn apply_chunk( self: &mut Pin>>, db: &'db GroveDb, @@ -226,15 +311,20 @@ impl<'db> MultiStateSyncSession<'db> { let mut next_chunk_ids = vec![]; - let (chunk_prefix, _, _, chunk_id) = replication::util_split_global_chunk_id(global_chunk_id, &self.app_hash)?; + let (chunk_prefix, _, _, chunk_id) = + replication::utils::decode_global_chunk_id(global_chunk_id, &self.app_hash)?; if self.is_empty() { - return Err(Error::InternalError("GroveDB is not in syncing mode".to_string())); + return Err(Error::InternalError( + "GroveDB is not in syncing mode".to_string(), + )); } let current_prefixes = self.as_mut().current_prefixes(); let Some(subtree_state_sync) = current_prefixes.get_mut(&chunk_prefix) else { - return Err(Error::InternalError("Unable to process incoming chunk".to_string())); + return Err(Error::InternalError( + "Unable to process incoming chunk".to_string(), + )); }; let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk, grove_version) else { return Err(Error::InternalError("Invalid incoming prefix".to_string())); @@ -242,7 +332,12 @@ impl<'db> MultiStateSyncSession<'db> { if !res.is_empty() { for local_chunk_id in res.iter() { - next_chunk_ids.push(util_create_global_chunk_id(chunk_prefix, subtree_state_sync.root_key.clone(), subtree_state_sync.is_sum_tree.clone(), local_chunk_id.clone())); + next_chunk_ids.push(encode_global_chunk_id( + chunk_prefix, + subtree_state_sync.root_key.clone(), + subtree_state_sync.is_sum_tree, + local_chunk_id.clone(), + )); } Ok(next_chunk_ids) @@ -272,18 +367,56 @@ impl<'db> MultiStateSyncSession<'db> { self.as_mut().processed_prefixes().insert(chunk_prefix); - println!(" finished tree: {:?}", util_path_to_string(completed_path.as_slice())); - let new_subtrees_metadata = self.discover_new_subtrees_metadata(db, completed_path.to_vec(), grove_version)?; + println!( + " finished tree: {:?}", + path_to_string(completed_path.as_slice()) + ); + let new_subtrees_metadata = + self.discover_new_subtrees_metadata(db, completed_path.to_vec(), grove_version)?; - if let Ok(res) = self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) { + if let Ok(res) = + self.prepare_sync_state_sessions(db, new_subtrees_metadata, grove_version) + { next_chunk_ids.extend(res); Ok(next_chunk_ids) } else { - Err(Error::InternalError("Unable to discover Subtrees".to_string())) + Err(Error::InternalError( + "Unable to discover Subtrees".to_string(), + )) } } } + /// Discovers new subtrees at the given path that need to be synchronized. + /// + /// # Parameters + /// - `self`: A pinned mutable reference to the `MultiStateSyncSession`. + /// - `db`: A reference to the `GroveDb` instance being used for + /// synchronization. + /// - `path_vec`: A vector of byte vectors representing the path where + /// subtrees should be discovered. + /// - `grove_version`: A reference to the `GroveVersion` specifying the + /// GroveDB version. + /// + /// # Returns + /// - `Ok(SubtreesMetadata)`: Metadata about the discovered subtrees, + /// including information necessary for their synchronization. + /// - `Err(Error)`: An error if the discovery process fails. + /// + /// # Behavior + /// - This function traverses the specified `path_vec` in the database and + /// identifies subtrees that are not yet synchronized. + /// - Returns metadata about these subtrees, which can be used to initiate + /// or manage the synchronization process. + /// + /// # Notes + /// - The `path_vec` should represent a valid path in the GroveDB where + /// subtrees are expected to exist. + /// - Ensure that the GroveDB instance (`db`) and Grove version + /// (`grove_version`) are compatible and up-to-date to avoid errors during + /// discovery. + /// - The function modifies the state of the synchronization session, so it + /// should be used carefully to maintain session integrity. fn discover_new_subtrees_metadata( self: &mut Pin>>, db: &'db GroveDb, @@ -291,17 +424,15 @@ impl<'db> MultiStateSyncSession<'db> { grove_version: &GroveVersion, ) -> Result { let transaction_ref: &'db Transaction<'db> = unsafe { - let tx: &mut Transaction<'db> = - &mut Pin::into_inner_unchecked(self.as_mut()).transaction; - &*(tx as *mut _) + let tx: &Transaction<'db> = &self.as_ref().transaction; + &*(tx as *const _) }; let subtree_path: Vec<&[u8]> = path_vec.iter().map(|vec| vec.as_slice()).collect(); let path: &[&[u8]] = &subtree_path; - let merk = db.open_transactional_merk_at_path(path.into(), transaction_ref, None, grove_version) + let merk = db + .open_transactional_merk_at_path(path.into(), transaction_ref, None, grove_version) .value - .map_err(|e| Error::CorruptedData( - format!("failed to open merk by path-tx:{}", e), - ))?; + .map_err(|e| Error::CorruptedData(format!("failed to open merk by path-tx:{}", e)))?; if merk.is_empty_tree().unwrap() { return Ok(SubtreesMetadata::default()); } @@ -333,7 +464,11 @@ impl<'db> MultiStateSyncSession<'db> { let path: &[&[u8]] = &subtree_path; let prefix = RocksDbStorage::build_prefix(path.as_ref().into()).unwrap(); - println!(" discovered {:?} prefix:{}", util_path_to_string(&new_path), hex::encode(prefix)); + println!( + " discovered {:?} prefix:{}", + path_to_string(&new_path), + hex::encode(prefix) + ); subtrees_metadata.data.insert( prefix, @@ -342,11 +477,42 @@ impl<'db> MultiStateSyncSession<'db> { } } - Ok((subtrees_metadata)) + Ok(subtrees_metadata) } - /// Prepares sync session for the freshly discovered subtrees and returns - /// global chunk ids of those new subtrees. + /// Prepares a synchronization session for the newly discovered subtrees and + /// returns the global chunk IDs of those subtrees. + /// + /// # Parameters + /// - `self`: A pinned mutable reference to the `MultiStateSyncSession`. + /// - `db`: A reference to the `GroveDb` instance used for managing the + /// synchronization process. + /// - `subtrees_metadata`: Metadata about the discovered subtrees that + /// require synchronization. + /// - `grove_version`: A reference to the `GroveVersion` specifying the + /// GroveDB version. + /// + /// # Returns + /// - `Ok(Vec>)`: A vector of global chunk IDs (each represented as + /// a vector of bytes) corresponding to the newly discovered subtrees. + /// These IDs can be fetched from sources to continue the synchronization + /// process. + /// - `Err(Error)`: An error if the synchronization session could not be + /// prepared or if processing the metadata fails. + /// + /// # Behavior + /// - Initializes the synchronization state for the newly discovered + /// subtrees based on the provided metadata. + /// - Calculates and returns the global chunk IDs of these subtrees, + /// enabling further state synchronization. + /// + /// # Notes + /// - Ensure that the `subtrees_metadata` accurately reflects the subtrees + /// requiring synchronization. + /// - This function modifies the state of the synchronization session to + /// include the new subtrees. + /// - Proper handling of the returned global chunk IDs is essential to + /// ensure seamless state synchronization. fn prepare_sync_state_sessions( self: &mut Pin>>, db: &'db GroveDb, @@ -366,16 +532,16 @@ impl<'db> MultiStateSyncSession<'db> { let path: &[&[u8]] = &subtree_path; println!( " path:{:?} starting...", - util_path_to_string(&prefix_metadata.0) + path_to_string(&prefix_metadata.0) ); let next_chunks_ids = self.add_subtree_sync_info( db, path.into(), - elem_value_hash.clone(), - Some(actual_value_hash.clone()), - prefix.clone(), - grove_version + *elem_value_hash, + Some(*actual_value_hash), + *prefix, + grove_version, )?; res.push(next_chunks_ids); @@ -386,11 +552,20 @@ impl<'db> MultiStateSyncSession<'db> { } } -// Struct containing information about current subtrees found in GroveDB +/// Struct containing metadata about the current subtrees found in GroveDB. +/// This metadata is used during the state synchronization process to track +/// discovered subtrees and verify their integrity after they are constructed. pub struct SubtreesMetadata { - // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent - // Subtree elem_value_hash) Note: Parent Subtree actual_value_hash, Parent Subtree - // elem_value_hash are needed when verifying the new constructed subtree after wards. + /// A map where: + /// - **Key**: `SubtreePrefix` (the path digest of the subtree). + /// - **Value**: A tuple containing: + /// - `Vec>`: The actual path of the subtree in GroveDB. + /// - `CryptoHash`: The parent subtree's actual value hash. + /// - `CryptoHash`: The parent subtree's element value hash. + /// + /// The `parent subtree actual_value_hash` and `parent subtree + /// elem_value_hash` are required to verify the integrity of the newly + /// constructed subtree after synchronization. pub data: BTreeMap>, CryptoHash, CryptoHash)>, } @@ -412,7 +587,7 @@ impl fmt::Debug for SubtreesMetadata { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { for (prefix, metadata) in self.data.iter() { let metadata_path = &metadata.0; - let metadata_path_str = util_path_to_string(metadata_path); + let metadata_path_str = path_to_string(metadata_path); writeln!( f, " prefix:{:?} -> path:{:?}", @@ -422,4 +597,4 @@ impl fmt::Debug for SubtreesMetadata { } Ok(()) } -} \ No newline at end of file +} diff --git a/storage/src/rocksdb_storage/storage.rs b/storage/src/rocksdb_storage/storage.rs index d1833707..44510694 100644 --- a/storage/src/rocksdb_storage/storage.rs +++ b/storage/src/rocksdb_storage/storage.rs @@ -476,9 +476,9 @@ impl<'db> Storage<'db> for RocksDbStorage { &'db self, prefix: SubtreePrefix, batch: Option<&'db StorageBatch>, - ) -> CostContext - { - PrefixedRocksDbStorageContext::new(&self.db, prefix, batch).wrap_with_cost(OperationCost::default()) + ) -> CostContext { + PrefixedRocksDbStorageContext::new(&self.db, prefix, batch) + .wrap_with_cost(OperationCost::default()) } fn get_transactional_storage_context<'b, B>( @@ -500,9 +500,9 @@ impl<'db> Storage<'db> for RocksDbStorage { prefix: SubtreePrefix, batch: Option<&'db StorageBatch>, transaction: &'db Self::Transaction, - ) -> CostContext - { - PrefixedRocksDbTransactionContext::new(&self.db, transaction, prefix, batch).wrap_with_cost(OperationCost::default()) + ) -> CostContext { + PrefixedRocksDbTransactionContext::new(&self.db, transaction, prefix, batch) + .wrap_with_cost(OperationCost::default()) } fn get_immediate_storage_context<'b, B>( @@ -522,9 +522,9 @@ impl<'db> Storage<'db> for RocksDbStorage { &'db self, prefix: SubtreePrefix, transaction: &'db Self::Transaction, - ) -> CostContext - { - PrefixedRocksDbImmediateStorageContext::new(&self.db, transaction, prefix).wrap_with_cost(OperationCost::default()) + ) -> CostContext { + PrefixedRocksDbImmediateStorageContext::new(&self.db, transaction, prefix) + .wrap_with_cost(OperationCost::default()) } fn commit_multi_context_batch( diff --git a/storage/src/storage.rs b/storage/src/storage.rs index 019b0e48..2795cfc2 100644 --- a/storage/src/storage.rs +++ b/storage/src/storage.rs @@ -110,8 +110,8 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; - /// Make context for a subtree by prefix on transactional data, keeping all write - /// operations inside a `batch` if provided. + /// Make context for a subtree by prefix on transactional data, keeping all + /// write operations inside a `batch` if provided. fn get_transactional_storage_context_by_subtree_prefix( &'db self, prefix: SubtreePrefix, @@ -129,8 +129,8 @@ pub trait Storage<'db> { where B: AsRef<[u8]> + 'b; - /// Make context for a subtree by prefix on transactional data that will apply all - /// operations straight to the storage. + /// Make context for a subtree by prefix on transactional data that will + /// apply all operations straight to the storage. fn get_immediate_storage_context_by_subtree_prefix( &'db self, prefix: SubtreePrefix, diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index 96cb867f..74374c33 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -263,7 +263,7 @@ fn sync_db_demo( println!("num_chunks: {}", num_chunks); if session.is_sync_completed() { - target_db.commit_session(session); + target_db.commit_session(session).expect("failed to commit session"); } let elapsed = start_time.elapsed(); println!("state_synced in {:.2?}", elapsed); From dfcef7a4f590fc1cbf23bd868c247cf43fc04a9c Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Mon, 30 Dec 2024 15:00:16 +0200 Subject: [PATCH 6/7] refactor: better type alias --- grovedb/src/lib.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index 111a113f..16298cc8 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -240,6 +240,18 @@ pub type Transaction<'db> = >::Transaction; #[cfg(feature = "full")] pub type TransactionArg<'db, 'a> = Option<&'a Transaction<'db>>; +/// Type alias for the return type of the `verify_merk_and_submerks` and `verify_grovedb` functions. +/// It represents a mapping of paths (as vectors of vectors of bytes) to a tuple +/// of three cryptographic hashes: the root hash, the combined value hash, and the expected value hash. +type VerificationIssues = HashMap>, (CryptoHash, CryptoHash, CryptoHash)>; + +/// Type alias for the return type of the `open_merk_for_replication` function. +/// It represents a tuple containing: +/// - A `Merk` instance with a prefixed RocksDB immediate storage context. +/// - An optional `root_key`, represented as a vector of bytes. +/// - A boolean indicating whether the Merk is a sum tree. +type OpenedMerkForReplication<'tx> = (Merk>, Option>, bool); + #[cfg(feature = "full")] impl GroveDb { /// Opens a given path @@ -377,14 +389,7 @@ impl GroveDb { path: SubtreePath<'b, B>, tx: &'tx Transaction<'db>, grove_version: &GroveVersion, - ) -> Result< - ( - Merk>, - Option>, - bool, - ), - Error, - > + ) -> Result, Error> where B: AsRef<[u8]> + 'b, { @@ -1029,7 +1034,7 @@ impl GroveDb { verify_references: bool, allow_cache: bool, grove_version: &GroveVersion, - ) -> Result>, (CryptoHash, CryptoHash, CryptoHash)>, Error> { + ) -> Result { if let Some(transaction) = transaction { let root_merk = self .open_transactional_merk_at_path( @@ -1073,7 +1078,7 @@ impl GroveDb { verify_references: bool, allow_cache: bool, grove_version: &GroveVersion, - ) -> Result>, (CryptoHash, CryptoHash, CryptoHash)>, Error> { + ) -> Result { let mut all_query = Query::new(); all_query.insert_all(); @@ -1217,7 +1222,7 @@ impl GroveDb { verify_references: bool, allow_cache: bool, grove_version: &GroveVersion, - ) -> Result>, (CryptoHash, CryptoHash, CryptoHash)>, Error> { + ) -> Result { let mut all_query = Query::new(); all_query.insert_all(); From 5098940256b5bfc1bf214871d1158f6ee21a844c Mon Sep 17 00:00:00 2001 From: Odysseas Gabrielides Date: Mon, 30 Dec 2024 15:05:02 +0200 Subject: [PATCH 7/7] fix: build for verify feature --- grovedb/src/lib.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index 16298cc8..f3b2dcc4 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -240,9 +240,11 @@ pub type Transaction<'db> = >::Transaction; #[cfg(feature = "full")] pub type TransactionArg<'db, 'a> = Option<&'a Transaction<'db>>; -/// Type alias for the return type of the `verify_merk_and_submerks` and `verify_grovedb` functions. -/// It represents a mapping of paths (as vectors of vectors of bytes) to a tuple -/// of three cryptographic hashes: the root hash, the combined value hash, and the expected value hash. +/// Type alias for the return type of the `verify_merk_and_submerks` and +/// `verify_grovedb` functions. It represents a mapping of paths (as vectors of +/// vectors of bytes) to a tuple of three cryptographic hashes: the root hash, +/// the combined value hash, and the expected value hash. +#[cfg(feature = "full")] type VerificationIssues = HashMap>, (CryptoHash, CryptoHash, CryptoHash)>; /// Type alias for the return type of the `open_merk_for_replication` function. @@ -250,7 +252,12 @@ type VerificationIssues = HashMap>, (CryptoHash, CryptoHash, CryptoH /// - A `Merk` instance with a prefixed RocksDB immediate storage context. /// - An optional `root_key`, represented as a vector of bytes. /// - A boolean indicating whether the Merk is a sum tree. -type OpenedMerkForReplication<'tx> = (Merk>, Option>, bool); +#[cfg(feature = "full")] +type OpenedMerkForReplication<'tx> = ( + Merk>, + Option>, + bool, +); #[cfg(feature = "full")] impl GroveDb {