Skip to content

Commit

Permalink
feat: clear subtree (#272)
Browse files Browse the repository at this point in the history
* wip

* fmt

* modifications

---------

Co-authored-by: Quantum Explorer <[email protected]>
  • Loading branch information
iammadab and QuantumExplorer authored Sep 29, 2023
1 parent eade3e0 commit 3eb67eb
Show file tree
Hide file tree
Showing 2 changed files with 319 additions and 2 deletions.
4 changes: 4 additions & 0 deletions grovedb/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ pub enum Error {
/// Deleting non empty tree
DeletingNonEmptyTree(&'static str),

#[error("clearing tree with subtrees not allowed error: {0}")]
/// Clearing tree with subtrees not allowed
ClearingTreeWithSubtreesNotAllowed(&'static str),

// Client allowed errors
#[error("just in time element flags client error: {0}")]
/// Just in time element flags client error
Expand Down
317 changes: 315 additions & 2 deletions grovedb/src/operations/delete/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use grovedb_costs::{
storage_cost::removal::{StorageRemovedBytes, StorageRemovedBytes::BasicStorageRemoval},
CostResult, CostsExt, OperationCost,
};
use grovedb_merk::{proofs::Query, KVIterator};
#[cfg(feature = "full")]
use grovedb_merk::{Error as MerkError, Merk, MerkOptions};
use grovedb_path::SubtreePath;
Expand All @@ -55,13 +56,37 @@ use grovedb_storage::{
Storage, StorageBatch, StorageContext,
};

use crate::util::merk_optional_tx_path_not_empty;
#[cfg(feature = "full")]
use crate::{
batch::{GroveDbOp, Op},
util::{storage_context_optional_tx, storage_context_with_parent_optional_tx},
Element, ElementFlags, Error, GroveDb, Transaction, TransactionArg,
};
use crate::{raw_decode, util::merk_optional_tx_path_not_empty};

#[cfg(feature = "full")]
#[derive(Clone)]
/// Clear options
pub struct ClearOptions {
/// Check for Subtrees
pub check_for_subtrees: bool,
/// Allow deleting non empty trees if we check for subtrees
pub allow_deleting_subtrees: bool,
/// If we check for subtrees, and we don't allow deleting and there are
/// some, should we error?
pub trying_to_clear_with_subtrees_returns_error: bool,
}

#[cfg(feature = "full")]
impl Default for ClearOptions {
fn default() -> Self {
ClearOptions {
check_for_subtrees: true,
allow_deleting_subtrees: false,
trying_to_clear_with_subtrees_returns_error: true,
}
}
}

#[cfg(feature = "full")]
#[derive(Clone)]
Expand Down Expand Up @@ -138,6 +163,180 @@ impl GroveDb {
})
}

/// Delete all elements in a specified subtree
/// Returns if we successfully cleared the subtree
fn clear_subtree<'b, B, P>(
&self,
path: P,
options: Option<ClearOptions>,
transaction: TransactionArg,
) -> Result<bool, Error>
where
B: AsRef<[u8]> + 'b,
P: Into<SubtreePath<'b, B>>,
{
self.clear_subtree_with_costs(path, options, transaction)
.unwrap()
}

/// Delete all elements in a specified subtree and get back costs
/// Warning: The costs for this operation are not yet correct, hence we
/// should keep this private for now
/// Returns if we successfully cleared the subtree
fn clear_subtree_with_costs<'b, B, P>(
&self,
path: P,
options: Option<ClearOptions>,
transaction: TransactionArg,
) -> CostResult<bool, Error>
where
B: AsRef<[u8]> + 'b,
P: Into<SubtreePath<'b, B>>,
{
let subtree_path: SubtreePath<B> = path.into();
let mut cost = OperationCost::default();
let batch = StorageBatch::new();

let options = options.unwrap_or_default();

if let Some(transaction) = transaction {
let mut merk_to_clear = cost_return_on_error!(
&mut cost,
self.open_transactional_merk_at_path(
subtree_path.clone(),
transaction,
Some(&batch)
)
);

if options.check_for_subtrees {
let mut all_query = Query::new();
all_query.insert_all();

let mut element_iterator =
KVIterator::new(merk_to_clear.storage.raw_iter(), &all_query).unwrap();

// delete all nested subtrees
while let Some((key, element_value)) =
element_iterator.next_kv().unwrap_add_cost(&mut cost)
{
let element = raw_decode(&element_value).unwrap();
if element.is_tree() {
if options.allow_deleting_subtrees {
cost_return_on_error!(
&mut cost,
self.delete(
subtree_path.clone(),
key.as_slice(),
Some(DeleteOptions {
allow_deleting_non_empty_trees: true,
deleting_non_empty_trees_returns_error: false,
..Default::default()
}),
Some(transaction),
)
);
} else if options.trying_to_clear_with_subtrees_returns_error {
return Err(Error::ClearingTreeWithSubtreesNotAllowed(
"options do not allow to clear this merk tree as it contains \
subtrees",
))
.wrap_with_cost(cost);
} else {
return Ok(false).wrap_with_cost(cost);
}
}
}
}

// delete non subtree values
cost_return_on_error!(&mut cost, merk_to_clear.clear().map_err(Error::MerkError));

// propagate changes
let mut merk_cache: HashMap<SubtreePath<B>, Merk<PrefixedRocksDbTransactionContext>> =
HashMap::default();
merk_cache.insert(subtree_path.clone(), merk_to_clear);
cost_return_on_error!(
&mut cost,
self.propagate_changes_with_transaction(
merk_cache,
subtree_path.clone(),
transaction,
&batch,
)
);
} else {
let mut merk_to_clear = cost_return_on_error!(
&mut cost,
self.open_non_transactional_merk_at_path(subtree_path.clone(), Some(&batch))
);

if options.check_for_subtrees {
let mut all_query = Query::new();
all_query.insert_all();

let mut element_iterator =
KVIterator::new(merk_to_clear.storage.raw_iter(), &all_query).unwrap();

// delete all nested subtrees
while let Some((key, element_value)) =
element_iterator.next_kv().unwrap_add_cost(&mut cost)
{
let element = raw_decode(&element_value).unwrap();
if options.allow_deleting_subtrees {
if element.is_tree() {
cost_return_on_error!(
&mut cost,
self.delete(
subtree_path.clone(),
key.as_slice(),
Some(DeleteOptions {
allow_deleting_non_empty_trees: true,
deleting_non_empty_trees_returns_error: false,
..Default::default()
}),
None
)
);
}
} else if options.trying_to_clear_with_subtrees_returns_error {
return Err(Error::ClearingTreeWithSubtreesNotAllowed(
"options do not allow to clear this merk tree as it contains subtrees",
))
.wrap_with_cost(cost);
} else {
return Ok(false).wrap_with_cost(cost);
}
}
}

// delete non subtree values
cost_return_on_error!(&mut cost, merk_to_clear.clear().map_err(Error::MerkError));

// propagate changes
let mut merk_cache: HashMap<SubtreePath<B>, Merk<PrefixedRocksDbStorageContext>> =
HashMap::default();
merk_cache.insert(subtree_path.clone(), merk_to_clear);
cost_return_on_error!(
&mut cost,
self.propagate_changes_without_transaction(
merk_cache,
subtree_path.clone(),
&batch,
)
);
}

cost_return_on_error!(
&mut cost,
self.db
.commit_multi_context_batch(batch, transaction)
.map_err(Into::into)
);

Ok(true).wrap_with_cost(cost)
}

/// Delete element with sectional storage function
pub fn delete_with_sectional_storage_function<B: AsRef<[u8]>>(
&self,
Expand Down Expand Up @@ -738,7 +937,7 @@ mod tests {
use pretty_assertions::assert_eq;

use crate::{
operations::delete::{delete_up_tree::DeleteUpTreeOptions, DeleteOptions},
operations::delete::{delete_up_tree::DeleteUpTreeOptions, ClearOptions, DeleteOptions},
tests::{
common::EMPTY_PATH, make_empty_grovedb, make_test_grovedb, ANOTHER_TEST_LEAF, TEST_LEAF,
},
Expand Down Expand Up @@ -1445,4 +1644,118 @@ mod tests {
}
);
}

#[test]
fn test_subtree_clear() {
let element = Element::new_item(b"ayy".to_vec());

let db = make_test_grovedb();

// Insert some nested subtrees
db.insert(
[TEST_LEAF].as_ref(),
b"key1",
Element::empty_tree(),
None,
None,
)
.unwrap()
.expect("successful subtree 1 insert");
db.insert(
[TEST_LEAF, b"key1"].as_ref(),
b"key2",
Element::empty_tree(),
None,
None,
)
.unwrap()
.expect("successful subtree 2 insert");

// Insert an element into subtree
db.insert(
[TEST_LEAF, b"key1", b"key2"].as_ref(),
b"key3",
element,
None,
None,
)
.unwrap()
.expect("successful value insert");
db.insert(
[TEST_LEAF].as_ref(),
b"key4",
Element::empty_tree(),
None,
None,
)
.unwrap()
.expect("successful subtree 3 insert");

let key1_tree = db
.get([TEST_LEAF].as_ref(), b"key1", None)
.unwrap()
.unwrap();
assert!(!matches!(key1_tree, Element::Tree(None, _)));
let key1_merk = db
.open_non_transactional_merk_at_path([TEST_LEAF, b"key1"].as_ref().into(), None)
.unwrap()
.unwrap();
assert_ne!(key1_merk.root_hash().unwrap(), [0; 32]);

let root_hash_before_clear = db.root_hash(None).unwrap().unwrap();
db.clear_subtree([TEST_LEAF, b"key1"].as_ref(), None, None)
.expect_err("unable to delete subtree");

let success = db
.clear_subtree(
[TEST_LEAF, b"key1"].as_ref(),
Some(ClearOptions {
check_for_subtrees: true,
allow_deleting_subtrees: false,
trying_to_clear_with_subtrees_returns_error: false,
}),
None,
)
.expect("expected no error");
assert!(!success);

let success = db
.clear_subtree(
[TEST_LEAF, b"key1"].as_ref(),
Some(ClearOptions {
check_for_subtrees: true,
allow_deleting_subtrees: true,
trying_to_clear_with_subtrees_returns_error: false,
}),
None,
)
.expect("unable to delete subtree");

assert!(success);

assert!(matches!(
db.get([TEST_LEAF, b"key1"].as_ref(), b"key2", None)
.unwrap(),
Err(Error::PathKeyNotFound(_))
));
assert!(matches!(
db.get([TEST_LEAF, b"key1", b"key2"].as_ref(), b"key3", None)
.unwrap(),
Err(Error::PathParentLayerNotFound(_))
));
let key1_tree = db
.get([TEST_LEAF].as_ref(), b"key1", None)
.unwrap()
.unwrap();
assert!(matches!(key1_tree, Element::Tree(None, _)));

let key1_merk = db
.open_non_transactional_merk_at_path([TEST_LEAF, b"key1"].as_ref().into(), None)
.unwrap()
.unwrap();
assert_eq!(key1_merk.root_hash().unwrap(), [0; 32]);

let root_hash_after_clear = db.root_hash(None).unwrap().unwrap();
assert_ne!(root_hash_before_clear, root_hash_after_clear);
}
}

0 comments on commit 3eb67eb

Please sign in to comment.