From ec240385d67e41b039e3f8ae196c4a43701c7560 Mon Sep 17 00:00:00 2001 From: Ivan Shumkov Date: Fri, 9 Feb 2024 02:42:08 +0700 Subject: [PATCH] feat: secondary rocksdb storage --- .../src/estimated_costs/average_case_costs.rs | 2 +- grovedb/src/lib.rs | 21 +- grovedb/tests/secondary_tests.rs | 71 ++++ merk/src/merk/chunks.rs | 4 +- merk/src/merk/mod.rs | 14 +- merk/src/merk/open.rs | 2 +- storage/src/rocksdb_storage/storage.rs | 261 +++++++++----- .../storage_context/context_immediate.rs | 237 +++++++----- .../storage_context/context_no_tx.rs | 326 ++++++++++------- .../storage_context/context_tx.rs | 336 +++++++++++------- .../storage_context/raw_iterator.rs | 333 ++++++++--------- storage/src/rocksdb_storage/test_utils.rs | 19 +- 12 files changed, 1017 insertions(+), 609 deletions(-) create mode 100644 grovedb/tests/secondary_tests.rs diff --git a/grovedb/src/estimated_costs/average_case_costs.rs b/grovedb/src/estimated_costs/average_case_costs.rs index d93b6451b..7a8a3101c 100644 --- a/grovedb/src/estimated_costs/average_case_costs.rs +++ b/grovedb/src/estimated_costs/average_case_costs.rs @@ -478,7 +478,7 @@ mod test { fn test_get_merk_node_average_case() { // Open a merk and insert 10 elements. let tmp_dir = TempDir::new().expect("cannot open tempdir"); - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let batch = StorageBatch::new(); diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index 9ea955134..6d60ea991 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -246,12 +246,29 @@ pub type TransactionArg<'db, 'a> = Option<&'a Transaction<'db>>; #[cfg(feature = "full")] impl GroveDb { - /// Opens a given path + /// Opens a primary storage with a given path pub fn open>(path: P) -> Result { - let db = RocksDbStorage::default_rocksdb_with_path(path)?; + let db = RocksDbStorage::default_primary_rocksdb(path)?; Ok(GroveDb { db }) } + /// Open a secondary storage with given paths + pub fn open_secondary>( + primary_path: P, + secondary_storage: P, + ) -> Result { + let db = RocksDbStorage::default_secondary_rocksdb(primary_path, secondary_storage)?; + Ok(GroveDb { db }) + } + + /// Replicate recent changes from primary database + /// Available only for a secondary storage + pub fn try_to_catch_up_from_primary(&self) -> Result<(), Error> { + self.db.try_to_catch_up_from_primary()?; + + Ok(()) + } + /// Uses raw iter to delete GroveDB key values pairs from rocksdb pub fn wipe(&self) -> Result<(), Error> { self.db.wipe()?; diff --git a/grovedb/tests/secondary_tests.rs b/grovedb/tests/secondary_tests.rs new file mode 100644 index 000000000..9003689c6 --- /dev/null +++ b/grovedb/tests/secondary_tests.rs @@ -0,0 +1,71 @@ +use grovedb::GroveDb; +use tempfile::TempDir; + +#[test] +fn test_some_functionality() { + let primary_dir = TempDir::new().expect("should create temp dir"); + + let primary_grovedb = GroveDb::open(primary_dir.path()).expect("should open grovedb"); + + // Store value in primary + + let key = b"key"; + let value = vec![1, 2, 3]; + + primary_grovedb + .put_aux(key, &value, None, None) + .unwrap() + .expect("should put value to primary"); + + // Read value from primary + + let primary_value = primary_grovedb + .get_aux(key, None) + .unwrap() + .expect("should get value from primary") + .expect("value should exist on primary"); + + assert_eq!(value, primary_value); + + // Open secondary + + let secondary_dir = TempDir::new().expect("should create temp dir"); + + let secondary_grovedb = GroveDb::open_secondary(primary_dir.path(), secondary_dir.path()) + .expect("should open secondary"); + + // Read value on secondary + + let secondary_value = secondary_grovedb + .get_aux(key, None) + .unwrap() + .expect("should get value from secondary") + .expect("value from primary should exist on secondary"); + + assert_eq!(primary_value, secondary_value); + + // Update value on primary + + let primary_value2 = vec![4, 5, 6]; + + primary_grovedb + .put_aux(key, &primary_value2, None, None) + .unwrap() + .expect("should put value to primary"); + + // Catch up secondary + + secondary_grovedb + .try_to_catch_up_from_primary() + .expect("should catch up"); + + // Read updated value on secondary + + let secondary_value2 = secondary_grovedb + .get_aux(key, None) + .unwrap() + .expect("should get value from secondary") + .expect("value from primary should exist on secondary"); + + assert_eq!(primary_value2, secondary_value2); +} diff --git a/merk/src/merk/chunks.rs b/merk/src/merk/chunks.rs index 4f6564efe..ed6937804 100644 --- a/merk/src/merk/chunks.rs +++ b/merk/src/merk/chunks.rs @@ -268,7 +268,7 @@ mod tests { fn chunks_from_reopen() { let tmp_dir = TempDir::new().expect("cannot create tempdir"); let original_chunks = { - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let batch = StorageBatch::new(); let mut merk = Merk::open_base( @@ -307,7 +307,7 @@ mod tests { .collect::>() .into_iter() }; - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let merk = Merk::open_base( storage diff --git a/merk/src/merk/mod.rs b/merk/src/merk/mod.rs index 93c052a4c..4438977ba 100644 --- a/merk/src/merk/mod.rs +++ b/merk/src/merk/mod.rs @@ -739,7 +739,7 @@ mod test { #[test] fn reopen_check_root_hash() { let tmp_dir = TempDir::new().expect("cannot open tempdir"); - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let mut merk = Merk::open_base( storage @@ -763,7 +763,7 @@ mod test { #[test] fn test_get_node_cost() { let tmp_dir = TempDir::new().expect("cannot open tempdir"); - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let mut merk = Merk::open_base( storage @@ -807,7 +807,7 @@ mod test { let tmp_dir = TempDir::new().expect("cannot open tempdir"); let original_nodes = { - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let batch = StorageBatch::new(); let mut merk = Merk::open_base( @@ -846,7 +846,7 @@ mod test { nodes }; - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let merk = Merk::open_base( storage @@ -883,7 +883,7 @@ mod test { let tmp_dir = TempDir::new().expect("cannot open tempdir"); let original_nodes = { - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let batch = StorageBatch::new(); let mut merk = Merk::open_base( @@ -918,7 +918,7 @@ mod test { collect(&mut merk.storage.raw_iter(), &mut nodes); nodes }; - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let merk = Merk::open_base( storage @@ -939,7 +939,7 @@ mod test { #[test] fn update_node() { let tmp_dir = TempDir::new().expect("cannot open tempdir"); - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let batch = StorageBatch::new(); let mut merk = Merk::open_base( diff --git a/merk/src/merk/open.rs b/merk/src/merk/open.rs index af15d5969..ca7e3acb5 100644 --- a/merk/src/merk/open.rs +++ b/merk/src/merk/open.rs @@ -92,7 +92,7 @@ mod test { #[test] fn test_reopen_root_hash() { let tmp_dir = TempDir::new().expect("cannot open tempdir"); - let storage = RocksDbStorage::default_rocksdb_with_path(tmp_dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(tmp_dir.path()) .expect("cannot open rocksdb storage"); let test_prefix = [b"ayy"]; diff --git a/storage/src/rocksdb_storage/storage.rs b/storage/src/rocksdb_storage/storage.rs index a396b75fe..0cdb5e75d 100644 --- a/storage/src/rocksdb_storage/storage.rs +++ b/storage/src/rocksdb_storage/storage.rs @@ -48,6 +48,7 @@ use super::{ PrefixedRocksDbImmediateStorageContext, PrefixedRocksDbStorageContext, PrefixedRocksDbTransactionContext, }; +use crate::Error::StorageError; use crate::{ error, error::Error::{CostError, RocksDBError}, @@ -88,21 +89,33 @@ lazy_static! { }; } -/// Type alias for a database -pub(crate) type Db = OptimisticTransactionDB; +/// Non-transactional database that supports secondary instance +pub(crate) type NonTransactionalDb = rocksdb::DB; /// Type alias for a transaction -pub(crate) type Tx<'db> = Transaction<'db, Db>; +pub(crate) type Tx<'db> = Transaction<'db, OptimisticTransactionDB>; /// Storage which uses RocksDB as its backend. -pub struct RocksDbStorage { - db: OptimisticTransactionDB, +pub enum RocksDbStorage { + /// Primary storage + Primary(OptimisticTransactionDB), + /// Secondary storage + Secondary(NonTransactionalDb), +} + +macro_rules! call_with_db { + ($self:ident, $db:ident, $code:block) => { + match $self { + RocksDbStorage::Primary($db) => $code, + RocksDbStorage::Secondary($db) => $code, + } + }; } impl RocksDbStorage { /// Create RocksDb storage with default parameters using `path`. - pub fn default_rocksdb_with_path>(path: P) -> Result { - let db = Db::open_cf_descriptors( + pub fn default_primary_rocksdb>(path: P) -> Result { + let db = OptimisticTransactionDB::open_cf_descriptors( &DEFAULT_OPTS, &path, [ @@ -112,7 +125,39 @@ impl RocksDbStorage { ], ) .map_err(RocksDBError)?; - Ok(RocksDbStorage { db }) + + Ok(Self::Primary(db)) + } + + /// Create RocksDb storage with default parameters using `path`. + pub fn default_secondary_rocksdb>( + primary_path: P, + secondary_path: P, + ) -> Result { + let db = NonTransactionalDb::open_cf_descriptors_as_secondary( + &DEFAULT_OPTS, + &primary_path, + &secondary_path, + [ + ColumnFamilyDescriptor::new(AUX_CF_NAME, DEFAULT_OPTS.clone()), + ColumnFamilyDescriptor::new(ROOTS_CF_NAME, DEFAULT_OPTS.clone()), + ColumnFamilyDescriptor::new(META_CF_NAME, DEFAULT_OPTS.clone()), + ], + ) + .map_err(RocksDBError)?; + + Ok(Self::Secondary(db)) + } + + /// Replicate recent changes from primary database + /// Available only for a secondary storage + pub fn try_to_catch_up_from_primary(&self) -> Result<(), Error> { + match self { + RocksDbStorage::Primary(_) => { + Err(StorageError("primary storage doesn't catchup".to_string())) + } + RocksDbStorage::Secondary(db) => db.try_catch_up_with_primary().map_err(RocksDBError), + } } fn build_prefix_body(path: SubtreePath) -> (Vec, usize) @@ -207,7 +252,7 @@ impl RocksDbStorage { value, cost_info, } => { - db_batch.put_cf(cf_aux(&self.db), &key, &value); + db_batch.put_cf(self.cf_aux(), &key, &value); cost.seek_count += 1; cost_return_on_error_no_add!( &cost, @@ -226,7 +271,7 @@ impl RocksDbStorage { value, cost_info, } => { - db_batch.put_cf(cf_roots(&self.db), &key, &value); + db_batch.put_cf(self.cf_roots(), &key, &value); cost.seek_count += 1; // We only add costs for put root if they are set, otherwise it is free if cost_info.is_some() { @@ -248,7 +293,7 @@ impl RocksDbStorage { value, cost_info, } => { - db_batch.put_cf(cf_meta(&self.db), &key, &value); + db_batch.put_cf(self.cf_meta(), &key, &value); cost.seek_count += 1; cost_return_on_error_no_add!( &cost, @@ -276,7 +321,7 @@ impl RocksDbStorage { // lets get the values let value_len = cost_return_on_error_no_add!( &cost, - self.db.get(&key).map_err(RocksDBError) + call_with_db!(self, db, { db.get(&key).map_err(RocksDBError) }) ) .map(|x| x.len() as u32) .unwrap_or(0); @@ -292,7 +337,7 @@ impl RocksDbStorage { } } AbstractBatchOperation::DeleteAux { key, cost_info } => { - db_batch.delete_cf(cf_aux(&self.db), &key); + db_batch.delete_cf(self.cf_aux(), &key); // TODO: fix not atomic freed size computation if let Some(key_value_removed_bytes) = cost_info { @@ -303,7 +348,9 @@ impl RocksDbStorage { cost.seek_count += 2; let value_len = cost_return_on_error_no_add!( &cost, - self.db.get_cf(cf_aux(&self.db), &key).map_err(RocksDBError) + call_with_db!(self, db, { + db.get_cf(self.cf_aux(), &key).map_err(RocksDBError) + }) ) .map(|x| x.len() as u32) .unwrap_or(0); @@ -320,7 +367,7 @@ impl RocksDbStorage { } } AbstractBatchOperation::DeleteRoot { key, cost_info } => { - db_batch.delete_cf(cf_roots(&self.db), &key); + db_batch.delete_cf(self.cf_roots(), &key); // TODO: fix not atomic freed size computation if let Some(key_value_removed_bytes) = cost_info { @@ -331,9 +378,9 @@ impl RocksDbStorage { cost.seek_count += 2; let value_len = cost_return_on_error_no_add!( &cost, - self.db - .get_cf(cf_roots(&self.db), &key) - .map_err(RocksDBError) + call_with_db!(self, db, { + db.get_cf(self.cf_roots(), &key).map_err(RocksDBError) + }) ) .map(|x| x.len() as u32) .unwrap_or(0); @@ -350,7 +397,7 @@ impl RocksDbStorage { } } AbstractBatchOperation::DeleteMeta { key, cost_info } => { - db_batch.delete_cf(cf_meta(&self.db), &key); + db_batch.delete_cf(self.cf_meta(), &key); // TODO: fix not atomic freed size computation if let Some(key_value_removed_bytes) = cost_info { @@ -361,9 +408,9 @@ impl RocksDbStorage { cost.seek_count += 2; let value_len = cost_return_on_error_no_add!( &cost, - self.db - .get_cf(cf_meta(&self.db), &key) - .map_err(RocksDBError) + call_with_db!(self, db, { + db.get_cf(self.cf_meta(), &key).map_err(RocksDBError) + }) ) .map(|x| x.len() as u32) .unwrap_or(0); @@ -391,17 +438,24 @@ impl RocksDbStorage { pending_costs: OperationCost, transaction: Option<&::Transaction>, ) -> CostResult<(), Error> { - let result = match transaction { - None => self.db.write(db_batch), - Some(transaction) => transaction.rebuild_from_writebatch(&db_batch), - }; - - if result.is_ok() { - result.map_err(RocksDBError).wrap_with_cost(pending_costs) - } else { - result - .map_err(RocksDBError) - .wrap_with_cost(OperationCost::default()) + match self { + RocksDbStorage::Primary(db) => { + let result = match transaction { + None => db.write(db_batch), + Some(transaction) => transaction.rebuild_from_writebatch(&db_batch), + }; + + if result.is_ok() { + result.map_err(RocksDBError).wrap_with_cost(pending_costs) + } else { + result + .map_err(RocksDBError) + .wrap_with_cost(OperationCost::default()) + } + } + RocksDbStorage::Secondary(_) => { + unimplemented!("secondary storage does not support WriteBatchWithTransaction") + } } } @@ -418,30 +472,58 @@ impl RocksDbStorage { } fn wipe_column_family(&self, column_family_name: &str) -> Result<(), Error> { - let cf_handle = self - .db - .cf_handle(column_family_name) - .ok_or(Error::StorageError( + call_with_db!(self, db, { + let cf_handle = db.cf_handle(column_family_name).ok_or(Error::StorageError( "failed to get column family handle".to_string(), ))?; - let mut iter = self.db.raw_iterator_cf(&cf_handle); - iter.seek_to_first(); - while iter.valid() { - self.db.delete(iter.key().expect("should have key"))?; - iter.next() - } - Ok(()) + let mut iter = db.raw_iterator_cf(&cf_handle); + iter.seek_to_first(); + while iter.valid() { + db.delete(iter.key().expect("should have key"))?; + iter.next() + } + Ok(()) + }) + } + + /// Get auxiliary data column family + fn cf_aux(&self) -> &ColumnFamily { + call_with_db!(self, db, { + db.cf_handle(AUX_CF_NAME) + .expect("meta column family must exist") + }) + } + + /// Get trees roots data column family + fn cf_roots(&self) -> &ColumnFamily { + call_with_db!(self, db, { + db.cf_handle(ROOTS_CF_NAME) + .expect("meta column family must exist") + }) + } + + /// Get metadata column family + fn cf_meta(&self) -> &ColumnFamily { + call_with_db!(self, db, { + db.cf_handle(META_CF_NAME) + .expect("meta column family must exist") + }) } } impl<'db> Storage<'db> for RocksDbStorage { + type Transaction = Tx<'db>; type BatchStorageContext = PrefixedRocksDbStorageContext<'db>; type BatchTransactionalStorageContext = PrefixedRocksDbTransactionContext<'db>; type ImmediateStorageContext = PrefixedRocksDbImmediateStorageContext<'db>; - type Transaction = Tx<'db>; fn start_transaction(&'db self) -> Self::Transaction { - self.db.transaction() + match self { + RocksDbStorage::Primary(db) => db.transaction(), + RocksDbStorage::Secondary(_) => { + unimplemented!("secondary storage does not support transactions") + } + } } fn commit_transaction(&self, transaction: Self::Transaction) -> CostResult<(), Error> { @@ -456,8 +538,21 @@ impl<'db> Storage<'db> for RocksDbStorage { transaction.rollback().map_err(RocksDBError) } + fn commit_multi_context_batch( + &self, + batch: StorageBatch, + transaction: Option<&'db Self::Transaction>, + ) -> CostResult<(), Error> { + let mut cost = OperationCost::default(); + let (db_batch, pending_costs) = + cost_return_on_error!(&mut cost, self.build_write_batch(batch)); + + self.commit_db_write_batch(db_batch, pending_costs, transaction) + .add_cost(cost) + } + fn flush(&self) -> Result<(), Error> { - self.db.flush().map_err(RocksDBError) + call_with_db!(self, db, { db.flush() }).map_err(RocksDBError) } fn get_storage_context<'b, B>( @@ -468,8 +563,14 @@ impl<'db> Storage<'db> for RocksDbStorage { where B: AsRef<[u8]> + 'b, { - Self::build_prefix(path) - .map(|prefix| PrefixedRocksDbStorageContext::new(&self.db, prefix, batch)) + Self::build_prefix(path).map(|prefix| match self { + RocksDbStorage::Primary(db) => { + PrefixedRocksDbStorageContext::new_primary(db, prefix, batch) + } + RocksDbStorage::Secondary(db) => { + PrefixedRocksDbStorageContext::new_secondary(db, prefix, batch) + } + }) } fn get_transactional_storage_context<'b, B>( @@ -481,8 +582,13 @@ impl<'db> Storage<'db> for RocksDbStorage { where B: AsRef<[u8]> + 'b, { - Self::build_prefix(path).map(|prefix| { - PrefixedRocksDbTransactionContext::new(&self.db, transaction, prefix, batch) + Self::build_prefix(path).map(|prefix| match self { + RocksDbStorage::Primary(db) => { + PrefixedRocksDbTransactionContext::new_primary(db, transaction, prefix, batch) + } + RocksDbStorage::Secondary(db) => { + PrefixedRocksDbTransactionContext::new_secondary(db, prefix, batch) + } }) } @@ -494,22 +600,22 @@ impl<'db> Storage<'db> for RocksDbStorage { where B: AsRef<[u8]> + 'b, { - Self::build_prefix(path).map(|prefix| { - PrefixedRocksDbImmediateStorageContext::new(&self.db, transaction, prefix) + Self::build_prefix(path).map(|prefix| match self { + RocksDbStorage::Primary(db) => { + PrefixedRocksDbImmediateStorageContext::new_primary(db, transaction, prefix) + } + RocksDbStorage::Secondary(db) => { + PrefixedRocksDbImmediateStorageContext::new_secondary(db, prefix) + } }) } - fn commit_multi_context_batch( - &self, - batch: StorageBatch, - transaction: Option<&'db Self::Transaction>, - ) -> CostResult<(), Error> { - let mut cost = OperationCost::default(); - let (db_batch, pending_costs) = - cost_return_on_error!(&mut cost, self.build_write_batch(batch)); - - self.commit_db_write_batch(db_batch, pending_costs, transaction) - .add_cost(cost) + fn create_checkpoint>(&self, path: P) -> Result<(), Error> { + call_with_db!(self, db, { + Checkpoint::new(db) + .and_then(|x| x.create_checkpoint(path)) + .map_err(RocksDBError) + }) } fn get_storage_context_cost(path: &[L]) -> OperationCost { @@ -522,33 +628,6 @@ impl<'db> Storage<'db> for RocksDbStorage { OperationCost::with_hash_node_calls(blocks_num) } } - - fn create_checkpoint>(&self, path: P) -> Result<(), Error> { - Checkpoint::new(&self.db) - .and_then(|x| x.create_checkpoint(path)) - .map_err(RocksDBError) - } -} - -/// Get auxiliary data column family -fn cf_aux(storage: &Db) -> &ColumnFamily { - storage - .cf_handle(AUX_CF_NAME) - .expect("aux column family must exist") -} - -/// Get trees roots data column family -fn cf_roots(storage: &Db) -> &ColumnFamily { - storage - .cf_handle(ROOTS_CF_NAME) - .expect("roots column family must exist") -} - -/// Get metadata column family -fn cf_meta(storage: &Db) -> &ColumnFamily { - storage - .cf_handle(META_CF_NAME) - .expect("meta column family must exist") } #[cfg(test)] diff --git a/storage/src/rocksdb_storage/storage_context/context_immediate.rs b/storage/src/rocksdb_storage/storage_context/context_immediate.rs index f5785b0c1..67c2c57b1 100644 --- a/storage/src/rocksdb_storage/storage_context/context_immediate.rs +++ b/storage/src/rocksdb_storage/storage_context/context_immediate.rs @@ -33,61 +33,118 @@ use grovedb_costs::{ storage_cost::key_value_cost::KeyValueStorageCost, ChildrenSizesWithIsSumTree, CostResult, CostsExt, }; -use rocksdb::{ColumnFamily, DBRawIteratorWithThreadMode, WriteBatchWithTransaction}; +use rocksdb::{ColumnFamily, OptimisticTransactionDB, WriteBatchWithTransaction}; use super::{make_prefixed_key, PrefixedRocksDbBatch, PrefixedRocksDbRawIterator}; +use crate::rocksdb_storage::storage::NonTransactionalDb; use crate::{ error, error::Error::RocksDBError, - rocksdb_storage::storage::{Db, SubtreePrefix, Tx, AUX_CF_NAME, META_CF_NAME, ROOTS_CF_NAME}, + rocksdb_storage::storage::{SubtreePrefix, Tx, AUX_CF_NAME, META_CF_NAME, ROOTS_CF_NAME}, StorageContext, }; /// Storage context with a prefix applied to be used in a subtree to be used in /// transaction. -pub struct PrefixedRocksDbImmediateStorageContext<'db> { - storage: &'db Db, +pub enum PrefixedRocksDbImmediateStorageContext<'db> { + /// Primary storage context + Primary(PrefixedPrimaryRocksDbImmediateStorageContext<'db>), + /// Secondary storage context + Secondary(PrefixedSecondaryRocksDbImmediateStorageContext<'db>), +} + +pub struct PrefixedPrimaryRocksDbImmediateStorageContext<'db> { + storage: &'db OptimisticTransactionDB, transaction: &'db Tx<'db>, prefix: SubtreePrefix, } +pub struct PrefixedSecondaryRocksDbImmediateStorageContext<'db> { + storage: &'db NonTransactionalDb, + prefix: SubtreePrefix, +} + +macro_rules! call_with_storage_and_prefix { + ($self:ident, $storage:ident, $prefix:ident, $code:block) => { + match $self { + PrefixedRocksDbImmediateStorageContext::Primary(context) => { + let $storage = context.storage; + let $prefix = &context.prefix; + $code + } + PrefixedRocksDbImmediateStorageContext::Secondary(context) => { + let $storage = context.storage; + let $prefix = &context.prefix; + $code + } + } + }; +} + +macro_rules! call_with_storage_or_transaction_and_prefix { + ($self:ident, $storage:ident, $prefix:ident, $code:block) => { + match $self { + PrefixedRocksDbImmediateStorageContext::Primary(context) => { + let $storage = context.transaction; + let $prefix = &context.prefix; + $code + } + PrefixedRocksDbImmediateStorageContext::Secondary(context) => { + let $storage = context.storage; + let $prefix = &context.prefix; + $code + } + } + }; +} + impl<'db> PrefixedRocksDbImmediateStorageContext<'db> { /// Create a new prefixed transaction context instance - pub fn new(storage: &'db Db, transaction: &'db Tx<'db>, prefix: SubtreePrefix) -> Self { - PrefixedRocksDbImmediateStorageContext { + pub fn new_primary( + storage: &'db OptimisticTransactionDB, + transaction: &'db Tx<'db>, + prefix: SubtreePrefix, + ) -> Self { + let context = PrefixedPrimaryRocksDbImmediateStorageContext { storage, transaction, prefix, - } + }; + + Self::Primary(context) + } + + /// Create a new prefixed context instance + pub fn new_secondary(storage: &'db NonTransactionalDb, prefix: SubtreePrefix) -> Self { + let context = PrefixedSecondaryRocksDbImmediateStorageContext { storage, prefix }; + + Self::Secondary(context) } } impl<'db> PrefixedRocksDbImmediateStorageContext<'db> { /// Get auxiliary data column family fn cf_aux(&self) -> &'db ColumnFamily { - self.storage - .cf_handle(AUX_CF_NAME) + call_with_storage_and_prefix!(self, storage, _prefix, { storage.cf_handle(AUX_CF_NAME) }) .expect("aux column family must exist") } /// Get trees roots data column family fn cf_roots(&self) -> &'db ColumnFamily { - self.storage - .cf_handle(ROOTS_CF_NAME) + call_with_storage_and_prefix!(self, storage, _prefix, { storage.cf_handle(ROOTS_CF_NAME) }) .expect("roots column family must exist") } /// Get metadata column family fn cf_meta(&self) -> &'db ColumnFamily { - self.storage - .cf_handle(META_CF_NAME) + call_with_storage_and_prefix!(self, storage, _prefix, { storage.cf_handle(META_CF_NAME) }) .expect("meta column family must exist") } } impl<'db> StorageContext<'db> for PrefixedRocksDbImmediateStorageContext<'db> { type Batch = PrefixedRocksDbBatch<'db>; - type RawIterator = PrefixedRocksDbRawIterator>>; + type RawIterator = PrefixedRocksDbRawIterator<'db, Tx<'db>, NonTransactionalDb>; fn put>( &self, @@ -96,10 +153,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbImmediateStorageContext<'db> { _children_sizes: ChildrenSizesWithIsSumTree, _cost_info: Option, ) -> CostResult<(), Error> { - self.transaction - .put(make_prefixed_key(&self.prefix, &key), value) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.put(make_prefixed_key(prefix, &key), value) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn put_aux>( @@ -108,10 +166,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbImmediateStorageContext<'db> { value: &[u8], _cost_info: Option, ) -> CostResult<(), Error> { - self.transaction - .put_cf(self.cf_aux(), make_prefixed_key(&self.prefix, &key), value) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.put_cf(self.cf_aux(), make_prefixed_key(prefix, &key), value) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn put_root>( @@ -120,14 +179,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbImmediateStorageContext<'db> { value: &[u8], _cost_info: Option, ) -> CostResult<(), Error> { - self.transaction - .put_cf( - self.cf_roots(), - make_prefixed_key(&self.prefix, &key), - value, - ) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.put_cf(self.cf_roots(), make_prefixed_key(prefix, &key), value) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn put_meta>( @@ -136,10 +192,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbImmediateStorageContext<'db> { value: &[u8], _cost_info: Option, ) -> CostResult<(), Error> { - self.transaction - .put_cf(self.cf_meta(), make_prefixed_key(&self.prefix, &key), value) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.put_cf(self.cf_meta(), make_prefixed_key(prefix, &key), value) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn delete>( @@ -147,10 +204,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbImmediateStorageContext<'db> { key: K, _cost_info: Option, ) -> CostResult<(), Error> { - self.transaction - .delete(make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.delete(make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn delete_aux>( @@ -158,10 +216,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbImmediateStorageContext<'db> { key: K, _cost_info: Option, ) -> CostResult<(), Error> { - self.transaction - .delete_cf(self.cf_aux(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.delete_cf(self.cf_aux(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn delete_root>( @@ -169,10 +228,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbImmediateStorageContext<'db> { key: K, _cost_info: Option, ) -> CostResult<(), Error> { - self.transaction - .delete_cf(self.cf_roots(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.delete_cf(self.cf_roots(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn delete_meta>( @@ -180,61 +240,78 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbImmediateStorageContext<'db> { key: K, _cost_info: Option, ) -> CostResult<(), Error> { - self.transaction - .delete_cf(self.cf_meta(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.delete_cf(self.cf_meta(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn get>(&self, key: K) -> CostResult>, Error> { - self.transaction - .get(make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.get(make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn get_aux>(&self, key: K) -> CostResult>, Error> { - self.transaction - .get_cf(self.cf_aux(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.get_cf(self.cf_aux(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn get_root>(&self, key: K) -> CostResult>, Error> { - self.transaction - .get_cf(self.cf_roots(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.get_cf(self.cf_roots(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn get_meta>(&self, key: K) -> CostResult>, Error> { - self.transaction - .get_cf(self.cf_meta(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + call_with_storage_or_transaction_and_prefix!(self, storage, prefix, { + storage.get_cf(self.cf_meta(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()) } fn new_batch(&self) -> Self::Batch { - PrefixedRocksDbBatch { - prefix: self.prefix, - batch: WriteBatchWithTransaction::::default(), - cf_aux: self.cf_aux(), - cf_roots: self.cf_roots(), - cost_acc: Default::default(), - } + call_with_storage_and_prefix!(self, _storage, prefix, { + PrefixedRocksDbBatch { + prefix: *prefix, + batch: WriteBatchWithTransaction::::default(), + cf_aux: self.cf_aux(), + cf_roots: self.cf_roots(), + cost_acc: Default::default(), + } + }) } fn commit_batch(&self, batch: Self::Batch) -> CostResult<(), Error> { - self.transaction - .rebuild_from_writebatch(&batch.batch) - .map_err(RocksDBError) - .wrap_with_cost(Default::default()) + match self { + PrefixedRocksDbImmediateStorageContext::Primary(db) => db + .transaction + .rebuild_from_writebatch(&batch.batch) + .map_err(RocksDBError) + .wrap_with_cost(Default::default()), + PrefixedRocksDbImmediateStorageContext::Secondary(_) => { + unimplemented!("commit_batch is not supported for secondary storage") + } + } } fn raw_iter(&self) -> Self::RawIterator { - PrefixedRocksDbRawIterator { - prefix: self.prefix, - raw_iterator: self.transaction.raw_iterator(), + match self { + PrefixedRocksDbImmediateStorageContext::Primary(context) => { + Self::RawIterator::new_primary(context.prefix, context.transaction.raw_iterator()) + } + PrefixedRocksDbImmediateStorageContext::Secondary(context) => { + Self::RawIterator::new_secondary(context.prefix, context.storage.raw_iterator()) + } } } } diff --git a/storage/src/rocksdb_storage/storage_context/context_no_tx.rs b/storage/src/rocksdb_storage/storage_context/context_no_tx.rs index fd639a5a6..7c1a195be 100644 --- a/storage/src/rocksdb_storage/storage_context/context_no_tx.rs +++ b/storage/src/rocksdb_storage/storage_context/context_no_tx.rs @@ -33,61 +33,124 @@ use grovedb_costs::{ storage_cost::key_value_cost::KeyValueStorageCost, ChildrenSizesWithIsSumTree, CostResult, CostsExt, OperationCost, }; -use rocksdb::{ColumnFamily, DBRawIteratorWithThreadMode}; +use rocksdb::{ColumnFamily, OptimisticTransactionDB}; use super::{batch::PrefixedMultiContextBatchPart, make_prefixed_key, PrefixedRocksDbRawIterator}; +use crate::rocksdb_storage::storage::NonTransactionalDb; use crate::{ error, error::Error::RocksDBError, - rocksdb_storage::storage::{Db, SubtreePrefix, AUX_CF_NAME, META_CF_NAME, ROOTS_CF_NAME}, + rocksdb_storage::storage::{SubtreePrefix, AUX_CF_NAME, META_CF_NAME, ROOTS_CF_NAME}, StorageBatch, StorageContext, }; /// Storage context with a prefix applied to be used in a subtree to be used /// outside of transaction. -pub struct PrefixedRocksDbStorageContext<'db> { - storage: &'db Db, +pub struct PrefixedPrimaryRocksDbStorageContext<'db> { + storage: &'db OptimisticTransactionDB, prefix: SubtreePrefix, batch: Option<&'db StorageBatch>, } +// TODO: We can just use generic for storage instead of the second struct + +/// Storage context with a prefix applied to be used in a subtree to be used +/// outside of transaction. +pub struct PrefixedSecondaryRocksDbStorageContext<'db> { + pub(in crate::rocksdb_storage) storage: &'db NonTransactionalDb, + pub(in crate::rocksdb_storage) prefix: SubtreePrefix, + pub(in crate::rocksdb_storage) batch: Option<&'db StorageBatch>, +} + +/// Storage context with a prefix applied to be used in a subtree to be used +/// outside of transaction. +pub enum PrefixedRocksDbStorageContext<'db> { + /// Primary storage context + Primary(PrefixedPrimaryRocksDbStorageContext<'db>), + /// Secondary storage context + Secondary(PrefixedSecondaryRocksDbStorageContext<'db>), +} + +macro_rules! call_with_storage_prefix_and_batch { + ($self:ident, $storage:ident, $prefix:ident, $batch:ident, $code:block) => { + match $self { + PrefixedRocksDbStorageContext::Primary(context) => { + let $storage = context.storage; + let $prefix = &context.prefix; + let $batch = context.batch; + $code + } + PrefixedRocksDbStorageContext::Secondary(context) => { + let $storage = context.storage; + let $prefix = &context.prefix; + let $batch = context.batch; + $code + } + } + }; +} + impl<'db> PrefixedRocksDbStorageContext<'db> { - /// Create a new prefixed storage_cost context instance - pub fn new(storage: &'db Db, prefix: SubtreePrefix, batch: Option<&'db StorageBatch>) -> Self { - PrefixedRocksDbStorageContext { + /// Create a new prefixed context instance + pub fn new_primary( + storage: &'db OptimisticTransactionDB, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + ) -> Self { + let context = PrefixedPrimaryRocksDbStorageContext { storage, prefix, batch, - } + }; + + Self::Primary(context) + } + + /// Create a new prefixed context instance + pub fn new_secondary( + storage: &'db NonTransactionalDb, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + ) -> Self { + let context = PrefixedSecondaryRocksDbStorageContext { + storage, + prefix, + batch, + }; + + Self::Secondary(context) } } impl<'db> PrefixedRocksDbStorageContext<'db> { /// Get auxiliary data column family fn cf_aux(&self) -> &'db ColumnFamily { - self.storage - .cf_handle(AUX_CF_NAME) - .expect("aux column family must exist") + call_with_storage_prefix_and_batch!(self, storage, _prefix, _batch, { + storage.cf_handle(AUX_CF_NAME) + }) + .expect("aux column family must exist") } /// Get trees roots data column family fn cf_roots(&self) -> &'db ColumnFamily { - self.storage - .cf_handle(ROOTS_CF_NAME) - .expect("roots column family must exist") + call_with_storage_prefix_and_batch!(self, storage, _prefix, _batch, { + storage.cf_handle(ROOTS_CF_NAME) + }) + .expect("roots column family must exist") } /// Get metadata column family fn cf_meta(&self) -> &'db ColumnFamily { - self.storage - .cf_handle(META_CF_NAME) - .expect("meta column family must exist") + call_with_storage_prefix_and_batch!(self, storage, _prefix, _batch, { + storage.cf_handle(META_CF_NAME) + }) + .expect("meta column family must exist") } } impl<'db> StorageContext<'db> for PrefixedRocksDbStorageContext<'db> { type Batch = PrefixedMultiContextBatchPart; - type RawIterator = PrefixedRocksDbRawIterator>; + type RawIterator = PrefixedRocksDbRawIterator<'db, OptimisticTransactionDB, NonTransactionalDb>; fn put>( &self, @@ -96,14 +159,16 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbStorageContext<'db> { children_sizes: ChildrenSizesWithIsSumTree, cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.put( - make_prefixed_key(&self.prefix, key), - value.to_vec(), - children_sizes, - cost_info, - ); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.put( + make_prefixed_key(prefix, key), + value.to_vec(), + children_sizes, + cost_info, + ); + } + }); Ok(()).wrap_with_cost(OperationCost::default()) } @@ -113,13 +178,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbStorageContext<'db> { value: &[u8], cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.put_aux( - make_prefixed_key(&self.prefix, key), - value.to_vec(), - cost_info, - ); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.put_aux(make_prefixed_key(prefix, key), value.to_vec(), cost_info); + } + }); Ok(()).wrap_with_cost(OperationCost::default()) } @@ -129,13 +192,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbStorageContext<'db> { value: &[u8], cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.put_root( - make_prefixed_key(&self.prefix, key), - value.to_vec(), - cost_info, - ); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.put_root(make_prefixed_key(prefix, key), value.to_vec(), cost_info); + } + }); Ok(()).wrap_with_cost(OperationCost::default()) } @@ -145,13 +206,12 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbStorageContext<'db> { value: &[u8], cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.put_meta( - make_prefixed_key(&self.prefix, key), - value.to_vec(), - cost_info, - ); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.put_meta(make_prefixed_key(prefix, key), value.to_vec(), cost_info); + } + }); + Ok(()).wrap_with_cost(OperationCost::default()) } @@ -160,9 +220,12 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbStorageContext<'db> { key: K, cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.delete(make_prefixed_key(&self.prefix, key), cost_info); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.delete(make_prefixed_key(prefix, key), cost_info); + } + }); + Ok(()).wrap_with_cost(OperationCost::default()) } @@ -171,9 +234,12 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbStorageContext<'db> { key: K, cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.delete_aux(make_prefixed_key(&self.prefix, key), cost_info); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.delete_aux(make_prefixed_key(prefix, key), cost_info); + } + }); + Ok(()).wrap_with_cost(OperationCost::default()) } @@ -182,9 +248,12 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbStorageContext<'db> { key: K, cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.delete_root(make_prefixed_key(&self.prefix, key), cost_info); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.delete_root(make_prefixed_key(prefix, key), cost_info); + } + }); + Ok(()).wrap_with_cost(OperationCost::default()) } @@ -193,94 +262,109 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbStorageContext<'db> { key: K, cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.delete_meta(make_prefixed_key(&self.prefix, key), cost_info); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.delete_meta(make_prefixed_key(prefix, key), cost_info); + } + }); + Ok(()).wrap_with_cost(OperationCost::default()) } fn get>(&self, key: K) -> CostResult>, Error> { - self.storage - .get(make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_fn_cost(|value| OperationCost { - seek_count: 1, - storage_loaded_bytes: value - .as_ref() - .ok() - .and_then(Option::as_ref) - .map(|x| x.len() as u32) - .unwrap_or(0), - ..Default::default() - }) + call_with_storage_prefix_and_batch!(self, storage, prefix, _batch, { + storage.get(make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_fn_cost(|value| OperationCost { + seek_count: 1, + storage_loaded_bytes: value + .as_ref() + .ok() + .and_then(Option::as_ref) + .map(|x| x.len() as u32) + .unwrap_or(0), + ..Default::default() + }) } fn get_aux>(&self, key: K) -> CostResult>, Error> { - self.storage - .get_cf(self.cf_aux(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_fn_cost(|value| OperationCost { - seek_count: 1, - storage_loaded_bytes: value - .as_ref() - .ok() - .and_then(Option::as_ref) - .map(|x| x.len() as u32) - .unwrap_or(0), - ..Default::default() - }) + call_with_storage_prefix_and_batch!(self, storage, prefix, _batch, { + storage.get_cf(self.cf_aux(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_fn_cost(|value| OperationCost { + seek_count: 1, + storage_loaded_bytes: value + .as_ref() + .ok() + .and_then(Option::as_ref) + .map(|x| x.len() as u32) + .unwrap_or(0), + ..Default::default() + }) } fn get_root>(&self, key: K) -> CostResult>, Error> { - self.storage - .get_cf(self.cf_roots(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_fn_cost(|value| OperationCost { - seek_count: 1, - storage_loaded_bytes: value - .as_ref() - .ok() - .and_then(Option::as_ref) - .map(|x| x.len() as u32) - .unwrap_or(0), - ..Default::default() - }) + call_with_storage_prefix_and_batch!(self, storage, prefix, _batch, { + storage.get_cf(self.cf_roots(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_fn_cost(|value| OperationCost { + seek_count: 1, + storage_loaded_bytes: value + .as_ref() + .ok() + .and_then(Option::as_ref) + .map(|x| x.len() as u32) + .unwrap_or(0), + ..Default::default() + }) } fn get_meta>(&self, key: K) -> CostResult>, Error> { - self.storage - .get_cf(self.cf_meta(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_fn_cost(|value| OperationCost { - seek_count: 1, - storage_loaded_bytes: value - .as_ref() - .ok() - .and_then(Option::as_ref) - .map(|x| x.len() as u32) - .unwrap_or(0), - ..Default::default() - }) + call_with_storage_prefix_and_batch!(self, storage, prefix, _batch, { + storage.get_cf(self.cf_meta(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_fn_cost(|value| OperationCost { + seek_count: 1, + storage_loaded_bytes: value + .as_ref() + .ok() + .and_then(Option::as_ref) + .map(|x| x.len() as u32) + .unwrap_or(0), + ..Default::default() + }) } fn new_batch(&self) -> Self::Batch { - PrefixedMultiContextBatchPart { - prefix: self.prefix, - batch: StorageBatch::new(), - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, _batch, { + PrefixedMultiContextBatchPart { + prefix: *prefix, + batch: StorageBatch::new(), + } + }) } fn commit_batch(&self, batch: Self::Batch) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.merge(batch.batch); - } + call_with_storage_prefix_and_batch!(self, _storage, _prefix, self_batch, { + if let Some(existing_batch) = self_batch { + existing_batch.merge(batch.batch); + } + }); Ok(()).wrap_with_cost(OperationCost::default()) } fn raw_iter(&self) -> Self::RawIterator { - PrefixedRocksDbRawIterator { - prefix: self.prefix, - raw_iterator: self.storage.raw_iterator(), + match self { + PrefixedRocksDbStorageContext::Primary(context) => { + Self::RawIterator::new_primary(context.prefix, context.storage.raw_iterator()) + } + PrefixedRocksDbStorageContext::Secondary(context) => { + Self::RawIterator::new_secondary(context.prefix, context.storage.raw_iterator()) + } } } } diff --git a/storage/src/rocksdb_storage/storage_context/context_tx.rs b/storage/src/rocksdb_storage/storage_context/context_tx.rs index d5a480c38..fe3eee78b 100644 --- a/storage/src/rocksdb_storage/storage_context/context_tx.rs +++ b/storage/src/rocksdb_storage/storage_context/context_tx.rs @@ -33,39 +33,108 @@ use grovedb_costs::{ cost_return_on_error, storage_cost::key_value_cost::KeyValueStorageCost, ChildrenSizesWithIsSumTree, CostResult, CostsExt, OperationCost, }; -use rocksdb::{ColumnFamily, DBRawIteratorWithThreadMode}; +use rocksdb::{ColumnFamily, OptimisticTransactionDB}; use super::{batch::PrefixedMultiContextBatchPart, make_prefixed_key, PrefixedRocksDbRawIterator}; +use crate::rocksdb_storage::storage::NonTransactionalDb; +use crate::rocksdb_storage::storage_context::context_no_tx::PrefixedSecondaryRocksDbStorageContext; use crate::{ error, error::Error::RocksDBError, - rocksdb_storage::storage::{Db, SubtreePrefix, Tx, AUX_CF_NAME, META_CF_NAME, ROOTS_CF_NAME}, + rocksdb_storage::storage::{SubtreePrefix, Tx, AUX_CF_NAME, META_CF_NAME, ROOTS_CF_NAME}, RawIterator, StorageBatch, StorageContext, }; /// Storage context with a prefix applied to be used in a subtree to be used in /// transaction. -pub struct PrefixedRocksDbTransactionContext<'db> { - storage: &'db Db, +pub struct PrefixedPrimaryRocksDbTransactionContext<'db> { + storage: &'db OptimisticTransactionDB, transaction: &'db Tx<'db>, prefix: SubtreePrefix, batch: Option<&'db StorageBatch>, } +/// Prefixed rocks db transaction context +pub enum PrefixedRocksDbTransactionContext<'db> { + /// Primary storage context + Primary(PrefixedPrimaryRocksDbTransactionContext<'db>), + /// Secondary storage context + Secondary(PrefixedSecondaryRocksDbStorageContext<'db>), +} + +macro_rules! call_with_storage_prefix_and_batch { + ($self:ident, $storage:ident, $prefix:ident, $batch:ident, $code:block) => { + match $self { + PrefixedRocksDbTransactionContext::Primary(context) => { + let $storage = context.storage; + let $prefix = &context.prefix; + let $batch = context.batch; + + $code + } + PrefixedRocksDbTransactionContext::Secondary(context) => { + let $storage = context.storage; + let $prefix = &context.prefix; + let $batch = context.batch; + + $code + } + } + }; +} + +macro_rules! call_with_storage_or_transaction_prefix_and_batch { + ($self:ident, $storage:ident, $prefix:ident, $batch:ident, $code:expr) => { + match $self { + PrefixedRocksDbTransactionContext::Primary(context) => { + let $storage = context.transaction; + let $prefix = &context.prefix; + let $batch = context.batch; + + $code + } + PrefixedRocksDbTransactionContext::Secondary(context) => { + let $storage = context.storage; + let $prefix = &context.prefix; + let $batch = context.batch; + + $code + } + } + }; +} + impl<'db> PrefixedRocksDbTransactionContext<'db> { - /// Create a new prefixed transaction context instance - pub fn new( - storage: &'db Db, + /// Create a new prefixed context instance + pub fn new_primary( + storage: &'db OptimisticTransactionDB, transaction: &'db Tx<'db>, prefix: SubtreePrefix, batch: Option<&'db StorageBatch>, ) -> Self { - PrefixedRocksDbTransactionContext { + let context = PrefixedPrimaryRocksDbTransactionContext { storage, transaction, prefix, batch, - } + }; + + Self::Primary(context) + } + + /// Create a new prefixed context instance + pub fn new_secondary( + storage: &'db NonTransactionalDb, + prefix: SubtreePrefix, + batch: Option<&'db StorageBatch>, + ) -> Self { + let context = PrefixedSecondaryRocksDbStorageContext { + storage, + prefix, + batch, + }; + + Self::Secondary(context) } /// Clears all the data in the tree at the storage level @@ -92,29 +161,32 @@ impl<'db> PrefixedRocksDbTransactionContext<'db> { impl<'db> PrefixedRocksDbTransactionContext<'db> { /// Get auxiliary data column family fn cf_aux(&self) -> &'db ColumnFamily { - self.storage - .cf_handle(AUX_CF_NAME) - .expect("aux column family must exist") + call_with_storage_prefix_and_batch!(self, storage, _prefix, _batch, { + storage.cf_handle(AUX_CF_NAME) + }) + .expect("aux column family must exist") } /// Get trees roots data column family fn cf_roots(&self) -> &'db ColumnFamily { - self.storage - .cf_handle(ROOTS_CF_NAME) - .expect("roots column family must exist") + call_with_storage_prefix_and_batch!(self, storage, _prefix, _batch, { + storage.cf_handle(ROOTS_CF_NAME) + }) + .expect("roots column family must exist") } /// Get metadata column family fn cf_meta(&self) -> &'db ColumnFamily { - self.storage - .cf_handle(META_CF_NAME) - .expect("meta column family must exist") + call_with_storage_prefix_and_batch!(self, storage, _prefix, _batch, { + storage.cf_handle(META_CF_NAME) + }) + .expect("meta column family must exist") } } impl<'db> StorageContext<'db> for PrefixedRocksDbTransactionContext<'db> { type Batch = PrefixedMultiContextBatchPart; - type RawIterator = PrefixedRocksDbRawIterator>>; + type RawIterator = PrefixedRocksDbRawIterator<'db, Tx<'db>, NonTransactionalDb>; fn put>( &self, @@ -123,14 +195,17 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbTransactionContext<'db> { children_sizes: ChildrenSizesWithIsSumTree, cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.put( - make_prefixed_key(&self.prefix, key), - value.to_vec(), - children_sizes, - cost_info, - ); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch.as_ref() { + existing_batch.put( + make_prefixed_key(prefix, key), + value.to_vec(), + children_sizes, + cost_info, + ); + } + }); + Ok(()).wrap_with_cost(OperationCost::default()) } @@ -140,13 +215,12 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbTransactionContext<'db> { value: &[u8], cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.put_aux( - make_prefixed_key(&self.prefix, key), - value.to_vec(), - cost_info, - ); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch.as_ref() { + existing_batch.put_aux(make_prefixed_key(prefix, key), value.to_vec(), cost_info); + } + }); + Ok(()).wrap_with_cost(OperationCost::default()) } @@ -156,13 +230,12 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbTransactionContext<'db> { value: &[u8], cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.put_root( - make_prefixed_key(&self.prefix, key), - value.to_vec(), - cost_info, - ); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.put_root(make_prefixed_key(prefix, key), value.to_vec(), cost_info); + } + }); + Ok(()).wrap_with_cost(OperationCost::default()) } @@ -172,13 +245,12 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbTransactionContext<'db> { value: &[u8], cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.put_meta( - make_prefixed_key(&self.prefix, key), - value.to_vec(), - cost_info, - ); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.put_meta(make_prefixed_key(prefix, key), value.to_vec(), cost_info); + } + }); + Ok(()).wrap_with_cost(OperationCost::default()) } @@ -187,9 +259,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbTransactionContext<'db> { key: K, cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.delete(make_prefixed_key(&self.prefix, key), cost_info); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.delete(make_prefixed_key(prefix, key), cost_info); + } + }); Ok(()).wrap_with_cost(OperationCost::default()) } @@ -199,9 +273,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbTransactionContext<'db> { key: K, cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.delete_aux(make_prefixed_key(&self.prefix, key), cost_info); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.delete_aux(make_prefixed_key(prefix, key), cost_info); + } + }); Ok(()).wrap_with_cost(OperationCost::default()) } @@ -211,9 +287,11 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbTransactionContext<'db> { key: K, cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.delete_root(make_prefixed_key(&self.prefix, key), cost_info); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.delete_root(make_prefixed_key(prefix, key), cost_info); + } + }); Ok(()).wrap_with_cost(OperationCost::default()) } @@ -223,96 +301,110 @@ impl<'db> StorageContext<'db> for PrefixedRocksDbTransactionContext<'db> { key: K, cost_info: Option, ) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.delete_meta(make_prefixed_key(&self.prefix, key), cost_info); - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, batch, { + if let Some(existing_batch) = batch { + existing_batch.delete_meta(make_prefixed_key(prefix, key), cost_info); + } + }); Ok(()).wrap_with_cost(OperationCost::default()) } fn get>(&self, key: K) -> CostResult>, Error> { - self.transaction - .get(make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_fn_cost(|value| OperationCost { - seek_count: 1, - storage_loaded_bytes: value - .as_ref() - .ok() - .and_then(Option::as_ref) - .map(|x| x.len() as u32) - .unwrap_or(0), - ..Default::default() - }) + call_with_storage_or_transaction_prefix_and_batch!(self, storage, prefix, _batch, { + storage.get(make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_fn_cost(|value| OperationCost { + seek_count: 1, + storage_loaded_bytes: value + .as_ref() + .ok() + .and_then(Option::as_ref) + .map(|x| x.len() as u32) + .unwrap_or(0), + ..Default::default() + }) } fn get_aux>(&self, key: K) -> CostResult>, Error> { - self.transaction - .get_cf(self.cf_aux(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_fn_cost(|value| OperationCost { - seek_count: 1, - storage_loaded_bytes: value - .as_ref() - .ok() - .and_then(Option::as_ref) - .map(|x| x.len() as u32) - .unwrap_or(0), - ..Default::default() - }) + call_with_storage_or_transaction_prefix_and_batch!(self, storage, prefix, _batch, { + storage.get_cf(self.cf_aux(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_fn_cost(|value| OperationCost { + seek_count: 1, + storage_loaded_bytes: value + .as_ref() + .ok() + .and_then(Option::as_ref) + .map(|x| x.len() as u32) + .unwrap_or(0), + ..Default::default() + }) } fn get_root>(&self, key: K) -> CostResult>, Error> { - self.transaction - .get_cf(self.cf_roots(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_fn_cost(|value| OperationCost { - seek_count: 1, - storage_loaded_bytes: value - .as_ref() - .ok() - .and_then(Option::as_ref) - .map(|x| x.len() as u32) - .unwrap_or(0), - ..Default::default() - }) + call_with_storage_or_transaction_prefix_and_batch!(self, storage, prefix, _batch, { + storage.get_cf(self.cf_roots(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_fn_cost(|value| OperationCost { + seek_count: 1, + storage_loaded_bytes: value + .as_ref() + .ok() + .and_then(Option::as_ref) + .map(|x| x.len() as u32) + .unwrap_or(0), + ..Default::default() + }) } fn get_meta>(&self, key: K) -> CostResult>, Error> { - self.transaction - .get_cf(self.cf_meta(), make_prefixed_key(&self.prefix, key)) - .map_err(RocksDBError) - .wrap_fn_cost(|value| OperationCost { - seek_count: 1, - storage_loaded_bytes: value - .as_ref() - .ok() - .and_then(Option::as_ref) - .map(|x| x.len() as u32) - .unwrap_or(0), - ..Default::default() - }) + call_with_storage_or_transaction_prefix_and_batch!(self, storage, prefix, _batch, { + storage.get_cf(self.cf_meta(), make_prefixed_key(prefix, key)) + }) + .map_err(RocksDBError) + .wrap_fn_cost(|value| OperationCost { + seek_count: 1, + storage_loaded_bytes: value + .as_ref() + .ok() + .and_then(Option::as_ref) + .map(|x| x.len() as u32) + .unwrap_or(0), + ..Default::default() + }) } fn new_batch(&self) -> Self::Batch { - PrefixedMultiContextBatchPart { - prefix: self.prefix, - batch: StorageBatch::new(), - } + call_with_storage_prefix_and_batch!(self, _storage, prefix, _batch, { + PrefixedMultiContextBatchPart { + prefix: *prefix, + batch: StorageBatch::new(), + } + }) } fn commit_batch(&self, batch: Self::Batch) -> CostResult<(), Error> { - if let Some(existing_batch) = self.batch { - existing_batch.merge(batch.batch); - } + call_with_storage_prefix_and_batch!(self, _storage, _prefix, self_batch, { + if let Some(existing_batch) = self_batch { + existing_batch.merge(batch.batch); + } + }); Ok(()).wrap_with_cost(OperationCost::default()) } fn raw_iter(&self) -> Self::RawIterator { - PrefixedRocksDbRawIterator { - prefix: self.prefix, - raw_iterator: self.transaction.raw_iterator(), + match self { + PrefixedRocksDbTransactionContext::Primary(context) => { + Self::RawIterator::new_primary(context.prefix, context.transaction.raw_iterator()) + } + PrefixedRocksDbTransactionContext::Secondary(context) => { + Self::RawIterator::new_secondary(context.prefix, context.storage.raw_iterator()) + } } } } diff --git a/storage/src/rocksdb_storage/storage_context/raw_iterator.rs b/storage/src/rocksdb_storage/storage_context/raw_iterator.rs index a9d6cf4fe..709ed7935 100644 --- a/storage/src/rocksdb_storage/storage_context/raw_iterator.rs +++ b/storage/src/rocksdb_storage/storage_context/raw_iterator.rs @@ -29,181 +29,151 @@ //! Prefixed storage_cost raw iterator implementation for RocksDB backend. use grovedb_costs::{CostContext, CostsExt, OperationCost}; -use rocksdb::DBRawIteratorWithThreadMode; +use rocksdb::{DBAccess, DBRawIteratorWithThreadMode}; use super::make_prefixed_key; -use crate::{ - rocksdb_storage::storage::{Db, SubtreePrefix, Tx}, - RawIterator, -}; +use crate::{rocksdb_storage::storage::SubtreePrefix, RawIterator}; /// 256 bytes for the key and 32 bytes for the prefix const MAX_PREFIXED_KEY_LENGTH: u32 = 256 + 32; /// Raw iterator over prefixed storage_cost. -pub struct PrefixedRocksDbRawIterator { +pub struct PrefixedPrimaryRocksDbRawIterator<'db, D: DBAccess> { pub(super) prefix: SubtreePrefix, - pub(super) raw_iterator: I, + pub(super) raw_iterator: DBRawIteratorWithThreadMode<'db, D>, } -impl<'a> RawIterator for PrefixedRocksDbRawIterator> { - fn seek_to_first(&mut self) -> CostContext<()> { - self.raw_iterator.seek(self.prefix); - ().wrap_with_cost(OperationCost::with_seek_count(1)) - } - - fn seek_to_last(&mut self) -> CostContext<()> { - let mut prefix_vec = self.prefix.to_vec(); - for i in (0..prefix_vec.len()).rev() { - prefix_vec[i] = prefix_vec[i].wrapping_add(1); - if prefix_vec[i] != 0 { - // if it is == 0 then we need to go to next bit - break; - } - } - self.raw_iterator.seek_for_prev(prefix_vec); - ().wrap_with_cost(OperationCost::with_seek_count(1)) - } +// TODO: Why not just use the same structure? - fn seek>(&mut self, key: K) -> CostContext<()> { - self.raw_iterator.seek(make_prefixed_key(&self.prefix, key)); - ().wrap_with_cost(OperationCost::with_seek_count(1)) - } - - fn seek_for_prev>(&mut self, key: K) -> CostContext<()> { - self.raw_iterator - .seek_for_prev(make_prefixed_key(&self.prefix, key)); - ().wrap_with_cost(OperationCost::with_seek_count(1)) - } - - fn next(&mut self) -> CostContext<()> { - self.raw_iterator.next(); - ().wrap_with_cost(OperationCost::with_seek_count(1)) - } +/// Raw iterator over prefixed storage_cost. +pub struct PrefixedSecondaryRocksDbRawIterator<'db, D: DBAccess> { + pub(super) prefix: SubtreePrefix, + pub(super) raw_iterator: DBRawIteratorWithThreadMode<'db, D>, +} - fn prev(&mut self) -> CostContext<()> { - self.raw_iterator.prev(); - ().wrap_with_cost(OperationCost::with_seek_count(1)) - } +/// Raw iterator over prefixed storage_cost. +pub enum PrefixedRocksDbRawIterator<'db, PD: DBAccess, SD: DBAccess> { + /// Primary iterator + Primary(PrefixedPrimaryRocksDbRawIterator<'db, PD>), + /// Secondary iterator + Secondary(PrefixedSecondaryRocksDbRawIterator<'db, SD>), +} - fn value(&self) -> CostContext> { - let mut cost = OperationCost::default(); +macro_rules! call_with_mut_raw_interator_and_prefix { + ($self:ident, $raw_iterator:ident, $prefix:ident, $code:block) => { + match $self { + PrefixedRocksDbRawIterator::Primary(ref mut iterator) => { + let $raw_iterator = &mut iterator.raw_iterator; + let $prefix = &iterator.prefix; - let value = if self.valid().unwrap_add_cost(&mut cost) { - self.raw_iterator.value().map(|v| { - cost.storage_loaded_bytes += v.len() as u32; - v - }) - } else { - None - }; + $code + } + PrefixedRocksDbRawIterator::Secondary(ref mut iterator) => { + let $raw_iterator = &mut iterator.raw_iterator; + let $prefix = &iterator.prefix; - value.wrap_with_cost(cost) - } + $code + } + } + }; +} - fn key(&self) -> CostContext> { - let mut cost = OperationCost::default(); +macro_rules! call_with_raw_interator_and_prefix { + ($self:ident, $raw_iterator:ident, $prefix:ident, $code:block) => { + match $self { + PrefixedRocksDbRawIterator::Primary(iterator) => { + let $raw_iterator = &iterator.raw_iterator; + let $prefix = &iterator.prefix; - let value = match self.raw_iterator.key() { - Some(k) => { - // Even if we truncate prefix, loaded cost should be maximum for the whole - // function - if k.starts_with(&self.prefix) { - cost.storage_loaded_bytes += k.len() as u32; - Some(k.split_at(self.prefix.len()).1) - } else { - // we can think of the underlying storage layer as stacked blocks - // and a block is a collection of key value pairs with the - // same prefix. - // if we are at the last key in a block and we try to - // check for the next key, we should not add the next block's first key - // len() as that will make cost depend on the ordering of blocks. - // instead we should add a fixed sized cost for such boundary checks - cost.storage_loaded_bytes += MAX_PREFIXED_KEY_LENGTH; - None - } + $code } - None => { - // if we are at the last key in the last block we should also add - // a fixed sized cost rather than nothing, as a change in block ordering - // could move the last block to some other position. - cost.storage_loaded_bytes += MAX_PREFIXED_KEY_LENGTH; - None + PrefixedRocksDbRawIterator::Secondary(iterator) => { + let $raw_iterator = &iterator.raw_iterator; + let $prefix = &iterator.prefix; + + $code } - }; + } + }; +} - value.wrap_with_cost(cost) +impl<'db, PD: DBAccess, SD: DBAccess> PrefixedRocksDbRawIterator<'db, PD, SD> { + /// Create new primary iterator + pub fn new_primary( + prefix: SubtreePrefix, + raw_iterator: DBRawIteratorWithThreadMode<'db, PD>, + ) -> Self { + PrefixedRocksDbRawIterator::Primary(PrefixedPrimaryRocksDbRawIterator { + prefix, + raw_iterator, + }) } - fn valid(&self) -> CostContext { - let mut cost = OperationCost::default(); - - self.raw_iterator - .key() - .map(|k| { - if k.starts_with(&self.prefix) { - cost.storage_loaded_bytes += k.len() as u32; - true - } else { - // we can think of the underlying storage layer as stacked blocks - // and a block is a collection of key value pairs with the - // same prefix. - // if we are at the last key in a block and we try to - // check for the next key, we should not add the next block's first key - // len() as that will make cost depend on the ordering of blocks. - // instead we should add a fixed sized cost for such boundary checks - cost.storage_loaded_bytes += MAX_PREFIXED_KEY_LENGTH; - false - } - }) - .unwrap_or_else(|| { - // if we are at the last key in the last block we should also add - // a fixed sized cost rather than nothing, as a change in block ordering - // could move the last block to some other position. - cost.storage_loaded_bytes += MAX_PREFIXED_KEY_LENGTH; - false - }) - .wrap_with_cost(cost) + /// Create new secondary iterator + pub fn new_secondary( + prefix: SubtreePrefix, + raw_iterator: DBRawIteratorWithThreadMode<'db, SD>, + ) -> Self { + PrefixedRocksDbRawIterator::Secondary(PrefixedSecondaryRocksDbRawIterator { + prefix, + raw_iterator, + }) } } -impl<'a> RawIterator for PrefixedRocksDbRawIterator>> { +impl<'db, PD: DBAccess, SD: DBAccess> RawIterator for PrefixedRocksDbRawIterator<'db, PD, SD> { fn seek_to_first(&mut self) -> CostContext<()> { - self.raw_iterator.seek(self.prefix); + call_with_mut_raw_interator_and_prefix!(self, raw_iterator, prefix, { + raw_iterator.seek(prefix); + }); ().wrap_with_cost(OperationCost::with_seek_count(1)) } fn seek_to_last(&mut self) -> CostContext<()> { - let mut prefix_vec = self.prefix.to_vec(); - for i in (0..prefix_vec.len()).rev() { - prefix_vec[i] = prefix_vec[i].wrapping_add(1); - if prefix_vec[i] != 0 { - // if it is == 0 then we need to go to next bit - break; + call_with_mut_raw_interator_and_prefix!(self, raw_iterator, prefix, { + let mut prefix_vec = prefix.to_vec(); + for i in (0..prefix_vec.len()).rev() { + prefix_vec[i] = prefix_vec[i].wrapping_add(1); + if prefix_vec[i] != 0 { + // if it is == 0 then we need to go to next bit + break; + } } - } - self.raw_iterator.seek_for_prev(prefix_vec); + raw_iterator.seek_for_prev(prefix_vec); + }); + ().wrap_with_cost(OperationCost::with_seek_count(1)) } fn seek>(&mut self, key: K) -> CostContext<()> { - self.raw_iterator.seek(make_prefixed_key(&self.prefix, key)); + call_with_mut_raw_interator_and_prefix!(self, raw_iterator, prefix, { + raw_iterator.seek(make_prefixed_key(prefix, key)); + }); + ().wrap_with_cost(OperationCost::with_seek_count(1)) } fn seek_for_prev>(&mut self, key: K) -> CostContext<()> { - self.raw_iterator - .seek_for_prev(make_prefixed_key(&self.prefix, key)); + call_with_mut_raw_interator_and_prefix!(self, raw_iterator, prefix, { + raw_iterator.seek_for_prev(make_prefixed_key(prefix, key)); + }); + ().wrap_with_cost(OperationCost::with_seek_count(1)) } fn next(&mut self) -> CostContext<()> { - self.raw_iterator.next(); + call_with_mut_raw_interator_and_prefix!(self, raw_iterator, _prefix, { + raw_iterator.next(); + }); + ().wrap_with_cost(OperationCost::with_seek_count(1)) } fn prev(&mut self) -> CostContext<()> { - self.raw_iterator.prev(); + call_with_mut_raw_interator_and_prefix!(self, raw_iterator, _prefix, { + raw_iterator.prev(); + }); + ().wrap_with_cost(OperationCost::with_seek_count(1)) } @@ -211,9 +181,11 @@ impl<'a> RawIterator for PrefixedRocksDbRawIterator RawIterator for PrefixedRocksDbRawIterator CostContext> { let mut cost = OperationCost::default(); - let value = match self.raw_iterator.key() { - Some(k) => { - // Even if we truncate prefix, loaded cost should be maximum for the whole - // function - if k.starts_with(&self.prefix) { - cost.storage_loaded_bytes += k.len() as u32; - Some(k.split_at(self.prefix.len()).1) - } else { - // we can think of the underlying storage layer as stacked blocks - // and a block is a collection of key value pairs with the - // same prefix. - // if we are at the last key in a block and we try to - // check for the next key, we should not add the next block's first key - // len() as that will make cost depend on the ordering of blocks. - // instead we should add a fixed sized cost for such boundary checks + let value = call_with_raw_interator_and_prefix!(self, raw_iterator, prefix, { + match raw_iterator.key() { + Some(k) => { + // Even if we truncate prefix, loaded cost should be maximum for the whole + // function + if k.starts_with(prefix) { + cost.storage_loaded_bytes += k.len() as u32; + Some(k.split_at(prefix.len()).1) + } else { + // we can think of the underlying storage layer as stacked blocks + // and a block is a collection of key value pairs with the + // same prefix. + // if we are at the last key in a block and we try to + // check for the next key, we should not add the next block's first key + // len() as that will make cost depend on the ordering of blocks. + // instead we should add a fixed sized cost for such boundary checks + cost.storage_loaded_bytes += MAX_PREFIXED_KEY_LENGTH; + None + } + } + None => { + // if we are at the last key in the last block we should also add + // a fixed sized cost rather than nothing, as a change in block ordering + // could move the last block to some other position. cost.storage_loaded_bytes += MAX_PREFIXED_KEY_LENGTH; None } } - None => { - // if we are at the last key in the last block we should also add - // a fixed sized cost rather than nothing, as a change in block ordering - // could move the last block to some other position. - cost.storage_loaded_bytes += MAX_PREFIXED_KEY_LENGTH; - None - } - }; + }); value.wrap_with_cost(cost) } @@ -259,31 +233,34 @@ impl<'a> RawIterator for PrefixedRocksDbRawIterator CostContext { let mut cost = OperationCost::default(); - self.raw_iterator - .key() - .map(|k| { - if k.starts_with(&self.prefix) { - cost.storage_loaded_bytes += k.len() as u32; - true - } else { - // we can think of the underlying storage layer as stacked blocks - // and a block is a collection of key value pairs with the - // same prefix. - // if we are at the last key in a block and we try to - // check for the next key, we should not add the next block's first key - // len() as that will make cost depend on the ordering of blocks. - // instead we should add a fixed sized cost for such boundary checks + let value = call_with_raw_interator_and_prefix!(self, raw_iterator, prefix, { + raw_iterator + .key() + .map(|k| { + if k.starts_with(prefix) { + cost.storage_loaded_bytes += k.len() as u32; + true + } else { + // we can think of the underlying storage layer as stacked blocks + // and a block is a collection of key value pairs with the + // same prefix. + // if we are at the last key in a block and we try to + // check for the next key, we should not add the next block's first key + // len() as that will make cost depend on the ordering of blocks. + // instead we should add a fixed sized cost for such boundary checks + cost.storage_loaded_bytes += MAX_PREFIXED_KEY_LENGTH; + false + } + }) + .unwrap_or_else(|| { + // if we are at the last key in the last block we should also add + // a fixed sized cost rather than nothing, as a change in block ordering + // could move the last block to some other position. cost.storage_loaded_bytes += MAX_PREFIXED_KEY_LENGTH; false - } - }) - .unwrap_or_else(|| { - // if we are at the last key in the last block we should also add - // a fixed sized cost rather than nothing, as a change in block ordering - // could move the last block to some other position. - cost.storage_loaded_bytes += MAX_PREFIXED_KEY_LENGTH; - false - }) - .wrap_with_cost(cost) + }) + }); + + value.wrap_with_cost(cost) } } diff --git a/storage/src/rocksdb_storage/test_utils.rs b/storage/src/rocksdb_storage/test_utils.rs index 2f17b5d0c..d87fb21f2 100644 --- a/storage/src/rocksdb_storage/test_utils.rs +++ b/storage/src/rocksdb_storage/test_utils.rs @@ -28,7 +28,8 @@ //! Useful utilities for testing. -use std::{cell::Cell, ops::Deref}; +use std::cell::RefCell; +use std::ops::Deref; use tempfile::TempDir; @@ -36,7 +37,7 @@ use super::*; /// RocksDb storage with self-cleanup pub struct TempStorage { - dir: Cell, + dir: RefCell, storage: RocksDbStorage, } @@ -44,14 +45,24 @@ impl TempStorage { /// Create new `TempStorage` pub fn new() -> Self { let dir = TempDir::new().expect("cannot create tempir"); - let storage = RocksDbStorage::default_rocksdb_with_path(dir.path()) + let storage = RocksDbStorage::default_primary_rocksdb(dir.path()) .expect("cannot open rocksdb storage"); TempStorage { - dir: Cell::new(dir), + dir: RefCell::new(dir), storage, } } + /// Create secondary storage + pub fn secondary(&self) -> RocksDbStorage { + let dir = TempDir::new().expect("cannot create tempir"); + + let primary_dir = self.dir.borrow(); + + RocksDbStorage::default_secondary_rocksdb(primary_dir.path(), dir.path()) + .expect("cannot open rocksdb storage") + } + /// Simulate storage crash pub fn crash(&self) { drop(