diff --git a/src/cache/cache_container.rs b/src/cache/cache_container.rs deleted file mode 100644 index d9e1036..0000000 --- a/src/cache/cache_container.rs +++ /dev/null @@ -1,995 +0,0 @@ -//! CacheContainer module responsible for correct traversal of cache layers. -use std::cmp::Ordering; -use std::collections::{btree_map, HashMap}; -use std::iter::{Peekable, Rev}; - -use crate::cache::cache_container::DataLocation::Snapshot; -use crate::cache::change_set::ChangeSet; -use crate::cache::SnapshotId; -use crate::iterator::ScanDirection; -use crate::schema::{KeyCodec, ValueCodec}; -use crate::{Operation, RawDbIter, ReadOnlyLock, Schema, SchemaKey, SchemaValue, DB}; - -/// Holds a collection of [`ChangeSet`]'s associated with particular Snapshot -/// and knows how to traverse them. -/// Managed externally. -/// Should be managed carefully, because discrepancy between `snapshots` and `to_parent` leads to panic -/// Ideally owner of writable reference to parent nad owner of cache container manages both correctly. -#[derive(Debug)] -pub struct CacheContainer { - db: DB, - /// Set of [`ChangeSet`]s of data per individual database per snapshot - snapshots: HashMap, - /// Hierarchical - /// Shared between all CacheContainers and managed by StorageManager - to_parent: ReadOnlyLock>, -} - -impl CacheContainer { - /// Create CacheContainer pointing go given DB and Snapshot ID relations - pub fn new(db: DB, to_parent: ReadOnlyLock>) -> Self { - Self { - db, - snapshots: HashMap::new(), - to_parent, - } - } - - /// Create instance of snapshot manager, when it does not have connection to snapshots tree - /// So it only reads from database. - #[cfg(feature = "test-utils")] - pub fn orphan(db: DB) -> Self { - Self { - db, - snapshots: HashMap::new(), - to_parent: std::sync::Arc::new(std::sync::RwLock::new(Default::default())).into(), - } - } - - /// Adds Snapshot to the collection. - /// Please note that caller must update its own reference of `to_parent` - /// After adding snapshot. - /// Ideally these operations should be atomic - pub fn add_snapshot(&mut self, snapshot: ChangeSet) -> anyhow::Result<()> { - let snapshot_id = snapshot.id(); - if self.snapshots.contains_key(&snapshot_id) { - anyhow::bail!("Attempt to double save same snapshot id={}", snapshot_id); - } - self.snapshots.insert(snapshot_id, snapshot); - Ok(()) - } - - /// Removes snapshot from collection - /// This should happen **after** `to_parent` is updated - pub fn discard_snapshot(&mut self, snapshot_id: &SnapshotId) -> Option { - self.snapshots.remove(snapshot_id) - } - - /// Writes snapshot to the underlying database - /// Snapshot id should be removed from `to_parent` atomically. - pub fn commit_snapshot(&mut self, snapshot_id: &SnapshotId) -> anyhow::Result<()> { - let snapshot = self.snapshots.remove(snapshot_id).ok_or_else(|| { - anyhow::anyhow!("Attempt to commit unknown snapshot id={}", snapshot_id) - })?; - self.db.write_schemas(&snapshot.into()) - } - - /// Indicates, if CacheContainer has any snapshots in memory. - /// Does not mean that underlying database is empty. - pub fn is_empty(&self) -> bool { - self.snapshots.is_empty() - } - - /// Helper method to check if snapshot with given ID has been added already - pub fn contains_snapshot(&self, snapshot_id: &SnapshotId) -> bool { - self.snapshots.contains_key(snapshot_id) - } - - pub(crate) fn get( - &self, - mut snapshot_id: SnapshotId, - key: &impl KeyCodec, - ) -> anyhow::Result> { - while let Some(parent_snapshot_id) = self.to_parent.read().unwrap().get(&snapshot_id) { - let parent_snapshot = self - .snapshots - .get(parent_snapshot_id) - .expect("Inconsistency between `self.snapshots` and `self.to_parent`: snapshot is missing in container"); - - // Some operation has been found - if let Some(operation) = parent_snapshot.get(key)? { - return match operation { - Operation::Put { value } => Ok(Some(S::Value::decode_value(value)?)), - Operation::Delete => Ok(None), - }; - } - - snapshot_id = *parent_snapshot_id; - } - self.db.get(key) - } - - fn snapshot_iterators( - &self, - mut snapshot_id: SnapshotId, - ) -> Vec> { - let mut snapshot_iterators = vec![]; - let to_parent = self.to_parent.read().unwrap(); - while let Some(parent_snapshot_id) = to_parent.get(&snapshot_id) { - let parent_snapshot = self - .snapshots - .get(parent_snapshot_id) - .expect("Inconsistency between `self.snapshots` and `self.to_parent`"); - - snapshot_iterators.push(parent_snapshot.iter::()); - - snapshot_id = *parent_snapshot_id; - } - - snapshot_iterators.reverse(); - snapshot_iterators - } - - // Allow this unused method, for future use - #[allow(dead_code)] - pub(crate) fn iter( - &self, - snapshot_id: SnapshotId, - ) -> anyhow::Result>> { - let snapshot_iterators = self.snapshot_iterators::(snapshot_id); - let db_iter = self.db.raw_iter::(ScanDirection::Forward)?; - - Ok(CacheContainerIter::new( - db_iter, - snapshot_iterators, - ScanDirection::Forward, - )) - } - - /// Returns iterator over keys in given [`Schema`] among all snapshots and DB in reverse lexicographical order - pub(crate) fn rev_iter( - &self, - snapshot_id: SnapshotId, - ) -> anyhow::Result>>> { - let snapshot_iterators = self - .snapshot_iterators::(snapshot_id) - .into_iter() - .map(Iterator::rev) - .collect(); - let db_iter = self.db.raw_iter::(ScanDirection::Backward)?; - - Ok(CacheContainerIter::new( - db_iter, - snapshot_iterators, - ScanDirection::Backward, - )) - } - - pub(crate) fn iter_range( - &self, - mut snapshot_id: SnapshotId, - range: impl std::ops::RangeBounds + Clone, - ) -> anyhow::Result>> { - let mut snapshot_iterators = vec![]; - let to_parent = self.to_parent.read().unwrap(); - while let Some(parent_snapshot_id) = to_parent.get(&snapshot_id) { - let parent_snapshot = self - .snapshots - .get(parent_snapshot_id) - .expect("Inconsistency between `self.snapshots` and `self.to_parent`"); - - snapshot_iterators.push(parent_snapshot.iter_range::(range.clone())); - - snapshot_id = *parent_snapshot_id; - } - - snapshot_iterators.reverse(); - - let db_iter = self.db.raw_iter_range::(range, ScanDirection::Forward)?; - Ok(CacheContainerIter::new( - db_iter, - snapshot_iterators, - ScanDirection::Forward, - )) - } - - pub(crate) fn rev_iter_range( - &self, - mut snapshot_id: SnapshotId, - range: impl std::ops::RangeBounds + Clone, - ) -> anyhow::Result>>> { - let mut snapshot_iterators = vec![]; - let to_parent = self.to_parent.read().unwrap(); - while let Some(parent_snapshot_id) = to_parent.get(&snapshot_id) { - let parent_snapshot = self - .snapshots - .get(parent_snapshot_id) - .expect("Inconsistency between `self.snapshots` and `self.to_parent`"); - - snapshot_iterators.push(parent_snapshot.iter_range::(range.clone()).rev()); - snapshot_id = *parent_snapshot_id; - } - - snapshot_iterators.reverse(); - let db_iter = self - .db - .raw_iter_range::(range, ScanDirection::Backward)?; - - Ok(CacheContainerIter::new( - db_iter, - snapshot_iterators, - ScanDirection::Backward, - )) - } -} - -/// [`Iterator`] over keys in given [`Schema`] in all snapshots in reverse -/// lexicographical order. -pub(crate) struct CacheContainerIter<'a, SnapshotIter> -where - SnapshotIter: Iterator, -{ - db_iter: Peekable>, - snapshot_iterators: Vec>, - next_value_locations: Vec, - direction: ScanDirection, -} - -impl<'a, SnapshotIter> CacheContainerIter<'a, SnapshotIter> -where - SnapshotIter: Iterator, -{ - fn new( - db_iter: RawDbIter<'a>, - snapshot_iterators: Vec, - direction: ScanDirection, - ) -> Self { - let max_values_size = snapshot_iterators.len().checked_add(1).unwrap_or_default(); - Self { - db_iter: db_iter.peekable(), - snapshot_iterators: snapshot_iterators - .into_iter() - .map(|iter| iter.peekable()) - .collect(), - next_value_locations: Vec::with_capacity(max_values_size), - direction, - } - } -} - -#[derive(Debug)] -enum DataLocation { - Db, - // Index inside `snapshot_iterators` - Snapshot(usize), -} - -impl<'a, SnapshotIter> Iterator for CacheContainerIter<'a, SnapshotIter> -where - SnapshotIter: Iterator, -{ - type Item = (SchemaKey, SchemaValue); - - fn next(&mut self) -> Option { - loop { - let mut next_value: Option<&SchemaKey> = None; - self.next_value_locations.clear(); - if let Some((db_key, _)) = self.db_iter.peek() { - self.next_value_locations.push(DataLocation::Db); - next_value = Some(db_key); - }; - - for (idx, iter) in self.snapshot_iterators.iter_mut().enumerate() { - if let Some(&(peeked_key, _)) = iter.peek() { - match next_value { - None => { - self.next_value_locations.push(Snapshot(idx)); - next_value = Some(peeked_key); - } - Some(next_key) => match (&self.direction, peeked_key.cmp(next_key)) { - (ScanDirection::Backward, Ordering::Greater) - | (ScanDirection::Forward, Ordering::Less) => { - next_value = Some(peeked_key); - self.next_value_locations.clear(); - self.next_value_locations.push(Snapshot(idx)); - } - (_, Ordering::Equal) => { - self.next_value_locations.push(Snapshot(idx)); - } - _ => {} - }, - }; - } - } - - // Rightmost location is from the latest snapshot, so it has priority over all past snapshot/DB - if let Some(latest_next_location) = self.next_value_locations.pop() { - // Move all iterators to next value - for location in &self.next_value_locations { - match location { - DataLocation::Db => { - let _ = self.db_iter.next().unwrap(); - } - Snapshot(idx) => { - let _ = self.snapshot_iterators[*idx].next().unwrap(); - } - } - } - - // Handle next value - match latest_next_location { - DataLocation::Db => { - let (key, value) = self.db_iter.next().unwrap(); - return Some((key, value)); - } - Snapshot(idx) => { - let (key, operation) = self.snapshot_iterators[idx].next().unwrap(); - match operation { - Operation::Put { value } => { - return Some((key.to_vec(), value.to_vec())) - } - Operation::Delete => continue, - } - } - }; - } else { - break; - } - } - - None - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::sync::{Arc, RwLock, RwLockReadGuard}; - - use proptest::prelude::*; - - use crate::cache::cache_container::{CacheContainer, CacheContainerIter}; - use crate::cache::cache_db::CacheDb; - use crate::iterator::ScanDirection; - use crate::schema::{KeyDecoder, KeyEncoder, Schema, ValueCodec}; - use crate::test::TestField; - use crate::{define_schema, Operation, SchemaBatch, SchemaKey, SchemaValue, DB}; - - const DUMMY_STATE_CF: &str = "DummyStateCF"; - - define_schema!(DummyStateSchema, TestField, TestField, DUMMY_STATE_CF); - type S = DummyStateSchema; - - fn create_test_db(path: &std::path::Path) -> DB { - let tables = vec![DUMMY_STATE_CF.to_string()]; - let mut db_opts = rocksdb::Options::default(); - db_opts.create_if_missing(true); - db_opts.create_missing_column_families(true); - DB::open(path, "test_db", tables, &db_opts).unwrap() - } - - #[test] - fn test_empty() { - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let snapshot_manager = - CacheContainer::new(db, Arc::new(RwLock::new(HashMap::new())).into()); - assert!(snapshot_manager.is_empty()); - } - - #[test] - fn test_add_and_discard_snapshot() { - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); - - let snapshot_id = 1; - let db_snapshot = CacheDb::new(snapshot_id, snapshot_manager.clone().into()); - - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - assert!(!snapshot_manager.is_empty()); - snapshot_manager.discard_snapshot(&snapshot_id); - assert!(snapshot_manager.is_empty()); - } - } - - #[test] - #[should_panic(expected = "Attempt to double save same snapshot")] - fn test_add_twice() { - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); - - let snapshot_id = 1; - // Both share the same ID - let db_snapshot_1 = CacheDb::new(snapshot_id, snapshot_manager.clone().into()); - let db_snapshot_2 = CacheDb::new(snapshot_id, snapshot_manager.clone().into()); - - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot_1.into()).unwrap(); - assert!(!snapshot_manager.is_empty()); - snapshot_manager.add_snapshot(db_snapshot_2.into()).unwrap(); - } - } - - #[test] - #[should_panic(expected = "Attempt to commit unknown snapshot")] - fn test_commit_unknown() { - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - let mut cache_container = CacheContainer::new(db, to_parent.into()); - - cache_container.commit_snapshot(&1).unwrap(); - } - - #[test] - fn test_discard_unknown() { - // Discarding unknown snapshots are fine. - // As it possible that caller didn't save it previously. - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - let mut cache_container = CacheContainer::new(db, to_parent.into()); - - cache_container.discard_snapshot(&1); - } - - #[test] - fn test_commit_snapshot() { - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); - - let snapshot_id = 1; - let db_snapshot = CacheDb::new(snapshot_id, snapshot_manager.clone().into()); - - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - let result = snapshot_manager.commit_snapshot(&snapshot_id); - assert!(result.is_ok()); - assert!(snapshot_manager.is_empty()); - } - } - - #[test] - fn test_query_unknown_snapshot_id() { - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - let snapshot_manager = CacheContainer::new(db, to_parent.into()); - assert_eq!(None, snapshot_manager.get::(1, &TestField(1)).unwrap()); - } - - #[test] - fn test_query_genesis_snapshot() { - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - - let one = TestField(1); - let two = TestField(2); - let three = TestField(3); - - let mut db_data = SchemaBatch::new(); - db_data.put::(&one, &one).unwrap(); - db_data.put::(&three, &three).unwrap(); - db.write_schemas(&db_data).unwrap(); - - let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); - - let db_snapshot = CacheDb::new(1, snapshot_manager.clone().into()); - db_snapshot.put::(&two, &two).unwrap(); - db_snapshot.delete::(&three).unwrap(); - - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - - // Effectively querying database: - assert_eq!(Some(one), snapshot_manager.get::(1, &one).unwrap()); - assert_eq!(None, snapshot_manager.get::(1, &two).unwrap()); - assert_eq!(Some(three), snapshot_manager.get::(1, &three).unwrap()); - } - } - - #[test] - fn test_query_lifecycle() { - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - { - // / -> 6 -> 7 - // DB -> 1 -> 2 -> 3 - // \ -> 4 -> 5 - let mut edit = to_parent.write().unwrap(); - edit.insert(3, 2); - edit.insert(2, 1); - edit.insert(4, 1); - edit.insert(5, 4); - edit.insert(6, 2); - edit.insert(7, 6); - } - - let f1 = TestField(1); - let f2 = TestField(2); - let f3 = TestField(3); - let f4 = TestField(4); - let f5 = TestField(5); - let f6 = TestField(6); - let f7 = TestField(7); - let f8 = TestField(8); - - let mut db_data = SchemaBatch::new(); - db_data.put::(&f1, &f1).unwrap(); - db.write_schemas(&db_data).unwrap(); - - let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); - - // Operations: - // | snapshot_id | key | operation | - // | DB | 1 | write(1) | - // | 1 | 2 | write(2) | - // | 1 | 3 | write(4) | - // | 2 | 1 | write(5) | - // | 2 | 2 | delete | - // | 4 | 3 | write(6) | - // | 6 | 1 | write(7) | - // | 6 | 2 | write(8) | - - // 1 - let db_snapshot = CacheDb::new(1, snapshot_manager.clone().into()); - db_snapshot.put::(&f2, &f2).unwrap(); - db_snapshot.put::(&f3, &f4).unwrap(); - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - } - - // 2 - let db_snapshot = CacheDb::new(2, snapshot_manager.clone().into()); - db_snapshot.put::(&f1, &f5).unwrap(); - db_snapshot.delete::(&f2).unwrap(); - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - } - - // 3 - let db_snapshot = CacheDb::new(3, snapshot_manager.clone().into()); - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - } - - // 4 - let db_snapshot = CacheDb::new(4, snapshot_manager.clone().into()); - db_snapshot.put::(&f3, &f6).unwrap(); - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - } - - // 5 - let db_snapshot = CacheDb::new(5, snapshot_manager.clone().into()); - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - } - - // 6 - let db_snapshot = CacheDb::new(6, snapshot_manager.clone().into()); - db_snapshot.put::(&f1, &f7).unwrap(); - db_snapshot.put::(&f2, &f8).unwrap(); - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - } - - // 7 - let db_snapshot = CacheDb::new(7, snapshot_manager.clone().into()); - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - } - - // View: - // | from s_id | key | value | - // | 3 | 1 | 5 | - // | 3 | 2 | None | - // | 3 | 3 | 4 | - // | 5 | 1 | 1 | - // | 5 | 2 | 2 | - // | 5 | 3 | 6 | - // | 7 | 1 | 7 | - // | 7 | 2 | 8 | - // | 7 | 3 | 4 | - let snapshot_manager = snapshot_manager.read().unwrap(); - assert_eq!(Some(f5), snapshot_manager.get::(3, &f1).unwrap()); - assert_eq!(None, snapshot_manager.get::(3, &f2).unwrap()); - assert_eq!(Some(f4), snapshot_manager.get::(3, &f3).unwrap()); - assert_eq!(Some(f1), snapshot_manager.get::(5, &f1).unwrap()); - assert_eq!(Some(f2), snapshot_manager.get::(5, &f2).unwrap()); - assert_eq!(Some(f6), snapshot_manager.get::(5, &f3).unwrap()); - - assert_eq!(Some(f7), snapshot_manager.get::(7, &f1).unwrap()); - assert_eq!(Some(f8), snapshot_manager.get::(7, &f2).unwrap()); - assert_eq!(Some(f4), snapshot_manager.get::(7, &f3).unwrap()); - } - - fn collect_actual_values<'a, I: Iterator>( - iterator: CacheContainerIter<'a, I>, - ) -> Vec<(TestField, TestField)> { - iterator - .into_iter() - .map(|(k, v)| { - let key = <::Key as KeyDecoder>::decode_key(&k).unwrap(); - let value = <::Value as ValueCodec>::decode_value(&v).unwrap(); - (key, value) - }) - .collect() - } - - fn encode_key(field: &TestField) -> SchemaKey { - >::encode_key(field).unwrap() - } - - #[test] - fn test_iterator() { - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - { - // DB -> 1 -> 2 -> 3 - let mut edit = to_parent.write().unwrap(); - edit.insert(2, 1); - edit.insert(3, 2); - edit.insert(4, 3); - } - - let f0 = TestField(0); - let f1 = TestField(1); - let f2 = TestField(2); - let f3 = TestField(3); - let f4 = TestField(4); - let f5 = TestField(5); - let f6 = TestField(6); - let f7 = TestField(7); - let f8 = TestField(8); - let f9 = TestField(9); - let f10 = TestField(10); - let f11 = TestField(11); - let f12 = TestField(12); - - let mut db_data = SchemaBatch::new(); - db_data.put::(&f3, &f9).unwrap(); - db_data.put::(&f2, &f1).unwrap(); - db_data.put::(&f4, &f1).unwrap(); - db_data.put::(&f0, &f1).unwrap(); - db_data.put::(&f11, &f9).unwrap(); - db.write_schemas(&db_data).unwrap(); - // DB Data - // | key | value | - // | 3 | 9 | - // | 2 | 1 | - // | 4 | 1 | - // | 0 | 1 | - // | 11 | 9 | - - let snapshot_manager = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); - - // Operations: - // | snapshot_id | key | operation | - // | 1 | 1 | write(8) | - // | 1 | 5 | write(7) | - // | 1 | 8 | write(3) | - // | 1 | 4 | write(2) | - // | 2 | 10 | write(2) | - // | 2 | 9 | write(4) | - // | 2 | 4 | delete | - // | 2 | 2 | write(6) | - // | 3 | 8 | write(6) | - // | 3 | 9 | delete | - // | 3 | 12 | write(1) | - // | 3 | 1 | write(2) | - - // 1 - let db_snapshot = CacheDb::new(1, snapshot_manager.clone().into()); - db_snapshot.put::(&f1, &f8).unwrap(); - db_snapshot.put::(&f5, &f7).unwrap(); - db_snapshot.put::(&f8, &f3).unwrap(); - db_snapshot.put::(&f4, &f2).unwrap(); - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - } - - // 2 - let db_snapshot = CacheDb::new(2, snapshot_manager.clone().into()); - db_snapshot.put::(&f10, &f2).unwrap(); - db_snapshot.put::(&f9, &f4).unwrap(); - db_snapshot.delete::(&f4).unwrap(); - db_snapshot.put::(&f2, &f6).unwrap(); - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - } - - // 3 - let db_snapshot = CacheDb::new(3, snapshot_manager.clone().into()); - db_snapshot.put::(&f8, &f6).unwrap(); - db_snapshot.delete::(&f9).unwrap(); - db_snapshot.put::(&f12, &f1).unwrap(); - db_snapshot.put::(&f1, &f2).unwrap(); - { - let mut snapshot_manager = snapshot_manager.write().unwrap(); - snapshot_manager.add_snapshot(db_snapshot.into()).unwrap(); - } - - // Expected Order - // | key | value | - // | 12 | 1 | - // | 10 | 2 | - // | 8 | 6 | - // | 5 | 7 | - // | 3 | 9 | - // | 2 | 6 | - // | 1 | 2 | - - let snapshot_manager = snapshot_manager.read().unwrap(); - - // Full iterator - { - let mut expected_fields = vec![ - (f0, f1), - (f1, f2), - (f2, f6), - (f3, f9), - (f5, f7), - (f8, f6), - (f10, f2), - (f11, f9), - (f12, f1), - ]; - - let forward_iterator = snapshot_manager.iter::(4).unwrap(); - let actual_fields = collect_actual_values(forward_iterator); - - assert_eq!( - expected_fields, actual_fields, - "forward iterator is incorrect" - ); - - expected_fields.reverse(); - - let backward_iterator = snapshot_manager.rev_iter::(4).unwrap(); - let actual_fields = collect_actual_values(backward_iterator); - assert_eq!( - expected_fields, actual_fields, - "backward iterator is incorrect" - ); - } - - // Range iterator - { - let lower_bound = encode_key(&f2); - let upper_bound = encode_key(&f10); - - // Full range - let mut expected_fields = vec![(f2, f6), (f3, f9), (f5, f7), (f8, f6)]; - - let forward_iterator = snapshot_manager - .iter_range::(4, lower_bound.clone()..upper_bound.clone()) - .unwrap(); - let actual_fields = collect_actual_values(forward_iterator); - assert_eq!( - expected_fields, actual_fields, - "forward full range iterator is incorrect" - ); - let backward_iterator = snapshot_manager - .rev_iter_range::(4, lower_bound.clone()..upper_bound.clone()) - .unwrap(); - let actual_fields = collect_actual_values(backward_iterator); - expected_fields.reverse(); - assert_eq!( - expected_fields, actual_fields, - "backward full range iterator is incorrect" - ); - - // Only lower bound - let mut expected_fields = vec![ - (f2, f6), - (f3, f9), - (f5, f7), - (f8, f6), - (f10, f2), - (f11, f9), - (f12, f1), - ]; - let forward_iterator = snapshot_manager - .iter_range::(4, lower_bound.clone()..) - .unwrap(); - let actual_fields = collect_actual_values(forward_iterator); - assert_eq!( - expected_fields, actual_fields, - "forward low range iterator is incorrect" - ); - let backward_iterator = snapshot_manager - .rev_iter_range::(4, lower_bound..) - .unwrap(); - let actual_fields = collect_actual_values(backward_iterator); - expected_fields.reverse(); - assert_eq!( - expected_fields, actual_fields, - "backward low range iterator is incorrect" - ); - - // Only upper bound - let mut expected_fields = - vec![(f0, f1), (f1, f2), (f2, f6), (f3, f9), (f5, f7), (f8, f6)]; - let forward_iterator = snapshot_manager - .iter_range::(4, ..upper_bound.clone()) - .unwrap(); - let actual_fields = collect_actual_values(forward_iterator); - assert_eq!( - expected_fields, actual_fields, - "forward high range iterator is incorrect" - ); - let backward_iterator = snapshot_manager - .rev_iter_range::(4, ..upper_bound) - .unwrap(); - let actual_fields = collect_actual_values(backward_iterator); - expected_fields.reverse(); - assert_eq!( - expected_fields, actual_fields, - "backward high range iterator is incorrect" - ); - } - } - - fn test_iterator_ranges(numbers: Vec) { - // Spreads all numbers into 10 changesets, where first 3 are saved to the database - // Each number should be above 3 and below u32::MAX - 3 - assert!(numbers.len() >= 100); - - // Some numbers for ranges - let min = *numbers.iter().min().unwrap(); - let below_min = min - .checked_sub(3) - .expect("test input is not in defined range"); - let max = *numbers.iter().max().unwrap(); - let above_max = max - .checked_add(3) - .expect("test input is not in defined range"); - let middle = *numbers.get(numbers.len() / 2).unwrap(); - - let range_pairs = vec![ - (min, middle), - (below_min, middle), - (middle, max), - (middle, above_max), - (min, max), - (below_min, above_max), - ]; - - // Prepare cache container - let tempdir = tempfile::tempdir().unwrap(); - let db = create_test_db(tempdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - { - let mut to_parent = to_parent.write().unwrap(); - for id in 4..=10 { - to_parent.insert(id, id - 1); - } - } - - let cache_container = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); - - // Creating snapshots - let n_chunks = 10; // Number of chunks you want - - let chunk_size = (numbers.len() + n_chunks - 1) / n_chunks; // Calculate size of each chunk - let chunks: Vec<_> = numbers.chunks(chunk_size).map(|c| c.to_vec()).collect(); - - for (idx, chunk) in chunks.iter().enumerate() { - let snapshot_id = idx as u64; - let db_snapshot = CacheDb::new(snapshot_id, cache_container.clone().into()); - for item in chunk { - db_snapshot - .put::(&TestField(*item), &TestField(*item)) - .unwrap(); - } - - { - let mut cache_container = cache_container.write().unwrap(); - cache_container.add_snapshot(db_snapshot.into()).unwrap(); - if idx < 3 { - cache_container.commit_snapshot(&snapshot_id).unwrap(); - } - } - } - - for (low, high) in range_pairs { - let low = TestField(low); - let high = TestField(high); - let range = encode_key(&low)..encode_key(&high); - let range_inclusive = encode_key(&low)..=encode_key(&high); - let range_to = ..encode_key(&high); - let range_to_inclusive = ..=encode_key(&high); - let range_from = encode_key(&low)..; - let range_full = ..; - - check_range(cache_container.read().unwrap(), range); - check_range(cache_container.read().unwrap(), range_inclusive); - check_range(cache_container.read().unwrap(), range_to); - check_range(cache_container.read().unwrap(), range_to_inclusive); - check_range(cache_container.read().unwrap(), range_from); - check_range(cache_container.read().unwrap(), range_full); - } - } - - fn check_range + Clone>( - cache_container: RwLockReadGuard, - range: R, - ) { - let iterator_forward = cache_container.iter_range::(10, range.clone()).unwrap(); - validate_iterator(iterator_forward, range.clone(), ScanDirection::Forward); - let iterator_backward = cache_container - .rev_iter_range::(10, range.clone()) - .unwrap(); - validate_iterator(iterator_backward, range, ScanDirection::Backward); - } - - fn validate_iterator(iterator: I, range: R, direction: ScanDirection) - where - I: Iterator, - R: std::ops::RangeBounds, - { - let mut prev_key: Option = None; - for (key, _) in iterator { - assert!(range.contains(&key)); - let key = <::Key as KeyDecoder>::decode_key(&key).unwrap(); - if let Some(prev_key) = prev_key { - match direction { - ScanDirection::Forward => { - assert!(key.0 >= prev_key.0) - } - ScanDirection::Backward => { - assert!(key.0 <= prev_key.0) - } - }; - } - prev_key = Some(key); - } - } - - #[test] - fn check_proptest_case() { - let numbers: Vec = (10..=113).collect(); - test_iterator_ranges(numbers); - } - - proptest! { - #[test] - fn cache_container_iter_range(input in prop::collection::vec(4u32..10_000, 101..200)) { - test_iterator_ranges(input); - } - - #[test] - fn cache_container_iter_range_tight_values(input in prop::collection::vec(4u32..10, 101..200)) { - test_iterator_ranges(input); - } - - fn cache_container_iter_range_uniq_numbers(input in prop::collection::hash_set(4u32..10_000, 101..200)) { - let input: Vec = input.into_iter().collect(); - test_iterator_ranges(input); - } - } -} diff --git a/src/cache/cache_db.rs b/src/cache/cache_db.rs deleted file mode 100644 index 74f9c5b..0000000 --- a/src/cache/cache_db.rs +++ /dev/null @@ -1,609 +0,0 @@ -//! This module is main entry point into cache layering subsystem. -use std::iter::Peekable; -use std::sync::Mutex; - -use crate::cache::cache_container::CacheContainer; -use crate::cache::change_set::ChangeSet; -use crate::cache::{PaginatedResponse, SnapshotId}; -use crate::iterator::ScanDirection; -use crate::schema::KeyDecoder; -use crate::{ - KeyCodec, Operation, ReadOnlyLock, Schema, SchemaBatch, SchemaKey, SchemaValue, SeekKeyEncoder, - ValueCodec, -}; - -/// Cache layer that stores all writes locally and also able to access "previous" operations. -#[derive(Debug)] -pub struct CacheDb { - local_cache: Mutex, - db: ReadOnlyLock, -} - -impl CacheDb { - /// Create new [`CacheDb`] pointing to given [`CacheContainer`] - pub fn new(id: SnapshotId, cache_container: ReadOnlyLock) -> Self { - Self { - local_cache: Mutex::new(ChangeSet::new(id)), - db: cache_container, - } - } - - /// Store a value in local cache - pub fn put( - &self, - key: &impl KeyCodec, - value: &impl ValueCodec, - ) -> anyhow::Result<()> { - self.local_cache - .lock() - .expect("Local ChangeSet lock must not be poisoned") - .operations - .put(key, value) - } - - /// Delete given key from local cache - pub fn delete(&self, key: &impl KeyCodec) -> anyhow::Result<()> { - self.local_cache - .lock() - .expect("Local ChangeSet lock must not be poisoned") - .operations - .delete(key) - } - - /// Writes many operations at once in local cache, atomically. - pub fn write_many(&self, batch: SchemaBatch) -> anyhow::Result<()> { - let mut inner = self - .local_cache - .lock() - .expect("Local SchemaBatch lock must not be poisoned"); - inner.operations.merge(batch); - Ok(()) - } - - /// Overwrites inner cache with new, while retaining reference to parent - pub fn overwrite_change_set(&self, other: CacheDb) { - let mut this_cache = self.local_cache.lock().unwrap(); - let other_cache = other.local_cache.into_inner().unwrap(); - *this_cache = other_cache; - } - - /// Get a value from current snapshot, its parents or underlying database - pub fn read(&self, key: &impl KeyCodec) -> anyhow::Result> { - // Some(Operation) means that key was touched, - // but in case of deletion we early return None - // Only in case of not finding operation for key, - // we go deeper - - // Hold local cache lock explicitly, so reads are atomic - let local_cache = self - .local_cache - .lock() - .expect("SchemaBatch lock should not be poisoned"); - - // 1. Check in cache - if let Some(operation) = local_cache.get(key)? { - return operation.decode_value::(); - } - - // 2. Check parent - let parent = self.db.read().expect("Parent lock must not be poisoned"); - parent.get::(local_cache.id(), key) - } - - /// Get a value from current snapshot, its parents or underlying database - pub async fn read_async( - &self, - key: &impl KeyCodec, - ) -> anyhow::Result> { - tokio::task::block_in_place(|| self.read(key)) - } - - /// Get value of largest key written value for given [`Schema`] - pub fn get_largest(&self) -> anyhow::Result> { - let change_set = self - .local_cache - .lock() - .expect("SchemaBatch lock must not be poisoned"); - let local_cache_iter = change_set.iter::().rev(); - - let parent = self.db.read().expect("Parent lock must not be poisoned"); - - let parent_iter = parent.rev_iter::(change_set.id())?; - - let mut combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { - local_cache_iter: local_cache_iter.peekable(), - parent_iter: parent_iter.peekable(), - direction: ScanDirection::Backward, - }; - - if let Some((key, value)) = combined_iter.next() { - let key = S::Key::decode_key(&key)?; - let value = S::Value::decode_value(&value)?; - return Ok(Some((key, value))); - } - - Ok(None) - } - - /// Get value of largest key written value for given [`Schema`] - pub async fn get_largest_async(&self) -> anyhow::Result> { - tokio::task::block_in_place(|| self.get_largest::()) - } - - /// Get largest value in [`Schema`] that is smaller than give `seek_key` - pub fn get_prev( - &self, - seek_key: &impl SeekKeyEncoder, - ) -> anyhow::Result> { - let seek_key = seek_key.encode_seek_key()?; - let range = ..=seek_key; - let change_set = self - .local_cache - .lock() - .expect("Local cache lock must not be poisoned"); - let local_cache_iter = change_set.iter_range::(range.clone()).rev(); - - let parent = self - .db - .read() - .expect("Parent snapshots lock must not be poisoned"); - let parent_iter = parent.rev_iter_range::(change_set.id(), range)?; - - let mut combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { - local_cache_iter: local_cache_iter.peekable(), - parent_iter: parent_iter.peekable(), - direction: ScanDirection::Backward, - }; - - if let Some((key, value)) = combined_iter.next() { - let key = S::Key::decode_key(&key)?; - let value = S::Value::decode_value(&value)?; - return Ok(Some((key, value))); - } - Ok(None) - } - - /// Get largest value in [`Schema`] that is smaller than give `seek_key` - pub async fn get_prev_async( - &self, - seek_key: &impl SeekKeyEncoder, - ) -> anyhow::Result> { - tokio::task::block_in_place(|| self.get_prev(seek_key)) - } - - /// Get `n` keys >= `seek_key` - pub fn get_n_from_first_match( - &self, - seek_key: &impl SeekKeyEncoder, - n: usize, - ) -> anyhow::Result> { - let seek_key = seek_key.encode_seek_key()?; - let range = seek_key..; - let change_set = self - .local_cache - .lock() - .expect("Local cache lock must not be poisoned"); - let local_cache_iter = change_set.iter_range::(range.clone()); - - let parent = self - .db - .read() - .expect("Parent snapshots lock must not be poisoned"); - let parent_iter = parent.iter_range::(change_set.id(), range)?; - - let mut combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { - local_cache_iter: local_cache_iter.peekable(), - parent_iter: parent_iter.peekable(), - direction: ScanDirection::Forward, - }; - - let results: Vec<(S::Key, S::Value)> = combined_iter - .by_ref() - .filter_map(|(key_bytes, value_bytes)| { - let key = S::Key::decode_key(&key_bytes).ok()?; - let value = S::Value::decode_value(&value_bytes).ok()?; - Some((key, value)) - }) - .take(n) - .collect(); - - let next_start_key: Option = combined_iter - .next() - .and_then(|(key_bytes, _)| S::Key::decode_key(&key_bytes).ok()); - - Ok(PaginatedResponse { - key_value: results, - next: next_start_key, - }) - } - - /// Get `n` keys >= `seek_key` - pub async fn get_n_from_first_match_async( - &self, - seek_key: &impl SeekKeyEncoder, - n: usize, - ) -> anyhow::Result> { - tokio::task::block_in_place(|| self.get_n_from_first_match(seek_key, n)) - } - - /// Get a clone of internal ChangeSet - pub fn clone_change_set(&self) -> ChangeSet { - let change_set = self - .local_cache - .lock() - .expect("Local change set lock is poisoned"); - change_set.clone() - } - - /// Collects all key-value pairs in given range, from smallest to largest. - pub fn collect_in_range>( - &self, - range: std::ops::Range, - ) -> anyhow::Result> { - let lower_bound = range.start.encode_seek_key()?; - let upper_bound = range.end.encode_seek_key()?; - let range = lower_bound..upper_bound; - let change_set = self - .local_cache - .lock() - .expect("Local cache lock must not be poisoned"); - let local_cache_iter = change_set.iter_range::(range.clone()); - - let parent = self - .db - .read() - .expect("Parent snapshots lock must not be poisoned"); - let parent_iter = parent.iter_range::(change_set.id(), range)?; - - let combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { - local_cache_iter: local_cache_iter.peekable(), - parent_iter: parent_iter.peekable(), - direction: ScanDirection::Forward, - }; - - let result = combined_iter - .map(|(key, value)| { - let key = S::Key::decode_key(&key).unwrap(); - let value = S::Value::decode_value(&value).unwrap(); - (key, value) - }) - .collect(); - - Ok(result) - } - - /// Collects all key-value pairs in given range, from smallest to largest. - pub async fn collect_in_range_async>( - &self, - range: std::ops::Range, - ) -> anyhow::Result> { - tokio::task::block_in_place(|| self.collect_in_range(range)) - } -} - -/// Iterator over [`CacheDb`] that combines local cache and parent iterators. -/// Prioritizes local cache over parent in case of equal keys. -/// This is because local cache operations are considered to be newer. -struct CacheDbIter<'a, LocalIter, ParentIter> -where - LocalIter: Iterator, - ParentIter: Iterator, -{ - local_cache_iter: Peekable, - parent_iter: Peekable, - direction: ScanDirection, -} - -impl<'a, LocalIter, ParentIter> Iterator for CacheDbIter<'a, LocalIter, ParentIter> -where - LocalIter: Iterator, - ParentIter: Iterator, -{ - type Item = (SchemaKey, SchemaValue); - - fn next(&mut self) -> Option { - loop { - let Some(&local) = self.local_cache_iter.peek() else { - // Local iterator is exhausted; all reads go to the parent. - return self.parent_iter.next(); - }; - - if let Some(parent) = self.parent_iter.peek() { - // Check if the next parent key has priority over the local key, based on - // the scan direction. - if ((local.0 > &parent.0) && (self.direction == ScanDirection::Forward)) - || ((local.0 < &parent.0) && (self.direction == ScanDirection::Backward)) - { - return self.parent_iter.next(); - } else if local.0 == &parent.0 { - // If keys are equal, we must consume the next parent value - // before we can read from the local cache (or they will - // get out of sync). - self.parent_iter.next(); - } - } - - // At this point, we've determined that we wish to read from the - // local cache. - self.local_cache_iter.next(); - match local.1 { - // No value to read from the local cache, skip it. - Operation::Delete => continue, - Operation::Put { value } => return Some((local.0.clone(), value.clone())), - } - } - } -} - -impl From for ChangeSet { - fn from(value: CacheDb) -> Self { - value - .local_cache - .into_inner() - .expect("Internal cache lock is poisoned") - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::path::Path; - use std::sync::{Arc, RwLock}; - - use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; - - use super::*; - use crate::cache::cache_db::CacheContainer; - use crate::schema::KeyEncoder; - use crate::test::{TestCompositeField, TestField}; - use crate::{define_schema, DB}; - define_schema!(TestSchema, TestCompositeField, TestField, "TestCF"); - - mod local_cache { - use super::*; - - #[test] - fn test_concurrent_operations() { - let tmpdir = tempfile::tempdir().unwrap(); - let db = open_db(tmpdir.path()); - - let cache_container = CacheContainer::orphan(db); - - let cache_db = Arc::new(CacheDb::new( - 0, - Arc::new(RwLock::new(cache_container)).into(), - )); - - let mut handles = vec![]; - for i in 0..10 { - let cache_db_clone = cache_db.clone(); - let handle = std::thread::spawn(move || { - for j in 0..10 { - let key = TestCompositeField(j + 1, j + 2, j + 3); - let value = TestField(i * 10 + j + 4); - cache_db_clone.put::(&key, &value).unwrap(); - } - }); - handles.push(handle); - } - - for handle in handles { - handle.join().expect("Thread panicked"); - } - for j in 0..10 { - let key = TestCompositeField(j + 1, j + 2, j + 3); - let value = cache_db.read::(&key).unwrap().unwrap().0; - let possible_range = (j + 4)..(100 + j + 4); - assert!(possible_range.contains(&value)); - let rem = value - (j + 4); - assert_eq!(0, rem % 10); - } - } - } - - fn encode_key(key: &TestCompositeField) -> SchemaKey { - >::encode_key(key).unwrap() - } - - fn encode_value(value: &TestField) -> SchemaValue { - >::encode_value(value).unwrap() - } - - fn open_db(dir: impl AsRef) -> DB { - let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME, TestSchema::COLUMN_FAMILY_NAME]; - let mut db_opts = rocksdb::Options::default(); - db_opts.create_if_missing(true); - db_opts.create_missing_column_families(true); - DB::open(dir, "test", column_families, &db_opts).expect("Failed to open DB.") - } - - #[test] - fn test_db_snapshot_iterator_empty() { - let tmpdir = tempfile::tempdir().unwrap(); - let db = open_db(tmpdir.path()); - - let local_cache = SchemaBatch::new(); - let local_cache_iter = local_cache.iter::().peekable(); - - let cache_container = CacheContainer::orphan(db); - let parent_iter = cache_container.iter::(0).unwrap().peekable(); - - let combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { - local_cache_iter, - parent_iter, - direction: ScanDirection::Forward, - }; - - let values: Vec<(SchemaKey, SchemaValue)> = combined_iter.collect(); - assert!(values.is_empty()) - } - - #[test] - fn test_db_snapshot_iterator_values() { - let tmpdir = tempfile::tempdir().unwrap(); - let db = open_db(tmpdir.path()); - - let k0 = TestCompositeField(0, 0, 0); - let k1 = TestCompositeField(0, 1, 0); - let k2 = TestCompositeField(0, 1, 2); - let k3 = TestCompositeField(3, 1, 0); - let k4 = TestCompositeField(3, 2, 0); - - let mut older_change_set = ChangeSet::new(0); - older_change_set - .operations - .put::(&k2, &TestField(2)) - .unwrap(); - older_change_set - .operations - .put::(&k1, &TestField(1)) - .unwrap(); - older_change_set - .operations - .put::(&k4, &TestField(4)) - .unwrap(); - older_change_set - .operations - .put::(&k3, &TestField(3)) - .unwrap(); - - let to_parent: Arc>> = - Arc::new(RwLock::new(Default::default())); - { - let mut parent = to_parent.write().unwrap(); - parent.insert(1, 0); - } - let mut cache_container = CacheContainer::new(db, to_parent.into()); - cache_container.add_snapshot(older_change_set).unwrap(); - - let parent_iter = cache_container.iter::(1).unwrap(); - - let mut local_change_set = ChangeSet::new(1); - - local_change_set - .operations - .delete::(&k3) - .unwrap(); - local_change_set - .operations - .put::(&k0, &TestField(100)) - .unwrap(); - local_change_set - .operations - .put::(&k1, &TestField(10)) - .unwrap(); - local_change_set - .operations - .put::(&k2, &TestField(20)) - .unwrap(); - - let local_cache_iter = local_change_set.iter::(); - - let combined_iter: CacheDbIter<'_, _, _> = CacheDbIter { - local_cache_iter: local_cache_iter.peekable(), - parent_iter: parent_iter.peekable(), - direction: ScanDirection::Forward, - }; - - let actual_values: Vec<(SchemaKey, SchemaValue)> = combined_iter.collect(); - let expected_values = vec![ - (encode_key(&k0), encode_value(&TestField(100))), - (encode_key(&k1), encode_value(&TestField(10))), - (encode_key(&k2), encode_value(&TestField(20))), - (encode_key(&k4), encode_value(&TestField(4))), - ]; - - assert_eq!(expected_values, actual_values); - } - - fn put_value(cache_db: &CacheDb, key: u32, value: u32) { - cache_db - .put::(&TestCompositeField(key, 0, 0), &TestField(value)) - .unwrap(); - } - - fn check_value(cache_db: &CacheDb, key: u32, expected_value: Option) { - let actual_value = cache_db - .read::(&TestCompositeField(key, 0, 0)) - .unwrap() - .map(|v| v.0); - assert_eq!(expected_value, actual_value); - } - - #[test] - fn test_iterators_complete() { - let tmpdir = tempfile::tempdir().unwrap(); - let db = open_db(tmpdir.path()); - - let to_parent: Arc>> = - Arc::new(RwLock::new(Default::default())); - { - let mut parent = to_parent.write().unwrap(); - parent.insert(2, 1); - parent.insert(3, 2); - parent.insert(4, 3); - } - - let cache_container = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); - - let db_values = CacheDb::new(0, cache_container.clone().into()); - - put_value(&db_values, 2, 1); - put_value(&db_values, 4, 1); - put_value(&db_values, 3, 9); - { - let mut cache_container = cache_container.write().unwrap(); - cache_container.add_snapshot(db_values.into()).unwrap(); - cache_container.commit_snapshot(&0).unwrap(); - assert!(cache_container.is_empty()); - } - - let cache_db_1 = CacheDb::new(1, cache_container.clone().into()); - put_value(&cache_db_1, 1, 8); - put_value(&cache_db_1, 4, 2); - put_value(&cache_db_1, 5, 7); - put_value(&cache_db_1, 8, 3); - { - let mut cache_container = cache_container.write().unwrap(); - cache_container.add_snapshot(cache_db_1.into()).unwrap(); - } - let cache_db_2 = CacheDb::new(2, cache_container.clone().into()); - put_value(&cache_db_2, 2, 6); - cache_db_2 - .delete::(&TestCompositeField(4, 0, 0)) - .unwrap(); - put_value(&cache_db_2, 9, 4); - put_value(&cache_db_2, 10, 2); - { - let mut cache_container = cache_container.write().unwrap(); - cache_container.add_snapshot(cache_db_2.into()).unwrap(); - } - - let cache_db_3 = CacheDb::new(3, cache_container.into()); - put_value(&cache_db_3, 1, 2); - put_value(&cache_db_3, 8, 6); - cache_db_3 - .delete::(&TestCompositeField(9, 0, 0)) - .unwrap(); - put_value(&cache_db_3, 12, 1); - - // Checking - check_value(&cache_db_3, 5, Some(7)); - check_value(&cache_db_3, 4, None); - - let actual_kv_sorted = cache_db_3 - .collect_in_range::( - TestCompositeField(0, 0, 0)..TestCompositeField(100, 0, 0), - ) - .unwrap() - .into_iter() - .map(|(k, v)| (k.0, v.0)) - .collect::>(); - - let expected_kv_sorted = vec![(1, 2), (2, 6), (3, 9), (5, 7), (8, 6), (10, 2), (12, 1)]; - - assert_eq!(expected_kv_sorted, actual_kv_sorted); - } - - // TODO: Proptest here -} diff --git a/src/cache/change_set.rs b/src/cache/change_set.rs deleted file mode 100644 index bbf36f1..0000000 --- a/src/cache/change_set.rs +++ /dev/null @@ -1,61 +0,0 @@ -//! Collection of writes in given snapshot/cache layer -//! Data in ChangeSet is written inside crate, public access allows only reading. -use std::collections::btree_map; - -use crate::cache::SnapshotId; -use crate::{KeyCodec, Operation, Schema, SchemaBatch, SchemaKey}; - -/// Iterator type returned by [`ChangeSet::iter`]. -pub type ChangeSetIter<'a> = btree_map::Iter<'a, SchemaKey, Operation>; -/// Range type returned by [`ChangeSet::iter_range`]. -pub type ChangeSetRange<'a> = btree_map::Range<'a, SchemaKey, Operation>; - -/// Collection of all writes with associated [`SnapshotId`] -#[derive(Debug, Clone)] -pub struct ChangeSet { - id: SnapshotId, - pub(crate) operations: SchemaBatch, -} - -impl ChangeSet { - pub(crate) fn new(id: SnapshotId) -> Self { - Self { - id, - operations: SchemaBatch::default(), - } - } - - /// Create new `ChangeSet - pub fn new_with_operations(id: SnapshotId, operations: SchemaBatch) -> Self { - Self { id, operations } - } - - /// Get value from its own cache - pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { - self.operations.get_operation(key) - } - - /// Get the ID of this [`ChangeSet`]. - pub fn id(&self) -> SnapshotId { - self.id - } - - /// Iterate over all operations in snapshot in lexicographic order - pub fn iter(&self) -> ChangeSetIter { - self.operations.iter::() - } - - /// Iterate over operations in lower_bound..upper_bound range in lexicographic order - pub fn iter_range( - &self, - range: impl std::ops::RangeBounds, - ) -> ChangeSetRange { - self.operations.iter_range::(range) - } -} - -impl From for SchemaBatch { - fn from(value: ChangeSet) -> Self { - value.operations - } -} diff --git a/src/cache/mod.rs b/src/cache/mod.rs index c92067e..23bb03c 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -2,14 +2,8 @@ use crate::Schema; -pub mod cache_container; -pub mod cache_db; -pub mod change_set; pub mod delta_reader; -/// Id of ChangeSet/snapshot/cache layer -pub type SnapshotId = u64; - /// Response for a paginated query which also includes the "next" key to pass. #[derive(Debug)] pub struct PaginatedResponse { diff --git a/src/config.rs b/src/config.rs index 0dd1024..33de290 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,3 @@ -use rocksdb::Options; - /// Port selected RocksDB options for tuning underlying rocksdb instance of our state db. /// The current default values are taken from Aptos. TODO: tune rocksdb for our workload. /// see @@ -31,8 +29,8 @@ impl Default for RocksdbConfig { } /// Generate [`rocksdb::Options`] corresponding to the given [`RocksdbConfig`]. -pub fn gen_rocksdb_options(config: &RocksdbConfig, readonly: bool) -> Options { - let mut db_opts = Options::default(); +pub fn gen_rocksdb_options(config: &RocksdbConfig, readonly: bool) -> rocksdb::Options { + let mut db_opts = rocksdb::Options::default(); db_opts.set_max_open_files(config.max_open_files); db_opts.set_max_total_wal_size(config.max_total_wal_size); db_opts.set_max_background_jobs(config.max_background_jobs); diff --git a/src/lib.rs b/src/lib.rs index 80e64ec..2e8169b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,6 @@ pub mod test; pub use config::{gen_rocksdb_options, RocksdbConfig}; use std::path::Path; -use std::sync::{Arc, LockResult, RwLock, RwLockReadGuard}; use anyhow::format_err; use iterator::ScanDirection; @@ -103,7 +102,7 @@ impl DB { Ok(Self::log_construct(name, inner)) } - /// Open db in secondary mode. A secondary db is does not support writes, but can be dynamically caught up + /// Open db in secondary mode. A secondary db does not support writes, but can be dynamically caught up /// to the primary instance by a manual call. See /// for more details. pub fn open_cf_as_secondary>( @@ -413,7 +412,7 @@ pub enum CodecError { Io(#[from] std::io::Error), } -/// For now we always use synchronous writes. This makes sure that once the operation returns +/// For now, we always use synchronous writes. This makes sure that once the operation returns /// `Ok(())` the data is persisted even if the machine crashes. In the future we might consider /// selectively turning this off for some non-critical writes to improve performance. fn default_write_options() -> rocksdb::WriteOptions { @@ -422,33 +421,6 @@ fn default_write_options() -> rocksdb::WriteOptions { opts } -/// Wrapper around `RwLock` that only allows read access. -/// This type implies that wrapped type suppose to be used only for reading. -/// It is useful to indicate that user of this type can only do reading. -/// This also implies that that inner `Arc>` is a clone and some other part can do writing. -#[derive(Debug, Clone)] -pub struct ReadOnlyLock { - lock: Arc>, -} - -impl ReadOnlyLock { - /// Create new [`ReadOnlyLock`] from [`Arc>`]. - pub fn new(lock: Arc>) -> Self { - Self { lock } - } - - /// Acquires a read lock on the underlying [`RwLock`]. - pub fn read(&self) -> LockResult> { - self.lock.read() - } -} - -impl From>> for ReadOnlyLock { - fn from(value: Arc>) -> Self { - Self::new(value) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/schema.rs b/src/schema.rs index 19b552a..0c83d57 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -15,7 +15,7 @@ use crate::CodecError; pub type ColumnFamilyName = &'static str; /// A [`Schema`] is a type-safe interface over a specific column family in a -/// [`DB`](crate::DB). It always a key type ([`KeyCodec`]) and a value type ([`ValueCodec`]). +/// [`DB`](crate::DB). It is always a key type ([`KeyCodec`]) and a value type ([`ValueCodec`]). pub trait Schema: Debug + Send + Sync + 'static + Sized { /// The column family name associated with this struct. /// Note: all schemas within the same SchemaDB must have distinct column family names. diff --git a/tests/iterator_test.rs b/tests/iterator_test.rs index a002138..5a3aacb 100644 --- a/tests/iterator_test.rs +++ b/tests/iterator_test.rs @@ -1,15 +1,10 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashMap; -use std::sync::{Arc, RwLock}; - -use rockbound::cache::cache_container::CacheContainer; -use rockbound::cache::cache_db::CacheDb; use rockbound::schema::{KeyDecoder, KeyEncoder, ValueCodec}; use rockbound::test::{KeyPrefix1, KeyPrefix2, TestCompositeField, TestField}; use rockbound::{ - define_schema, Operation, ReadOnlyLock, Schema, SchemaBatch, SchemaIterator, SeekKeyEncoder, DB, + define_schema, Operation, Schema, SchemaBatch, SchemaIterator, SeekKeyEncoder, DB, }; use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; use tempfile::TempDir; @@ -365,200 +360,3 @@ fn test_schema_batch_iter_range() { ); assert_eq!(None, iter.next()); } - -#[test] -fn test_db_snapshot_get_last_value() { - let db = TestDB::new(); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - { - let mut to_parent = to_parent.write().unwrap(); - to_parent.insert(1, 0); - } - let manager = Arc::new(RwLock::new(CacheContainer::new(db.db, to_parent.into()))); - - let snapshot_1 = CacheDb::new(0, ReadOnlyLock::new(manager.clone())); - - { - let (largest_key_in_db, largest_value_in_db) = - snapshot_1.get_largest::().unwrap().unwrap(); - assert_eq!(TestCompositeField(2, 0, 2), largest_key_in_db); - assert_eq!(TestField(202), largest_value_in_db); - } - - let key_1 = TestCompositeField(8, 2, 3); - let value_1 = TestField(6); - - snapshot_1.put::(&key_1, &value_1).unwrap(); - - { - let (largest_key, largest_value) = snapshot_1 - .get_largest::() - .unwrap() - .expect("largest key-value pair should be found"); - assert_eq!(key_1, largest_key); - assert_eq!(value_1, largest_value); - } - - { - let mut manager = manager.write().unwrap(); - manager.add_snapshot(snapshot_1.into()).unwrap(); - } - - let snapshot_2 = CacheDb::new(1, ReadOnlyLock::new(manager)); - - { - let (latest_key, latest_value) = snapshot_2 - .get_largest::() - .unwrap() - .expect("largest key-value pair should be found"); - assert_eq!(key_1, latest_key); - assert_eq!(value_1, latest_value); - } - - let key_2 = TestCompositeField(8, 1, 3); - let value_2 = TestField(7); - snapshot_2.put::(&key_2, &value_2).unwrap(); - { - let (latest_key, latest_value) = snapshot_2 - .get_largest::() - .unwrap() - .expect("largest key-value pair should be found"); - assert_eq!(key_1, latest_key); - assert_eq!(value_1, latest_value); - } - - // Largest value from local is picked up - let key_3 = TestCompositeField(8, 3, 1); - let value_3 = TestField(8); - snapshot_2.put::(&key_3, &value_3).unwrap(); - { - let (latest_key, latest_value) = snapshot_2 - .get_largest::() - .unwrap() - .expect("largest key-value pair should be found"); - assert_eq!(key_3, latest_key); - assert_eq!(value_3, latest_value); - } - - // Deletion: Previous "largest" value is returned - snapshot_2.delete::(&key_3).unwrap(); - { - let (latest_key, latest_value) = snapshot_2 - .get_largest::() - .unwrap() - .expect("large key-value pair should be found"); - assert_eq!(key_1, latest_key); - assert_eq!(value_1, latest_value); - } -} - -#[test] -fn test_db_cache_container_get_prev_value() { - let tmpdir = tempfile::tempdir().unwrap(); - let db = open_inner_db(tmpdir.path()); - let to_parent = Arc::new(RwLock::new(HashMap::new())); - { - let mut to_parent = to_parent.write().unwrap(); - to_parent.insert(1, 0); - to_parent.insert(2, 1); - } - let cache_container = Arc::new(RwLock::new(CacheContainer::new(db, to_parent.into()))); - - // Snapshots 1 and 2 are to black box usages of parents iterator - let snapshot_1 = CacheDb::new(0, ReadOnlyLock::new(cache_container.clone())); - - let key_1 = TestCompositeField(8, 2, 3); - let key_2 = TestCompositeField(8, 2, 0); - let key_3 = TestCompositeField(8, 3, 2); - - assert!(snapshot_1.get_prev::(&key_1).unwrap().is_none()); - - snapshot_1.put::(&key_2, &TestField(10)).unwrap(); - snapshot_1.put::(&key_1, &TestField(1)).unwrap(); - snapshot_1 - .put::(&TestCompositeField(8, 1, 3), &TestField(11)) - .unwrap(); - snapshot_1 - .put::(&TestCompositeField(7, 2, 3), &TestField(12)) - .unwrap(); - snapshot_1 - .put::(&TestCompositeField(8, 2, 5), &TestField(13)) - .unwrap(); - snapshot_1.put::(&key_3, &TestField(14)).unwrap(); - - // Equal: - assert_eq!( - (key_1.clone(), TestField(1)), - snapshot_1.get_prev::(&key_1).unwrap().unwrap() - ); - // Previous: value from 8.2.0 - assert_eq!( - (key_2.clone(), TestField(10)), - snapshot_1 - .get_prev::(&TestCompositeField(8, 2, 1)) - .unwrap() - .unwrap() - ); - - { - let mut manager = cache_container.write().unwrap(); - manager.add_snapshot(snapshot_1.into()).unwrap(); - } - - let snapshot_2 = CacheDb::new(1, ReadOnlyLock::new(cache_container.clone())); - // Equal: - assert_eq!( - (key_1.clone(), TestField(1)), - snapshot_2.get_prev::(&key_1).unwrap().unwrap() - ); - // Previous: value from 8.2.0 - assert_eq!( - (key_2.clone(), TestField(10)), - snapshot_2 - .get_prev::(&TestCompositeField(8, 2, 1)) - .unwrap() - .unwrap() - ); - snapshot_2.put::(&key_2, &TestField(20)).unwrap(); - snapshot_2.put::(&key_1, &TestField(2)).unwrap(); - // Updated values are higher priority - assert_eq!( - (key_1.clone(), TestField(2)), - snapshot_2.get_prev::(&key_1).unwrap().unwrap() - ); - assert_eq!( - (key_2.clone(), TestField(20)), - snapshot_2 - .get_prev::(&TestCompositeField(8, 2, 1)) - .unwrap() - .unwrap() - ); - snapshot_2.delete::(&key_1).unwrap(); - assert_eq!( - (key_2.clone(), TestField(20)), - snapshot_2.get_prev::(&key_1).unwrap().unwrap() - ); - { - let mut manager = cache_container.write().unwrap(); - manager.add_snapshot(snapshot_2.into()).unwrap(); - } - let snapshot_3 = CacheDb::new(2, ReadOnlyLock::new(cache_container)); - assert_eq!( - (key_2.clone(), TestField(20)), - snapshot_3 - .get_prev::(&TestCompositeField(8, 2, 1)) - .unwrap() - .unwrap() - ); - assert_eq!( - (key_2, TestField(20)), - snapshot_3.get_prev::(&key_1).unwrap().unwrap() - ); - assert_eq!( - (key_3, TestField(14)), - snapshot_3 - .get_prev::(&TestCompositeField(8, 3, 4)) - .unwrap() - .unwrap() - ); -} diff --git a/tests/snapshot_test.rs b/tests/snapshot_test.rs deleted file mode 100644 index 6594d6e..0000000 --- a/tests/snapshot_test.rs +++ /dev/null @@ -1,83 +0,0 @@ -use std::collections::HashMap; -use std::path::Path; -use std::sync::{Arc, RwLock}; - -use rockbound::cache::cache_container::CacheContainer; -use rockbound::cache::cache_db::CacheDb; -use rockbound::schema::ColumnFamilyName; -use rockbound::test::TestField; -use rockbound::{define_schema, ReadOnlyLock, Schema, DB}; -use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; - -define_schema!(TestSchema1, TestField, TestField, "TestCF1"); - -type S = TestSchema1; - -fn get_column_families() -> Vec { - vec![DEFAULT_COLUMN_FAMILY_NAME, S::COLUMN_FAMILY_NAME] -} - -fn open_db(dir: impl AsRef) -> DB { - let mut db_opts = rocksdb::Options::default(); - db_opts.create_if_missing(true); - db_opts.create_missing_column_families(true); - DB::open(dir, "test", get_column_families(), &db_opts).expect("Failed to open DB.") -} - -#[test] -fn snapshot_lifecycle() { - let tmpdir = tempfile::tempdir().unwrap(); - let db = open_db(&tmpdir); - - let to_parent = Arc::new(RwLock::new(HashMap::new())); - { - let mut to_parent = to_parent.write().unwrap(); - to_parent.insert(1, 0); - to_parent.insert(2, 1); - } - let manager = Arc::new(RwLock::new(CacheContainer::new( - db, - to_parent.clone().into(), - ))); - - let key = TestField(1); - let value = TestField(1); - - let snapshot_1 = CacheDb::new(0, ReadOnlyLock::new(manager.clone())); - assert_eq!( - None, - snapshot_1.read::(&key).unwrap(), - "Incorrect value, should find nothing" - ); - - snapshot_1.put::(&key, &value).unwrap(); - assert_eq!( - Some(value), - snapshot_1.read::(&key).unwrap(), - "Incorrect value, should be fetched from local cache" - ); - { - let mut manager = manager.write().unwrap(); - manager.add_snapshot(snapshot_1.into()).unwrap(); - } - - // Snapshot 2: reads value from snapshot 1, then deletes it - let snapshot_2 = CacheDb::new(1, ReadOnlyLock::new(manager.clone())); - assert_eq!(Some(value), snapshot_2.read::(&key).unwrap()); - snapshot_2.delete::(&key).unwrap(); - assert_eq!(None, snapshot_2.read::(&key).unwrap()); - { - let mut manager = manager.write().unwrap(); - manager.add_snapshot(snapshot_2.into()).unwrap(); - let mut to_parent = to_parent.write().unwrap(); - to_parent.insert(1, 0); - } - - // Snapshot 3: gets empty result, event value is in some previous snapshots - let snapshot_3 = CacheDb::new(2, ReadOnlyLock::new(manager)); - { - let mut to_parent = to_parent.write().unwrap(); - to_parent.insert(2, 1); - } - assert_eq!(None, snapshot_3.read::(&key).unwrap()); -}