From cd475898fa275bcf46c448f7a15c272fba13c5e0 Mon Sep 17 00:00:00 2001 From: Evgeny Fomin Date: Thu, 7 Nov 2024 13:12:57 +0100 Subject: [PATCH] wip --- costs/src/context.rs | 5 +- grovedb/src/element/insert.rs | 50 ++++++++++- grovedb/src/element/mod.rs | 52 +++++++++++ grovedb/src/merk_cache.rs | 159 +++++++++++++++++++++++++--------- 4 files changed, 221 insertions(+), 45 deletions(-) diff --git a/costs/src/context.rs b/costs/src/context.rs index 374466ef..8224cb88 100644 --- a/costs/src/context.rs +++ b/costs/src/context.rs @@ -179,8 +179,9 @@ impl CostsExt for T {} /// 1. Early termination on error; /// 2. Because of 1, `Result` is removed from the equation; /// 3. `CostContext` is removed too because it is added to external cost -/// accumulator; 4. Early termination uses external cost accumulator so previous -/// costs won't be lost. +/// accumulator; +/// 4. Early termination uses external cost accumulator so previous costs won't +/// be lost. #[macro_export] macro_rules! cost_return_on_error { ( &mut $cost:ident, $($body:tt)+ ) => { diff --git a/grovedb/src/element/insert.rs b/grovedb/src/element/insert.rs index 432ff2f0..5c381e52 100644 --- a/grovedb/src/element/insert.rs +++ b/grovedb/src/element/insert.rs @@ -190,6 +190,38 @@ impl Element { } } + #[cfg(feature = "full")] + /// Promote `Element` to referenced variant in case the old one was already + /// referenced. + fn promote_to_referenced_variant(self, old_element: &mut Element) -> Result { + if let Some(refs) = old_element.take_backward_references() { + // Since variants with backward references are publicly available, we still have + // to address them, meaning filling in the actual information about references + // from the database by discarding user input. + + match self { + Element::Item(value, flags) + | Element::ItemWithBackwardsReferences(value, _, flags) => { + Ok(Element::ItemWithBackwardsReferences(value, refs, flags)) + } + Element::Reference(ref_path, max_hops, flags) + | Element::BidirectionalReference(ref_path, _, max_hops, flags) => Ok( + Element::BidirectionalReference(ref_path, refs, max_hops, flags), + ), + Element::SumItem(sum, flags) + | Element::SumItemWithBackwardsReferences(sum, _, flags) => { + Ok(Element::SumItemWithBackwardsReferences(sum, refs, flags)) + } + + Element::Tree(..) | Element::SumTree(..) => Err(Error::NotSupported( + "cannot insert subtree in place of a referenced item".to_owned(), + )), + } + } else { + Ok(self) + } + } + #[cfg(feature = "full")] /// Insert an element in Merk under a key if the value is different from /// what already exists; path should be resolved and proper Merk should @@ -198,8 +230,10 @@ impl Element { /// will be committed on the transaction commit. /// The bool represents if we indeed inserted. /// If the value changed we return the old element. + // TODO: a combo of `bool` and `Option::Some` in case `bool` equals + // to true could be covered just by `Option`. pub fn insert_if_changed_value<'db, S: StorageContext<'db>>( - &self, + self, merk: &mut Merk, key: &[u8], options: Option, @@ -214,18 +248,26 @@ impl Element { ); let mut cost = OperationCost::default(); - let previous_element = cost_return_on_error!( + let mut previous_element = cost_return_on_error!( &mut cost, Self::get_optional_from_storage(&merk.storage, key, grove_version) ); + let to_insert = if let Some(prev) = previous_element.as_mut() { + cost_return_on_error_no_add!(cost, self.promote_to_referenced_variant(prev)) + } else { + self + }; let needs_insert = match &previous_element { None => true, - Some(previous_element) => previous_element != self, + Some(previous_element) => !previous_element.eq_no_backreferences(&to_insert), }; if !needs_insert { Ok((false, None)).wrap_with_cost(cost) } else { - cost_return_on_error!(&mut cost, self.insert(merk, key, options, grove_version)); + cost_return_on_error!( + &mut cost, + to_insert.insert(merk, key, options, grove_version) + ); Ok((true, previous_element)).wrap_with_cost(cost) } } diff --git a/grovedb/src/element/mod.rs b/grovedb/src/element/mod.rs index 691a15a4..25ec964f 100644 --- a/grovedb/src/element/mod.rs +++ b/grovedb/src/element/mod.rs @@ -22,6 +22,7 @@ pub use query::QueryOptions; mod serialize; #[cfg(any(feature = "full", feature = "verify"))] use std::fmt; +use std::mem; use bincode::{Decode, Encode}; #[cfg(any(feature = "full", feature = "verify"))] @@ -306,6 +307,57 @@ impl Element { ); crate::value_hash(&bytes).map(Result::Ok) } + + /// Returns backward references if the `Element` in question participates in + /// bidirectional referencing machinery. + pub(crate) fn take_backward_references( + &mut self, + ) -> Option> { + match self { + Element::BidirectionalReference(_, refs, ..) + | Element::ItemWithBackwardsReferences(_, refs, ..) + | Element::SumItemWithBackwardsReferences(_, refs, ..) + if !refs.is_empty() => + { + Some(mem::take(refs)) + } + _ => None, + } + } + + /// Checks elements for equality ignoring backreferences part. + pub(crate) fn eq_no_backreferences(&self, other: &Self) -> bool { + use Element::*; + + match (self, other) { + ( + Item(value_left, flags_left) + | ItemWithBackwardsReferences(value_left, _, flags_left), + Item(value_right, flags_right) + | ItemWithBackwardsReferences(value_right, _, flags_right), + ) => value_left == value_right && flags_left == flags_right, + ( + SumItem(sum_left, flags_left) + | SumItemWithBackwardsReferences(sum_left, _, flags_left), + SumItem(sum_right, flags_right) + | SumItemWithBackwardsReferences(sum_right, _, flags_right), + ) => sum_left == sum_right && flags_left == flags_right, + ( + Reference(ref_path_left, max_hops_left, flags_left) + | BidirectionalReference(ref_path_left, _, max_hops_left, flags_left), + Reference(ref_path_right, max_hops_right, flags_right) + | BidirectionalReference(ref_path_right, _, max_hops_right, flags_right), + ) => { + ref_path_left == ref_path_right + && max_hops_left == max_hops_right + && flags_left == flags_right + } + (left @ Tree(..), right @ Tree(..)) => left == right, + (left @ SumTree(..), right @ SumTree(..)) => left == right, + + _ => false, + } + } } #[cfg(any(feature = "full", feature = "visualize"))] diff --git a/grovedb/src/merk_cache.rs b/grovedb/src/merk_cache.rs index 3ed25653..6353a430 100644 --- a/grovedb/src/merk_cache.rs +++ b/grovedb/src/merk_cache.rs @@ -2,6 +2,8 @@ //! after usage automatically. use std::{ + borrow::Cow, + cell::RefCell, collections::{btree_map::Entry, BTreeMap, HashSet}, mem::{self, MaybeUninit}, ops::Deref, @@ -17,39 +19,39 @@ use crate::{Element, Error, GroveDb, Transaction}; type TxMerk<'db> = Merk>; +type Key = Vec; + struct CachedMerk<'db> { to_propagate: bool, merk: TxMerk<'db>, } -/// Merk caching structure. -/// -/// Since we usually postpone all writes to the very end with a single RocksDB -/// batch all intermediate changes to subtrees might not be tracked if we reopen -/// those Merks, so it's better to have them cached and proceed through the same -/// structure. Eventually we'll have enough info at the same place to perform -/// necessary propagations as well. -// SAFETY: please consult with other safety docs here before doing any changes -pub(crate) struct MerkCache<'db, 'b, B> { +type UpdatedReferences<'b, B> = RefCell, Vec)>>; + +/// Helper struct to split `MerkCache` into independent parts to allow splitting +/// borrows, meaning a dependency on the storage part that shall have a certain +/// lifetime won't clash with another dependency unrelated to the storage part. +struct MerkCacheStorage<'db, 'b, B> { + /// Subtrees opened during usage of this cache structure, the wrapper also + /// marks those that were changed. merks: BTreeMap, CachedMerk<'db>>, + /// GroveDb provides a storage to open Merks against. db: &'db GroveDb, + /// Nowadays GroveDb operates solely on transactional storage contexts. tx: &'db Transaction<'db>, + /// The `MerkCache` finalization result is a `StorageBatch` of operations. + /// It's then up to the user what actions to take on that result until + /// no further changes can be made to the storage. batch: &'static StorageBatch, - version: &'db GroveVersion, } -impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { - pub(crate) fn new<'tx>( - db: &'db GroveDb, - tx: &'db Transaction<'db>, - version: &'db GroveVersion, - ) -> Self { - MerkCache { +impl<'db, 'b, B: AsRef<[u8]>> MerkCacheStorage<'db, 'b, B> { + fn new(db: &'db GroveDb, tx: &'db Transaction<'db>) -> Self { + MerkCacheStorage { + merks: Default::default(), db, - tx: &tx, - version, + tx, batch: Box::leak(Box::new(StorageBatch::default())), - merks: Default::default(), } } @@ -59,6 +61,7 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { fn get_merk_mut_internal<'c>( &'c mut self, path: SubtreePath<'b, B>, + version: &GroveVersion, ) -> CostResult<&'c mut CachedMerk<'db>, Error> { let mut cost = Default::default(); @@ -71,7 +74,7 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { e.key().clone(), self.tx, Some(self.batch), - self.version + version ) ); Ok(e.insert(CachedMerk { @@ -82,6 +85,36 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { } } } +} + +/// Merk caching structure. +/// +/// Since we usually postpone all writes to the very end with a single RocksDB +/// batch all intermediate changes to subtrees might not be tracked if we reopen +/// those Merks, so it's better to have them cached and proceed through the same +/// structure. Eventually we'll have enough info at the same place to perform +/// necessary propagations as well. +// SAFETY: please consult with other safety docs here before doing any changes +pub(crate) struct MerkCache<'db, 'b, B> { + storage: MerkCacheStorage<'db, 'b, B>, + /// References require different kind of propagation and we track pointed to + /// values to update references. + updated_references: UpdatedReferences<'b, B>, + version: &'db GroveVersion, +} + +impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { + pub(crate) fn new<'tx>( + db: &'db GroveDb, + tx: &'db Transaction<'db>, + version: &'db GroveVersion, + ) -> Self { + MerkCache { + storage: MerkCacheStorage::new(db, tx), + version, + updated_references: Default::default(), + } + } /// Returns an array of mutable references to different Merks, where each /// element in the array corresponds to a unique Merk based on its @@ -93,8 +126,8 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { pub(crate) fn get_multi_mut<'c, const N: usize>( &'c mut self, paths: [SubtreePath<'b, B>; N], - ) -> CostResult<[MerkHandle<'db, 'c>; N], Error> { - let mut result_uninit = [const { MaybeUninit::>::uninit() }; N]; + ) -> CostResult<[MerkHandle<'db, 'c, 'b, B>; N], Error> { + let mut result_uninit = [const { MaybeUninit::>::uninit() }; N]; let mut cost = Default::default(); let unique_args: HashSet<_> = paths.iter().collect(); @@ -110,10 +143,17 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { // sure no overlapping memory will be referenced. let merk_ref = unsafe { MerkHandle::new( - (cost_return_on_error!(&mut cost, self.get_merk_mut_internal(path)) - as *mut CachedMerk<'db>) + (cost_return_on_error!( + &mut cost, + self.storage + .get_merk_mut_internal(path.clone(), self.version) + ) as *mut CachedMerk<'db>) .as_mut::<'c>() .expect("not a null pointer"), + UpdatedReferenceHandle { + path, + updated_references: &self.updated_references, + }, &self.version, ) }; @@ -125,7 +165,9 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { // N in our case. `mem::transmute` would represent it better, however, // due to poor support of const generics in stable Rust we bypass // compile-time size checks with pointer casts. - let result = unsafe { (&result_uninit as *const _ as *const [MerkHandle; N]).read() }; + let result = unsafe { + (&result_uninit as *const _ as *const [MerkHandle<'db, 'c, 'b, B>; N]).read() + }; mem::forget(result_uninit); Ok(result).wrap_with_cost(cost) @@ -134,7 +176,7 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { /// Summarizes all performed operations on this `MerkCache` with necessary /// propagations into a `Storagebatch`. pub(crate) fn finalize(mut self) -> CostResult, Error> { - let batch_ptr = self.batch as *const _; + let batch_ptr = self.storage.batch as *const _; // Propagate updated subtrees' hashes up to the root and dropping all possible // batch users: @@ -150,6 +192,11 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { propagation_result.map_ok(|_| result_batch) } + /// Perform propagation of references' chains marked as changed. + fn propagate_updated_references(&mut self) -> CostResult<(), Error> { + todo!() + } + /// Finalizes each Merk starting from the deepest subtrees, updating hashes /// up to the root. fn propagate_updated_merks(mut self) -> CostResult<(), Error> { @@ -159,7 +206,7 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { // Picking Merks one by one as long as they have a parent while let Some((parent_path, parent_key, subtree)) = - self.merks.pop_first().and_then(|(path, subtree)| { + self.storage.merks.pop_first().and_then(|(path, subtree)| { path.derive_parent() .map(|(parent_path, parent_key)| (parent_path, parent_key, subtree)) }) @@ -169,8 +216,10 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { continue; } - let parent_subtree = - cost_return_on_error!(&mut cost, self.get_merk_mut_internal(parent_path)); + let parent_subtree = cost_return_on_error!( + &mut cost, + self.storage.get_merk_mut_internal(parent_path, version) + ); parent_subtree.to_propagate = true; let (root_hash, root_key, root_sum) = cost_return_on_error!( &mut cost, @@ -194,15 +243,30 @@ impl<'db, 'b, B: AsRef<[u8]>> MerkCache<'db, 'b, B> { } /// Handle to a cached Merk. -pub(crate) struct MerkHandle<'db, 'c> { +pub(crate) struct MerkHandle<'db, 'c, 'b, B> { merk: &'c mut TxMerk<'db>, version: &'db GroveVersion, to_propagate: &'c mut bool, + updated_reference_handle: UpdatedReferenceHandle<'c, 'b, B>, +} + +/// Helper struct to signal `MerkCache` about updated references. +struct UpdatedReferenceHandle<'c, 'b, B> { + path: SubtreePath<'b, B>, + updated_references: &'c UpdatedReferences<'b, B>, +} + +impl<'c, 'b, B: AsRef<[u8]>> UpdatedReferenceHandle<'c, 'b, B> { + fn mark_updated_reference(&self, key: Key) { + self.updated_references + .borrow_mut() + .insert((self.path.clone(), key)); + } } /// It is allowed to dereference `MerkHandle` to regular Merks but in a /// non-mutable way since we want to track what have been done to those Merks. -impl<'db, 'c> Deref for MerkHandle<'db, 'c> { +impl<'db, 'c, 'b, B> Deref for MerkHandle<'db, 'c, 'b, B> { type Target = TxMerk<'db>; fn deref(&self) -> &Self::Target { @@ -210,23 +274,40 @@ impl<'db, 'c> Deref for MerkHandle<'db, 'c> { } } -impl<'db, 'c> MerkHandle<'db, 'c> { +impl<'db, 'c, 'b, B: AsRef<[u8]>> MerkHandle<'db, 'c, 'b, B> { pub(crate) fn insert( &mut self, - key: impl AsRef<[u8]>, + key: &[u8], element: Element, options: Option, ) -> CostResult<(), Error> { - element - .insert(self.merk, key, options, self.version) - .for_ok(|_| *self.to_propagate = true) + let mut costs = Default::default(); + if let (_, Some(mut old_element)) = cost_return_on_error!( + &mut costs, + element.insert_if_changed_value(self.merk, key, options, self.version) + ) { + // In case the item that was changed has been referenced, we indicate that + // references should be propagated after + if old_element.take_backward_references().is_some() { + self.updated_reference_handle + .mark_updated_reference(key.to_vec()); + } + } + *self.to_propagate = true; + + Ok(()).wrap_with_cost(costs) } - fn new(cached_merk: &'c mut CachedMerk<'db>, version: &'db GroveVersion) -> Self { + fn new( + cached_merk: &'c mut CachedMerk<'db>, + updated_reference_handle: UpdatedReferenceHandle<'c, 'b, B>, + version: &'db GroveVersion, + ) -> Self { Self { merk: &mut cached_merk.merk, version, to_propagate: &mut cached_merk.to_propagate, + updated_reference_handle, } } } @@ -235,7 +316,7 @@ impl<'db, 'c> MerkHandle<'db, 'c> { mod tests { use grovedb_costs::OperationCost; use grovedb_path::SubtreePath; - use grovedb_storage::{Storage, StorageBatch}; + use grovedb_storage::Storage; use grovedb_version::version::GroveVersion; use super::MerkCache;